Skip to content

Commit

Permalink
HIVE-24991: Enable fetching deleted rows in vectorized mode (Krisztia…
Browse files Browse the repository at this point in the history
…n Kasa, reviewed by Panos Garefalakis)
  • Loading branch information
kasakrisz committed Jun 15, 2021
1 parent 88048e7 commit 6a7d4ba
Show file tree
Hide file tree
Showing 262 changed files with 3,591 additions and 2,780 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,20 @@ public boolean isVirtualColumnNeeded(String virtualColumnName) {

public int findVirtualColumnNum(VirtualColumn virtualColumn) {
// Virtual columns start after the last partition column.
int resultColumnNum = dataColumnCount + partitionColumnCount;
for (VirtualColumn neededVirtualColumn : neededVirtualColumns) {
if (neededVirtualColumn.equals(virtualColumn)) {
return resultColumnNum;
int partitionEndColumnNum = dataColumnCount + partitionColumnCount;
final int virtualEndColumnNum = partitionEndColumnNum + virtualColumnCount;
for (int virtualColumnNum = partitionEndColumnNum; virtualColumnNum < virtualEndColumnNum; virtualColumnNum++) {
String virtualColumnName = rowColumnNames[virtualColumnNum];
if (!virtualColumnName.equals(virtualColumn.getName())) {
continue;
}
if (!isVirtualColumnNeeded(virtualColumnName)) {
continue;
}
resultColumnNum++;

return virtualColumnNum;
}

return -1;
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ public class Vectorizer implements PhysicalPlanResolver {

// The set of virtual columns that vectorized readers *MAY* support.
public static final ImmutableSet<VirtualColumn> vectorizableVirtualColumns =
ImmutableSet.of(VirtualColumn.ROWID);
ImmutableSet.of(VirtualColumn.ROWID, VirtualColumn.ROWISDELETED);

private HiveConf hiveConf;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.common.type.DataTypePhysicalVariation;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.Utilities;
Expand All @@ -51,9 +53,12 @@

import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.JobConf;
Expand Down Expand Up @@ -961,6 +966,21 @@ private void testDeleteEventOriginalFiltering2() throws Exception {

@Test
public void testVectorizedOrcAcidRowBatchReader() throws Exception {
setupTestData();

testVectorizedOrcAcidRowBatchReader(ColumnizedDeleteEventRegistry.class.getName());

// To test the SortMergedDeleteEventRegistry, we need to explicitly set the
// HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY constant to a smaller value.
int oldValue = conf.getInt(HiveConf.ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname, 1000000);
conf.setInt(HiveConf.ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname, 1000);
testVectorizedOrcAcidRowBatchReader(SortMergedDeleteEventRegistry.class.getName());

// Restore the old value.
conf.setInt(HiveConf.ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname, oldValue);
}

private void setupTestData() throws IOException {
conf.set("bucket_count", "1");
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
Expand Down Expand Up @@ -1030,17 +1050,6 @@ public void testVectorizedOrcAcidRowBatchReader() throws Exception {
}
}
updater.close(false);

testVectorizedOrcAcidRowBatchReader(ColumnizedDeleteEventRegistry.class.getName());

// To test the SortMergedDeleteEventRegistry, we need to explicitly set the
// HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY constant to a smaller value.
int oldValue = conf.getInt(HiveConf.ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname, 1000000);
conf.setInt(HiveConf.ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname, 1000);
testVectorizedOrcAcidRowBatchReader(SortMergedDeleteEventRegistry.class.getName());

// Restore the old value.
conf.setInt(HiveConf.ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname, oldValue);
}


Expand Down Expand Up @@ -1085,6 +1094,75 @@ private void testVectorizedOrcAcidRowBatchReader(String deleteEventRegistry) thr
}
}
}

@Test
public void testFetchDeletedRowsUsingColumnizedDeleteEventRegistry() throws Exception {
setupTestData();
testFetchDeletedRows();
}

@Test
public void testFetchDeletedRowsUsingSortMergedDeleteEventRegistry() throws Exception {
setupTestData();

// To test the SortMergedDeleteEventRegistry, we need to explicitly set the
// HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY constant to a smaller value.
int oldValue = conf.getInt(HiveConf.ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname, 1000000);
try {
conf.setInt(HiveConf.ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname, 1000);
testFetchDeletedRows();
}
finally {
// Restore the old value.
conf.setInt(HiveConf.ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname, oldValue);
}
}

private void testFetchDeletedRows() throws Exception {
List<OrcInputFormat.SplitStrategy<?>> splitStrategies = getSplitStrategies();
List<OrcSplit> splits = ((OrcInputFormat.ACIDSplitStrategy) splitStrategies.get(0)).getSplits();

// Mark one of the transactions as an exception to test that invalid transactions
// are being handled properly.
conf.set(ValidWriteIdList.VALID_WRITEIDS_KEY, "tbl:14:1:1:5"); // Exclude transaction 5

// enable fetching deleted rows
conf.set(Constants.ACID_FETCH_DELETED_ROWS, "true");

// Project ROW__IS__DELETED
VectorizedRowBatchCtx rbCtx = new VectorizedRowBatchCtx(
new String[] { "payload", VirtualColumn.ROWISDELETED.getName() },
new TypeInfo[] { TypeInfoFactory.longTypeInfo, VirtualColumn.ROWISDELETED.getTypeInfo() },
new DataTypePhysicalVariation[] { DataTypePhysicalVariation.NONE, DataTypePhysicalVariation.NONE },
new int[] { 0 }, 0, 1,
new VirtualColumn[] { VirtualColumn.ROWISDELETED },
new String[0],
new DataTypePhysicalVariation[] { DataTypePhysicalVariation.NONE, DataTypePhysicalVariation.NONE });
VectorizedOrcAcidRowBatchReader vectorizedReader =
new VectorizedOrcAcidRowBatchReader(splits.get(0), conf, Reporter.NULL, rbCtx);
VectorizedRowBatch vectorizedRowBatch = rbCtx.createVectorizedRowBatch();
vectorizedRowBatch.setPartitionInfo(1, 0); // set data column count as 1.
long previousPayload = Long.MIN_VALUE;
while (vectorizedReader.next(null, vectorizedRowBatch)) {
LongColumnVector col = (LongColumnVector) vectorizedRowBatch.cols[0];
LongColumnVector rowIsDeletedColumnVector = (LongColumnVector) vectorizedRowBatch.cols[1];
for (int i = 0; i < vectorizedRowBatch.size; ++i) {
int idx = vectorizedRowBatch.selected[i];
long payload = col.vector[idx];
long owid = (payload / NUM_ROWID_PER_OWID) + 1;
long rowId = payload % NUM_ROWID_PER_OWID;
if (rowId % 2 == 0 || rowId % 3 == 0) {
assertEquals(1, rowIsDeletedColumnVector.vector[idx]);
} else {
assertEquals(0, rowIsDeletedColumnVector.vector[idx]);
}
assertTrue(owid != 5); // Check that writeid#5 has been excluded.
assertTrue(payload >= previousPayload); // Check that the data is in sorted order.
previousPayload = payload;
}
}
}

private List<OrcInputFormat.SplitStrategy<?>> getSplitStrategies() throws Exception {
conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
AcidUtils.AcidOperationalProperties.getDefault().toInt());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,5 @@ group by t;
set hive.exec.orc.split.strategy=BI;
select t, count(*) from over10k_orc
group by t;

select oo.ROW__ID.writeId, oo.ROW__IS__DELETED, oo.* from over10k_orc('acid.fetch.deleted.rows'='true') oo order by si;
19 changes: 12 additions & 7 deletions ql/src/test/queries/clientpositive/fetch_deleted_rows.q
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
set hive.support.concurrency=true;
set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
SET hive.vectorized.execution.enabled=false;
set hive.vectorized.execution.enabled=false;

create table t1(a int, b varchar(128)) stored as orc tblproperties ('transactional'='true');

Expand All @@ -10,29 +10,34 @@ delete from t1 where a = 1;

insert into t1(a,b) values (3, 'three'), (4, 'four'), (4, 'four again'), (5, 'five');

select t1.ROW__IS__DELETED, * from t1('acid.fetch.deleted.rows'='true');
select t1.ROW__IS__DELETED, * from t1('acid.fetch.deleted.rows'='true') order by a;


update t1
set b = 'updated'
where a = 3;

select t1.ROW__IS__DELETED, * from t1('acid.fetch.deleted.rows'='true');
select t1.ROW__IS__DELETED, * from t1('acid.fetch.deleted.rows'='true') order by a;


create table t2(a int, c float) stored as orc tblproperties ('transactional'='true');

insert into t2(a,c) values (1, 1.0), (2, 2.0), (3, 3.3), (4, 4.4), (4, 4.5), (5, 5.5);

select t1.*, t2.* from t1
join t2 on t1.a = t2.a;
join t2 on t1.a = t2.a
order by t1.a;

select t1.ROW__IS__DELETED, t1.*, t2.ROW__IS__DELETED, t2.* from t1('acid.fetch.deleted.rows'='true')
join t2('acid.fetch.deleted.rows'='true') on t1.a = t2.a;
join t2('acid.fetch.deleted.rows'='true') on t1.a = t2.a
order by t1.a;

delete from t2 where a in (1, 4);

select t1.*, t2.* from t1
join t2 on t1.a = t2.a;
join t2 on t1.a = t2.a
order by t1.a;

select t1.ROW__IS__DELETED, t1.*, t2.ROW__IS__DELETED, t2.* from t1('acid.fetch.deleted.rows'='true')
join t2('acid.fetch.deleted.rows'='true') on t1.a = t2.a;
join t2('acid.fetch.deleted.rows'='true') on t1.a = t2.a
order by t1.a;
57 changes: 57 additions & 0 deletions ql/src/test/queries/clientpositive/fetch_deleted_rows_vector.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
set hive.support.concurrency=true;
set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;


create table t1(a int, b varchar(128)) stored as orc tblproperties ('transactional'='true');

insert into t1(a,b) values (1, 'one'), (2, 'two');

delete from t1 where a = 1;

insert into t1(a,b) values (3, 'three'), (4, 'four'), (4, 'four again'), (5, 'five');

explain vectorization
select t1.ROW__IS__DELETED, * from t1('acid.fetch.deleted.rows'='true') order by a;

select t1.ROW__IS__DELETED, * from t1('acid.fetch.deleted.rows'='true') order by a;


update t1
set b = 'updated'
where a = 3;

select t1.ROW__IS__DELETED, * from t1('acid.fetch.deleted.rows'='true') order by a;


create table t2(a int, c float) stored as orc tblproperties ('transactional'='true');

insert into t2(a,c) values (1, 1.0), (2, 2.0), (3, 3.3), (4, 4.4), (4, 4.5), (5, 5.5);

select t1.*, t2.* from t1
join t2 on t1.a = t2.a
order by t1.a;

select t1.ROW__IS__DELETED, t1.*, t2.ROW__IS__DELETED, t2.* from t1('acid.fetch.deleted.rows'='true')
join t2('acid.fetch.deleted.rows'='true') on t1.a = t2.a
order by t1.a;

delete from t2 where a in (1, 4);

select t1.*, t2.* from t1
join t2 on t1.a = t2.a
order by t1.a;

explain vectorization
select t1.ROW__IS__DELETED, t1.ROW__ID.writeId, t1.*, t2.ROW__IS__DELETED, t2.ROW__ID.writeId, t2.* from t1('acid.fetch.deleted.rows'='true')
join t2('acid.fetch.deleted.rows'='true') on t1.a = t2.a
order by t1.a;

select t1.ROW__IS__DELETED, t1.ROW__ID.writeId, t1.*, t2.ROW__IS__DELETED, t2.ROW__ID.writeId, t2.* from t1('acid.fetch.deleted.rows'='true')
join t2('acid.fetch.deleted.rows'='true') on t1.a = t2.a
order by t1.a;

set hive.transactional.events.mem=0;

select t1.ROW__IS__DELETED, t1.ROW__ID.writeId, t1.*, t2.ROW__IS__DELETED, t2.ROW__ID.writeId, t2.* from t1('acid.fetch.deleted.rows'='true')
join t2('acid.fetch.deleted.rows'='true') on t1.a = t2.a
order by t1.a;

0 comments on commit 6a7d4ba

Please sign in to comment.