Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions tajo-common/src/main/java/org/apache/tajo/SessionVars.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,16 @@ public enum SessionVars implements ConfigKey {

// for distributed query strategies
BROADCAST_NON_CROSS_JOIN_THRESHOLD(ConfVars.$DIST_QUERY_BROADCAST_NON_CROSS_JOIN_THRESHOLD,
"restriction for the total bytes of broadcasted table for non-cross join", DEFAULT, Long.class,
"restriction for the total size of broadcasted table for non-cross join (kb)", DEFAULT, Long.class,
Validators.min("0")),
BROADCAST_CROSS_JOIN_THRESHOLD(ConfVars.$DIST_QUERY_BROADCAST_CROSS_JOIN_THRESHOLD,
"restriction for the total bytes of broadcasted table for cross join", DEFAULT, Long.class, Validators.min("0")),
"restriction for the total size of broadcasted table for cross join (kb)", DEFAULT, Long.class,
Validators.min("0")),

JOIN_TASK_INPUT_SIZE(ConfVars.$DIST_QUERY_JOIN_TASK_VOLUME, "join task input size (mb) ", DEFAULT,
Integer.class, Validators.min("1")),
SORT_TASK_INPUT_SIZE(ConfVars.$DIST_QUERY_SORT_TASK_VOLUME, "sort task input size (mb)", DEFAULT),
SORT_TASK_INPUT_SIZE(ConfVars.$DIST_QUERY_SORT_TASK_VOLUME, "sort task input size (mb)", DEFAULT,
Integer.class, Validators.min("1")),
GROUPBY_TASK_INPUT_SIZE(ConfVars.$DIST_QUERY_GROUPBY_TASK_VOLUME, "group by task input size (mb)", DEFAULT),

JOIN_PER_SHUFFLE_SIZE(ConfVars.$DIST_QUERY_JOIN_PARTITION_VOLUME, "shuffle output size for join (mb)", DEFAULT,
Expand Down
33 changes: 16 additions & 17 deletions tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public static enum ConfVars implements ConfigKey {

// Query Configuration
QUERY_SESSION_TIMEOUT("tajo.query.session.timeout-sec", 60, Validators.min("0")),
QUERY_SESSION_QUERY_CACHE_SIZE("tajo.query.session.query-cache-size-kb", 1024, Validators.min("0")),
QUERY_SESSION_QUERY_CACHE_SIZE("tajo.query.session.query-cache-size-kb", 0, Validators.min("0")),

// Shuffle Configuration --------------------------------------------------
PULLSERVER_PORT("tajo.pullserver.port", 0, Validators.range("0", "65535")),
Expand Down Expand Up @@ -310,14 +310,14 @@ public static enum ConfVars implements ConfigKey {
// Query and Optimization ---------------------------------------------------

// for distributed query strategies
$DIST_QUERY_BROADCAST_NON_CROSS_JOIN_THRESHOLD("tajo.dist-query.broadcast.non-cross-join.threshold-bytes",
(long)5 * 1048576), // 5 MB
$DIST_QUERY_BROADCAST_CROSS_JOIN_THRESHOLD("tajo.dist-query.broadcast.cross-join.threshold-bytes",
(long)1 * 1048576), // 1 MB

$DIST_QUERY_JOIN_TASK_VOLUME("tajo.dist-query.join.task-volume-mb", 128),
$DIST_QUERY_SORT_TASK_VOLUME("tajo.dist-query.sort.task-volume-mb", 128),
$DIST_QUERY_GROUPBY_TASK_VOLUME("tajo.dist-query.groupby.task-volume-mb", 128),
$DIST_QUERY_BROADCAST_NON_CROSS_JOIN_THRESHOLD("tajo.dist-query.broadcast.non-cross-join.threshold-kb", 5 * 1024l,
Validators.min("0")), // 5 MB
$DIST_QUERY_BROADCAST_CROSS_JOIN_THRESHOLD("tajo.dist-query.broadcast.cross-join.threshold-kb", 1 * 1024l,
Validators.min("0")), // 1 MB

$DIST_QUERY_JOIN_TASK_VOLUME("tajo.dist-query.join.task-volume-mb", 64),
$DIST_QUERY_SORT_TASK_VOLUME("tajo.dist-query.sort.task-volume-mb", 64),
$DIST_QUERY_GROUPBY_TASK_VOLUME("tajo.dist-query.groupby.task-volume-mb", 64),
$DIST_QUERY_JOIN_PARTITION_VOLUME("tajo.dist-query.join.partition-volume-mb", 128, Validators.min("1")),
$DIST_QUERY_GROUPBY_PARTITION_VOLUME("tajo.dist-query.groupby.partition-volume-mb", 256, Validators.min("1")),
$DIST_QUERY_TABLE_PARTITION_VOLUME("tajo.dist-query.table-partition.task-volume-mb", 256, Validators.min("1")),
Expand All @@ -328,14 +328,13 @@ public static enum ConfVars implements ConfigKey {

// for physical Executors
$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE("tajo.executor.external-sort.buffer-mb", 200L),
$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.common.in-memory-hash-threshold-bytes",
(long)256 * 1048576),
$EXECUTOR_INNER_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.inner.in-memory-hash-threshold-bytes",
(long)256 * 1048576),
$EXECUTOR_OUTER_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.outer.in-memory-hash-threshold-bytes",
(long)256 * 1048576),
$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD("tajo.executor.groupby.in-memory-hash-threshold-bytes",
(long)256 * 1048576),
$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.common.in-memory-hash-threshold-mb", 64l, Validators.min("0")),
$EXECUTOR_INNER_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.inner.in-memory-hash-threshold-mb", 64l,
Validators.min("0")),
$EXECUTOR_OUTER_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.outer.in-memory-hash-threshold-mb", 64l,
Validators.min("0")),
$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD("tajo.executor.groupby.in-memory-hash-threshold-mb", 64l,
Validators.min("0")),
$MAX_OUTPUT_FILE_SIZE("tajo.query.max-outfile-size-mb", 0), // zero means infinite
$CODEGEN("tajo.executor.codegen.enabled", false), // Runtime code generation (todo this is broken)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ public TooLargeInputForCrossJoinException(ReturnState e) {
}

