diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
index 7d47c23a32..36234e061f 100644
--- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
+++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
@@ -90,14 +90,16 @@ public enum SessionVars implements ConfigKey {
// for distributed query strategies
BROADCAST_NON_CROSS_JOIN_THRESHOLD(ConfVars.$DIST_QUERY_BROADCAST_NON_CROSS_JOIN_THRESHOLD,
- "restriction for the total bytes of broadcasted table for non-cross join", DEFAULT, Long.class,
+ "restriction for the total size of broadcasted table for non-cross join (kb)", DEFAULT, Long.class,
Validators.min("0")),
BROADCAST_CROSS_JOIN_THRESHOLD(ConfVars.$DIST_QUERY_BROADCAST_CROSS_JOIN_THRESHOLD,
- "restriction for the total bytes of broadcasted table for cross join", DEFAULT, Long.class, Validators.min("0")),
+ "restriction for the total size of broadcasted table for cross join (kb)", DEFAULT, Long.class,
+ Validators.min("0")),
JOIN_TASK_INPUT_SIZE(ConfVars.$DIST_QUERY_JOIN_TASK_VOLUME, "join task input size (mb) ", DEFAULT,
Integer.class, Validators.min("1")),
- SORT_TASK_INPUT_SIZE(ConfVars.$DIST_QUERY_SORT_TASK_VOLUME, "sort task input size (mb)", DEFAULT),
+ SORT_TASK_INPUT_SIZE(ConfVars.$DIST_QUERY_SORT_TASK_VOLUME, "sort task input size (mb)", DEFAULT,
+ Integer.class, Validators.min("1")),
GROUPBY_TASK_INPUT_SIZE(ConfVars.$DIST_QUERY_GROUPBY_TASK_VOLUME, "group by task input size (mb)", DEFAULT),
JOIN_PER_SHUFFLE_SIZE(ConfVars.$DIST_QUERY_JOIN_PARTITION_VOLUME, "shuffle output size for join (mb)", DEFAULT,
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 75826e69a2..295e63b979 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -202,7 +202,7 @@ public static enum ConfVars implements ConfigKey {
// Query Configuration
QUERY_SESSION_TIMEOUT("tajo.query.session.timeout-sec", 60, Validators.min("0")),
- QUERY_SESSION_QUERY_CACHE_SIZE("tajo.query.session.query-cache-size-kb", 1024, Validators.min("0")),
+ QUERY_SESSION_QUERY_CACHE_SIZE("tajo.query.session.query-cache-size-kb", 0, Validators.min("0")),
// Shuffle Configuration --------------------------------------------------
PULLSERVER_PORT("tajo.pullserver.port", 0, Validators.range("0", "65535")),
@@ -310,14 +310,14 @@ public static enum ConfVars implements ConfigKey {
// Query and Optimization ---------------------------------------------------
// for distributed query strategies
- $DIST_QUERY_BROADCAST_NON_CROSS_JOIN_THRESHOLD("tajo.dist-query.broadcast.non-cross-join.threshold-bytes",
- (long)5 * 1048576), // 5 MB
- $DIST_QUERY_BROADCAST_CROSS_JOIN_THRESHOLD("tajo.dist-query.broadcast.cross-join.threshold-bytes",
- (long)1 * 1048576), // 1 MB
-
- $DIST_QUERY_JOIN_TASK_VOLUME("tajo.dist-query.join.task-volume-mb", 128),
- $DIST_QUERY_SORT_TASK_VOLUME("tajo.dist-query.sort.task-volume-mb", 128),
- $DIST_QUERY_GROUPBY_TASK_VOLUME("tajo.dist-query.groupby.task-volume-mb", 128),
+ $DIST_QUERY_BROADCAST_NON_CROSS_JOIN_THRESHOLD("tajo.dist-query.broadcast.non-cross-join.threshold-kb", 5 * 1024l,
+ Validators.min("0")), // 5 MB
+ $DIST_QUERY_BROADCAST_CROSS_JOIN_THRESHOLD("tajo.dist-query.broadcast.cross-join.threshold-kb", 1 * 1024l,
+ Validators.min("0")), // 1 MB
+
+ $DIST_QUERY_JOIN_TASK_VOLUME("tajo.dist-query.join.task-volume-mb", 64),
+ $DIST_QUERY_SORT_TASK_VOLUME("tajo.dist-query.sort.task-volume-mb", 64),
+ $DIST_QUERY_GROUPBY_TASK_VOLUME("tajo.dist-query.groupby.task-volume-mb", 64),
$DIST_QUERY_JOIN_PARTITION_VOLUME("tajo.dist-query.join.partition-volume-mb", 128, Validators.min("1")),
$DIST_QUERY_GROUPBY_PARTITION_VOLUME("tajo.dist-query.groupby.partition-volume-mb", 256, Validators.min("1")),
$DIST_QUERY_TABLE_PARTITION_VOLUME("tajo.dist-query.table-partition.task-volume-mb", 256, Validators.min("1")),
@@ -328,14 +328,13 @@ public static enum ConfVars implements ConfigKey {
// for physical Executors
$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE("tajo.executor.external-sort.buffer-mb", 200L),
- $EXECUTOR_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.common.in-memory-hash-threshold-bytes",
- (long)256 * 1048576),
- $EXECUTOR_INNER_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.inner.in-memory-hash-threshold-bytes",
- (long)256 * 1048576),
- $EXECUTOR_OUTER_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.outer.in-memory-hash-threshold-bytes",
- (long)256 * 1048576),
- $EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD("tajo.executor.groupby.in-memory-hash-threshold-bytes",
- (long)256 * 1048576),
+ $EXECUTOR_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.common.in-memory-hash-threshold-mb", 64l, Validators.min("0")),
+ $EXECUTOR_INNER_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.inner.in-memory-hash-threshold-mb", 64l,
+ Validators.min("0")),
+ $EXECUTOR_OUTER_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.outer.in-memory-hash-threshold-mb", 64l,
+ Validators.min("0")),
+ $EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD("tajo.executor.groupby.in-memory-hash-threshold-mb", 64l,
+ Validators.min("0")),
$MAX_OUTPUT_FILE_SIZE("tajo.query.max-outfile-size-mb", 0), // zero means infinite
$CODEGEN("tajo.executor.codegen.enabled", false), // Runtime code generation (todo this is broken)
diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java b/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java
index 32cd44e3f4..55d5f46cb1 100644
--- a/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java
+++ b/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java
@@ -33,6 +33,6 @@ public TooLargeInputForCrossJoinException(ReturnState e) {
}
public TooLargeInputForCrossJoinException(String[] relations, long currentBroadcastThreshold) {
- super(ResultCode.TOO_LARGE_INPUT_FOR_CROSS_JOIN, StringUtils.join(relations), "" + currentBroadcastThreshold);
+ super(ResultCode.TOO_LARGE_INPUT_FOR_CROSS_JOIN, StringUtils.join(relations), currentBroadcastThreshold + " MB");
}
}
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
index 4550db9c30..6f665eac41 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -42,6 +42,7 @@
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.worker.TaskAttemptContext;
@@ -205,7 +206,7 @@ public final void testCheckIfInMemoryInnerJoinIsPossible() throws IOException, T
LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- ctx.getQueryContext().setLong(SessionVars.HASH_JOIN_SIZE_LIMIT.keyname(), 100l);
+ ctx.getQueryContext().setLong(SessionVars.HASH_JOIN_SIZE_LIMIT.keyname(), 1); // set hash join limit as 1 MB
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf);
PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
@@ -224,9 +225,9 @@ public final void testCheckIfInMemoryInnerJoinIsPossible() throws IOException, T
* we use some boolean variable leftSmaller to indicate which side is small.
*/
private static boolean assertCheckInnerJoinRelatedFunctions(TaskAttemptContext ctx,
- PhysicalPlannerImpl phyPlanner,
- JoinNode joinNode, BinaryPhysicalExec joinExec) throws
- IOException {
+ PhysicalPlannerImpl phyPlanner,
+ JoinNode joinNode, BinaryPhysicalExec joinExec)
+ throws IOException {
String [] left = PlannerUtil.getRelationLineage(joinNode.getLeftChild());
String [] right = PlannerUtil.getRelationLineage(joinNode.getRightChild());
@@ -268,12 +269,18 @@ private static boolean assertCheckInnerJoinRelatedFunctions(TaskAttemptContext c
assertEquals("default.p", right[0]);
}
+ // To test the behaviour of PhysicalPlannerImpl.checkIfInMemoryInnerJoinIsPossible(),
+ // use a fake value for table volumes.
if (leftSmaller) {
- assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getLeftChild(), true));
- assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getRightChild(), false));
+ assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx,
+ PlannerUtil.getRelationLineage(joinNode.getLeftChild()), 1 * StorageUnit.MB, true));
+ assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx,
+ PlannerUtil.getRelationLineage(joinNode.getRightChild()), 5 * StorageUnit.MB, false));
} else {
- assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getLeftChild(), true));
- assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getRightChild(), false));
+ assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx,
+ PlannerUtil.getRelationLineage(joinNode.getLeftChild()), 5 * StorageUnit.MB, true));
+ assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx,
+ PlannerUtil.getRelationLineage(joinNode.getRightChild()), 1 * StorageUnit.MB, false));
}
return leftSmaller;
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
index 706f201547..4dcf562abb 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
@@ -60,9 +60,9 @@ public TestJoinQuery(String joinOption) throws Exception {
testingCluster.setAllTajoDaemonConfValue(ConfVars.$TEST_BROADCAST_JOIN_ENABLED.varname, "true");
testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_BROADCAST_NON_CROSS_JOIN_THRESHOLD.varname,
- "" + (5 * 1024));
+ "" + 5);
testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_BROADCAST_CROSS_JOIN_THRESHOLD.varname,
- "" + (2 * 1024));
+ "" + 2);
testingCluster.setAllTajoDaemonConfValue(
ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname,
@@ -79,19 +79,19 @@ public TestJoinQuery(String joinOption) throws Exception {
if (joinOption.indexOf("Hash") >= 0) {
testingCluster.setAllTajoDaemonConfValue(
- ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname, String.valueOf(256 * 1048576));
+ ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname, String.valueOf(256));
testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname,
- String.valueOf(256 * 1048576));
+ String.valueOf(256));
testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.varname,
- String.valueOf(256 * 1048576));
+ String.valueOf(256));
}
if (joinOption.indexOf("Sort") >= 0) {
testingCluster.setAllTajoDaemonConfValue(
ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname, String.valueOf(1));
testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname,
- String.valueOf(1));
+ String.valueOf(0));
testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.varname,
- String.valueOf(1));
+ String.valueOf(0));
}
}
diff --git a/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result b/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
index 33443da521..51fb61fb1e 100644
--- a/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
+++ b/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
@@ -18,8 +18,8 @@ Available Session Variables:
\set LC_MONETARY [text value] - Formatting of currency amounts
\set LC_NUMERIC [text value] - Formatting of numbers
\set LC_TIME [text value] - Formatting of dates and times
-\set BROADCAST_NON_CROSS_JOIN_THRESHOLD [long value] - restriction for the total bytes of broadcasted table for non-cross join
-\set BROADCAST_CROSS_JOIN_THRESHOLD [long value] - restriction for the total bytes of broadcasted table for cross join
+\set BROADCAST_NON_CROSS_JOIN_THRESHOLD [long value] - restriction for the total size of broadcasted table for non-cross join (kb)
+\set BROADCAST_CROSS_JOIN_THRESHOLD [long value] - restriction for the total size of broadcasted table for cross join (kb)
\set JOIN_TASK_INPUT_SIZE [int value] - join task input size (mb)
\set SORT_TASK_INPUT_SIZE [int value] - sort task input size (mb)
\set GROUPBY_TASK_INPUT_SIZE [int value] - group by task input size (mb)
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 9847ff6553..36d80da8e1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -53,6 +53,7 @@
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.FileUtil;
import org.apache.tajo.util.StringUtils;
import org.apache.tajo.util.TUtil;
@@ -261,26 +262,33 @@ public long estimateSizeRecursive(TaskAttemptContext ctx, String [] tableIds) th
return size;
}
- @VisibleForTesting
- public boolean checkIfInMemoryInnerJoinIsPossible(TaskAttemptContext context, LogicalNode node, boolean left)
+ private boolean checkIfInMemoryInnerJoinIsPossible(TaskAttemptContext context, LogicalNode node, boolean left)
throws IOException {
String [] lineage = PlannerUtil.getRelationLineage(node);
long volume = estimateSizeRecursive(context, lineage);
- boolean inMemoryInnerJoinFlag = false;
+ return checkIfInMemoryInnerJoinIsPossible(context, lineage, volume, left);
+ }
+
+ @VisibleForTesting
+ public boolean checkIfInMemoryInnerJoinIsPossible(TaskAttemptContext context, String [] lineage, long tableVolume,
+ boolean left) throws IOException {
+ boolean inMemoryInnerJoinFlag;
QueryContext queryContext = context.getQueryContext();
if (queryContext.containsKey(SessionVars.INNER_HASH_JOIN_SIZE_LIMIT)) {
- inMemoryInnerJoinFlag = volume <= context.getQueryContext().getLong(SessionVars.INNER_HASH_JOIN_SIZE_LIMIT);
+ inMemoryInnerJoinFlag = tableVolume <= context.getQueryContext().getLong(SessionVars.INNER_HASH_JOIN_SIZE_LIMIT)
+ * StorageUnit.MB;
} else {
- inMemoryInnerJoinFlag = volume <= context.getQueryContext().getLong(SessionVars.HASH_JOIN_SIZE_LIMIT);
+ inMemoryInnerJoinFlag = tableVolume <= context.getQueryContext().getLong(SessionVars.HASH_JOIN_SIZE_LIMIT)
+ * StorageUnit.MB;
}
LOG.info(String.format("[%s] the volume of %s relations (%s) is %s and is %sfit to main maemory.",
context.getTaskId().toString(),
(left ? "Left" : "Right"),
StringUtils.join(lineage),
- FileUtil.humanReadableByteCount(volume, false),
+ FileUtil.humanReadableByteCount(tableVolume, false),
(inMemoryInnerJoinFlag ? "" : "not ")));
return inMemoryInnerJoinFlag;
}
@@ -484,9 +492,9 @@ private PhysicalExec createBestLeftOuterJoinPlan(TaskAttemptContext context, Joi
QueryContext queryContext = context.getQueryContext();
if (queryContext.containsKey(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT)) {
- hashJoin = rightTableVolume < queryContext.getLong(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT);
+ hashJoin = rightTableVolume <= queryContext.getLong(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT) * StorageUnit.MB;
} else {
- hashJoin = rightTableVolume < queryContext.getLong(SessionVars.HASH_JOIN_SIZE_LIMIT);
+ hashJoin = rightTableVolume <= queryContext.getLong(SessionVars.HASH_JOIN_SIZE_LIMIT) * StorageUnit.MB;
}
if (hashJoin) {
@@ -512,9 +520,9 @@ private PhysicalExec createBestRightJoinPlan(TaskAttemptContext context, JoinNod
QueryContext queryContext = context.getQueryContext();
if (queryContext.containsKey(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT)) {
- hashJoin = leftTableVolume < queryContext.getLong(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT);
+ hashJoin = leftTableVolume <= queryContext.getLong(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT) * StorageUnit.MB;
} else {
- hashJoin = leftTableVolume < queryContext.getLong(SessionVars.HASH_JOIN_SIZE_LIMIT);
+ hashJoin = leftTableVolume <= queryContext.getLong(SessionVars.HASH_JOIN_SIZE_LIMIT)* StorageUnit.MB;
}
if (hashJoin){
@@ -1015,7 +1023,7 @@ private PhysicalExec createBestAggregationPlan(TaskAttemptContext context, Group
String [] outerLineage = PlannerUtil.getRelationLineage(groupbyNode.getChild());
long estimatedSize = estimateSizeRecursive(context, outerLineage);
- final long threshold = context.getQueryContext().getLong(SessionVars.HASH_GROUPBY_SIZE_LIMIT);
+ final long threshold = context.getQueryContext().getLong(SessionVars.HASH_GROUPBY_SIZE_LIMIT) * StorageUnit.MB;
// if the relation size is less than the threshold,
// the hash aggregation will be used.
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java
index dbb92e10bb..b320a81f87 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java
@@ -32,6 +32,7 @@
import org.apache.tajo.plan.joinorder.GreedyHeuristicJoinOrderAlgorithm;
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.util.graph.DirectedGraphVisitor;
@@ -80,8 +81,10 @@ public String getName() {
@Override
public boolean isEligible(OverridableConf queryContext, MasterPlan plan) {
- long thresholdForNonCrossJoin = queryContext.getLong(SessionVars.BROADCAST_NON_CROSS_JOIN_THRESHOLD);
- long thresholdForCrossJoin = queryContext.getLong(SessionVars.BROADCAST_CROSS_JOIN_THRESHOLD);
+ long thresholdForNonCrossJoin = queryContext.getLong(SessionVars.BROADCAST_NON_CROSS_JOIN_THRESHOLD) *
+ StorageUnit.KB;
+ long thresholdForCrossJoin = queryContext.getLong(SessionVars.BROADCAST_CROSS_JOIN_THRESHOLD) *
+ StorageUnit.KB;
boolean broadcastJoinEnabled = queryContext.getBool(SessionVars.TEST_BROADCAST_JOIN_ENABLED);
if (broadcastJoinEnabled &&
(thresholdForNonCrossJoin > 0 || thresholdForCrossJoin > 0)) {
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 12e43664a5..61d26081a9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -1043,14 +1043,47 @@ private static void schedule(Stage stage) throws IOException, TajoException {
* @return
*/
public static int getNonLeafTaskNum(Stage stage) {
+ // This method is assumed to be called only for aggregation or sort.
+ LogicalNode plan = stage.getBlock().getPlan();
+ LogicalNode sortNode = PlannerUtil.findTopNode(plan, NodeType.SORT);
+ LogicalNode groupbyNode = PlannerUtil.findTopNode(plan, NodeType.GROUP_BY);
+
+ // Task volume is assumed to be 64 MB by default.
+ long taskVolume = 64;
+
+ if (groupbyNode != null && sortNode == null) {
+ // aggregation plan
+ taskVolume = stage.getContext().getQueryContext().getLong(SessionVars.GROUPBY_TASK_INPUT_SIZE);
+ } else if (sortNode != null && groupbyNode == null) {
+ // sort plan
+ taskVolume = stage.getContext().getQueryContext().getLong(SessionVars.SORT_TASK_INPUT_SIZE);
+ } else if (sortNode != null /* && groupbyNode != null */) {
+ // NOTE: when the plan includes both aggregation and sort, usually aggregation is executed first.
+ // If not, we need to check the query plan is valid.
+ LogicalNode aggChildOfSort = PlannerUtil.findTopNode(sortNode, NodeType.GROUP_BY);
+ boolean aggFirst = aggChildOfSort != null && aggChildOfSort.equals(groupbyNode);
+ // Set task volume according to the operator which will be executed first.
+ if (aggFirst) {
+ // choose aggregation task volume
+ taskVolume = stage.getContext().getQueryContext().getLong(SessionVars.GROUPBY_TASK_INPUT_SIZE);
+ } else {
+ // choose sort task volume
+ LOG.warn("Sort is executed before aggregation.");
+ taskVolume = stage.getContext().getQueryContext().getLong(SessionVars.SORT_TASK_INPUT_SIZE);
+ }
+ } else {
+ LOG.warn("Task volume is chosen as " + taskVolume + " in unexpected case.");
+ }
+
// Getting intermediate data size
long volume = getInputVolume(stage.getMasterPlan(), stage.context, stage.getBlock());
- int mb = (int) Math.ceil((double)volume / 1048576);
+ int mb = (int) Math.ceil((double)volume / (double)StorageUnit.MB);
LOG.info(stage.getId() + ", Table's volume is approximately " + mb + " MB");
- // determine the number of task per 64MB
- int minTaskNum = Math.max(1, stage.getContext().getQueryMasterContext().getConf().getInt(ConfVars.$TEST_MIN_TASK_NUM.varname, 1));
- int maxTaskNum = Math.max(minTaskNum, (int) Math.ceil((double)mb / 64));
+ // determine the number of task
+ int minTaskNum = Math.max(1, stage.getContext().getQueryMasterContext().getConf().
+ getInt(ConfVars.$TEST_MIN_TASK_NUM.varname, 1));
+ int maxTaskNum = Math.max(minTaskNum, (int) Math.ceil((double)mb / taskVolume));
LOG.info(stage.getId() + ", The determined number of non-leaf tasks is " + maxTaskNum);
return maxTaskNum;
}
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java
index a46d66e344..744256136b 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java
@@ -26,6 +26,7 @@
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.plan.verifier.PostLogicalPlanVerifier.Context;
import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
+import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.TUtil;
import java.util.List;
@@ -89,7 +90,7 @@ public Object visitJoin(Context context, LogicalPlan plan, LogicalPlan.QueryBloc
List largeRelationNames = TUtil.newList();
if (isSimpleRelationNode(node.getLeftChild())) {
- if (getTableVolume((ScanNode) node.getLeftChild()) <= context.bcastLimitForCrossJoin) {
+ if (getTableVolume((ScanNode) node.getLeftChild()) <= context.bcastLimitForCrossJoin * StorageUnit.KB) {
crossJoinAllowed = true;
} else {
largeRelationNames.add(((ScanNode) node.getLeftChild()).getCanonicalName());
@@ -97,7 +98,7 @@ public Object visitJoin(Context context, LogicalPlan plan, LogicalPlan.QueryBloc
}
if (isSimpleRelationNode(node.getRightChild())) {
- if (getTableVolume((ScanNode) node.getRightChild()) <= context.bcastLimitForCrossJoin) {
+ if (getTableVolume((ScanNode) node.getRightChild()) <= context.bcastLimitForCrossJoin * StorageUnit.KB) {
crossJoinAllowed = true;
} else {
largeRelationNames.add(((ScanNode) node.getRightChild()).getCanonicalName());