Skip to content
Permalink
Browse files
DRILL-6606: Fixed bug in HashJoin that caused it not to return OK_NEW…
…_SCHEMA in some cases.

closes #1384
  • Loading branch information
ilooner authored and sohami committed Jul 21, 2018
1 parent 4fd9cba commit 91b5e4d0d84dae2c6af19f82df11bb4c493a6ce8
Showing 2 changed files with 163 additions and 49 deletions.
@@ -149,6 +149,11 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
private RecordBatch buildBatch;
private RecordBatch probeBatch;

/**
* Flag indicating whether or not the first data holding batch needs to be fetched.
*/
private boolean prefetched;

// For handling spilling
private SpillSet spillSet;
HashJoinPOP popConfig;
@@ -174,7 +179,7 @@ public static class HJSpilledPartition {
/**
* Queue of spilled partitions to process.
*/
private ArrayList<HJSpilledPartition> spilledPartitionsList;
private ArrayList<HJSpilledPartition> spilledPartitionsList = new ArrayList<>();
private HJSpilledPartition spilledInners[]; // for the outer to find the partition

public enum Metric implements MetricDef {
@@ -213,86 +218,145 @@ public int getRecordCount() {

@Override
protected void buildSchema() throws SchemaChangeException {
if (! prefetchFirstBatchFromBothSides()) {
return;
// We must first get the schemas from upstream operators before we can build
// our schema.
boolean validSchema = sniffNewSchemas();

if (validSchema) {
// We are able to construct a valid schema from the upstream data.
// Setting the state here makes sure AbstractRecordBatch returns OK_NEW_SCHEMA
state = BatchState.BUILD_SCHEMA;
} else {
verifyOutcomeToSetBatchState(leftUpstream, rightUpstream);
}

// If we have a valid schema, this will build a valid container. If we were unable to obtain a valid schema,
// we still need to build a dummy schema. These code handles both cases for us.
setupOutputContainerSchema();
container.buildSchema(BatchSchema.SelectionVectorMode.NONE);

// Initialize the hash join helper context
if (rightUpstream != IterOutcome.NONE) {
if (rightUpstream == IterOutcome.OK_NEW_SCHEMA) {
// We only need the hash tables if we have data on the build side.
setupHashTable();
}
setupOutputContainerSchema();

try {
hashJoinProbe = setupHashJoinProbe();
} catch (IOException | ClassTransformationException e) {
throw new SchemaChangeException(e);
}

container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
}

@Override
protected boolean prefetchFirstBatchFromBothSides() {
leftUpstream = sniffNonEmptyBatch(0, left);
rightUpstream = sniffNonEmptyBatch(1, right);
if (leftUpstream != IterOutcome.NONE) {
// We can only get data if there is data available
leftUpstream = sniffNonEmptyBatch(leftUpstream, LEFT_INDEX, left);
}

if (rightUpstream != IterOutcome.NONE) {
// We can only get data if there is data available
rightUpstream = sniffNonEmptyBatch(rightUpstream, RIGHT_INDEX, right);
}

buildSideIsEmpty = rightUpstream == IterOutcome.NONE;

// For build side, use aggregate i.e. average row width across batches
batchMemoryManager.update(LEFT_INDEX, 0);
batchMemoryManager.update(RIGHT_INDEX, 0, true);
if (verifyOutcomeToSetBatchState(leftUpstream, rightUpstream)) {
// For build side, use aggregate i.e. average row width across batches
batchMemoryManager.update(LEFT_INDEX, 0);
batchMemoryManager.update(RIGHT_INDEX, 0, true);

logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));

if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) {
state = BatchState.STOP;
// Got our first batche(s)
state = BatchState.FIRST;
return true;
} else {
return false;
}
}

if (leftUpstream == IterOutcome.OUT_OF_MEMORY || rightUpstream == IterOutcome.OUT_OF_MEMORY) {
state = BatchState.OUT_OF_MEMORY;
return false;
/**
* Sniffs all data necessary to construct a schema.
* @return True if all the data necessary to construct a schema has been retrieved. False otherwise.
*/
private boolean sniffNewSchemas() {
do {
// Ask for data until we get a valid result.
leftUpstream = next(LEFT_INDEX, left);
} while (leftUpstream == IterOutcome.NOT_YET);

boolean isValidLeft = false;

switch (leftUpstream) {
case OK_NEW_SCHEMA:
probeSchema = probeBatch.getSchema();
case NONE:
isValidLeft = true;
break;
case OK:
case EMIT:
throw new IllegalStateException("Unsupported outcome while building schema " + leftUpstream);
default:
// Termination condition
}

if (checkForEarlyFinish(leftUpstream, rightUpstream)) {
state = BatchState.DONE;
return false;
do {
// Ask for data until we get a valid result.
rightUpstream = next(RIGHT_INDEX, right);
} while (rightUpstream == IterOutcome.NOT_YET);

boolean isValidRight = false;

switch (rightUpstream) {
case OK_NEW_SCHEMA:
// We need to have the schema of the build side even when the build side is empty
rightSchema = buildBatch.getSchema();
// position of the new "column" for keeping the hash values (after the real columns)
rightHVColPosition = buildBatch.getContainer().getNumberOfColumns();
case NONE:
isValidRight = true;
break;
case OK:
case EMIT:
throw new IllegalStateException("Unsupported outcome while building schema " + leftUpstream);
default:
// Termination condition
}

state = BatchState.FIRST; // Got our first batches on both sides
return true;
// Left and right sides must return a valid response and both sides cannot be NONE.
return (isValidLeft && isValidRight) &&
(leftUpstream != IterOutcome.NONE && rightUpstream != IterOutcome.NONE);
}

/**
* Currently in order to accurately predict memory usage for spilling, the first non-empty build side and probe side batches are needed. This method
* fetches the first non-empty batch from the left or right side.
* @param curr The current outcome.
* @param inputIndex Index specifying whether to work with the left or right input.
* @param recordBatch The left or right record batch.
* @return The {@link org.apache.drill.exec.record.RecordBatch.IterOutcome} for the left or right record batch.
*/
private IterOutcome sniffNonEmptyBatch(int inputIndex, RecordBatch recordBatch) {
private IterOutcome sniffNonEmptyBatch(IterOutcome curr, int inputIndex, RecordBatch recordBatch) {
while (true) {
IterOutcome outcome = next(inputIndex, recordBatch);
if (recordBatch.getRecordCount() != 0) {
return curr;
}

switch (outcome) {
case OK_NEW_SCHEMA:
if ( inputIndex == 0 ) {
// Indicate that a schema was seen (in case probe side is empty)
probeSchema = probeBatch.getSchema();
} else {
// We need to have the schema of the build side even when the build side is empty
rightSchema = buildBatch.getSchema();
// position of the new "column" for keeping the hash values (after the real columns)
rightHVColPosition = buildBatch.getContainer().getNumberOfColumns();
// new schema can also contain records
}
curr = next(inputIndex, recordBatch);

switch (curr) {
case OK:
if (recordBatch.getRecordCount() == 0) {
continue;
}
// We got a non empty batch
// We got a data batch
break;
case NOT_YET:
// We need to try again
break;
default:
// Other cases termination conditions
return outcome;
return curr;
}
}
}
@@ -317,7 +381,25 @@ public HashJoinMemoryCalculator getCalculatorImpl() {

@Override
public IterOutcome innerNext() {
// In case incoming was killed before, just cleanup and return
if (!prefetched) {
// If we didn't retrieve our first data hold batch, we need to do it now.
prefetched = true;
prefetchFirstBatchFromBothSides();

// Handle emitting the correct outcome for termination conditions
// Use the state set by prefetchFirstBatchFromBothSides to emit the correct termination outcome.
switch (state) {
case DONE:
return IterOutcome.NONE;
case STOP:
return IterOutcome.STOP;
case OUT_OF_MEMORY:
return IterOutcome.OUT_OF_MEMORY;
default:
// No termination condition so continue processing.
}
}

if ( wasKilled ) {
this.cleanup();
super.close();
@@ -504,13 +586,8 @@ private void delayedSetup() {
partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5

// Create the FIFO list of spilled partitions (pairs - inner/outer)
spilledPartitionsList = new ArrayList<>();

// Create array for the partitions
partitions = new HashPartition[numPartitions];

buildSideIsEmpty = false;
}

/**
@@ -19,16 +19,24 @@

import org.apache.drill.categories.OperatorTest;
import org.apache.drill.categories.UnlikelyTest;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.test.BaseTestQuery;
import org.apache.drill.test.rowSet.schema.SchemaBuilder;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.nio.file.Paths;
import java.util.List;
import java.util.regex.Pattern;

@Category(OperatorTest.class)
@@ -206,4 +214,33 @@ public void testJoinOrdering() throws Exception {
final Pattern sortHashJoinPattern = Pattern.compile(".*Sort.*HashJoin", Pattern.DOTALL);
testPlanMatchingPatterns(query, new Pattern[]{sortHashJoinPattern}, null);
}

@Test // DRILL-6606
public void testJoinLimit0Schema() throws Exception {
String query = "SELECT l.l_quantity, l.l_shipdate, o.o_custkey\n" +
"FROM (SELECT * FROM cp.`tpch/lineitem.parquet` LIMIT 0) l\n" +
" JOIN (SELECT * FROM cp.`tpch/orders.parquet` LIMIT 0) o \n" +
" ON l.l_orderkey = o.o_orderkey\n";
final List<QueryDataBatch> dataBatches = client.runQuery(UserBitShared.QueryType.SQL, query);

Assert.assertEquals(1, dataBatches.size());

final QueryDataBatch queryDataBatch = dataBatches.get(0);
final RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator());

try {
batchLoader.load(queryDataBatch.getHeader().getDef(), queryDataBatch.getData());

final BatchSchema actualSchema = batchLoader.getSchema();
final BatchSchema expectedSchema = new SchemaBuilder()
.add("l_quantity", TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.REQUIRED)
.add("l_shipdate", TypeProtos.MinorType.DATE, TypeProtos.DataMode.REQUIRED)
.add("o_custkey", TypeProtos.MinorType.INT, TypeProtos.DataMode.REQUIRED)
.build();

Assert.assertTrue(expectedSchema.isEquivalent(actualSchema));
} finally {
batchLoader.clear();
}
}
}

0 comments on commit 91b5e4d

Please sign in to comment.