Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 0 additions & 25 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -161,21 +161,6 @@ Release 0.9.0 - unreleased

BUG FIXES

TAJO-1067: INSERT OVERWRITE INTO should not remove all partitions. (jaehwa)

TAJO-1065: The \admin -cluster argument doesn't run as expected.
(Jongyoung Park via hyunsik)

TAJO-1072: CLI gets stuck when wrong host/port is provided.
(Jihun Kang via hyunsik)

TAJO-1081: Non-forwarded (simple) query shows wrong rows. (hyunsik)

TAJO-981: Help command (\?) in tsql takes too long time. (YeonSu Han via
jaehwa)

TAJO-962: Column reference used in LIMIT clause incurs NPE.

TAJO-1074: Query calculates wrong progress before subquery init. (jinho)

TAJO-1025: Network disconnection during query processing can cause
Expand Down Expand Up @@ -457,12 +442,6 @@ Release 0.9.0 - unreleased

TASKS

TAJO-668: Add datetime function documentation. (Jongyoung Park via hyunsik)

TAJO-1077: Add Derby configuration documentation. (hyunsik)

TAJO-1068: Add SQL Query documentation. (hyunsik)

TAJO-1078: Update contributor list. (hyunsik)

TAJO-1070: BSTIndexScanExec should not seek a negative offset. (jinho)
Expand Down Expand Up @@ -500,10 +479,6 @@ Release 0.9.0 - unreleased

SUB TASKS

TAJO-1096: Update download source documentation (Mai Hai Thanh via jaehwa)

TAJO-1062: Update TSQL documentation. (jaehwa)

TAJO-1061: TAJO-1061: Update build documentation. (jaehwa)

