From faef4298668c26b1d12b3ee6f60bf3d6674efc83 Mon Sep 17 00:00:00 2001 From: Mehant Baid Date: Tue, 25 Feb 2014 01:09:06 -0800 Subject: [PATCH] DRILL-505 Hash Join basic changes Support for left outer, right outer and full joins Tests and multi batch outer join WIP Support for multiple join conditions Add following tests - Multiple condition join - Join on JSON scan - Multi batch join - Simple equality join Fix memory leaks Cleanup Move logic that invokes run time generated code into the template. Incorporate suggestions by Aman. Cleanup --- .../base/AbstractPhysicalVisitor.java | 5 + .../exec/physical/base/PhysicalVisitor.java | 1 + .../exec/physical/config/HashJoinPOP.java | 143 +++++++ .../impl/common/ChainedHashTable.java | 40 +- .../exec/physical/impl/common/HashTable.java | 2 +- .../impl/common/HashTableTemplate.java | 11 +- .../physical/impl/join/HashJoinBatch.java | 355 ++++++++++++++++++ .../impl/join/HashJoinBatchCreator.java | 37 ++ .../physical/impl/join/HashJoinHelper.java | 222 +++++++++++ .../physical/impl/join/HashJoinProbe.java | 56 +++ .../impl/join/HashJoinProbeTemplate.java | 226 +++++++++++ .../exec/physical/impl/join/TestHashJoin.java | 216 +++++++++++ .../drill/exec/pop/PopUnitTestBase.java | 2 +- .../src/test/resources/build_side_input.json | 24 ++ .../src/test/resources/join/hash_join.json | 63 ++++ .../resources/join/hash_join_multi_batch.json | 47 +++ .../join/hj_left_outer_multi_batch.json | 48 +++ .../join/hj_multi_condition_join.json | 66 ++++ .../join/hj_right_outer_multi_batch.json | 48 +++ .../src/test/resources/probe_side_input.json | 28 ++ 20 files changed, 1618 insertions(+), 22 deletions(-) create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java create mode 100644 exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java create mode 100644 exec/java-exec/src/test/resources/build_side_input.json create mode 100644 exec/java-exec/src/test/resources/join/hash_join.json create mode 100644 exec/java-exec/src/test/resources/join/hash_join_multi_batch.json create mode 100644 exec/java-exec/src/test/resources/join/hj_left_outer_multi_batch.json create mode 100644 exec/java-exec/src/test/resources/join/hj_multi_condition_join.json create mode 100644 exec/java-exec/src/test/resources/join/hj_right_outer_multi_batch.json create mode 100644 exec/java-exec/src/test/resources/probe_side_input.json diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java index 5f37487a2ad..abafc423f16 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java @@ -105,6 +105,11 @@ public T visitMergeJoin(MergeJoinPOP join, X value) throws E { return visitOp(join, value); } + @Override + public T visitHashJoin(HashJoinPOP join, X value) throws E { + return visitOp(join, value); + } + @Override public T visitHashPartitionSender(HashPartitionSender op, X value) throws E { return visitSender(op, value); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java index 712fafe8a56..208dab97851 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java @@ -41,6 +41,7 @@ public interface PhysicalVisitor { public RETURN visitSort(Sort sort, EXTRA value) throws EXCEP; public RETURN visitLimit(Limit limit, EXTRA value) throws EXCEP; public RETURN visitMergeJoin(MergeJoinPOP join, EXTRA value) throws EXCEP; + public RETURN visitHashJoin(HashJoinPOP join, EXTRA value) throws EXCEP; public RETURN visitSender(Sender sender, EXTRA value) throws EXCEP; public RETURN visitReceiver(Receiver receiver, EXTRA value) throws EXCEP; public RETURN visitStreamingAggregate(StreamingAggregate agg, EXTRA value) throws EXCEP; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java new file mode 100644 index 00000000000..f4a1fc7494c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.physical.config; + +import com.fasterxml.jackson.annotation.JsonTypeName; + +import java.util.Iterator; +import java.util.List; + +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.logical.data.JoinCondition; +import org.apache.drill.common.logical.data.NamedExpression; +import org.apache.drill.exec.physical.OperatorCost; +import org.apache.drill.exec.physical.base.AbstractBase; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalVisitor; +import org.apache.drill.exec.physical.base.Size; + +import com.beust.jcommander.internal.Lists; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterators; +import org.apache.drill.exec.physical.impl.common.HashTable; +import org.apache.drill.exec.physical.impl.common.HashTableConfig; + +import org.eigenbase.rel.JoinRelType; + +@JsonTypeName("hash-join") +public class HashJoinPOP extends AbstractBase { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashJoinPOP.class); + + + private final PhysicalOperator left; + private final PhysicalOperator right; + private final List conditions; + private final JoinRelType joinType; + private final HashTableConfig htConfig; + + @Override + public OperatorCost getCost() { + return new OperatorCost(0,0,0,0); + } + + @JsonCreator + public HashJoinPOP( + @JsonProperty("left") PhysicalOperator left, + @JsonProperty("right") PhysicalOperator right, + @JsonProperty("join-conditions") List conditions, + @JsonProperty("join-type") JoinRelType joinType + ) { + this.left = left; + this.right = right; + this.conditions = conditions; + this.joinType = joinType; + + int conditionsSize = conditions.size(); + + NamedExpression rightExpr[] = new NamedExpression[conditionsSize]; + NamedExpression leftExpr[] = new NamedExpression[conditionsSize]; + + for (int i = 0; i < conditionsSize; i++) { + rightExpr[i] = new NamedExpression(conditions.get(i).getRight(), new FieldReference("build_side_" + i )); + leftExpr[i] = new NamedExpression(conditions.get(i).getLeft(), new FieldReference("probe_side_" + i)); + + // Hash join only supports equality currently. + assert conditions.get(i).getRelationship().equals("=="); + } + + this.htConfig = new HashTableConfig(HashTable.DEFAULT_INITIAL_CAPACITY, + HashTable.DEFAULT_LOAD_FACTOR, + rightExpr, leftExpr); + } + + @Override + public Size getSize() { + return left.getSize().add(right.getSize()); + } + + @Override + public T accept(PhysicalVisitor physicalVisitor, X value) throws E { + return physicalVisitor.visitHashJoin(this, value); + } + + @Override + public PhysicalOperator getNewWithChildren(List children) { + Preconditions.checkArgument(children.size() == 2); + return new HashJoinPOP(children.get(0), children.get(1), conditions, joinType); + } + + @Override + public Iterator iterator() { + return Iterators.forArray(left, right); + } + + public PhysicalOperator getLeft() { + return left; + } + + public PhysicalOperator getRight() { + return right; + } + + public JoinRelType getJoinType() { + return joinType; + } + + public List getConditions() { + return conditions; + } + + public HashTableConfig getHtConfig() { + return htConfig; + } + + public HashJoinPOP flipIfRight(){ + if(joinType == JoinRelType.RIGHT){ + List flippedConditions = Lists.newArrayList(conditions.size()); + for(JoinCondition c : conditions){ + flippedConditions.add(c.flip()); + } + return new HashJoinPOP(right, left, flippedConditions, JoinRelType.LEFT); + }else{ + return this; + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java index 507be33e424..ec579fc5ba6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java @@ -84,6 +84,7 @@ public class ChainedHashTable { private final MappingSet KeyMatchIncomingBuildMapping = new MappingSet("incomingRowIdx", null, "incomingBuild", null, KEY_MATCH_BUILD, KEY_MATCH_BUILD); private final MappingSet KeyMatchIncomingProbeMapping = new MappingSet("incomingRowIdx", null, "incomingProbe", null, KEY_MATCH_PROBE, KEY_MATCH_PROBE); private final MappingSet KeyMatchHtableMapping = new MappingSet("htRowIdx", null, "htContainer", null, KEY_MATCH_BUILD, KEY_MATCH_BUILD); + private final MappingSet KeyMatchHtableProbeMapping = new MappingSet("htRowIdx", null, "htContainer", null, KEY_MATCH_PROBE, KEY_MATCH_PROBE); private final MappingSet GetHashIncomingBuildMapping = new MappingSet("incomingRowIdx", null, "incomingBuild", null, GET_HASH_BUILD, GET_HASH_BUILD); private final MappingSet GetHashIncomingProbeMapping = new MappingSet("incomingRowIdx", null, "incomingProbe", null, GET_HASH_PROBE, GET_HASH_PROBE); private final MappingSet SetValueMapping = new MappingSet("incomingRowIdx" /* read index */, "htRowIdx" /* write index */, "incomingBuild" /* read container */, "htContainer" /* write container */, SET_VALUE, SET_VALUE); @@ -114,10 +115,6 @@ public HashTable createAndSetupHashTable (TypedFieldId[] outKeyFieldIds) throws ClassGenerator cg = top.getRoot(); ClassGenerator cgInner = cg.getInnerGenerator("BatchHolder"); - if (outKeyFieldIds.length > htConfig.getKeyExprsBuild().length) { - throw new IllegalArgumentException("Mismatched number of output key fields."); - } - LogicalExpression[] keyExprsBuild = new LogicalExpression[htConfig.getKeyExprsBuild().length]; LogicalExpression[] keyExprsProbe = null; boolean isProbe = (htConfig.getKeyExprsProbe() != null) ; @@ -146,27 +143,34 @@ public HashTable createAndSetupHashTable (TypedFieldId[] outKeyFieldIds) throws i++; } - if (isProbe) { + if (isProbe) { + i = 0; for (NamedExpression ne : htConfig.getKeyExprsProbe()) { final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incomingProbe, collector, context.getFunctionRegistry()); if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); if (expr == null) continue; keyExprsProbe[i] = expr; + i++; } } // generate code for isKeyMatch(), setValue(), getHash() and outputRecordKeys() - setupIsKeyMatchInternal(cgInner, KeyMatchIncomingBuildMapping, KeyMatchHtableMapping, keyExprsBuild, htKeyFieldIds); - setupIsKeyMatchInternal(cgInner, KeyMatchIncomingProbeMapping, KeyMatchHtableMapping, keyExprsProbe, htKeyFieldIds) ; + setupIsKeyMatchInternal(cgInner, KeyMatchIncomingProbeMapping, KeyMatchHtableProbeMapping, keyExprsProbe, htKeyFieldIds) ; setupSetValue(cgInner, keyExprsBuild, htKeyFieldIds); - setupOutputRecordKeys(cgInner, htKeyFieldIds, outKeyFieldIds); + if (outgoing != null) { + + if (outKeyFieldIds.length > htConfig.getKeyExprsBuild().length) { + throw new IllegalArgumentException("Mismatched number of output key fields."); + } + } + setupOutputRecordKeys(cgInner, htKeyFieldIds, outKeyFieldIds); setupGetHash(cg /* use top level code generator for getHash */, GetHashIncomingBuildMapping, keyExprsBuild); setupGetHash(cg /* use top level code generator for getHash */, GetHashIncomingProbeMapping, keyExprsProbe); - HashTable ht = context.getImplementationClass(top); + HashTable ht = context.getImplementationClass(top); ht.setup(htConfig, context, incomingBuild, incomingProbe, outgoing, htContainerOrig); return ht; @@ -227,14 +231,18 @@ private void setupOutputRecordKeys(ClassGenerator cg, TypedFieldId[] cg.setMappingSet(OutputRecordKeysMapping); - for (int i = 0; i < outKeyFieldIds.length; i++) { - ValueVectorReadExpression vvrExpr = new ValueVectorReadExpression(htKeyFieldIds[i]); - ValueVectorWriteExpression vvwExpr = new ValueVectorWriteExpression(outKeyFieldIds[i], vvrExpr, true); - HoldingContainer hc = cg.addExpr(vvwExpr); - cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE); - } + if (outKeyFieldIds != null) { + for (int i = 0; i < outKeyFieldIds.length; i++) { + ValueVectorReadExpression vvrExpr = new ValueVectorReadExpression(htKeyFieldIds[i]); + ValueVectorWriteExpression vvwExpr = new ValueVectorWriteExpression(outKeyFieldIds[i], vvrExpr, true); + HoldingContainer hc = cg.addExpr(vvwExpr); + cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE); + } - cg.getEvalBlock()._return(JExpr.TRUE); + cg.getEvalBlock()._return(JExpr.TRUE); + } else { + cg.getEvalBlock()._return(JExpr.FALSE); + } } private void setupGetHash(ClassGenerator cg, MappingSet incomingMapping, LogicalExpression[] keyExprs) throws SchemaChangeException { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java index d9321b90a1f..2f1172a6c8b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java @@ -49,7 +49,7 @@ public void setup(HashTableConfig htConfig, FragmentContext context, public PutStatus put(int incomingRowIdx, IntHolder htIdxHolder); - public int containsKey(int incomingRowIdx); + public int containsKey(int incomingRowIdx, boolean isProbe); public int size(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java index 5a7a6fac4ed..7045b25cf25 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java @@ -372,11 +372,13 @@ public boolean isEmpty() { } public void clear() { - for (BatchHolder bh : batchHolders) { - bh.clear(); + if (batchHolders != null) { + for (BatchHolder bh : batchHolders) { + bh.clear(); + } + batchHolders.clear(); + batchHolders = null; } - batchHolders.clear(); - batchHolders = null; startIndices.clear(); currentIdxHolder = null; numEntries = 0; @@ -487,6 +489,7 @@ private boolean insertEntry(int incomingRowIdx, int currentIdx, int hashValue, B } // Return -1 if key is not found in the hash table. Otherwise, return the global index of the key + @Override public int containsKey(int incomingRowIdx, boolean isProbe) { int hash = isProbe ? getHashProbe(incomingRowIdx) : getHashBuild(incomingRowIdx); int i = getBucketIndex(hash, numBuckets()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java new file mode 100644 index 00000000000..7ada651a21a --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.join; + +import org.apache.drill.exec.record.*; +import org.eigenbase.rel.JoinRelType; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import com.sun.codemodel.JExpression; +import com.sun.codemodel.JVar; +import com.sun.codemodel.JExpr; + +import org.apache.drill.exec.compile.sig.GeneratorMapping; +import org.apache.drill.exec.compile.sig.MappingSet; +import org.apache.drill.exec.exception.ClassTransformationException; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.ClassGenerator; +import org.apache.drill.exec.expr.CodeGenerator; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.expr.holders.IntHolder; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.HashJoinPOP; +import org.apache.drill.exec.physical.impl.common.ChainedHashTable; +import org.apache.drill.exec.physical.impl.common.HashTable; +import org.apache.drill.exec.physical.impl.common.HashTableConfig; +import org.apache.drill.exec.physical.impl.sort.RecordBatchData; +import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.allocator.VectorAllocator; + +public class HashJoinBatch extends AbstractRecordBatch { + // Probe side record batch + private final RecordBatch left; + + // Build side record batch + private final RecordBatch right; + + // Join type, INNER, LEFT, RIGHT or OUTER + private final JoinRelType joinType; + + // hash table configuration, created in HashJoinPOP + private HashTableConfig htConfig; + + // Runtime generated class implementing HashJoinProbe interface + private HashJoinProbe hashJoinProbe = null; + + /* Helper class + * Maintains linked list of build side records with the same key + * Keeps information about which build records have a corresponding + * matching key in the probe side (for outer, right joins) + */ + private HashJoinHelper hjHelper = null; + + // Underlying hashtable used by the hash join + private HashTable hashTable = null; + + /* Hyper container to store all build side record batches. + * Records are retrieved from this container when there is a matching record + * on the probe side + */ + private ExpandableHyperContainer hyperContainer; + + // Number of records in the output container + private int outputRecords; + + // Current batch index on the build side + private int buildBatchIndex = 0; + + // List of vector allocators + private List allocators = null; + + // Schema of the build side + private BatchSchema rightSchema = null; + + // Generator mapping for the build side + private static final GeneratorMapping PROJECT_BUILD = GeneratorMapping.create("doSetup"/* setup method */, + "projectBuildRecord" /* eval method */, + null /* reset */, null /* cleanup */); + + // Generator mapping for the probe side + private static final GeneratorMapping PROJECT_PROBE = GeneratorMapping.create("doSetup" /* setup method */, + "projectProbeRecord" /* eval method */, + null /* reset */, null /* cleanup */); + + // Mapping set for the build side + private final MappingSet projectBuildMapping = new MappingSet("buildIndex" /* read index */, "outIndex" /* write index */, + "buildBatch" /* read container */, + "outgoing" /* write container */, + PROJECT_BUILD, PROJECT_BUILD); + + // Mapping set for the probe side + private final MappingSet projectProbeMapping = new MappingSet("probeIndex" /* read index */, "outIndex" /* write index */, + "probeBatch" /* read container */, + "outgoing" /* write container */, + PROJECT_PROBE, PROJECT_PROBE); + + @Override + public int getRecordCount() { + return outputRecords; + } + + + @Override + public IterOutcome next() { + + try { + /* If we are here for the first time, execute the build phase of the + * hash join and setup the run time generated class for the probe side + */ + if (hashJoinProbe == null) { + + // Initialize the hash join helper context + hjHelper = new HashJoinHelper(context); + + /* Build phase requires setting up the hash table. Hash table will + * materialize both the build and probe side expressions while + * creating the hash table. So we need to invoke next() on our probe batch + * as well, for the materialization to be successful. This batch will not be used + * till we complete the build phase. + */ + left.next(); + + // Build the hash table, using the build side record batches. + executeBuildPhase(); + + // Create the run time generated code needed to probe and project + hashJoinProbe = setupHashJoinProbe(); + } + + // Allocate the memory for the vectors in the output container + allocateVectors(); + + // Store the number of records projected + outputRecords = hashJoinProbe.probeAndProject(); + + /* We are here because of one the following + * 1. Completed processing of all the records and we are done + * 2. We've filled up the outgoing batch to the maximum and we need to return upstream + * Either case build the output container's schema and return + */ + if (outputRecords > 0) { + + // Build the container schema and set the counts + container.buildSchema(BatchSchema.SelectionVectorMode.NONE); + container.setRecordCount(outputRecords); + + for (VectorWrapper v : container) { + v.getValueVector().getMutator().setValueCount(outputRecords); + } + + return IterOutcome.OK_NEW_SCHEMA; + } + + // No more output records, clean up and return + cleanup(); + return IterOutcome.NONE; + + } catch (ClassTransformationException | SchemaChangeException | IOException e) { + context.fail(e); + killIncoming(); + cleanup(); + return IterOutcome.STOP; + } + } + + public void setupHashTable() throws IOException, SchemaChangeException, ClassTransformationException { + + // Shouldn't be recreating the hash table, this should be done only once + assert hashTable == null; + + ChainedHashTable ht = new ChainedHashTable(htConfig, context, this.right, this.left, null); + hashTable = ht.createAndSetupHashTable(null); + + } + + public void executeBuildPhase() throws SchemaChangeException, ClassTransformationException, IOException { + + //Setup the underlying hash table + IterOutcome rightUpstream = right.next(); + + boolean moreData = true; + + setupHashTable(); + + while (moreData) { + + switch (rightUpstream) { + + case NONE: + case NOT_YET: + case STOP: + moreData = false; + continue; + + case OK_NEW_SCHEMA: + if (rightSchema == null) { + rightSchema = right.getSchema(); + } else { + throw new SchemaChangeException("Hash join does not support schema changes"); + } + // Fall through + case OK: + int currentRecordCount = right.getRecordCount(); + + /* For every new build batch, we store some state in the helper context + * Add new state to the helper context + */ + hjHelper.addNewBatch(currentRecordCount); + + // Holder contains the global index where the key is hashed into using the hash table + IntHolder htIndex = new IntHolder(); + + // For every record in the build batch , hash the key columns + for (int i = 0; i < currentRecordCount; i++) { + + HashTable.PutStatus status = hashTable.put(i, htIndex); + + if (status != HashTable.PutStatus.PUT_FAILED) { + /* Use the global index returned by the hash table, to store + * the current record index and batch index. This will be used + * later when we probe and find a match. + */ + hjHelper.setCurrentIndex(htIndex.value, buildBatchIndex, i); + } + } + + /* Completed hashing all records in this batch. Transfer the batch + * to the hyper vector container. Will be used when we want to retrieve + * records that have matching keys on the probe side. + */ + RecordBatchData nextBatch = new RecordBatchData(right); + if (hyperContainer == null) { + hyperContainer = new ExpandableHyperContainer(nextBatch.getContainer()); + } else { + hyperContainer.addBatch(nextBatch.getContainer()); + } + + // completed processing a batch, increment batch index + buildBatchIndex++; + break; + } + // Get the next record batch + rightUpstream = right.next(); + } + } + + public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException { + + allocators = new ArrayList<>(); + + final CodeGenerator cg = CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getFunctionRegistry()); + ClassGenerator g = cg.getRoot(); + + // Generate the code to project build side records + g.setMappingSet(projectBuildMapping); + + + int fieldId = 0; + JExpression buildIndex = JExpr.direct("buildIndex"); + JExpression outIndex = JExpr.direct("outIndex"); + g.rotateBlock(); + for(VectorWrapper vv : hyperContainer) { + + // Add the vector to our output container + ValueVector v = TypeHelper.getNewVector(vv.getField(), context.getAllocator()); + container.add(v); + allocators.add(RemovingRecordBatch.getAllocator4(v)); + + JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(vv.getField().getType(), fieldId, true)); + JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), fieldId, false)); + + g.getEvalBlock().add(outVV.invoke("copyFrom") + .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE))) + .arg(outIndex) + .arg(inVV.component(buildIndex.shrz(JExpr.lit(16))))); + + fieldId++; + } + + // Generate the code to project probe side records + g.setMappingSet(projectProbeMapping); + + int outputFieldId = fieldId; + fieldId = 0; + JExpression probeIndex = JExpr.direct("probeIndex"); + for (VectorWrapper vv : left) { + + ValueVector v = TypeHelper.getNewVector(vv.getField(), context.getAllocator()); + container.add(v); + allocators.add(RemovingRecordBatch.getAllocator4(v)); + + JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(vv.getField().getType(), fieldId, false)); + JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), outputFieldId, false)); + + g.getEvalBlock().add(outVV.invoke("copyFrom").arg(probeIndex).arg(outIndex).arg(inVV)); + + fieldId++; + outputFieldId++; + } + + HashJoinProbe hj = context.getImplementationClass(cg); + hj.setupHashJoinProbe(context, hyperContainer, left, this, hashTable, hjHelper, joinType); + return hj; + } + + private void allocateVectors(){ + for(VectorAllocator a : allocators){ + a.alloc(RecordBatch.MAX_BATCH_SIZE); + } + } + + public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) { + super(popConfig, context); + this.left = left; + this.right = right; + this.joinType = popConfig.getJoinType(); + this.htConfig = popConfig.getHtConfig(); + } + + @Override + public void killIncoming() { + this.left.kill(); + this.right.kill(); + cleanup(); + } + + @Override + public void cleanup() { + left.cleanup(); + right.cleanup(); + hyperContainer.clear(); + hjHelper.clear(); + container.clear(); + hashTable.clear(); + super.cleanup(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java new file mode 100644 index 00000000000..19a4a293600 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.join; + +import com.google.common.base.Preconditions; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.HashJoinPOP; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.record.RecordBatch; + +import java.util.List; + +public class HashJoinBatchCreator implements BatchCreator { + + @Override + public RecordBatch getBatch(FragmentContext context, HashJoinPOP config, List children) throws ExecutionSetupException { + Preconditions.checkArgument(children.size() == 2); + return new HashJoinBatch(config, context, children.get(0), children.get(1)); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java new file mode 100644 index 00000000000..e0098b1591e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.physical.impl.join; + +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; + +import io.netty.buffer.ByteBuf; + +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.physical.impl.common.HashTable; + + +/* + * Helper class for hash join. Keeps track of information about the build side batches. + * + * Hash join is a blocking operator, so we consume all the batches on the build side and + * store them in a hyper container. The way we can retrieve records from the hyper container + * is by providing the record index and batch index in the hyper container. When we invoke put() + * for a given row, hash table returns a global index. We store the current row's record index + * and batch index in this global index of the startIndices structure. + * + * Since there can be many rows with the same key on the build side, we store the first + * index in the startIndices list and the remaining are stored as a logical linked list using + * the 'links' field in the BuildInfo structures. + * + * Apart from the indexes into the hyper container, this class also stores information about + * which records of the build side had a matching record on the probe side. Stored in a bitvector + * keyMatchBitVector, it is used to retrieve all records that did not match a record on probe side + * for right outer and full outer joins + */ +public class HashJoinHelper { + + /* List of start indexes. Stores the record and batch index of the first record + * with a give key. + */ + List startIndices = new ArrayList<>(); + + // List of BuildInfo structures. Used to maintain auxiliary information about the build batches + List buildInfoList = new ArrayList<>(); + + // Fragment context + FragmentContext context; + + // Constant to indicate index is empty. + static final int INDEX_EMPTY = -1; + + // bits to shift while obtaining batch index from SV4 + static final int SHIFT_SIZE = 16; + + public HashJoinHelper(FragmentContext context) { + this.context = context; + } + + public void addStartIndexBatch() throws SchemaChangeException { + startIndices.add(getNewSV4(HashTable.BATCH_SIZE)); + } + + public class BuildInfo { + // List of links. Logically it helps maintain a linked list of records with the same key value + private SelectionVector4 links; + + // List of bitvectors. Keeps track of records on the build side that matched a record on the probe side + private BitSet keyMatchBitVector; + + // number of records in this batch + private int recordCount; + + public BuildInfo(SelectionVector4 links, BitSet keyMatchBitVector, int recordCount) { + this.links = links; + this.keyMatchBitVector = keyMatchBitVector; + this.recordCount = recordCount; + } + + public SelectionVector4 getLinks() { + return links; + } + + public BitSet getKeyMatchBitVector() { + return keyMatchBitVector; + } + } + + public SelectionVector4 getNewSV4(int recordCount) throws SchemaChangeException { + + ByteBuf vector = context.getAllocator().buffer((recordCount * 4)); + + SelectionVector4 sv4 = new SelectionVector4(vector, recordCount, recordCount); + + // Initialize the vector + for (int i = 0; i < recordCount; i++) { + sv4.set(i, INDEX_EMPTY); + } + + return sv4; + } + + public void addNewBatch(int recordCount) throws SchemaChangeException { + // Add a node to the list of BuildInfo's + BuildInfo info = new BuildInfo(getNewSV4(recordCount), new BitSet(recordCount), recordCount); + buildInfoList.add(info); + } + + public int getStartIndex(int keyIndex) { + int batchIdx = keyIndex / HashTable.BATCH_SIZE; + int offsetIdx = keyIndex % HashTable.BATCH_SIZE; + + assert batchIdx < startIndices.size(); + + SelectionVector4 sv4 = startIndices.get(batchIdx); + + return sv4.get(offsetIdx); + } + + public int getNextIndex(int currentIdx) { + // Get to the links field of the current index to get the next index + int batchIdx = currentIdx >>> SHIFT_SIZE; + int recordIdx = currentIdx & HashTable.BATCH_MASK; + + assert batchIdx < buildInfoList.size(); + + // Get the corresponding BuildInfo node + BuildInfo info = buildInfoList.get(batchIdx); + return info.getLinks().get(recordIdx); + } + + public List getNextUnmatchedIndex() { + List compositeIndexes = new ArrayList<>(); + + for (int i = 0; i < buildInfoList.size(); i++) { + BuildInfo info = buildInfoList.get(i); + int fromIndex = 0; + + while (((fromIndex = info.getKeyMatchBitVector().nextClearBit(fromIndex)) != -1) && (fromIndex < info.recordCount)) { + compositeIndexes.add((i << SHIFT_SIZE) | (fromIndex & HashTable.BATCH_MASK)); + fromIndex++; + } + } + return compositeIndexes; + } + + public void setRecordMatched(int index) { + int batchIdx = index >>> SHIFT_SIZE; + int recordIdx = index & HashTable.BATCH_MASK; + + // Get the BitVector for the appropriate batch and set the bit to indicate the record matched + BuildInfo info = buildInfoList.get(batchIdx); + BitSet bitVector = info.getKeyMatchBitVector(); + + bitVector.set(recordIdx); + } + + public void setCurrentIndex(int keyIndex, int batchIndex, int recordIndex) throws SchemaChangeException { + + /* set the current record batch index and the index + * within the batch at the specified keyIndex. The keyIndex + * denotes the global index where the key for this record is + * stored in the hash table + */ + int batchIdx = keyIndex / HashTable.BATCH_SIZE; + int offsetIdx = keyIndex % HashTable.BATCH_SIZE; + + if (keyIndex >= (HashTable.BATCH_SIZE * startIndices.size())) { + // allocate a new batch + addStartIndexBatch(); + } + + SelectionVector4 startIndex = startIndices.get(batchIdx); + int linkIndex; + + // If its the first value for this key + if ((linkIndex = (startIndex.get(offsetIdx))) == INDEX_EMPTY) { + startIndex.set(offsetIdx, batchIndex, recordIndex); + } else { + /* we already have encountered a record with the same key + * use links to store this value + */ + SelectionVector4 link; + do { + //Traverse the links to get an empty slot to insert the current index + batchIdx = linkIndex >>> SHIFT_SIZE; + offsetIdx = linkIndex & Character.MAX_VALUE; + + // get the next link + link = buildInfoList.get(batchIdx).getLinks(); + } while ((linkIndex = link.get(offsetIdx)) != INDEX_EMPTY); + + // We have the correct batchIdx and offset within the batch to store the next link + link.set(offsetIdx, batchIndex, recordIndex); + } + } + + public void clear() { + // Clear the SV4 used for start indices + for (SelectionVector4 sv4: startIndices) { + sv4.clear(); + } + + for (BuildInfo info : buildInfoList) { + info.getLinks().clear(); + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java new file mode 100644 index 00000000000..c99f2a6a81a --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.physical.impl.join; + +import org.apache.drill.exec.compile.TemplateClassDefinition; +import org.apache.drill.exec.exception.ClassTransformationException; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.impl.common.HashTable; +import org.apache.drill.exec.physical.impl.common.HashTableConfig; +import org.apache.drill.exec.record.ExpandableHyperContainer; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.RecordBatch.IterOutcome; +import org.apache.drill.exec.record.VectorContainer; +import org.eigenbase.rel.JoinRelType; + +import java.io.IOException; + +public interface HashJoinProbe { + public static TemplateClassDefinition TEMPLATE_DEFINITION = new TemplateClassDefinition(HashJoinProbe.class, HashJoinProbeTemplate.class); + + /* The probe side of the hash join can be in the following two states + * 1. PROBE_PROJECT: Inner join case, we probe our hash table to see if we have a + * key match and if we do we project the record + * 2. PROJECT_RIGHT: Right Outer or Full Outer joins where we are projecting the records + * from the build side that did not match any records on the probe side. For Left outer + * case we handle it internally by projecting the record if there isn't a match on the build side + * 3. DONE: Once we have projected all possible records we are done + */ + public static enum ProbeState { + PROBE_PROJECT, PROJECT_RIGHT, DONE + } + + public abstract void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, + RecordBatch outgoing, HashTable hashTable, HashJoinHelper hjHelper, JoinRelType joinRelType); + public abstract void doSetup(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, RecordBatch outgoing); + public abstract int probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException; + public abstract void projectBuildRecord(int buildIndex, int outIndex); + public abstract void projectProbeRecord(int probeIndex, int outIndex); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java new file mode 100644 index 00000000000..cc1a257ed5f --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.join; + +import javax.inject.Named; + +import org.apache.drill.exec.exception.ClassTransformationException; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.impl.common.ChainedHashTable; +import org.apache.drill.exec.physical.impl.common.HashTable; +import org.apache.drill.exec.physical.impl.common.HashTableConfig; +import org.apache.drill.exec.physical.impl.sort.RecordBatchData; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.ExpandableHyperContainer; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.RecordBatch.IterOutcome; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.vector.allocator.VectorAllocator; +import org.apache.drill.exec.expr.holders.IntHolder; +import org.eigenbase.rel.JoinRelType; +import org.eigenbase.sql2rel.StandardConvertletTable; + +import java.io.IOException; +import java.util.List; + +public abstract class HashJoinProbeTemplate implements HashJoinProbe { + + // Probe side record batch + private RecordBatch probeBatch; + + // Join type, INNER, LEFT, RIGHT or OUTER + private JoinRelType joinType; + + /* Helper class + * Maintains linked list of build side records with the same key + * Keeps information about which build records have a corresponding + * matching key in the probe side (for outer, right joins) + */ + private HashJoinHelper hjHelper = null; + + // Underlying hashtable used by the hash join + private HashTable hashTable = null; + + // Number of records to process on the probe side + private int recordsToProcess = 0; + + // Number of records processed on the probe side + private int recordsProcessed = 0; + + // Number of records in the output container + private int outputRecords; + + // Indicate if we should drain the next record from the probe side + private boolean getNextRecord = true; + + // Contains both batch idx and record idx of the matching record in the build side + private int currentCompositeIdx = -1; + + // Current state the hash join algorithm is in + private ProbeState probeState = ProbeState.PROBE_PROJECT; + + // For outer or right joins, this is a list of unmatched records that needs to be projected + private List unmatchedBuildIndexes = null; + + @Override + public void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, + RecordBatch outgoing, HashTable hashTable, HashJoinHelper hjHelper, + JoinRelType joinRelType) { + + this.probeBatch = probeBatch; + this.joinType = joinRelType; + this.recordsToProcess = probeBatch.getRecordCount(); + this.hashTable = hashTable; + this.hjHelper = hjHelper; + + doSetup(context, buildBatch, probeBatch, outgoing); + } + + public void executeProjectRightPhase() { + while (outputRecords < RecordBatch.MAX_BATCH_SIZE && recordsProcessed < recordsToProcess) { + projectBuildRecord(unmatchedBuildIndexes.get(recordsProcessed++), outputRecords++); + } + } + + public void executeProbePhase() throws SchemaChangeException { + while (outputRecords < RecordBatch.MAX_BATCH_SIZE && recordsToProcess > 0) { + + // Check if we have processed all records in this batch we need to invoke next + if (recordsProcessed == recordsToProcess) { + IterOutcome leftUpstream = probeBatch.next(); + + switch (leftUpstream) { + case NONE: + case NOT_YET: + case STOP: + recordsProcessed = 0; + recordsToProcess = 0; + probeState = ProbeState.DONE; + + // We are done with the probe phase. If its a RIGHT or a FULL join get the unmatched indexes from the build side + if (joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) { + probeState = ProbeState.PROJECT_RIGHT; + } + + continue; + + case OK_NEW_SCHEMA: + throw new SchemaChangeException("Hash join does not support schema changes"); + case OK: + recordsToProcess = probeBatch.getRecordCount(); + recordsProcessed = 0; + } + } + int probeIndex; + + // Check if we need to drain the next row in the probe side + if (getNextRecord) { + probeIndex = hashTable.containsKey(recordsProcessed, true); + + if (probeIndex != -1) { + + /* The current probe record has a key that matches. Get the index + * of the first row in the build side that matches the current key + */ + currentCompositeIdx = hjHelper.getStartIndex(probeIndex); + + /* Record in the build side at currentCompositeIdx has a matching record in the probe + * side. Set the bit corresponding to this index so if we are doing a FULL or RIGHT + * join we keep track of which records we need to project at the end + */ + hjHelper.setRecordMatched(currentCompositeIdx); + + projectBuildRecord(currentCompositeIdx, outputRecords); + projectProbeRecord(recordsProcessed, outputRecords); + outputRecords++; + + /* Projected single row from the build side with matching key but there + * may be more rows with the same key. Check if that's the case + */ + currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx); + if (currentCompositeIdx == -1) { + /* We only had one row in the build side that matched the current key + * from the probe side. Drain the next row in the probe side. + */ + recordsProcessed++; + } + else { + /* There is more than one row with the same key on the build side + * don't drain more records from the probe side till we have projected + * all the rows with this key + */ + getNextRecord = false; + } + } + else { // No matching key + + // If we have a left outer join, project the keys + if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) { + projectProbeRecord(recordsProcessed, outputRecords++); + } + recordsProcessed++; + } + } + else { + hjHelper.setRecordMatched(currentCompositeIdx); + projectBuildRecord(currentCompositeIdx, outputRecords); + projectProbeRecord(recordsProcessed, outputRecords); + outputRecords++; + + currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx); + + if (currentCompositeIdx == -1) { + // We don't have any more rows matching the current key on the build side, move on to the next probe row + getNextRecord = true; + recordsProcessed++; + } + } + } + } + + public int probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException { + + outputRecords = 0; + + if (probeState == ProbeState.PROBE_PROJECT) { + executeProbePhase(); + } + + if (probeState == ProbeState.PROJECT_RIGHT) { + + // We are here because we have a RIGHT OUTER or a FULL join + if (unmatchedBuildIndexes == null) { + // Initialize list of build indexes that didn't match a record on the probe side + unmatchedBuildIndexes = hjHelper.getNextUnmatchedIndex(); + recordsToProcess = unmatchedBuildIndexes.size(); + recordsProcessed = 0; + } + + // Project the list of unmatched records on the build side + executeProjectRightPhase(); + } + + return outputRecords; + } + + public abstract void doSetup(@Named("context") FragmentContext context, @Named("buildBatch") VectorContainer buildBatch, @Named("probeBatch") RecordBatch probeBatch, + @Named("outgoing") RecordBatch outgoing); + public abstract void projectBuildRecord(@Named("buildIndex") int buildIndex, @Named("outIndex") int outIndex); + public abstract void projectProbeRecord(@Named("probeIndex") int probeIndex, @Named("outIndex") int outIndex); +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java new file mode 100644 index 00000000000..529563a6877 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.join; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; + +import mockit.Injectable; +import mockit.NonStrictExpectations; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.util.FileUtils; +import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.TopLevelAllocator; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.physical.base.FragmentRoot; +import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry; +import org.apache.drill.exec.physical.impl.ImplCreator; +import org.apache.drill.exec.physical.impl.SimpleRootExec; +import org.apache.drill.exec.planner.PhysicalPlanReader; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.ExecProtos; +import org.apache.drill.exec.proto.BitControl.PlanFragment; +import org.apache.drill.exec.rpc.user.UserServer; +import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.store.StoragePluginRegistry; +import org.apache.drill.exec.vector.ValueVector; +import org.junit.AfterClass; +import org.junit.Test; + +import com.beust.jcommander.internal.Lists; +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import com.codahale.metrics.MetricRegistry; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Iterator; +import java.util.List; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.util.FileUtils; +import org.apache.drill.exec.client.DrillClient; +import org.apache.drill.exec.pop.PopUnitTestBase; +import org.apache.drill.exec.proto.UserProtos; +import org.apache.drill.exec.record.RecordBatchLoader; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.rpc.user.QueryResultBatch; +import org.apache.drill.exec.server.Drillbit; +import org.apache.drill.exec.server.RemoteServiceSet; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.VarCharVector; +import org.apache.drill.exec.vector.IntVector; +import org.junit.Ignore; +import org.junit.Test; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; + + +public class TestHashJoin extends PopUnitTestBase{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestMergeJoin.class); + + DrillConfig c = DrillConfig.create(); + + private void testHJMockScanCommon(final DrillbitContext bitContext, UserServer.UserClientConnection connection, String physicalPlan, int expectedRows) throws Throwable { + new NonStrictExpectations(){{ + bitContext.getMetrics(); result = new MetricRegistry(); + bitContext.getAllocator(); result = new TopLevelAllocator(); + bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c); + }}; + + PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance()); + PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile(physicalPlan), Charsets.UTF_8)); + FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c); + FragmentContext context = new FragmentContext(bitContext, PlanFragment.getDefaultInstance(), connection, registry); + SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next())); + + int totalRecordCount = 0; + while (exec.next()) { + totalRecordCount += exec.getRecordCount(); + } + assertEquals(expectedRows, totalRecordCount); + System.out.println("Total Record Count: " + totalRecordCount); + if (context.getFailureCause() != null) + throw context.getFailureCause(); + assertTrue(!context.isFailed()); + } + + @Test + public void multiBatchEqualityJoin(@Injectable final DrillbitContext bitContext, + @Injectable UserServer.UserClientConnection connection) throws Throwable { + + testHJMockScanCommon(bitContext, connection, "/join/hash_join_multi_batch.json", 200000); + } + + @Test + public void multiBatchRightOuterJoin(@Injectable final DrillbitContext bitContext, + @Injectable UserServer.UserClientConnection connection) throws Throwable { + + testHJMockScanCommon(bitContext, connection, "/join/hj_right_outer_multi_batch.json", 100000); + } + + @Test + public void multiBatchLeftOuterJoin(@Injectable final DrillbitContext bitContext, + @Injectable UserServer.UserClientConnection connection) throws Throwable { + + + testHJMockScanCommon(bitContext, connection, "/join/hj_left_outer_multi_batch.json", 100000); + } + + @Test + public void simpleEqualityJoin(@Injectable final DrillbitContext bitContext, + @Injectable UserServer.UserClientConnection connection) throws Throwable { + + // Function checks for casting from Float, Double to Decimal data types + try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); + Drillbit bit = new Drillbit(CONFIG, serviceSet); + DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + + // run query. + bit.run(); + client.connect(); + List results = client.runQuery(UserProtos.QueryType.PHYSICAL, + Files.toString(FileUtils.getResourceAsFile("/join/hash_join.json"), Charsets.UTF_8) + .replace("#{TEST_FILE_1}", FileUtils.getResourceAsFile("/build_side_input.json").toURI().toString()) + .replace("#{TEST_FILE_2}", FileUtils.getResourceAsFile("/probe_side_input.json").toURI().toString())); + + RecordBatchLoader batchLoader = new RecordBatchLoader(bit.getContext().getAllocator()); + + QueryResultBatch batch = results.get(0); + assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData())); + + batchLoader.getValueAccessorById(0, IntVector.class); + + Iterator> itr = batchLoader.iterator(); + + // Just test the join key + long colA[] = {1, 1, 2, 2, 1, 1}; + + // Check the output of decimal9 + ValueVector.Accessor intAccessor1 = itr.next().getValueVector().getAccessor(); + + + for (int i = 0; i < intAccessor1.getValueCount(); i++) { + assertEquals(intAccessor1.getObject(i), colA[i]); + } + assertEquals(6, intAccessor1.getValueCount()); + } + } + + @Test + public void multipleConditionJoin(@Injectable final DrillbitContext bitContext, + @Injectable UserServer.UserClientConnection connection) throws Throwable { + + // Function checks for casting from Float, Double to Decimal data types + try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); + Drillbit bit = new Drillbit(CONFIG, serviceSet); + DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + + // run query. + bit.run(); + client.connect(); + List results = client.runQuery(UserProtos.QueryType.PHYSICAL, + Files.toString(FileUtils.getResourceAsFile("/join/hj_multi_condition_join.json"), Charsets.UTF_8) + .replace("#{TEST_FILE_1}", FileUtils.getResourceAsFile("/build_side_input.json").toURI().toString()) + .replace("#{TEST_FILE_2}", FileUtils.getResourceAsFile("/probe_side_input.json").toURI().toString())); + + RecordBatchLoader batchLoader = new RecordBatchLoader(bit.getContext().getAllocator()); + + QueryResultBatch batch = results.get(0); + assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData())); + + batchLoader.getValueAccessorById(0, IntVector.class); + + Iterator> itr = batchLoader.iterator(); + + // Just test the join key + long colA[] = {1, 2, 1}; + long colC[] = {100, 200, 500}; + + // Check the output of decimal9 + ValueVector.Accessor intAccessor1 = itr.next().getValueVector().getAccessor(); + ValueVector.Accessor intAccessor2 = itr.next().getValueVector().getAccessor(); + + + for (int i = 0; i < intAccessor1.getValueCount(); i++) { + assertEquals(intAccessor1.getObject(i), colA[i]); + assertEquals(intAccessor2.getObject(i), colC[i]); + } + assertEquals(3, intAccessor1.getValueCount()); + } + } +} \ No newline at end of file diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java index 78f7e43af96..e5cd5088422 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java @@ -45,7 +45,7 @@ public abstract class PopUnitTestBase { protected static DrillConfig CONFIG; // Set a timeout unless we're debugging. - @Rule public TestRule globalTimeout = IS_DEBUG ? new TestName() : new Timeout(25000); + @Rule public TestRule globalTimeout = IS_DEBUG ? new TestName() : new Timeout(500000); @BeforeClass public static void setup() { diff --git a/exec/java-exec/src/test/resources/build_side_input.json b/exec/java-exec/src/test/resources/build_side_input.json new file mode 100644 index 00000000000..31006a6a5b9 --- /dev/null +++ b/exec/java-exec/src/test/resources/build_side_input.json @@ -0,0 +1,24 @@ +{ +"A": 1, +"C": 100 +} +{ +"A": 1, +"C": 500 +} +{ +"A": 2, +"C": 200 +} +{ +"A": 3, +"C": 300 +} +{ +"A": 4, +"C": 400 +} +{ +"A": 5, +"C": 500 +} \ No newline at end of file diff --git a/exec/java-exec/src/test/resources/join/hash_join.json b/exec/java-exec/src/test/resources/join/hash_join.json new file mode 100644 index 00000000000..60b915bed56 --- /dev/null +++ b/exec/java-exec/src/test/resources/join/hash_join.json @@ -0,0 +1,63 @@ +{ + "head" : { + "version" : 1, + "generator" : { + "type" : "org.apache.drill.exec.planner.logical.DrillImplementor", + "info" : "" + }, + "type" : "APACHE_DRILL_PHYSICAL", + "resultMode" : "EXEC" + }, + graph:[ + { + @id:1, + pop:"fs-scan", + format: {type: "json"}, + storage:{type: "file", connection: "file:///"}, + files:["#{TEST_FILE_1}"] + }, + { + @id:2, + pop:"fs-scan", + format: {type: "json"}, + storage:{type: "file", connection: "file:///"}, + files:["#{TEST_FILE_2}"] + }, + { + "pop" : "project", + "@id" : 3, + "exprs" : [ { + "ref" : "output.A", + "expr" : "A" + }, + { "ref" : "output.CCOL", "expr" : "C" } + ], + + "child" : 1 + }, + { + "pop" : "project", + "@id" : 4, + "exprs" : [ { + "ref" : "output.B", + "expr" : "B" + }, + { "ref" : "output.DCOL", "expr" : "D" } + ], + + "child" : 2 + }, + { + @id: 5, + right: 3, + left: 4, + pop: "hash-join", + join-conditions: [ {relationship: "==", left: "B", right: "A"} ] + }, + { + @id: 6, + child: 5, + pop: "screen" + } + ] + } diff --git a/exec/java-exec/src/test/resources/join/hash_join_multi_batch.json b/exec/java-exec/src/test/resources/join/hash_join_multi_batch.json new file mode 100644 index 00000000000..7e218a40945 --- /dev/null +++ b/exec/java-exec/src/test/resources/join/hash_join_multi_batch.json @@ -0,0 +1,47 @@ +{ + head:{ + type:"APACHE_DRILL_PHYSICAL", + version:"1", + generator:{ + type:"manual" + } + }, + graph:[ + { + @id:1, + pop:"mock-sub-scan", + url: "http://source1.apache.org", + entries:[ + {records: 4, types: [ + {name: "blue", type: "INT", mode: "REQUIRED"}, + {name: "red", type: "INT", mode: "REQUIRED"}, + {name: "green", type: "INT", mode: "REQUIRED"} + ]} + ] + }, + { + @id:2, + pop:"mock-sub-scan", + url: "http://source2.apache.org", + entries:[ + {records: 100000, types: [ + {name: "blue1", type: "INT", mode: "REQUIRED"}, + {name: "red1", type: "INT", mode: "REQUIRED"}, + {name: "green1", type: "INT", mode: "REQUIRED"} + ]} + ] + }, + { + @id: 3, + right: 1, + left: 2, + pop: "hash-join", + join-conditions: [ {relationship: "==", left: "blue1", right: "blue"} ] + }, + { + @id: 4, + child: 3, + pop: "screen" + } + ] +} \ No newline at end of file diff --git a/exec/java-exec/src/test/resources/join/hj_left_outer_multi_batch.json b/exec/java-exec/src/test/resources/join/hj_left_outer_multi_batch.json new file mode 100644 index 00000000000..0cb5a03197c --- /dev/null +++ b/exec/java-exec/src/test/resources/join/hj_left_outer_multi_batch.json @@ -0,0 +1,48 @@ +{ + head:{ + type:"APACHE_DRILL_PHYSICAL", + version:"1", + generator:{ + type:"manual" + } + }, + graph:[ + { + @id:1, + pop:"mock-sub-scan", + url: "http://source1.apache.org", + entries:[ + {records: 1, types: [ + {name: "blue", type: "INT", mode: "REQUIRED"}, + {name: "red", type: "INT", mode: "REQUIRED"}, + {name: "green", type: "INT", mode: "REQUIRED"} + ]} + ] + }, + { + @id:2, + pop:"mock-sub-scan", + url: "http://source2.apache.org", + entries:[ + {records: 100000, types: [ + {name: "blue1", type: "INT", mode: "REQUIRED"}, + {name: "red1", type: "INT", mode: "REQUIRED"}, + {name: "green1", type: "INT", mode: "REQUIRED"} + ]} + ] + }, + { + @id: 3, + right: 1, + left: 2, + pop: "hash-join", + join-type: "LEFT", + join-conditions: [ {relationship: "==", left: "blue1", right: "blue"} ] + }, + { + @id: 4, + child: 3, + pop: "screen" + } + ] +} \ No newline at end of file diff --git a/exec/java-exec/src/test/resources/join/hj_multi_condition_join.json b/exec/java-exec/src/test/resources/join/hj_multi_condition_join.json new file mode 100644 index 00000000000..fd680d0a5ec --- /dev/null +++ b/exec/java-exec/src/test/resources/join/hj_multi_condition_join.json @@ -0,0 +1,66 @@ +{ + "head" : { + "version" : 1, + "generator" : { + "type" : "org.apache.drill.exec.planner.logical.DrillImplementor", + "info" : "" + }, + "type" : "APACHE_DRILL_PHYSICAL", + "resultMode" : "EXEC" + }, + graph:[ + { + @id:1, + pop:"fs-scan", + format: {type: "json"}, + storage:{type: "file", connection: "file:///"}, + files:["#{TEST_FILE_1}"] + }, + { + @id:2, + pop:"fs-scan", + format: {type: "json"}, + storage:{type: "file", connection: "file:///"}, + files:["#{TEST_FILE_2}"] + }, + { + "pop" : "project", + "@id" : 3, + "exprs" : [ { + "ref" : "output.A", + "expr" : "A" + }, + { "ref" : "output.CCOL", "expr" : "C" } + ], + + "child" : 1 + }, + { + "pop" : "project", + "@id" : 4, + "exprs" : [ { + "ref" : "output.B", + "expr" : "B" + }, + { "ref" : "output.DCOL", "expr" : "D" } + ], + + "child" : 2 + }, + { + @id: 5, + right: 3, + left: 4, + pop: "hash-join", + join-conditions: [ + {relationship: "==", left: "B", right: "A"}, + {relationship: "==", left: "DCOL", right: "CCOL"} + ] + }, + { + @id: 6, + child: 5, + pop: "screen" + } + ] + } diff --git a/exec/java-exec/src/test/resources/join/hj_right_outer_multi_batch.json b/exec/java-exec/src/test/resources/join/hj_right_outer_multi_batch.json new file mode 100644 index 00000000000..b8723aac8dc --- /dev/null +++ b/exec/java-exec/src/test/resources/join/hj_right_outer_multi_batch.json @@ -0,0 +1,48 @@ +{ + head:{ + type:"APACHE_DRILL_PHYSICAL", + version:"1", + generator:{ + type:"manual" + } + }, + graph:[ + { + @id:1, + pop:"mock-sub-scan", + url: "http://source1.apache.org", + entries:[ + {records: 100000, types: [ + {name: "blue", type: "INT", mode: "REQUIRED"}, + {name: "red", type: "INT", mode: "REQUIRED"}, + {name: "green", type: "INT", mode: "REQUIRED"} + ]} + ] + }, + { + @id:2, + pop:"mock-sub-scan", + url: "http://source2.apache.org", + entries:[ + {records: 1, types: [ + {name: "blue1", type: "INT", mode: "REQUIRED"}, + {name: "red1", type: "INT", mode: "REQUIRED"}, + {name: "green1", type: "INT", mode: "REQUIRED"} + ]} + ] + }, + { + @id: 3, + right: 1, + left: 2, + pop: "hash-join", + join-type: "RIGHT", + join-conditions: [ {relationship: "==", left: "blue1", right: "blue"} ] + }, + { + @id: 4, + child: 3, + pop: "screen" + } + ] +} \ No newline at end of file diff --git a/exec/java-exec/src/test/resources/probe_side_input.json b/exec/java-exec/src/test/resources/probe_side_input.json new file mode 100644 index 00000000000..d3dbbb342f6 --- /dev/null +++ b/exec/java-exec/src/test/resources/probe_side_input.json @@ -0,0 +1,28 @@ +{ +"B": 1, +"D": 100 +} +{ +"B": 2, +"D": 200 +} +{ +"B": 2, +"D": 300 +} +{ +"B": 9, +"D": 900 +} +{ +"B": 1, +"D": 500 +} +{ +"B": 10, +"D": 1000 +} +{ +"B": 11, +"D": 1100 +} \ No newline at end of file