Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-24991: Enable fetching deleted rows in vectorized mode #2264

Merged
merged 23 commits into from
Jun 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,20 @@ public boolean isVirtualColumnNeeded(String virtualColumnName) {

public int findVirtualColumnNum(VirtualColumn virtualColumn) {
// Virtual columns start after the last partition column.
int resultColumnNum = dataColumnCount + partitionColumnCount;
for (VirtualColumn neededVirtualColumn : neededVirtualColumns) {
if (neededVirtualColumn.equals(virtualColumn)) {
return resultColumnNum;
int partitionEndColumnNum = dataColumnCount + partitionColumnCount;
final int virtualEndColumnNum = partitionEndColumnNum + virtualColumnCount;
for (int virtualColumnNum = partitionEndColumnNum; virtualColumnNum < virtualEndColumnNum; virtualColumnNum++) {
String virtualColumnName = rowColumnNames[virtualColumnNum];
if (!virtualColumnName.equals(virtualColumn.getName())) {
continue;
}
if (!isVirtualColumnNeeded(virtualColumnName)) {
continue;
}
resultColumnNum++;

return virtualColumnNum;
}

return -1;
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ public class Vectorizer implements PhysicalPlanResolver {

// The set of virtual columns that vectorized readers *MAY* support.
public static final ImmutableSet<VirtualColumn> vectorizableVirtualColumns =
ImmutableSet.of(VirtualColumn.ROWID);
ImmutableSet.of(VirtualColumn.ROWID, VirtualColumn.ROWISDELETED);

private HiveConf hiveConf;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.common.type.DataTypePhysicalVariation;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.Utilities;
Expand All @@ -51,9 +53,12 @@

import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.JobConf;
Expand Down Expand Up @@ -961,6 +966,21 @@ private void testDeleteEventOriginalFiltering2() throws Exception {

@Test
public void testVectorizedOrcAcidRowBatchReader() throws Exception {
setupTestData();

testVectorizedOrcAcidRowBatchReader(ColumnizedDeleteEventRegistry.class.getName());

// To test the SortMergedDeleteEventRegistry, we need to explicitly set the
// HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY constant to a smaller value.
int oldValue = conf.getInt(HiveConf.ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname, 1000000);
conf.setInt(HiveConf.ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname, 1000);
testVectorizedOrcAcidRowBatchReader(SortMergedDeleteEventRegistry.class.getName());

// Restore the old value.
conf.setInt(HiveConf.ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname, oldValue);
}

private void setupTestData() throws IOException {
conf.set("bucket_count", "1");
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
Expand Down Expand Up @@ -1030,17 +1050,6 @@ public void testVectorizedOrcAcidRowBatchReader() throws Exception {
}
}
updater.close(false);

testVectorizedOrcAcidRowBatchReader(ColumnizedDeleteEventRegistry.class.getName());

// To test the SortMergedDeleteEventRegistry, we need to explicitly set the
// HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY constant to a smaller value.
int oldValue = conf.getInt(HiveConf.ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname, 1000000);
conf.setInt(HiveConf.ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname, 1000);
testVectorizedOrcAcidRowBatchReader(SortMergedDeleteEventRegistry.class.getName());

// Restore the old value.
conf.setInt(HiveConf.ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname, oldValue);
}


Expand Down Expand Up @@ -1085,6 +1094,75 @@ private void testVectorizedOrcAcidRowBatchReader(String deleteEventRegistry) thr
}
}
}

@Test
public void testFetchDeletedRowsUsingColumnizedDeleteEventRegistry() throws Exception {
setupTestData();
testFetchDeletedRows();
}

@Test
public void testFetchDeletedRowsUsingSortMergedDeleteEventRegistry() throws Exception {
setupTestData();

// To test the SortMergedDeleteEventRegistry, we need to explicitly set the
// HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY constant to a smaller value.
int oldValue = conf.getInt(HiveConf.ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname, 1000000);
try {
conf.setInt(HiveConf.ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname, 1000);
testFetchDeletedRows();
}
finally {
// Restore the old value.
conf.setInt(HiveConf.ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname, oldValue);
}
}

private void testFetchDeletedRows() throws Exception {
List<OrcInputFormat.SplitStrategy<?>> splitStrategies = getSplitStrategies();
List<OrcSplit> splits = ((OrcInputFormat.ACIDSplitStrategy) splitStrategies.get(0)).getSplits();

// Mark one of the transactions as an exception to test that invalid transactions
// are being handled properly.
conf.set(ValidWriteIdList.VALID_WRITEIDS_KEY, "tbl:14:1:1:5"); // Exclude transaction 5

// enable fetching deleted rows
conf.set(Constants.ACID_FETCH_DELETED_ROWS, "true");

// Project ROW__IS__DELETED
VectorizedRowBatchCtx rbCtx = new VectorizedRowBatchCtx(
new String[] { "payload", VirtualColumn.ROWISDELETED.getName() },
new TypeInfo[] { TypeInfoFactory.longTypeInfo, VirtualColumn.ROWISDELETED.getTypeInfo() },
new DataTypePhysicalVariation[] { DataTypePhysicalVariation.NONE, DataTypePhysicalVariation.NONE },
new int[] { 0 }, 0, 1,
new VirtualColumn[] { VirtualColumn.ROWISDELETED },
new String[0],
new DataTypePhysicalVariation[] { DataTypePhysicalVariation.NONE, DataTypePhysicalVariation.NONE });
VectorizedOrcAcidRowBatchReader vectorizedReader =
new VectorizedOrcAcidRowBatchReader(splits.get(0), conf, Reporter.NULL, rbCtx);
VectorizedRowBatch vectorizedRowBatch = rbCtx.createVectorizedRowBatch();
vectorizedRowBatch.setPartitionInfo(1, 0); // set data column count as 1.
long previousPayload = Long.MIN_VALUE;
while (vectorizedReader.next(null, vectorizedRowBatch)) {
LongColumnVector col = (LongColumnVector) vectorizedRowBatch.cols[0];
LongColumnVector rowIsDeletedColumnVector = (LongColumnVector) vectorizedRowBatch.cols[1];
for (int i = 0; i < vectorizedRowBatch.size; ++i) {
int idx = vectorizedRowBatch.selected[i];
long payload = col.vector[idx];
long owid = (payload / NUM_ROWID_PER_OWID) + 1;
long rowId = payload % NUM_ROWID_PER_OWID;
if (rowId % 2 == 0 || rowId % 3 == 0) {
assertEquals(1, rowIsDeletedColumnVector.vector[idx]);
} else {
assertEquals(0, rowIsDeletedColumnVector.vector[idx]);
}
assertTrue(owid != 5); // Check that writeid#5 has been excluded.
assertTrue(payload >= previousPayload); // Check that the data is in sorted order.
previousPayload = payload;
}
}
}

private List<OrcInputFormat.SplitStrategy<?>> getSplitStrategies() throws Exception {
conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
AcidUtils.AcidOperationalProperties.getDefault().toInt());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,5 @@ group by t;
set hive.exec.orc.split.strategy=BI;
select t, count(*) from over10k_orc
group by t;

select oo.ROW__ID.writeId, oo.ROW__IS__DELETED, oo.* from over10k_orc('acid.fetch.deleted.rows'='true') oo order by si;
19 changes: 12 additions & 7 deletions ql/src/test/queries/clientpositive/fetch_deleted_rows.q
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
set hive.support.concurrency=true;
set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
SET hive.vectorized.execution.enabled=false;
set hive.vectorized.execution.enabled=false;

create table t1(a int, b varchar(128)) stored as orc tblproperties ('transactional'='true');

Expand All @@ -10,29 +10,34 @@ delete from t1 where a = 1;

insert into t1(a,b) values (3, 'three'), (4, 'four'), (4, 'four again'), (5, 'five');

select t1.ROW__IS__DELETED, * from t1('acid.fetch.deleted.rows'='true');
select t1.ROW__IS__DELETED, * from t1('acid.fetch.deleted.rows'='true') order by a;


update t1
set b = 'updated'
where a = 3;

select t1.ROW__IS__DELETED, * from t1('acid.fetch.deleted.rows'='true');
select t1.ROW__IS__DELETED, * from t1('acid.fetch.deleted.rows'='true') order by a;


create table t2(a int, c float) stored as orc tblproperties ('transactional'='true');

insert into t2(a,c) values (1, 1.0), (2, 2.0), (3, 3.3), (4, 4.4), (4, 4.5), (5, 5.5);

select t1.*, t2.* from t1
join t2 on t1.a = t2.a;
join t2 on t1.a = t2.a
order by t1.a;

select t1.ROW__IS__DELETED, t1.*, t2.ROW__IS__DELETED, t2.* from t1('acid.fetch.deleted.rows'='true')
join t2('acid.fetch.deleted.rows'='true') on t1.a = t2.a;
join t2('acid.fetch.deleted.rows'='true') on t1.a = t2.a
order by t1.a;

delete from t2 where a in (1, 4);

select t1.*, t2.* from t1
join t2 on t1.a = t2.a;
join t2 on t1.a = t2.a
order by t1.a;

select t1.ROW__IS__DELETED, t1.*, t2.ROW__IS__DELETED, t2.* from t1('acid.fetch.deleted.rows'='true')
join t2('acid.fetch.deleted.rows'='true') on t1.a = t2.a;
join t2('acid.fetch.deleted.rows'='true') on t1.a = t2.a
order by t1.a;
57 changes: 57 additions & 0 deletions ql/src/test/queries/clientpositive/fetch_deleted_rows_vector.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
set hive.support.concurrency=true;
set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;


create table t1(a int, b varchar(128)) stored as orc tblproperties ('transactional'='true');

insert into t1(a,b) values (1, 'one'), (2, 'two');

delete from t1 where a = 1;

insert into t1(a,b) values (3, 'three'), (4, 'four'), (4, 'four again'), (5, 'five');

explain vectorization
select t1.ROW__IS__DELETED, * from t1('acid.fetch.deleted.rows'='true') order by a;

select t1.ROW__IS__DELETED, * from t1('acid.fetch.deleted.rows'='true') order by a;


update t1
set b = 'updated'
where a = 3;

select t1.ROW__IS__DELETED, * from t1('acid.fetch.deleted.rows'='true') order by a;


create table t2(a int, c float) stored as orc tblproperties ('transactional'='true');

insert into t2(a,c) values (1, 1.0), (2, 2.0), (3, 3.3), (4, 4.4), (4, 4.5), (5, 5.5);

select t1.*, t2.* from t1
join t2 on t1.a = t2.a
order by t1.a;

select t1.ROW__IS__DELETED, t1.*, t2.ROW__IS__DELETED, t2.* from t1('acid.fetch.deleted.rows'='true')
join t2('acid.fetch.deleted.rows'='true') on t1.a = t2.a
order by t1.a;

delete from t2 where a in (1, 4);

select t1.*, t2.* from t1
join t2 on t1.a = t2.a
order by t1.a;

explain vectorization
select t1.ROW__IS__DELETED, t1.ROW__ID.writeId, t1.*, t2.ROW__IS__DELETED, t2.ROW__ID.writeId, t2.* from t1('acid.fetch.deleted.rows'='true')
join t2('acid.fetch.deleted.rows'='true') on t1.a = t2.a
order by t1.a;

select t1.ROW__IS__DELETED, t1.ROW__ID.writeId, t1.*, t2.ROW__IS__DELETED, t2.ROW__ID.writeId, t2.* from t1('acid.fetch.deleted.rows'='true')
join t2('acid.fetch.deleted.rows'='true') on t1.a = t2.a
order by t1.a;

set hive.transactional.events.mem=0;

select t1.ROW__IS__DELETED, t1.ROW__ID.writeId, t1.*, t2.ROW__IS__DELETED, t2.ROW__ID.writeId, t2.* from t1('acid.fetch.deleted.rows'='true')
join t2('acid.fetch.deleted.rows'='true') on t1.a = t2.a
order by t1.a;