public TooLargeInputForCrossJoinException(String[] relations, long currentBroadcastThreshold) {
super(ResultCode.TOO_LARGE_INPUT_FOR_CROSS_JOIN, StringUtils.join(relations), "" + currentBroadcastThreshold);
super(ResultCode.TOO_LARGE_INPUT_FOR_CROSS_JOIN, StringUtils.join(relations), currentBroadcastThreshold + " MB");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.worker.TaskAttemptContext;
Expand Down Expand Up @@ -205,7 +206,7 @@ public final void testCheckIfInMemoryInnerJoinIsPossible() throws IOException, T
LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);

ctx.getQueryContext().setLong(SessionVars.HASH_JOIN_SIZE_LIMIT.keyname(), 100l);
ctx.getQueryContext().setLong(SessionVars.HASH_JOIN_SIZE_LIMIT.keyname(), 1); // set hash join limit as 1 MB
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf);
PhysicalExec exec = phyPlanner.createPlan(ctx, plan);

Expand All @@ -224,9 +225,9 @@ public final void testCheckIfInMemoryInnerJoinIsPossible() throws IOException, T
* we use some boolean variable <code>leftSmaller</code> to indicate which side is small.
*/
private static boolean assertCheckInnerJoinRelatedFunctions(TaskAttemptContext ctx,
PhysicalPlannerImpl phyPlanner,
JoinNode joinNode, BinaryPhysicalExec joinExec) throws
IOException {
PhysicalPlannerImpl phyPlanner,
JoinNode joinNode, BinaryPhysicalExec joinExec)
throws IOException {

String [] left = PlannerUtil.getRelationLineage(joinNode.getLeftChild());
String [] right = PlannerUtil.getRelationLineage(joinNode.getRightChild());
Expand Down Expand Up @@ -268,12 +269,18 @@ private static boolean assertCheckInnerJoinRelatedFunctions(TaskAttemptContext c
assertEquals("default.p", right[0]);
}

// To test the behaviour of PhysicalPlannerImpl.checkIfInMemoryInnerJoinIsPossible(),
// use a fake value for table volumes.
if (leftSmaller) {
assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getLeftChild(), true));
assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getRightChild(), false));
assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx,
PlannerUtil.getRelationLineage(joinNode.getLeftChild()), 1 * StorageUnit.MB, true));
assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx,
PlannerUtil.getRelationLineage(joinNode.getRightChild()), 5 * StorageUnit.MB, false));
} else {
assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getLeftChild(), true));
assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getRightChild(), false));
assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx,
PlannerUtil.getRelationLineage(joinNode.getLeftChild()), 5 * StorageUnit.MB, true));
assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx,
PlannerUtil.getRelationLineage(joinNode.getRightChild()), 1 * StorageUnit.MB, false));
}

