Skip to content

Commit

Permalink
DRILL-4927 (part 2): Add support for Null Equality Joins (mixed compa…
Browse files Browse the repository at this point in the history
…rators)

This changes are a subset of the original pull request from DRILL-4539 (PR-462).
- Added changes to support mixed comparators;
- Added tests for it.

closes #635
  • Loading branch information
KulykRoman authored and Sudheesh Katkam committed Nov 2, 2016
1 parent 83513da commit 5f34c96
Show file tree
Hide file tree
Showing 12 changed files with 277 additions and 81 deletions.
Expand Up @@ -18,7 +18,9 @@
package org.apache.drill.exec.physical.impl.aggregate;

import java.io.IOException;
import java.util.List;

import com.google.common.collect.Lists;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ErrorCollector;
Expand All @@ -40,6 +42,7 @@
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.HashAggregate;
import org.apache.drill.exec.physical.impl.aggregate.HashAggregator.AggOutcome;
import org.apache.drill.exec.physical.impl.common.Comparator;
import org.apache.drill.exec.physical.impl.common.HashTable;
import org.apache.drill.exec.physical.impl.common.HashTableConfig;
import org.apache.drill.exec.record.AbstractRecordBatch;
Expand All @@ -64,6 +67,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
private LogicalExpression[] aggrExprs;
private TypedFieldId[] groupByOutFieldIds;
private TypedFieldId[] aggrOutFieldIds; // field ids for the outgoing batch
private final List<Comparator> comparators;

private final GeneratorMapping UPDATE_AGGR_INSIDE =
GeneratorMapping.create("setupInterior" /* setup method */, "updateAggrValuesInternal" /* eval method */,
Expand All @@ -82,6 +86,13 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
public HashAggBatch(HashAggregate popConfig, RecordBatch incoming, FragmentContext context) throws ExecutionSetupException {
super(popConfig, context);
this.incoming = incoming;

final int numGrpByExprs = popConfig.getGroupByExprs().size();
comparators = Lists.newArrayListWithExpectedSize(numGrpByExprs);
for (int i=0; i<numGrpByExprs; i++) {
// nulls are equal in group by case
comparators.add(Comparator.IS_NOT_DISTINCT_FROM);
}
}

@Override
Expand Down Expand Up @@ -241,7 +252,7 @@ private HashAggregator createAggregatorInternal() throws SchemaChangeException,
HashTableConfig htConfig =
// TODO - fix the validator on this option
new HashTableConfig((int)context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE),
HashTable.DEFAULT_LOAD_FACTOR, popConfig.getGroupByExprs(), null /* no probe exprs */);
HashTable.DEFAULT_LOAD_FACTOR, popConfig.getGroupByExprs(), null /* no probe exprs */, comparators);

agg.setup(popConfig, htConfig, context, this.stats,
oContext.getAllocator(), incoming, this,
Expand Down
Expand Up @@ -268,8 +268,7 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, Fragme
}

ChainedHashTable ht =
new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing,
true /* nulls are equal */);
new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing);
this.htable = ht.createAndSetupHashTable(groupByOutFieldIds);

numGroupByOutFields = groupByOutFieldIds.length;
Expand Down
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
Expand Down Expand Up @@ -116,19 +117,16 @@ public class ChainedHashTable {
private final RecordBatch incomingBuild;
private final RecordBatch incomingProbe;
private final RecordBatch outgoing;
private final boolean areNullsEqual;

public ChainedHashTable(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator,
RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing,
boolean areNullsEqual) {
RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing) {

this.htConfig = htConfig;
this.context = context;
this.allocator = allocator;
this.incomingBuild = incomingBuild;
this.incomingProbe = incomingProbe;
this.outgoing = outgoing;
this.areNullsEqual = areNullsEqual;
}

public HashTable createAndSetupHashTable(TypedFieldId[] outKeyFieldIds) throws ClassTransformationException,
Expand Down Expand Up @@ -197,9 +195,10 @@ public HashTable createAndSetupHashTable(TypedFieldId[] outKeyFieldIds) throws C


// generate code for isKeyMatch(), setValue(), getHash() and outputRecordKeys()
setupIsKeyMatchInternal(cgInner, KeyMatchIncomingBuildMapping, KeyMatchHtableMapping, keyExprsBuild, htKeyFieldIds);
setupIsKeyMatchInternal(cgInner, KeyMatchIncomingBuildMapping, KeyMatchHtableMapping, keyExprsBuild,
htConfig.getComparators(), htKeyFieldIds);
setupIsKeyMatchInternal(cgInner, KeyMatchIncomingProbeMapping, KeyMatchHtableProbeMapping, keyExprsProbe,
htKeyFieldIds);
htConfig.getComparators(), htKeyFieldIds);

