Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ public TajoConf getConf() {
* @param query a query for execution
* @param condition this parameter means whether it is for success case or is not for failure case.
* @return
* @throws PlanningException
*/
private static Target[] getRawTargets(QueryContext context, String query, boolean condition)
throws TajoException, InvalidStatementException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.SortSpecArray;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.plan.PlanningException;
import org.apache.tajo.plan.Target;
import org.apache.tajo.plan.expr.AggregationFunctionCallEval;
import org.apache.tajo.plan.expr.EvalNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.tajo.OverridableConf;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.exception.TajoException;
import org.apache.tajo.plan.PlanningException;

/**
* A rewrite rule for global plans
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanRewriteRule;
import org.apache.tajo.exception.TajoException;
import org.apache.tajo.exception.TajoInternalError;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.PlanningException;
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.util.TUtil;
Expand Down Expand Up @@ -181,7 +181,7 @@ private void visitLeafNode(ExecutionBlock current) {
if (fullyBroadcastable && current.getScanNodes().length == 1) {
try {
updateScanOfParentAsBroadcastable(plan, current);
} catch (PlanningException e) {
} catch (NoScanNodeForChildEbException e) {
// This case is when the current has two or more inputs via union, and simply ignored.
}
}
Expand All @@ -199,24 +199,16 @@ private void visitNonLeafNode(ExecutionBlock current) {
if (current.hasBroadcastRelation()) {
// The current execution block and its every child are able to be merged.
for (ExecutionBlock child : childs) {
try {
addUnionNodeIfNecessary(unionScanMap, plan, child, current);
mergeTwoPhaseJoin(plan, child, current);
} catch (PlanningException e) {
throw new RuntimeException(e);
}
addUnionNodeIfNecessary(unionScanMap, plan, child, current);
mergeTwoPhaseJoin(plan, child, current);
}

checkTotalSizeOfBroadcastableRelations(current);

// We assume that if every input of an execution block is broadcastable,
// the output of the execution block is also broadcastable.
if (!current.isPreservedRow() && isFullyBroadcastable(current)) {
try {
updateScanOfParentAsBroadcastable(plan, current);
} catch (PlanningException e) {
throw new RuntimeException(e);
}
updateScanOfParentAsBroadcastable(plan, current);
}
}
} else {
Expand Down Expand Up @@ -261,10 +253,10 @@ private void checkTotalSizeOfBroadcastableRelations(ExecutionBlock block) {
}
}

private void updateScanOfParentAsBroadcastable(MasterPlan plan, ExecutionBlock current) throws PlanningException {
private void updateScanOfParentAsBroadcastable(MasterPlan plan, ExecutionBlock current) {
ExecutionBlock parent = plan.getParent(current);
if (parent != null && !plan.isTerminal(parent)) {
ScanNode scanForCurrent = GlobalPlanRewriteUtil.findScanForChildEb(current, parent);
ScanNode scanForCurrent = findScanForChildEb(current, parent);
parent.addBroadcastRelation(scanForCurrent);
}
}
Expand All @@ -277,9 +269,8 @@ private void updateScanOfParentAsBroadcastable(MasterPlan plan, ExecutionBlock c
* @param parent parent block who has join nodes
* @return
*/
private ExecutionBlock mergeTwoPhaseJoin(MasterPlan plan, ExecutionBlock child, ExecutionBlock parent)
throws PlanningException {
ScanNode scanForChild = GlobalPlanRewriteUtil.findScanForChildEb(child, parent);
private ExecutionBlock mergeTwoPhaseJoin(MasterPlan plan, ExecutionBlock child, ExecutionBlock parent) {
ScanNode scanForChild = findScanForChildEb(child, parent);

parentFinder.set(scanForChild);
parentFinder.find(parent.getPlan());
Expand All @@ -301,8 +292,7 @@ private ExecutionBlock mergeTwoPhaseJoin(MasterPlan plan, ExecutionBlock child,
}

private void addUnionNodeIfNecessary(Map<ExecutionBlockId, ExecutionBlockId> unionScanMap, MasterPlan plan,
ExecutionBlock child, ExecutionBlock current)
throws PlanningException {
ExecutionBlock child, ExecutionBlock current) {
if (unionScanMap != null) {
List<ExecutionBlockId> unionScans = TUtil.newList();
ExecutionBlockId representativeId = null;
Expand All @@ -326,14 +316,15 @@ private void addUnionNodeIfNecessary(Map<ExecutionBlockId, ExecutionBlockId> uni
// left must not be null
UnionNode unionNode = plan.getLogicalPlan().createNode(UnionNode.class);
unionNode.setLeftChild(left);
unionNode.setRightChild(GlobalPlanner.buildInputExecutor(plan.getLogicalPlan(), plan.getChannel(unionScans.get(i), current.getId())));
unionNode.setRightChild(GlobalPlanner.buildInputExecutor(plan.getLogicalPlan(),
plan.getChannel(unionScans.get(i), current.getId())));
unionNode.setInSchema(left.getOutSchema());
unionNode.setOutSchema(left.getOutSchema());
topUnion = unionNode;
left = unionNode;
}

ScanNode scanForChild = GlobalPlanRewriteUtil.findScanForChildEb(plan.getExecBlock(representativeId), current);
ScanNode scanForChild = findScanForChildEb(plan.getExecBlock(representativeId), current);
PlannerUtil.replaceNode(plan.getLogicalPlan(), current.getPlan(), scanForChild, topUnion);

current.getUnionScanMap().clear();
Expand All @@ -346,4 +337,32 @@ private void addUnionNodeIfNecessary(Map<ExecutionBlockId, ExecutionBlockId> uni
private static boolean isFullyBroadcastable(ExecutionBlock block) {
return block.getBroadcastRelations().size() == block.getScanNodes().length;
}

/**
* Find a scan node in the plan of the parent EB corresponding to the output of the child EB.
*
* @param child
* @param parent
* @return ScanNode
*/
private static ScanNode findScanForChildEb(ExecutionBlock child, ExecutionBlock parent) {
ScanNode scanForChild = null;
for (ScanNode scanNode : parent.getScanNodes()) {
if (scanNode.getTableName().equals(child.getId().toString())) {
scanForChild = scanNode;
break;
}
}
if (scanForChild == null) {
throw new NoScanNodeForChildEbException(
"cannot find any scan nodes for " + child.getId() + " in " + parent.getId());
}
return scanForChild;
}

private static class NoScanNodeForChildEbException extends RuntimeException {
NoScanNodeForChildEbException(String message) {
super(message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.planner.global.ExecutionBlock;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.plan.PlanningException;
import org.apache.tajo.exception.TajoInternalError;
import org.apache.tajo.plan.logical.*;

import java.util.List;
Expand Down Expand Up @@ -71,10 +71,8 @@ public static ExecutionBlock mergeExecutionBlocks(MasterPlan plan, ExecutionBloc
* @param newChild
* @param originalChild
* @param parent
* @throws PlanningException
*/
public static void replaceChild(LogicalNode newChild, ScanNode originalChild, LogicalNode parent)
throws PlanningException {
public static void replaceChild(LogicalNode newChild, ScanNode originalChild, LogicalNode parent) {
if (parent instanceof UnaryNode) {
((UnaryNode) parent).setChild(newChild);
} else if (parent instanceof BinaryNode) {
Expand All @@ -84,35 +82,13 @@ public static void replaceChild(LogicalNode newChild, ScanNode originalChild, Lo
} else if (binary.getRightChild().equals(originalChild)) {
binary.setRightChild(newChild);
} else {
throw new PlanningException(originalChild.getPID() + " is not a child of " + parent.getPID());
throw new TajoInternalError(originalChild.getPID() + " is not a child of " + parent.getPID());
}
} else {
throw new PlanningException(parent.getPID() + " seems to not have any children");
throw new TajoInternalError(parent.getPID() + " seems to not have any children");
}
}

/**
* Find a scan node in the plan of the parent EB corresponding to the output of the child EB.
*
* @param child
* @param parent
* @return
* @throws PlanningException
*/
public static ScanNode findScanForChildEb(ExecutionBlock child, ExecutionBlock parent) throws PlanningException {
ScanNode scanForChild = null;
for (ScanNode scanNode : parent.getScanNodes()) {
if (scanNode.getTableName().equals(child.getId().toString())) {
scanForChild = scanNode;
break;
}
}
if (scanForChild == null) {
throw new PlanningException("Cannot find any scan nodes for " + child.getId() + " in " + parent.getId());
}
return scanForChild;
}

/**
* Get a volume of a table of a partitioned table
* @param scanNode ScanNode corresponding to a table
Expand All @@ -137,7 +113,7 @@ public static long getTableVolume(ScanNode scanNode) {
/**
* It calculates the total volume of all descendent relation nodes.
*/
public static long computeDescendentVolume(LogicalNode node) throws PlanningException {
public static long computeDescendentVolume(LogicalNode node) {

if (node instanceof RelationNode) {
switch (node.getType()) {
Expand Down Expand Up @@ -176,7 +152,7 @@ public static long computeDescendentVolume(LogicalNode node) throws PlanningExce
return computeDescendentVolume(binaryNode.getLeftChild()) + computeDescendentVolume(binaryNode.getRightChild());
}

throw new PlanningException("Invalid State");
throw new TajoInternalError("invalid state");
}

public static class ParentFinder implements LogicalNodeVisitor {
Expand All @@ -192,9 +168,9 @@ public void find(LogicalNode root) {
this.visit(root);
}

public LogicalNode getFound() throws PlanningException {
public LogicalNode getFound() {
if (found == null) {
throw new PlanningException("Cannot find the parent of " + target.getPID());
throw new TajoInternalError("cannot find the parent of " + target.getPID());
}
return this.found;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.SchemaUtil;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.plan.InvalidQueryException;
import org.apache.tajo.exception.TajoInternalError;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.worker.TaskAttemptContext;

Expand All @@ -37,8 +37,7 @@ public SortIntersectExec(TaskAttemptContext context, PhysicalExec left, Physical
TajoDataTypes.DataType[] leftTypes = SchemaUtil.toDataTypes(left.getSchema());
TajoDataTypes.DataType[] rightTypes = SchemaUtil.toDataTypes(right.getSchema());
if (!CatalogUtil.isMatchedFunction(Arrays.asList(leftTypes), Arrays.asList(rightTypes))) {
throw new InvalidQueryException(
"The both schemas are not compatible");
throw new TajoInternalError("the both schemas are not compatible");
}
comparator = new SetTupleComparator(left.getSchema(), right.getSchema());
this.isDistinct = isDistinct;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
*/
package org.apache.tajo.engine.planner.physical;

import org.apache.tajo.exception.TajoInternalError;
import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.plan.InvalidQueryException;
import org.apache.tajo.storage.Tuple;

import java.io.IOException;
Expand All @@ -34,8 +34,7 @@ public class UnionExec extends BinaryPhysicalExec {
public UnionExec(TaskAttemptContext context, PhysicalExec outer, PhysicalExec inner) {
super(context, outer.getSchema(), inner.getSchema(), outer, inner);
if (!outer.getSchema().equals(inner.getSchema())) {
throw new InvalidQueryException(
"The both schemas are not same");
throw new TajoInternalError("the both schemas are not same");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,12 @@
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.exception.*;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.exception.*;
import org.apache.tajo.master.TajoMaster.MasterContext;
import org.apache.tajo.master.exec.DDLExecutor;
import org.apache.tajo.master.exec.QueryExecutor;
import org.apache.tajo.metrics.Master;
import org.apache.tajo.plan.IllegalQueryStatusException;
import org.apache.tajo.plan.LogicalOptimizer;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.LogicalPlanner;
Expand All @@ -58,7 +57,6 @@
import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.util.CommonTestingUtil;

import java.io.IOException;
import java.sql.SQLException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.exception.TajoException;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.PlanningException;
import org.apache.tajo.plan.Target;
import org.apache.tajo.plan.expr.AlgebraicUtil;
import org.apache.tajo.plan.expr.EvalNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@
import org.apache.tajo.exception.TajoInternalError;
import org.apache.tajo.master.TajoMaster.MasterContext;
import org.apache.tajo.master.rm.NodeStatus;
import org.apache.tajo.plan.InvalidQueryException;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.PlanningException;
import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.plan.logical.IndexScanNode;
import org.apache.tajo.plan.logical.LogicalNode;
Expand Down Expand Up @@ -121,7 +119,7 @@ public void init() throws IOException {
}

if (leafBlock == null) {
throw new InvalidQueryException("Global planner could not find any leaf block.");
throw new TajoInternalError("global planner could not find any leaf block.");
}

taskContext = new TaskAttemptContext(queryContext, null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.*;
import org.apache.tajo.BuiltinStorages;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.SessionVars;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.catalog.*;
import org.apache.tajo.exception.DuplicateIndexException;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.common.TajoDataTypes;
Expand All @@ -37,6 +39,9 @@
import org.apache.tajo.engine.planner.physical.EvalExprExec;
import org.apache.tajo.engine.planner.physical.InsertRowsExec;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.exception.DuplicateIndexException;
import org.apache.tajo.exception.TajoInternalError;
import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.ipc.ClientProtos.SerializedResultSet;
import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse.ResultType;
Expand All @@ -47,7 +52,6 @@
import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager;
import org.apache.tajo.master.exec.prehook.InsertIntoHook;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.PlanningException;
import org.apache.tajo.plan.Target;
import org.apache.tajo.plan.expr.EvalContext;
import org.apache.tajo.plan.expr.EvalNode;
Expand All @@ -56,7 +60,6 @@
import org.apache.tajo.plan.function.python.TajoScriptEngine;
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.plan.verifier.VerifyException;
import org.apache.tajo.session.Session;
import org.apache.tajo.storage.*;
import org.apache.tajo.util.ProtoUtil;
Expand Down Expand Up @@ -287,7 +290,7 @@ public void execNonFromQuery(QueryContext queryContext, Session session, String
EvalContext evalContext = new EvalContext();
Target[] targets = plan.getRootBlock().getRawTargets();
if (targets == null) {
throw new PlanningException("No targets");
throw new TajoInternalError("no targets");
}
try {
// start script executor
Expand Down Expand Up @@ -501,8 +504,8 @@ public void executeDistributedQuery(QueryContext queryContext, Session session,
FormatProperty formatProperty = space.getFormatProperty(tableDesc.getMeta());

if (!formatProperty.isInsertable()) {
throw new VerifyException(
String.format("%s tablespace does not allow INSERT operation.", tableDesc.getUri().toString()));
throw new UnsupportedException(
String.format("INSERT operation on %s tablespace", tableDesc.getUri().toString()));
}

space.prepareTable(rootNode.getChild());
Expand Down
Loading