return leftSmaller;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ public TestJoinQuery(String joinOption) throws Exception {

testingCluster.setAllTajoDaemonConfValue(ConfVars.$TEST_BROADCAST_JOIN_ENABLED.varname, "true");
testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_BROADCAST_NON_CROSS_JOIN_THRESHOLD.varname,
"" + (5 * 1024));
"" + 5);
testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_BROADCAST_CROSS_JOIN_THRESHOLD.varname,
"" + (2 * 1024));
"" + 2);

testingCluster.setAllTajoDaemonConfValue(
ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname,
Expand All @@ -79,19 +79,19 @@ public TestJoinQuery(String joinOption) throws Exception {

if (joinOption.indexOf("Hash") >= 0) {
testingCluster.setAllTajoDaemonConfValue(
ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname, String.valueOf(256 * 1048576));
ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname, String.valueOf(256));
testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname,
String.valueOf(256 * 1048576));
String.valueOf(256));
testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.varname,
String.valueOf(256 * 1048576));
String.valueOf(256));
}
if (joinOption.indexOf("Sort") >= 0) {
testingCluster.setAllTajoDaemonConfValue(
ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname, String.valueOf(1));
testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname,
String.valueOf(1));
String.valueOf(0));
testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.varname,
String.valueOf(1));
String.valueOf(0));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ Available Session Variables:
\set LC_MONETARY [text value] - Formatting of currency amounts
\set LC_NUMERIC [text value] - Formatting of numbers
\set LC_TIME [text value] - Formatting of dates and times
\set BROADCAST_NON_CROSS_JOIN_THRESHOLD [long value] - restriction for the total bytes of broadcasted table for non-cross join
\set BROADCAST_CROSS_JOIN_THRESHOLD [long value] - restriction for the total bytes of broadcasted table for cross join
\set BROADCAST_NON_CROSS_JOIN_THRESHOLD [long value] - restriction for the total size of broadcasted table for non-cross join (kb)
\set BROADCAST_CROSS_JOIN_THRESHOLD [long value] - restriction for the total size of broadcasted table for cross join (kb)
\set JOIN_TASK_INPUT_SIZE [int value] - join task input size (mb)
\set SORT_TASK_INPUT_SIZE [int value] - sort task input size (mb)
\set GROUPBY_TASK_INPUT_SIZE [int value] - group by task input size (mb)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.FileUtil;
import org.apache.tajo.util.StringUtils;
import org.apache.tajo.util.TUtil;
Expand Down Expand Up @@ -261,26 +262,33 @@ public long estimateSizeRecursive(TaskAttemptContext ctx, String [] tableIds) th
return size;
}

@VisibleForTesting
public boolean checkIfInMemoryInnerJoinIsPossible(TaskAttemptContext context, LogicalNode node, boolean left)
private boolean checkIfInMemoryInnerJoinIsPossible(TaskAttemptContext context, LogicalNode node, boolean left)
throws IOException {
String [] lineage = PlannerUtil.getRelationLineage(node);
long volume = estimateSizeRecursive(context, lineage);
boolean inMemoryInnerJoinFlag = false;
return checkIfInMemoryInnerJoinIsPossible(context, lineage, volume, left);
}

