Skip to content
Permalink
Browse files
DRILL-6542 : IndexOutOfBoundsException for multilevel lateral queries…
… with schema changed partitioned complex data

closes #1374
  • Loading branch information
Sorabh Hamirwasia authored and sohami committed Jul 13, 2018
1 parent 94186fc commit 4168e1e84d57b15d7667f7a768a0a47a577d0e79
Showing 4 changed files with 107 additions and 16 deletions.
@@ -64,4 +64,6 @@ void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing,
* time a new batch comes in.
*/
void resetGroupIndex();

void close();
}
@@ -25,6 +25,7 @@
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.vector.SchemaChangeCallBack;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.RepeatedValueVector;
import org.slf4j.Logger;
@@ -51,6 +52,7 @@ public class UnnestImpl implements Unnest {
private SelectionVectorMode svMode;
private RepeatedValueVector fieldToUnnest;
private RepeatedValueVector.RepeatedAccessor accessor;
private RecordBatch outgoing;

/**
* The output batch limit starts at OUTPUT_ROW_COUNT, but may be decreased
@@ -97,8 +99,16 @@ public final int unnestRecords(final int recordCount) {

logger.debug("Unnest: currentRecord: {}, innerValueCount: {}, record count: {}, output limit: {}", innerValueCount,
recordCount, outputLimit);
final SchemaChangeCallBack callBack = new SchemaChangeCallBack();
for (TransferPair t : transfers) {
t.splitAndTransfer(innerValueIndex, count);

// Get the corresponding ValueVector in output container and transfer the data
final ValueVector vectorWithData = t.getTo();
final ValueVector outputVector = outgoing.getContainer().addOrGet(vectorWithData.getField(), callBack);
Preconditions.checkState(!callBack.getSchemaChangedAndReset(), "Outgoing container doesn't have " +
"expected ValueVector of type %s, present in TransferPair of unnest field", vectorWithData.getClass());
vectorWithData.makeTransferPair(outputVector).transfer();
}
innerValueIndex += count;
return count;
@@ -110,6 +120,7 @@ public final void setup(FragmentContext context, RecordBatch incoming, RecordBat
List<TransferPair> transfers, LateralContract lateral) throws SchemaChangeException {

this.svMode = incoming.getSchema().getSelectionVectorMode();
this.outgoing = outgoing;
if (svMode == NONE) {
this.transfers = ImmutableList.copyOf(transfers);
this.lateral = lateral;
@@ -123,4 +134,13 @@ public void resetGroupIndex() {
this.innerValueIndex = 0;
}

@Override
public void close() {
if (transfers != null) {
for (TransferPair tp : transfers) {
tp.getTo().close();
}
transfers = null;
}
}
}
@@ -24,6 +24,7 @@
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
import org.apache.drill.exec.physical.config.UnnestPOP;
@@ -48,7 +49,7 @@
public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPOP> {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnnestRecordBatch.class);

private Unnest unnest;
private Unnest unnest = new UnnestImpl();
private boolean hasNewSchema = false; // set to true if a new schema was encountered and an empty batch was
// sent. The next iteration, we need to make sure the record batch sizer
// is updated before we process the actual data.
@@ -234,8 +235,23 @@ public IterOutcome innerNext() {
return IterOutcome.STOP;
}
return OK_NEW_SCHEMA;
}
// else
} else { // Unnest field schema didn't changed but new left empty/nonempty batch might come with OK_NEW_SCHEMA
try {
// This means even though there is no schema change for unnest field the reference of unnest field
// ValueVector must have changed hence we should just refresh the transfer pairs and keep output vector
// same as before. In case when new left batch is received with SchemaChange but was empty Lateral will
// not call next on unnest and will change it's left outcome to OK. Whereas for non-empty batch next will
// be called on unnest by Lateral. Hence UNNEST cannot rely on lateral current outcome to setup transfer
// pair. It should do for each new left incoming batch.
resetUnnestTransferPair();
container.zeroVectors();
} catch (SchemaChangeException ex) {
kill(false);
logger.error("Failure during query", ex);
context.getExecutorState().fail(ex);
return IterOutcome.STOP;
}
} // else
unnest.resetGroupIndex();
memoryManager.update();
}
@@ -353,26 +369,27 @@ protected IterOutcome doWork() {
return tp;
}

@Override
protected boolean setupNewSchema() throws SchemaChangeException {
Preconditions.checkNotNull(lateral);
container.clear();
recordCount = 0;
private TransferPair resetUnnestTransferPair() throws SchemaChangeException {
final List<TransferPair> transfers = Lists.newArrayList();

final FieldReference fieldReference = new FieldReference(popConfig.getColumn());

final TransferPair transferPair = getUnnestFieldTransferPair(fieldReference);

final ValueVector unnestVector = transferPair.getTo();
transfers.add(transferPair);
container.add(unnestVector);
logger.debug("Added transfer for unnest expression.");
container.buildSchema(SelectionVectorMode.NONE);

this.unnest = new UnnestImpl();
unnest.close();
unnest.setup(context, incoming, this, transfers, lateral);
setUnnestVector();
return transferPair;
}

@Override
protected boolean setupNewSchema() throws SchemaChangeException {
Preconditions.checkNotNull(lateral);
container.clear();
recordCount = 0;
unnest = new UnnestImpl();
final TransferPair tp = resetUnnestTransferPair();
container.add(TypeHelper.getNewVector(tp.getTo().getField(), oContext.getAllocator()));
container.buildSchema(SelectionVectorMode.NONE);
return true;
}

@@ -428,6 +445,7 @@ private void updateStats() {
@Override
public void close() {
updateStats();
unnest.close();
super.close();
}

@@ -370,6 +370,37 @@ public void testSchemaChangeOnNonUnnestColumn() throws Exception {
}
}

/**
* This test is different than {@link TestE2EUnnestAndLateral#testSchemaChangeOnNonUnnestColumn()} because with
* multilevel when the first Lateral see's a schema change it creates a new batch with new vector references. Hence
* the second lateral will receive a new incoming with new vector references with OK_NEW_SCHEMA outcome. Now even
* though there is schema change for non-unnest column the second Unnest has to again setup it's transfer pairs since
* vector reference for unnest field has changed for second Unnest.
* Whereas in other test since there is only 1 Lateral followed by Scan, the incoming for lateral which has
* schema change will be handled by Scan in such a way that it only updates vector of affected column. Hence in this
* case vector corresponding to unnest field will not be affected and it will work fine.
* @throws Exception
*/
@Test
public void testSchemaChangeOnNonUnnestColumn_InMultilevelCase() throws Exception {

try {
dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin", "multipleFiles", schemaChangeFile_1));
String sql = "SELECT customer.c_custkey, customer.c_name, customer.c_nationkey, orders.orderkey, " +
"orders.totalprice, olineitems.l_partkey, olineitems.l_linenumber, olineitems.l_quantity " +
"FROM dfs.`lateraljoin/multipleFiles` customer, " +
"LATERAL (SELECT t1.o.o_orderkey as orderkey, t1.o.o_totalprice as totalprice, t1.o.o_lineitems as lineitems " +
"FROM UNNEST(customer.c_orders) t1(o)) orders, " +
"LATERAL (SELECT t2.l.l_partkey as l_partkey, t2.l.l_linenumber as l_linenumber, t2.l.l_quantity as l_quantity " +
"FROM UNNEST(orders.lineitems) t2(l)) olineitems";
test(sql);
} catch (Exception ex) {
fail();
} finally {
dirTestWatcher.removeFileFromRoot(Paths.get("lateraljoin", "multipleFiles", schemaChangeFile_1));
}
}

@Test
public void testSchemaChangeOnUnnestColumn() throws Exception {
try {
@@ -386,6 +417,26 @@ public void testSchemaChangeOnUnnestColumn() throws Exception {
}
}

@Test
public void testSchemaChangeOnUnnestColumn_InMultilevelCase() throws Exception {
try {
dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin", "multipleFiles", schemaChangeFile_2));

String sql = "SELECT customer.c_custkey, customer.c_name, customer.c_nationkey, orders.orderkey, " +
"orders.totalprice, orders.spriority, olineitems.l_partkey, olineitems.l_linenumber, olineitems.l_quantity " +
"FROM dfs.`lateraljoin/multipleFiles` customer, " +
"LATERAL (SELECT t1.o.o_orderkey as orderkey, t1.o.o_totalprice as totalprice, t1.o.o_lineitems as lineitems," +
" t1.o.o_shippriority as spriority FROM UNNEST(customer.c_orders) t1(o)) orders, " +
"LATERAL (SELECT t2.l.l_partkey as l_partkey, t2.l.l_linenumber as l_linenumber, t2.l.l_quantity as l_quantity " +
"FROM UNNEST(orders.lineitems) t2(l)) olineitems";
test(sql);
} catch (Exception ex) {
fail();
} finally {
dirTestWatcher.removeFileFromRoot(Paths.get("lateraljoin", "multipleFiles", schemaChangeFile_2));
}
}

@Test
public void testSchemaChangeOnMultipleColumns() throws Exception {
try {

0 comments on commit 4168e1e

Please sign in to comment.