TAJO-1060: Apply updated hadoop versions to README and BUILDING files.
Expand Down
2 changes: 2 additions & 0 deletions tajo-common/src/main/java/org/apache/tajo/SessionVars.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ public enum SessionVars implements ConfigKey {
TABLE_PARTITION_PER_SHUFFLE_SIZE(ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME,
"shuffle output size for partition table write (mb)", DEFAULT),

GROUPBY_MULTI_LEVEL_ENABLED(ConfVars.$GROUPBY_MULTI_LEVEL_ENABLED, "Multiple level groupby enabled", DEFAULT),

// for physical Executors
EXTSORT_BUFFER_SIZE(ConfVars.$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE, "sort buffer size for external sort (mb)", DEFAULT),
HASH_JOIN_SIZE_LIMIT(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD, "limited size for hash join (mb)", DEFAULT),
Expand Down
2 changes: 2 additions & 0 deletions tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,8 @@ public static enum ConfVars implements ConfigKey {
$DIST_QUERY_GROUPBY_PARTITION_VOLUME("tajo.dist-query.groupby.partition-volume-mb", 256),
$DIST_QUERY_TABLE_PARTITION_VOLUME("tajo.dist-query.table-partition.task-volume-mb", 256),

$GROUPBY_MULTI_LEVEL_ENABLED("tajo.dist-query.groupby.multi-level-aggr", true),

// for physical Executors
$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE("tajo.executor.external-sort.buffer-mb", 200L),
$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.common.in-memory-hash-threshold-bytes",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@

public class AggregationFunctionCallEval extends FunctionEval implements Cloneable {
@Expose protected AggFunction instance;
@Expose boolean firstPhase = false;
@Expose boolean intermediatePhase = false;
@Expose boolean finalPhase = true;
@Expose String alias;

private Tuple params;

protected AggregationFunctionCallEval(EvalType type, FunctionDesc desc, AggFunction instance, EvalNode[] givenArgs) {
Expand Down Expand Up @@ -58,7 +61,8 @@ public void merge(FunctionContext context, Schema schema, Tuple tuple) {
}
}

if (firstPhase) {
if (!intermediatePhase && !finalPhase) {
// firstPhase
instance.eval(context, params);
} else {
instance.merge(context, params);
Expand All @@ -71,7 +75,7 @@ public Datum eval(Schema schema, Tuple tuple) {
}

public Datum terminate(FunctionContext context) {
if (firstPhase) {
if (!finalPhase) {
return instance.getPartialResult(context);
} else {
return instance.terminate(context);
Expand All @@ -80,18 +84,40 @@ public Datum terminate(FunctionContext context) {

@Override
public DataType getValueType() {
if (firstPhase) {
if (!finalPhase) {
return instance.getPartialResultType();
} else {
return funcDesc.getReturnType();
}
}

public void setAlias(String alias) { this.alias = alias; }

public String getAlias() { return this.alias; }

public Object clone() throws CloneNotSupportedException {
return super.clone();
AggregationFunctionCallEval clone = (AggregationFunctionCallEval)super.clone();

clone.finalPhase = finalPhase;
clone.intermediatePhase = intermediatePhase;
clone.alias = alias;
clone.instance = (AggFunction)instance.clone();

return clone;
}

public void setFirstPhase() {
this.firstPhase = true;
this.finalPhase = false;
this.intermediatePhase = false;
}

public void setFinalPhase() {
this.finalPhase = true;
this.intermediatePhase = false;
}

public void setIntermediatePhase() {
this.finalPhase = false;
this.intermediatePhase = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer;
import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm;
import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray;
import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.storage.StorageConstants;
Expand All @@ -56,6 +57,7 @@

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import java.util.Stack;

Expand Down Expand Up @@ -1047,17 +1049,61 @@ public PhysicalExec createDistinctGroupByPlan(TaskAttemptContext context,
Enforcer enforcer = context.getEnforcer();
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, distinctNode);
if (property != null) {
DistinctAggregationAlgorithm algorithm = property.getDistinct().getAlgorithm();
if (algorithm == DistinctAggregationAlgorithm.HASH_AGGREGATION) {
return createInMemoryDistinctGroupbyExec(context, distinctNode, subOp);
if (property.getDistinct().getIsMultipleAggregation()) {
MultipleAggregationStage stage = property.getDistinct().getMultipleAggregationStage();

if (stage == MultipleAggregationStage.FIRST_STAGE) {
return new DistinctGroupbyFirstAggregationExec(context, distinctNode, subOp);
} else if (stage == MultipleAggregationStage.SECOND_STAGE) {
return new DistinctGroupbySecondAggregationExec(context, distinctNode,
createSortExecForDistinctGroupby(context, distinctNode, subOp, 2));
} else {
return new DistinctGroupbyThirdAggregationExec(context, distinctNode,
createSortExecForDistinctGroupby(context, distinctNode, subOp, 3));
}
} else {
return createSortAggregationDistinctGroupbyExec(context, distinctNode, subOp, property.getDistinct());
DistinctAggregationAlgorithm algorithm = property.getDistinct().getAlgorithm();
if (algorithm == DistinctAggregationAlgorithm.HASH_AGGREGATION) {
return createInMemoryDistinctGroupbyExec(context, distinctNode, subOp);
} else {
return createSortAggregationDistinctGroupbyExec(context, distinctNode, subOp, property.getDistinct());
}
}
} else {
return createInMemoryDistinctGroupbyExec(context, distinctNode, subOp);
}
}

private SortExec createSortExecForDistinctGroupby(TaskAttemptContext context,
DistinctGroupbyNode distinctNode,
PhysicalExec subOp,
int phase) throws IOException {
SortNode sortNode = LogicalPlan.createNodeWithoutPID(SortNode.class);
//2 phase: seq, groupby columns, distinct1 keys, distinct2 keys,
//3 phase: groupby columns, seq, distinct1 keys, distinct2 keys,
List<SortSpec> sortSpecs = new ArrayList<SortSpec>();
if (phase == 2) {
sortSpecs.add(new SortSpec(distinctNode.getTargets()[0].getNamedColumn()));
}
for (Column eachColumn: distinctNode.getGroupingColumns()) {
sortSpecs.add(new SortSpec(eachColumn));
}
if (phase == 3) {
sortSpecs.add(new SortSpec(distinctNode.getTargets()[0].getNamedColumn()));
}
for (GroupbyNode eachGroupbyNode: distinctNode.getGroupByNodes()) {
for (Column eachColumn: eachGroupbyNode.getGroupingColumns()) {
sortSpecs.add(new SortSpec(eachColumn));
}
}
sortNode.setSortSpecs(sortSpecs.toArray(new SortSpec[]{}));
sortNode.setInSchema(distinctNode.getInSchema());
sortNode.setOutSchema(distinctNode.getInSchema());
ExternalSortExec sortExec = new ExternalSortExec(context, sm, sortNode, subOp);

return sortExec;
}

private PhysicalExec createInMemoryDistinctGroupbyExec(TaskAttemptContext ctx,
DistinctGroupbyNode distinctGroupbyNode, PhysicalExec subOp) throws IOException {
return new DistinctGroupbyHashAggregationExec(ctx, distinctGroupbyNode, subOp);
Expand Down Expand Up @@ -1145,7 +1191,7 @@ public PhysicalExec createIndexScanExec(TaskAttemptContext ctx,

}

private EnforceProperty getAlgorithmEnforceProperty(Enforcer enforcer, LogicalNode node) {
public static EnforceProperty getAlgorithmEnforceProperty(Enforcer enforcer, LogicalNode node) {
if (enforcer == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.common.ProtoObject;
import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm;
import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray;
import org.apache.tajo.util.TUtil;

Expand Down Expand Up @@ -135,13 +136,25 @@ public void enforceHashAggregation(int pid) {
public void enforceDistinctAggregation(int pid,
DistinctAggregationAlgorithm algorithm,
List<SortSpecArray> sortSpecArrays) {
enforceDistinctAggregation(pid, false, null, algorithm, sortSpecArrays);
}

public void enforceDistinctAggregation(int pid,
boolean isMultipleAggregation,
MultipleAggregationStage stage,
DistinctAggregationAlgorithm algorithm,
List<SortSpecArray> sortSpecArrays) {
EnforceProperty.Builder builder = newProperty();
DistinctGroupbyEnforcer.Builder enforce = DistinctGroupbyEnforcer.newBuilder();
enforce.setPid(pid);
enforce.setIsMultipleAggregation(isMultipleAggregation);
enforce.setAlgorithm(algorithm);
if (sortSpecArrays != null) {
enforce.addAllSortSpecArrays(sortSpecArrays);
}
if (stage != null) {
enforce.setMultipleAggregationStage(stage);
}

builder.setType(EnforceType.DISTINCT_GROUP_BY);
builder.setDistinct(enforce.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -658,47 +658,6 @@ private RewrittenFunctions rewriteAggFunctionsForDistinctAggregation(GlobalPlanC
return rewritten;
}

public ExecutionBlock buildDistinctGroupbyAndUnionPlan(MasterPlan masterPlan, ExecutionBlock lastBlock,
DistinctGroupbyNode firstPhaseGroupBy,
DistinctGroupbyNode secondPhaseGroupBy) {
DataChannel lastDataChannel = null;

// It pushes down the first phase group-by operator into all child blocks.
//
// (second phase) G (currentBlock)
// /|\
// / / | \
// (first phase) G G G G (child block)

// They are already connected one another.
// So, we don't need to connect them again.
for (DataChannel dataChannel : masterPlan.getIncomingChannels(lastBlock.getId())) {
if (firstPhaseGroupBy.isEmptyGrouping()) {
dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 1);
} else {
dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 32);
}
dataChannel.setSchema(firstPhaseGroupBy.getOutSchema());
ExecutionBlock childBlock = masterPlan.getExecBlock(dataChannel.getSrcId());

// Why must firstPhaseGroupby be copied?
//
// A groupby in each execution block can have different child.
// It affects groupby's input schema.
DistinctGroupbyNode firstPhaseGroupbyCopy = PlannerUtil.clone(masterPlan.getLogicalPlan(), firstPhaseGroupBy);
firstPhaseGroupbyCopy.setChild(childBlock.getPlan());
childBlock.setPlan(firstPhaseGroupbyCopy);

// just keep the last data channel.
lastDataChannel = dataChannel;
}

ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), lastDataChannel);
secondPhaseGroupBy.setChild(scanNode);
lastBlock.setPlan(secondPhaseGroupBy);
return lastBlock;
}

/**
* If there are at least one distinct aggregation function, a query works as if the query is rewritten as follows:
*
Expand Down Expand Up @@ -824,8 +783,20 @@ private ExecutionBlock buildGroupBy(GlobalPlanContext context, ExecutionBlock la
ExecutionBlock currentBlock;

if (groupbyNode.isDistinct()) { // if there is at one distinct aggregation function
DistinctGroupbyBuilder builder = new DistinctGroupbyBuilder(this);
return builder.buildPlan(context, lastBlock, groupbyNode);
boolean multiLevelEnabled = context.getPlan().getContext().getBool(SessionVars.GROUPBY_MULTI_LEVEL_ENABLED);

if (multiLevelEnabled) {
if (PlannerUtil.findTopNode(groupbyNode, NodeType.UNION) == null) {
DistinctGroupbyBuilder builder = new DistinctGroupbyBuilder(this);
return builder.buildMultiLevelPlan(context, lastBlock, groupbyNode);
} else {
DistinctGroupbyBuilder builder = new DistinctGroupbyBuilder(this);
return builder.buildPlan(context, lastBlock, groupbyNode);
}
} else {
DistinctGroupbyBuilder builder = new DistinctGroupbyBuilder(this);
return builder.buildPlan(context, lastBlock, groupbyNode);
}
} else {
GroupbyNode firstPhaseGroupby = createFirstPhaseGroupBy(masterPlan.getLogicalPlan(), groupbyNode);

Expand Down Expand Up @@ -968,6 +939,7 @@ public static GroupbyNode createFirstPhaseGroupBy(LogicalPlan plan, GroupbyNode
firstPhaseEvals[i].setFirstPhase();
firstPhaseEvalNames[i] = plan.generateUniqueColumnName(firstPhaseEvals[i]);
FieldEval param = new FieldEval(firstPhaseEvalNames[i], firstPhaseEvals[i].getValueType());
secondPhaseEvals[i].setFinalPhase();
secondPhaseEvals[i].setArgs(new EvalNode[] {param});
}

Expand Down
Loading