@VisibleForTesting
public boolean checkIfInMemoryInnerJoinIsPossible(TaskAttemptContext context, String [] lineage, long tableVolume,
boolean left) throws IOException {
boolean inMemoryInnerJoinFlag;

QueryContext queryContext = context.getQueryContext();

if (queryContext.containsKey(SessionVars.INNER_HASH_JOIN_SIZE_LIMIT)) {
inMemoryInnerJoinFlag = volume <= context.getQueryContext().getLong(SessionVars.INNER_HASH_JOIN_SIZE_LIMIT);
inMemoryInnerJoinFlag = tableVolume <= context.getQueryContext().getLong(SessionVars.INNER_HASH_JOIN_SIZE_LIMIT)
* StorageUnit.MB;
} else {
inMemoryInnerJoinFlag = volume <= context.getQueryContext().getLong(SessionVars.HASH_JOIN_SIZE_LIMIT);
inMemoryInnerJoinFlag = tableVolume <= context.getQueryContext().getLong(SessionVars.HASH_JOIN_SIZE_LIMIT)
* StorageUnit.MB;
}

LOG.info(String.format("[%s] the volume of %s relations (%s) is %s and is %sfit to main maemory.",
context.getTaskId().toString(),
(left ? "Left" : "Right"),
StringUtils.join(lineage),
FileUtil.humanReadableByteCount(volume, false),
FileUtil.humanReadableByteCount(tableVolume, false),
(inMemoryInnerJoinFlag ? "" : "not ")));
return inMemoryInnerJoinFlag;
}
Expand Down Expand Up @@ -484,9 +492,9 @@ private PhysicalExec createBestLeftOuterJoinPlan(TaskAttemptContext context, Joi
QueryContext queryContext = context.getQueryContext();

if (queryContext.containsKey(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT)) {
hashJoin = rightTableVolume < queryContext.getLong(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT);
hashJoin = rightTableVolume <= queryContext.getLong(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT) * StorageUnit.MB;
} else {
hashJoin = rightTableVolume < queryContext.getLong(SessionVars.HASH_JOIN_SIZE_LIMIT);
hashJoin = rightTableVolume <= queryContext.getLong(SessionVars.HASH_JOIN_SIZE_LIMIT) * StorageUnit.MB;
}

if (hashJoin) {
Expand All @@ -512,9 +520,9 @@ private PhysicalExec createBestRightJoinPlan(TaskAttemptContext context, JoinNod
QueryContext queryContext = context.getQueryContext();

if (queryContext.containsKey(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT)) {
hashJoin = leftTableVolume < queryContext.getLong(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT);
hashJoin = leftTableVolume <= queryContext.getLong(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT) * StorageUnit.MB;
} else {
hashJoin = leftTableVolume < queryContext.getLong(SessionVars.HASH_JOIN_SIZE_LIMIT);
hashJoin = leftTableVolume <= queryContext.getLong(SessionVars.HASH_JOIN_SIZE_LIMIT)* StorageUnit.MB;
}

if (hashJoin){
Expand Down Expand Up @@ -1015,7 +1023,7 @@ private PhysicalExec createBestAggregationPlan(TaskAttemptContext context, Group

String [] outerLineage = PlannerUtil.getRelationLineage(groupbyNode.getChild());
long estimatedSize = estimateSizeRecursive(context, outerLineage);
final long threshold = context.getQueryContext().getLong(SessionVars.HASH_GROUPBY_SIZE_LIMIT);
final long threshold = context.getQueryContext().getLong(SessionVars.HASH_GROUPBY_SIZE_LIMIT) * StorageUnit.MB;

// if the relation size is less than the threshold,
// the hash aggregation will be used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.tajo.plan.joinorder.GreedyHeuristicJoinOrderAlgorithm;
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.util.graph.DirectedGraphVisitor;

Expand Down Expand Up @@ -80,8 +81,10 @@ public String getName() {

@Override
public boolean isEligible(OverridableConf queryContext, MasterPlan plan) {
long thresholdForNonCrossJoin = queryContext.getLong(SessionVars.BROADCAST_NON_CROSS_JOIN_THRESHOLD);
long thresholdForCrossJoin = queryContext.getLong(SessionVars.BROADCAST_CROSS_JOIN_THRESHOLD);
long thresholdForNonCrossJoin = queryContext.getLong(SessionVars.BROADCAST_NON_CROSS_JOIN_THRESHOLD) *
StorageUnit.KB;
long thresholdForCrossJoin = queryContext.getLong(SessionVars.BROADCAST_CROSS_JOIN_THRESHOLD) *
StorageUnit.KB;
boolean broadcastJoinEnabled = queryContext.getBool(SessionVars.TEST_BROADCAST_JOIN_ENABLED);
if (broadcastJoinEnabled &&
(thresholdForNonCrossJoin > 0 || thresholdForCrossJoin > 0)) {
Expand Down
41 changes: 37 additions & 4 deletions tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
Original file line number Diff line number Diff line change
Expand Up @@ -1043,14 +1043,47 @@ private static void schedule(Stage stage) throws IOException, TajoException {
* @return
*/
public static int getNonLeafTaskNum(Stage stage) {
// This method is assumed to be called only for aggregation or sort.
LogicalNode plan = stage.getBlock().getPlan();
LogicalNode sortNode = PlannerUtil.findTopNode(plan, NodeType.SORT);
LogicalNode groupbyNode = PlannerUtil.findTopNode(plan, NodeType.GROUP_BY);

// Task volume is assumed to be 64 MB by default.
long taskVolume = 64;

if (groupbyNode != null && sortNode == null) {
// aggregation plan
taskVolume = stage.getContext().getQueryContext().getLong(SessionVars.GROUPBY_TASK_INPUT_SIZE);
} else if (sortNode != null && groupbyNode == null) {
// sort plan
taskVolume = stage.getContext().getQueryContext().getLong(SessionVars.SORT_TASK_INPUT_SIZE);
} else if (sortNode != null /* && groupbyNode != null */) {
// NOTE: when the plan includes both aggregation and sort, usually aggregation is executed first.
// If not, we need to check the query plan is valid.
LogicalNode aggChildOfSort = PlannerUtil.findTopNode(sortNode, NodeType.GROUP_BY);
boolean aggFirst = aggChildOfSort != null && aggChildOfSort.equals(groupbyNode);
// Set task volume according to the operator which will be executed first.
if (aggFirst) {
// choose aggregation task volume
taskVolume = stage.getContext().getQueryContext().getLong(SessionVars.GROUPBY_TASK_INPUT_SIZE);
} else {
// choose sort task volume
LOG.warn("Sort is executed before aggregation.");
taskVolume = stage.getContext().getQueryContext().getLong(SessionVars.SORT_TASK_INPUT_SIZE);
}
} else {
LOG.warn("Task volume is chosen as " + taskVolume + " in unexpected case.");
}

// Getting intermediate data size
long volume = getInputVolume(stage.getMasterPlan(), stage.context, stage.getBlock());

int mb = (int) Math.ceil((double)volume / 1048576);
int mb = (int) Math.ceil((double)volume / (double)StorageUnit.MB);
LOG.info(stage.getId() + ", Table's volume is approximately " + mb + " MB");
// determine the number of task per 64MB
int minTaskNum = Math.max(1, stage.getContext().getQueryMasterContext().getConf().getInt(ConfVars.$TEST_MIN_TASK_NUM.varname, 1));
int maxTaskNum = Math.max(minTaskNum, (int) Math.ceil((double)mb / 64));
// determine the number of task
int minTaskNum = Math.max(1, stage.getContext().getQueryMasterContext().getConf().
getInt(ConfVars.$TEST_MIN_TASK_NUM.varname, 1));
int maxTaskNum = Math.max(minTaskNum, (int) Math.ceil((double)mb / taskVolume));
LOG.info(stage.getId() + ", The determined number of non-leaf tasks is " + maxTaskNum);
return maxTaskNum;
}
Expand Down
Loading