setupSetValue(cgInner, keyExprsBuild, htKeyFieldIds);
if (outgoing != null) {
Expand All @@ -221,7 +220,7 @@ public HashTable createAndSetupHashTable(TypedFieldId[] outKeyFieldIds) throws C


private void setupIsKeyMatchInternal(ClassGenerator<HashTable> cg, MappingSet incomingMapping, MappingSet htableMapping,
LogicalExpression[] keyExprs, TypedFieldId[] htKeyFieldIds)
LogicalExpression[] keyExprs, List<Comparator> comparators, TypedFieldId[] htKeyFieldIds)
throws SchemaChangeException {
cg.setMappingSet(incomingMapping);

Expand All @@ -230,19 +229,20 @@ private void setupIsKeyMatchInternal(ClassGenerator<HashTable> cg, MappingSet in
return;
}

int i = 0;
for (LogicalExpression expr : keyExprs) {
for (int i=0; i<keyExprs.length; i++) {
final LogicalExpression expr = keyExprs[i];
cg.setMappingSet(incomingMapping);
HoldingContainer left = cg.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);

cg.setMappingSet(htableMapping);
ValueVectorReadExpression vvrExpr = new ValueVectorReadExpression(htKeyFieldIds[i++]);
ValueVectorReadExpression vvrExpr = new ValueVectorReadExpression(htKeyFieldIds[i]);
HoldingContainer right = cg.addExpr(vvrExpr, ClassGenerator.BlkCreateMode.FALSE);

JConditional jc;

// codegen for nullable columns if nulls are not equal
if (!areNullsEqual && left.isOptional() && right.isOptional()) {
if (comparators.get(i) == Comparator.EQUALS
&& left.isOptional() && right.isOptional()) {
jc = cg.getEvalBlock()._if(left.getIsSet().eq(JExpr.lit(0)).
cand(right.getIsSet().eq(JExpr.lit(0))));
jc._then()._return(JExpr.FALSE);
Expand Down
@@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.common;

/**
* Comparator type. Used in Join and Aggregation operators.
*/
public enum Comparator {
NONE, // No comparator
EQUALS, // Equality comparator
IS_NOT_DISTINCT_FROM // 'IS NOT DISTINCT FROM' comparator
}
Expand Up @@ -28,21 +28,22 @@
@JsonTypeName("hashtable-config")
public class HashTableConfig {

static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashTableConfig.class);

private final int initialCapacity;
private final float loadFactor;
private final List<NamedExpression> keyExprsBuild;
private final List<NamedExpression> keyExprsProbe;
private final List<Comparator> comparators;

@JsonCreator
public HashTableConfig(@JsonProperty("initialCapacity") int initialCapacity, @JsonProperty("loadFactor") float loadFactor,
@JsonProperty("keyExprsBuild") List<NamedExpression> keyExprsBuild,
@JsonProperty("keyExprsProbe") List<NamedExpression> keyExprsProbe) {
@JsonProperty("keyExprsProbe") List<NamedExpression> keyExprsProbe,
@JsonProperty("comparators") List<Comparator> comparators) {
this.initialCapacity = initialCapacity;
this.loadFactor = loadFactor;
this.keyExprsBuild = keyExprsBuild;
this.keyExprsProbe = keyExprsProbe;
this.comparators = comparators;
}

public int getInitialCapacity() {
Expand All @@ -61,4 +62,8 @@ public List<NamedExpression> getKeyExprsProbe() {
return keyExprsProbe;
}

public List<Comparator> getComparators() {
return comparators;
}

}
Expand Up @@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.List;

import com.google.common.collect.Lists;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.logical.data.JoinCondition;
import org.apache.drill.common.logical.data.NamedExpression;
Expand All @@ -44,7 +45,7 @@
import org.apache.drill.exec.physical.impl.common.HashTableConfig;
import org.apache.drill.exec.physical.impl.common.HashTableStats;
import org.apache.drill.exec.physical.impl.common.IndexPointer;
import org.apache.drill.exec.physical.impl.join.JoinUtils.JoinComparator;
import org.apache.drill.exec.physical.impl.common.Comparator;
import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
Expand Down Expand Up @@ -79,6 +80,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
// Join conditions
private final List<JoinCondition> conditions;

private final List<Comparator> comparators;

// Runtime generated class implementing HashJoinProbe interface
private HashJoinProbe hashJoinProbe = null;

Expand Down Expand Up @@ -285,19 +288,12 @@ public void setupHashTable() throws IOException, SchemaChangeException, ClassTra
final List<NamedExpression> rightExpr = new ArrayList<>(conditionsSize);
List<NamedExpression> leftExpr = new ArrayList<>(conditionsSize);

JoinComparator comparator = JoinComparator.NONE;
// Create named expressions from the conditions
for (int i = 0; i < conditionsSize; i++) {
rightExpr.add(new NamedExpression(conditions.get(i).getRight(), new FieldReference("build_side_" + i)));
leftExpr.add(new NamedExpression(conditions.get(i).getLeft(), new FieldReference("probe_side_" + i)));

// Hash join only supports certain types of comparisons
comparator = JoinUtils.checkAndSetComparison(conditions.get(i), comparator);
}

assert comparator != JoinComparator.NONE;
final boolean areNullsEqual = (comparator == JoinComparator.IS_NOT_DISTINCT_FROM) ? true : false;

// Set the left named expression to be null if the probe batch is empty.
if (leftUpstream != IterOutcome.OK_NEW_SCHEMA && leftUpstream != IterOutcome.OK) {
leftExpr = null;
Expand All @@ -309,12 +305,11 @@ public void setupHashTable() throws IOException, SchemaChangeException, ClassTra

final HashTableConfig htConfig =
new HashTableConfig((int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE),
HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr);
HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr, comparators);

// Create the chained hash table
final ChainedHashTable ht =
new ChainedHashTable(htConfig, context, oContext.getAllocator(), this.right, this.left, null,
areNullsEqual);
new ChainedHashTable(htConfig, context, oContext.getAllocator(), this.right, this.left, null);
hashTable = ht.createAndSetupHashTable(null);
}

Expand Down Expand Up @@ -500,6 +495,12 @@ public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, RecordBatch
this.right = right;
joinType = popConfig.getJoinType();
conditions = popConfig.getConditions();

comparators = Lists.newArrayListWithExpectedSize(conditions.size());
for (int i=0; i<conditions.size(); i++) {
JoinCondition cond = conditions.get(i);
comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond));
}
}

