Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@

package org.apache.tajo.engine.planner.physical;

import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.engine.eval.AlgebraicUtil;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.eval.EvalTreeUtil;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.Projector;
import org.apache.tajo.engine.planner.logical.JoinNode;
Expand All @@ -39,7 +42,8 @@
public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
// from logical plan
protected JoinNode plan;
protected EvalNode joinQual;
protected EvalNode joinQual; // ex) a.id = b.id
protected EvalNode joinFilter; // ex) a > 10

protected List<Column[]> joinKeyPairs;

Expand Down Expand Up @@ -69,7 +73,24 @@ public HashLeftOuterJoinExec(TaskAttemptContext context, JoinNode plan, Physical
super(context, SchemaUtil.merge(leftChild.getSchema(), rightChild.getSchema()),
plan.getOutSchema(), leftChild, rightChild);
this.plan = plan;
this.joinQual = plan.getJoinQual();

List<EvalNode> joinQuals = Lists.newArrayList();
List<EvalNode> joinFilters = Lists.newArrayList();
for (EvalNode eachQual : AlgebraicUtil.toConjunctiveNormalFormArray(plan.getJoinQual())) {
if (EvalTreeUtil.isJoinQual(eachQual, true)) {
joinQuals.add(eachQual);
} else {
joinFilters.add(eachQual);
}
}

this.joinQual = AlgebraicUtil.createSingletonExprFromCNF(joinQuals.toArray(new EvalNode[joinQuals.size()]));
if (joinFilters.size() > 0) {
this.joinFilter = AlgebraicUtil.createSingletonExprFromCNF(joinFilters.toArray(new EvalNode[joinFilters.size()]));
} else {
this.joinFilter = null;
}

this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);

// HashJoin only can manage equi join key pairs.
Expand Down Expand Up @@ -146,10 +167,23 @@ public Tuple next() throws IOException {
}

frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if both tuples are joinable

// if there is no join filter, it is always true.
boolean satisfiedWithFilter = joinFilter == null ? true : joinFilter.eval(inSchema, frameTuple).isTrue();
boolean satisfiedWithJoinCondition = joinQual.eval(inSchema, frameTuple).isTrue();

// if a composited tuple satisfies with both join filter and join condition
if (satisfiedWithFilter && satisfiedWithJoinCondition) {
projector.eval(frameTuple, outTuple);
return outTuple;
} else {

// if join filter is satisfied, the left outer join (LOJ) operator should return the null padded tuple
// only once. Then, LOJ operator should take the next left tuple.
if (!satisfiedWithFilter) {
shouldGetLeftTuple = true;
}

// null padding
Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
frameTuple.set(leftTuple, nullPaddedTuple);
Expand Down Expand Up @@ -204,6 +238,7 @@ public void close() throws IOException {
iterator = null;
plan = null;
joinQual = null;
joinFilter = null;
projector = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,9 @@ private static void pushDownIfComplexTermInJoinCondition(Context ctx, EvalNode c
// If one of both terms in a binary operator is a complex expression, the binary operator will require
// multiple phases. In this case, join cannot evaluate a binary operator.
// So, we should prevent dividing the binary operator into more subexpressions.
if (term.getType() != EvalType.FIELD && !(term instanceof BinaryEval)) {
if (term.getType() != EvalType.FIELD &&
!(term instanceof BinaryEval) &&
!(term.getType() == EvalType.ROW_CONSTANT)) {
String refName = ctx.addExpr(term);
EvalTreeUtil.replace(cnf, term, new FieldEval(refName, term.getValueType()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1051,4 +1051,12 @@ public void testFullOuterJoinWithEmptyIntermediateData() throws Exception {
cleanupQuery(res);
}
}

@Test
public final void testJoinFilterOfRowPreservedTable1() throws Exception {
// this test is for join filter of a row preserved table.
ResultSet res = executeQuery();
assertResultSet(res);
cleanupQuery(res);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
select
r_name,
r_regionkey,
n_name,
n_regionkey
from
region left outer join nation on n_regionkey = r_regionkey and r_name in ('AMERICA', 'ASIA')
order by r_name;
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
r_name,r_regionkey,n_name,n_regionkey
-------------------------------
AFRICA,0,null,null
AMERICA,1,ARGENTINA,1
AMERICA,1,BRAZIL,1
AMERICA,1,CANADA,1
AMERICA,1,PERU,1
AMERICA,1,UNITED STATES,1
ASIA,2,INDIA,2
ASIA,2,INDONESIA,2
ASIA,2,JAPAN,2
ASIA,2,CHINA,2
ASIA,2,VIETNAM,2
EUROPE,3,null,null
MIDDLE EAST,4,null,null