From 3c6cf30d27e5ba4fb7b8cef39ab55e0a9449a9db Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 1 Oct 2015 19:24:23 +0900 Subject: [PATCH 1/2] TAJO-1904 --- .../apache/tajo/util/graph/DirectedGraph.java | 2 +- .../tajo/util/graph/DirectedGraphVisitor.java | 4 +- .../tajo/util/graph/SimpleDirectedGraph.java | 17 +++-- .../util/graph/TestSimpleDirectedGraph.java | 6 +- .../engine/planner/global/MasterPlan.java | 4 +- .../rewriter/GlobalPlanRewriteEngine.java | 2 +- .../rewriter/GlobalPlanRewriteRule.java | 2 +- .../rewriter/rules/BroadcastJoinRule.java | 76 ++++++++++--------- .../rules/GlobalPlanEqualityTester.java | 2 +- .../rewrite/SelfDescSchemaBuildPhase.java | 6 +- 10 files changed, 64 insertions(+), 57 deletions(-) diff --git a/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraph.java b/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraph.java index 5433ef577c..d8d5ced7c5 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraph.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraph.java @@ -60,5 +60,5 @@ public interface DirectedGraph extends Graph { /** * It visits all vertices in a post-order traverse way. */ - void accept(V src, DirectedGraphVisitor visitor); + void accept(CONTEXT context, V src, DirectedGraphVisitor visitor); } diff --git a/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraphVisitor.java b/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraphVisitor.java index 139c2b4da6..8e0ce87984 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraphVisitor.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraphVisitor.java @@ -20,6 +20,6 @@ import java.util.Stack; -public interface DirectedGraphVisitor { - void visit(Stack stack, V v); +public interface DirectedGraphVisitor { + void visit(CONTEXT context, Stack stack, V v); } diff --git a/tajo-common/src/main/java/org/apache/tajo/util/graph/SimpleDirectedGraph.java b/tajo-common/src/main/java/org/apache/tajo/util/graph/SimpleDirectedGraph.java index d3a3a1e693..c40338e0bb 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/graph/SimpleDirectedGraph.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/graph/SimpleDirectedGraph.java @@ -219,18 +219,19 @@ public int getParentCount(V block) { } @Override - public void accept(V source, DirectedGraphVisitor visitor) { + public void accept(CONTEXT context, V source, DirectedGraphVisitor visitor) { Stack stack = new Stack<>(); - visitRecursive(stack, source, visitor); + visitRecursive(context, stack, source, visitor); } - private void visitRecursive(Stack stack, V current, DirectedGraphVisitor visitor) { + private void visitRecursive(CONTEXT context, Stack stack, V current, + DirectedGraphVisitor visitor) { stack.push(current); for (V child : getChilds(current)) { - visitRecursive(stack, child, visitor); + visitRecursive(context, stack, child, visitor); } stack.pop(); - visitor.visit(stack, current); + visitor.visit(context, stack, current); } public String toString() { @@ -248,7 +249,7 @@ public String printDepthString(DepthString planStr) { public String toStringGraph(V vertex) { StringBuilder sb = new StringBuilder(); QueryGraphTopologyStringBuilder visitor = new QueryGraphTopologyStringBuilder(); - accept(vertex, visitor); + accept(null, vertex, visitor); Stack depthStrings = visitor.getDepthStrings(); while(!depthStrings.isEmpty()) { sb.append(printDepthString(depthStrings.pop())); @@ -266,11 +267,11 @@ private class DepthString { } } - private class QueryGraphTopologyStringBuilder implements DirectedGraphVisitor { + private class QueryGraphTopologyStringBuilder implements DirectedGraphVisitor { Stack depthString = new Stack<>(); @Override - public void visit(Stack stack, V vertex) { + public void visit(Object context, Stack stack, V vertex) { depthString.push(new DepthString(stack.size(), vertex.toString())); } diff --git a/tajo-common/src/test/java/org/apache/tajo/util/graph/TestSimpleDirectedGraph.java b/tajo-common/src/test/java/org/apache/tajo/util/graph/TestSimpleDirectedGraph.java index 676d39f780..c683728875 100644 --- a/tajo-common/src/test/java/org/apache/tajo/util/graph/TestSimpleDirectedGraph.java +++ b/tajo-common/src/test/java/org/apache/tajo/util/graph/TestSimpleDirectedGraph.java @@ -69,13 +69,13 @@ public final void test() { assertEquals(2, graph.getChildCount(child2)); // visitor - graph.accept(root, new Visitor()); + graph.accept(null, root, new Visitor()); } - private class Visitor implements DirectedGraphVisitor { + private class Visitor implements DirectedGraphVisitor { @Override - public void visit(Stack stack, String s) { + public void visit(Object context, Stack stack, String s) { if(LOG.isDebugEnabled()) { LOG.debug("Element:" + s); } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java index 90ccc3a41a..feaba76e06 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java @@ -215,8 +215,8 @@ public ExecutionBlock getChild(ExecutionBlock executionBlock, int idx) { return getChild(executionBlock.getId(), idx); } - public void accept(ExecutionBlockId v, DirectedGraphVisitor visitor) { - execBlockGraph.accept(v, visitor); + public void accept(CONTEXT context, ExecutionBlockId v, DirectedGraphVisitor visitor) { + execBlockGraph.accept(context, v, visitor); } @Override diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteEngine.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteEngine.java index 7132e78b8a..039dde5e18 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteEngine.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteEngine.java @@ -73,7 +73,7 @@ public MasterPlan rewrite(OverridableConf queryContext, MasterPlan plan) throws for (Map.Entry rewriteRule : rewriteRules.entrySet()) { rule = rewriteRule.getValue(); if (rule.isEligible(queryContext, plan)) { - plan = rule.rewrite(plan); + plan = rule.rewrite(queryContext, plan); if (LOG.isDebugEnabled()) { LOG.debug("The rule \"" + rule.getName() + " \" rewrites the query."); } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteRule.java index f681d2e7a4..0acf956de6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteRule.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteRule.java @@ -48,5 +48,5 @@ public interface GlobalPlanRewriteRule { * @param plan Global Plan * @return */ - MasterPlan rewrite(MasterPlan plan) throws TajoException; + MasterPlan rewrite(OverridableConf queryContext, MasterPlan plan) throws TajoException; } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java index b320a81f87..ffa2b63bc3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java @@ -65,12 +65,10 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule { private BroadcastJoinPlanBuilder planBuilder; private BroadcastJoinPlanFinalizer planFinalizer; - protected void init(MasterPlan plan, long thresholdForNonCrossJoin, long thresholdForCrossJoin, - boolean broadcastForNonCrossJoinEnabled) { + protected void init(MasterPlan plan) { GlobalPlanRewriteUtil.ParentFinder parentFinder = new GlobalPlanRewriteUtil.ParentFinder(); RelationSizeComparator relSizeComparator = new RelationSizeComparator(); - planBuilder = new BroadcastJoinPlanBuilder(plan, relSizeComparator, parentFinder, thresholdForNonCrossJoin, - thresholdForCrossJoin, broadcastForNonCrossJoinEnabled); + planBuilder = new BroadcastJoinPlanBuilder(plan, relSizeComparator, parentFinder); planFinalizer = new BroadcastJoinPlanFinalizer(plan, relSizeComparator); } @@ -90,7 +88,7 @@ public boolean isEligible(OverridableConf queryContext, MasterPlan plan) { (thresholdForNonCrossJoin > 0 || thresholdForCrossJoin > 0)) { for (LogicalPlan.QueryBlock block : plan.getLogicalPlan().getQueryBlocks()) { if (block.hasNode(NodeType.JOIN)) { - init(plan, thresholdForNonCrossJoin, thresholdForCrossJoin, broadcastJoinEnabled); + init(plan); return true; } } @@ -99,9 +97,13 @@ public boolean isEligible(OverridableConf queryContext, MasterPlan plan) { } @Override - public MasterPlan rewrite(MasterPlan plan) throws TajoException { - plan.accept(plan.getRoot().getId(), planBuilder); - plan.accept(plan.getRoot().getId(), planFinalizer); + public MasterPlan rewrite(OverridableConf queryContext, MasterPlan plan) throws TajoException { + long thresholdForNonCrossJoin = queryContext.getLong(SessionVars.BROADCAST_NON_CROSS_JOIN_THRESHOLD) * + StorageUnit.KB; + long thresholdForCrossJoin = queryContext.getLong(SessionVars.BROADCAST_CROSS_JOIN_THRESHOLD) * + StorageUnit.KB; + plan.accept(new Context(thresholdForNonCrossJoin, thresholdForCrossJoin), plan.getRoot().getId(), planBuilder); + plan.accept(null, plan.getRoot().getId(), planFinalizer); return plan; } @@ -125,7 +127,7 @@ public int compare(ScanNode o1, ScanNode o2) { * {@Link BroadcastJoinPlanFinalizer} checks whether every input is the broadcast candidate or not. * If so, it removes the broadcast property from the largest relation. */ - private class BroadcastJoinPlanFinalizer implements DirectedGraphVisitor { + private class BroadcastJoinPlanFinalizer implements DirectedGraphVisitor { private final MasterPlan plan; private final RelationSizeComparator relSizeComparator; @@ -135,7 +137,7 @@ public BroadcastJoinPlanFinalizer(MasterPlan plan, RelationSizeComparator relati } @Override - public void visit(Stack stack, ExecutionBlockId currentId) { + public void visit(Object context, Stack stack, ExecutionBlockId currentId) { ExecutionBlock current = plan.getExecBlock(currentId); if (!plan.isTerminal(current)) { // When every child is a broadcast candidate, enforce non-broadcast for the largest relation for the join to be @@ -150,35 +152,39 @@ public void visit(Stack stack, ExecutionBlockId currentId) { } } - private class BroadcastJoinPlanBuilder implements DirectedGraphVisitor { - private final MasterPlan plan; - private final RelationSizeComparator relSizeComparator; + private static class Context { private final long thresholdForNonCrossJoin; private final long thresholdForCrossJoin; - private final boolean broadcastForNonCrossJoinEnabled; - private final GlobalPlanRewriteUtil.ParentFinder parentFinder; private final Map estimatedEbOutputSize = TUtil.newHashMap(); + public Context(long thresholdForNonCrossJoin, long thresholdForCrossJoin) { + this.thresholdForNonCrossJoin = thresholdForNonCrossJoin; + this.thresholdForCrossJoin = thresholdForCrossJoin; + } + } + + private class BroadcastJoinPlanBuilder implements DirectedGraphVisitor { + private final MasterPlan plan; + private final RelationSizeComparator relSizeComparator; + private final GlobalPlanRewriteUtil.ParentFinder parentFinder; + public BroadcastJoinPlanBuilder(MasterPlan plan, RelationSizeComparator relationSizeComparator, - GlobalPlanRewriteUtil.ParentFinder parentFinder, - long thresholdForNonCrossJoin, long thresholdForCrossJoin, - boolean broadcastForNonCrossJoinEnabled) { + GlobalPlanRewriteUtil.ParentFinder parentFinder) { this.plan = plan; this.relSizeComparator = relationSizeComparator; - this.thresholdForNonCrossJoin = thresholdForNonCrossJoin; - this.thresholdForCrossJoin = thresholdForCrossJoin; +// this.thresholdForNonCrossJoin = thresholdForNonCrossJoin; +// this.thresholdForCrossJoin = thresholdForCrossJoin; this.parentFinder = parentFinder; - this.broadcastForNonCrossJoinEnabled = broadcastForNonCrossJoinEnabled; } @Override - public void visit(Stack stack, ExecutionBlockId executionBlockId) { + public void visit(Context context, Stack stack, ExecutionBlockId executionBlockId) { ExecutionBlock current = plan.getExecBlock(executionBlockId); if (plan.isLeaf(current)) { - visitLeafNode(current); + visitLeafNode(context, current); } else { - visitNonLeafNode(current); + visitNonLeafNode(context, current); } } @@ -187,14 +193,14 @@ public void visit(Stack stack, ExecutionBlockId executionBlock * * @param current */ - private void visitLeafNode(ExecutionBlock current) { + private void visitLeafNode(Context context, ExecutionBlock current) { // Preserved-row relations must not be broadcasted to avoid data duplication. if (!current.isPreservedRow()) { long totalVolume = 0; for (ScanNode scanNode : current.getScanNodes()) { totalVolume += GlobalPlanRewriteUtil.getTableVolume(scanNode); } - estimatedEbOutputSize.put(current.getId(), totalVolume); + context.estimatedEbOutputSize.put(current.getId(), totalVolume); } } @@ -206,7 +212,7 @@ private void visitLeafNode(ExecutionBlock current) { * * @param current */ - private void visitNonLeafNode(ExecutionBlock current) { + private void visitNonLeafNode(Context context, ExecutionBlock current) { // At non-leaf execution blocks, merge broadcastable children's plan with the current plan. if (!plan.isTerminal(current)) { @@ -222,7 +228,7 @@ private void visitNonLeafNode(ExecutionBlock current) { for (ExecutionBlock child : childs) { if (!child.isPreservedRow()) { - updateBroadcastableRelForChildEb(child, joinType); + updateBroadcastableRelForChildEb(context, child, joinType); updateInputBasedOnChildEb(child, current); } } @@ -234,10 +240,10 @@ private void visitNonLeafNode(ExecutionBlock current) { mergeTwoPhaseJoinIfPossible(plan, child, current); } - checkTotalSizeOfBroadcastableRelations(current); + checkTotalSizeOfBroadcastableRelations(context, current); long outputVolume = estimateOutputVolume(current); - estimatedEbOutputSize.put(current.getId(), outputVolume); + context.estimatedEbOutputSize.put(current.getId(), outputVolume); } } else { List relations = TUtil.newList(current.getBroadcastRelations()); @@ -262,8 +268,8 @@ private void updateInputBasedOnChildEb(ExecutionBlock child, ExecutionBlock pare } } - private void updateBroadcastableRelForChildEb(ExecutionBlock child, JoinType joinType) { - long threshold = joinType == JoinType.CROSS ? thresholdForCrossJoin : thresholdForNonCrossJoin; + private void updateBroadcastableRelForChildEb(Context context, ExecutionBlock child, JoinType joinType) { + long threshold = joinType == JoinType.CROSS ? context.thresholdForCrossJoin : context.thresholdForNonCrossJoin; for (ScanNode scanNode : child.getScanNodes()) { long volume = GlobalPlanRewriteUtil.getTableVolume(scanNode); if (volume >= 0 && volume <= threshold) { @@ -382,14 +388,14 @@ private long estimateOutputVolumeInternal(LogicalNode node) throws TajoInternalE * * @param block */ - private void checkTotalSizeOfBroadcastableRelations(ExecutionBlock block) { + private void checkTotalSizeOfBroadcastableRelations(Context context, ExecutionBlock block) { List broadcastCandidates = TUtil.newList(block.getBroadcastRelations()); Collections.sort(broadcastCandidates, relSizeComparator); // Enforce broadcast for candidates in ascending order of relation size long totalBroadcastVolume = 0; - long largeThreshold = thresholdForCrossJoin > thresholdForNonCrossJoin ? - thresholdForCrossJoin : thresholdForNonCrossJoin; + long largeThreshold = context.thresholdForCrossJoin > context.thresholdForNonCrossJoin ? + context.thresholdForCrossJoin : context.thresholdForNonCrossJoin; int i; for (i = 0; i < broadcastCandidates.size(); i++) { long volumeOfCandidate = GlobalPlanRewriteUtil.getTableVolume(broadcastCandidates.get(i)); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java index 9f27eed048..5758f5ece8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java @@ -44,7 +44,7 @@ public boolean isEligible(OverridableConf queryContext, MasterPlan plan) { } @Override - public MasterPlan rewrite(MasterPlan plan) { + public MasterPlan rewrite(OverridableConf queryContext, MasterPlan plan) { try { ExecutionBlockCursor cursor = new ExecutionBlockCursor(plan); for (ExecutionBlock eb : cursor) { diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java index f609db2705..c58cab3a9f 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java @@ -427,7 +427,7 @@ private Schema buildSchemaFromColumnSet(Set columns) { // Build record columns RecordColumnBuilder builder = new RecordColumnBuilder(schemaGraph); for (ColumnVertex eachRoot : rootVertexes) { - schemaGraph.accept(eachRoot, builder); + schemaGraph.accept(null, eachRoot, builder); schema.addColumn(eachRoot.column); } @@ -479,7 +479,7 @@ public void addEdge(ColumnEdge edge) { } } - private static class RecordColumnBuilder implements DirectedGraphVisitor { + private static class RecordColumnBuilder implements DirectedGraphVisitor { private final SchemaGraph graph; public RecordColumnBuilder(SchemaGraph graph) { @@ -487,7 +487,7 @@ public RecordColumnBuilder(SchemaGraph graph) { } @Override - public void visit(Stack stack, ColumnVertex schemaVertex) { + public void visit(Object context, Stack stack, ColumnVertex schemaVertex) { if (graph.isLeaf(schemaVertex)) { schemaVertex.column = new Column(schemaVertex.name, schemaVertex.type); } else { From 1c1ebbe50b2fac38a9094197e1038208b4f7cf34 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 1 Oct 2015 22:58:50 +0900 Subject: [PATCH 2/2] Remove commented out codes --- .../engine/planner/global/rewriter/rules/BroadcastJoinRule.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java index ffa2b63bc3..a33093c948 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java @@ -172,8 +172,6 @@ public BroadcastJoinPlanBuilder(MasterPlan plan, RelationSizeComparator relation GlobalPlanRewriteUtil.ParentFinder parentFinder) { this.plan = plan; this.relSizeComparator = relationSizeComparator; -// this.thresholdForNonCrossJoin = thresholdForNonCrossJoin; -// this.thresholdForCrossJoin = thresholdForCrossJoin; this.parentFinder = parentFinder; }