private void updateStats(HashTable htable) {
Expand Down
Expand Up @@ -21,10 +21,12 @@
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rex.RexNode;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.logical.data.JoinCondition;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.volcano.RelSubset;
import org.apache.drill.exec.physical.impl.common.Comparator;
import org.apache.drill.exec.planner.logical.DrillAggregateRel;
import org.apache.drill.exec.planner.logical.DrillFilterRel;
import org.apache.drill.exec.planner.logical.DrillProjectRel;
Expand All @@ -46,40 +48,28 @@
import com.google.common.collect.Lists;

public class JoinUtils {
public static enum JoinComparator {
NONE, // No comparator
EQUALS, // Equality comparator
IS_NOT_DISTINCT_FROM // 'IS NOT DISTINCT FROM' comparator
}

public static enum JoinCategory {
public enum JoinCategory {
EQUALITY, // equality join
INEQUALITY, // inequality join: <>, <, >
CARTESIAN // no join condition
}
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinUtils.class);

// Check the comparator for the join condition. Note that a similar check is also
// Check the comparator is supported in join condition. Note that a similar check is also
// done in JoinPrel; however we have to repeat it here because a physical plan
// may be submitted directly to Drill.
public static JoinComparator checkAndSetComparison(JoinCondition condition,
JoinComparator comparator) {
if (condition.getRelationship().equalsIgnoreCase("EQUALS") ||
condition.getRelationship().equals("==") /* older json plans still have '==' */) {
if (comparator == JoinComparator.NONE ||
comparator == JoinComparator.EQUALS) {
return JoinComparator.EQUALS;
} else {
throw new IllegalArgumentException("This type of join does not support mixed comparators.");
}
} else if (condition.getRelationship().equalsIgnoreCase("IS_NOT_DISTINCT_FROM")) {
if (comparator == JoinComparator.NONE ||
comparator == JoinComparator.IS_NOT_DISTINCT_FROM) {
return JoinComparator.IS_NOT_DISTINCT_FROM;
} else {
throw new IllegalArgumentException("This type of join does not support mixed comparators.");
}
public static Comparator checkAndReturnSupportedJoinComparator(JoinCondition condition) {
switch(condition.getRelationship().toUpperCase()) {
case "EQUALS":
case "==": /* older json plans still have '==' */
return Comparator.EQUALS;
case "IS_NOT_DISTINCT_FROM":
return Comparator.IS_NOT_DISTINCT_FROM;
}
throw new IllegalArgumentException("Invalid comparator supplied to this join.");
throw UserException.unsupportedError()
.message("Invalid comparator supplied to this join: ", condition.getRelationship())
.build(logger);
}

/**
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.util.List;

import com.google.common.collect.Lists;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
Expand All @@ -43,7 +44,7 @@
import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.MergeJoinPOP;
import org.apache.drill.exec.physical.impl.join.JoinUtils.JoinComparator;
import org.apache.drill.exec.physical.impl.common.Comparator;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
Expand Down Expand Up @@ -98,10 +99,9 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
private final RecordIterator rightIterator;
private final JoinStatus status;
private final List<JoinCondition> conditions;
private final List<Comparator> comparators;
private final JoinRelType joinType;
private JoinWorker worker;
private boolean areNullsEqual = false; // whether nulls compare equal


private static final String LEFT_INPUT = "LEFT INPUT";
private static final String RIGHT_INPUT = "RIGHT INPUT";
Expand All @@ -120,12 +120,10 @@ protected MergeJoinBatch(MergeJoinPOP popConfig, FragmentContext context, Record
this.status = new JoinStatus(leftIterator, rightIterator, this);
this.conditions = popConfig.getConditions();

JoinComparator comparator = JoinComparator.NONE;
this.comparators = Lists.newArrayListWithExpectedSize(conditions.size());
for (JoinCondition condition : conditions) {
comparator = JoinUtils.checkAndSetComparison(condition, comparator);
this.comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(condition));
}
assert comparator != JoinComparator.NONE;
areNullsEqual = (comparator == JoinComparator.IS_NOT_DISTINCT_FROM);
}

public JoinRelType getJoinType() {
Expand Down Expand Up @@ -461,7 +459,7 @@ private void generateDoCompare(ClassGenerator<JoinWorker> cg, JVar incomingRecor
// If not 0, it means not equal.
// Null compares to Null should returns null (unknown). In such case, we return 1 to indicate they are not equal.
if (compareLeftExprHolder.isOptional() && compareRightExprHolder.isOptional()
&& ! areNullsEqual) {
&& comparators.get(i) == Comparator.EQUALS) {
JConditional jc = cg.getEvalBlock()._if(compareLeftExprHolder.getIsSet().eq(JExpr.lit(0)).
cand(compareRightExprHolder.getIsSet().eq(JExpr.lit(0))));
jc._then()._return(JExpr.lit(1));
Expand Down

0 comments on commit 5f34c96

Please sign in to comment.