From 852633bd7f3f43204afc4c850f0d06e41fcb5d58 Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Tue, 24 Nov 2015 01:28:47 -0800 Subject: [PATCH 1/2] TAJO-1987: Add stream API to Projectable and refactor its usage codes. --- .../java/org/apache/tajo/catalog/Schema.java | 5 ++ .../engine/codegen/ExecutorPreCompiler.java | 4 +- .../planner/physical/BSTIndexScanExec.java | 4 +- .../engine/planner/physical/EvalExprExec.java | 4 +- .../engine/planner/physical/SeqScanExec.java | 4 +- .../org/apache/tajo/plan/LogicalPlanner.java | 10 +-- .../plan/logical/DistinctGroupbyNode.java | 6 ++ .../tajo/plan/logical/EvalExprNode.java | 6 ++ .../apache/tajo/plan/logical/GroupbyNode.java | 8 ++- .../apache/tajo/plan/logical/JoinNode.java | 6 ++ .../apache/tajo/plan/logical/Projectable.java | 12 +++- .../tajo/plan/logical/ProjectionNode.java | 10 ++- .../apache/tajo/plan/logical/ScanNode.java | 16 ++++- .../tajo/plan/logical/TableSubQueryNode.java | 6 ++ .../tajo/plan/logical/WindowAggNode.java | 8 ++- .../rewrite/rules/FilterPushDownRule.java | 6 +- .../tajo/plan/serder/EvalNodeSerializer.java | 3 +- .../plan/serder/LogicalNodeSerializer.java | 65 ++++++++++++------- .../plan/verifier/LogicalPlanVerifier.java | 12 +--- 19 files changed, 132 insertions(+), 63 deletions(-) diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java index ae7d27446e..8fdba30aa1 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java @@ -35,6 +35,7 @@ import org.apache.tajo.util.TUtil; import java.util.*; +import java.util.stream.Stream; public class Schema implements ProtoObject, Cloneable, GsonObject { @@ -327,6 +328,10 @@ public void visit(int depth, List path, Column column) { return columnList; } + public Stream columns() { + return fields.stream(); + } + public boolean contains(String name) { // TODO - It's a hack if (NestedPathUtil.isPath(name)) { diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/ExecutorPreCompiler.java b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/ExecutorPreCompiler.java index 15df76a446..2033c66f9c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/ExecutorPreCompiler.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/ExecutorPreCompiler.java @@ -190,9 +190,7 @@ public LogicalNode visitTableSubQuery(CompilationContext context, LogicalPlan pl stack.pop(); if (node.hasTargets()) { - for (Target target : node.getTargets()) { - compileIfAbsent(context, node.getLogicalSchema(), target.getEvalTree()); - } + node.targets().forEach(t -> compileIfAbsent(context, node.getLogicalSchema(), t.getEvalTree())); } return node; diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java index ee3762fa9a..a4240cab33 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java @@ -139,9 +139,7 @@ public void init() throws IOException { columnSet.addAll(EvalTreeUtil.findUniqueColumns(qual)); } - for (Target t : plan.getTargets()) { - columnSet.addAll(EvalTreeUtil.findUniqueColumns(t.getEvalTree())); - } + plan.targets().forEach(t -> columnSet.addAll(EvalTreeUtil.findUniqueColumns(t.getEvalTree()))); for (Column column : inSchema.getAllColumns()) { if (columnSet.contains(column)) { diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java index 72ab608847..39aade01cd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java @@ -41,9 +41,7 @@ public EvalExprExec(final TaskAttemptContext context, final EvalExprNode plan) { public void init() throws IOException { super.init(); progress = 0.0f; - for (Target target : plan.getTargets()) { - target.getEvalTree().bind(context.getEvalContext(), inSchema); - } + plan.targets().forEach(t -> t.getEvalTree().bind(context.getEvalContext(), inSchema)); } @Override diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java index 3ddad1e5dc..e0a2443d8b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java @@ -149,9 +149,7 @@ public Schema getProjectSchema() { columnSet.addAll(EvalTreeUtil.findUniqueColumns(qual)); } - for (Target t : plan.getTargets()) { - columnSet.addAll(EvalTreeUtil.findUniqueColumns(t.getEvalTree())); - } + plan.targets().forEach(t -> columnSet.addAll(EvalTreeUtil.findUniqueColumns(t.getEvalTree()))); for (Column column : inSchema.getAllColumns()) { if (columnSet.contains(column)) { diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java index 061c433f7c..43488eb4a5 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java @@ -577,10 +577,12 @@ public static void verifyProjectedFields(QueryBlock block, Projectable projectab public static void prohibitNestedRecordProjection(Projectable projectable) throws TajoException { - for (Target t : projectable.getTargets()) { - if (t.getEvalTree().getValueType().getType() == TajoDataTypes.Type.RECORD) { - throw new NotImplementedException("record field projection"); - } + if (projectable.targets() + .map(t -> t.getEvalTree().getValueType().getType()) + .filter(t -> t == TajoDataTypes.Type.RECORD) + .findAny() + .isPresent()) { + throw new NotImplementedException("record field projection"); } } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/DistinctGroupbyNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/DistinctGroupbyNode.java index a36f9a4f3d..93a5206eb7 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/DistinctGroupbyNode.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/DistinctGroupbyNode.java @@ -31,6 +31,7 @@ import java.util.Arrays; import java.util.List; import java.util.Objects; +import java.util.stream.Stream; public class DistinctGroupbyNode extends UnaryNode implements Projectable, Cloneable { @Expose @@ -75,6 +76,11 @@ public List getTargets() { } } + @Override + public Stream targets() { + return getTargets().stream(); + } + public void setSubPlans(List groupByNodes) { this.subGroupbyPlan = groupByNodes; } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/EvalExprNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/EvalExprNode.java index d796443cd4..a6d0ff5270 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/EvalExprNode.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/EvalExprNode.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Objects; +import java.util.stream.Stream; public class EvalExprNode extends LogicalNode implements Projectable { @Expose private List exprs = null; @@ -64,6 +65,11 @@ public List getTargets() { return exprs; } + @Override + public Stream targets() { + return getTargets().stream(); + } + public List getExprs() { return this.exprs; } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/GroupbyNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/GroupbyNode.java index 4845c1d6fe..a156afd939 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/GroupbyNode.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/GroupbyNode.java @@ -32,6 +32,7 @@ import java.util.Arrays; import java.util.List; import java.util.Objects; +import java.util.stream.Stream; public class GroupbyNode extends UnaryNode implements Projectable, Cloneable { /** Grouping key sets */ @@ -117,7 +118,12 @@ public void setTargets(List targets) { public List getTargets() { return this.targets; } - + + @Override + public Stream targets() { + return getTargets().stream(); + } + public void setChild(LogicalNode subNode) { super.setChild(subNode); } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/JoinNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/JoinNode.java index 4584140c2a..e25f6a6343 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/JoinNode.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/JoinNode.java @@ -32,6 +32,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.stream.Stream; public class JoinNode extends BinaryNode implements Projectable, Cloneable { @Expose private JoinSpec joinSpec = new JoinSpec(); @@ -85,6 +86,11 @@ public List getTargets() { return this.targets; } + @Override + public Stream targets() { + return getTargets().stream(); + } + @Override public void setTargets(List targets) { this.targets = targets; diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/Projectable.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/Projectable.java index 858a585f0c..1219d0b9d7 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/Projectable.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/Projectable.java @@ -22,6 +22,7 @@ import org.apache.tajo.plan.Target; import java.util.List; +import java.util.stream.Stream; /** * Projectable is an interface for a LogicalNode which has a list of targets. @@ -60,16 +61,23 @@ public interface Projectable { */ List getTargets(); + /** + * Get a stream pipeline for Targets + * + * @return Stream for Targets + */ + Stream targets(); + /** * Get an input schema * @return The input schema */ - public Schema getInSchema(); + Schema getInSchema(); /** * Get an output schema * * @return The output schema */ - public Schema getOutSchema(); + Schema getOutSchema(); } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ProjectionNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ProjectionNode.java index 35ca47fbb2..0ac0f45b64 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ProjectionNode.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ProjectionNode.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.stream.Stream; public class ProjectionNode extends UnaryNode implements Projectable { @@ -64,8 +65,13 @@ public void setTargets(List targets) { public List getTargets() { return this.targets; } - - public void setChild(LogicalNode subNode) { + + @Override + public Stream targets() { + return getTargets().stream(); + } + + public void setChild(LogicalNode subNode) { super.setChild(subNode); } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java index 3de8c1d249..a3e4e62e87 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java @@ -32,7 +32,10 @@ import org.apache.tajo.util.TUtil; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; public class ScanNode extends RelationNode implements Projectable, SelectableNode, Cloneable { @Expose protected TableDesc tableDesc; @@ -159,10 +162,19 @@ public List getTargets() { } } + @Override + public Stream targets() { + if (hasTargets()) { + return this.targets.stream(); + } else { + return Collections.EMPTY_LIST.stream(); + } + } + /** + * Has limit num or not. * - * - * @return + * @return true if limit is given. Otherwise, it will return false. */ public boolean hasLimit() { return limit > 0; diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/TableSubQueryNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/TableSubQueryNode.java index bcf8851d6f..6be79edd64 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/TableSubQueryNode.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/TableSubQueryNode.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.stream.Stream; public class TableSubQueryNode extends RelationNode implements Projectable { @Expose private String tableName; @@ -126,6 +127,11 @@ public List getTargets() { } + @Override + public Stream targets() { + return getTargets().stream(); + } + @Override public PlanString getPlanString() { PlanString planStr = new PlanString(this); diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/WindowAggNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/WindowAggNode.java index b0b87882da..dfbd5ca29c 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/WindowAggNode.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/WindowAggNode.java @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.stream.Stream; public class WindowAggNode extends UnaryNode implements Projectable, Cloneable { /** partition key sets */ @@ -110,7 +111,12 @@ public void setTargets(List targets) { public List getTargets() { return this.targets; } - + + @Override + public Stream targets() { + return getTargets().stream(); + } + public void setChild(LogicalNode subNode) { super.setChild(subNode); } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java index 43868a5b59..229b159064 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java @@ -41,6 +41,7 @@ import org.apache.tajo.plan.rewrite.rules.IndexScanInfo.SimplePredicate; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor; +import org.apache.tajo.util.Pair; import org.apache.tajo.util.TUtil; import java.util.*; @@ -655,10 +656,9 @@ private BiMap findCanPushdownAndTransform( LogicalNode childNode, List notMatched, Set partitionColumns, int columnOffset) throws TajoException { // canonical name -> target + Map nodeTargetMap = new HashMap<>(); - for (Target target : node.getTargets()) { - nodeTargetMap.put(target.getCanonicalName(), target); - } + node.targets().forEach(t -> nodeTargetMap.put(t.getCanonicalName(), t)); // copy -> origin BiMap matched = HashBiMap.create(); diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java index 3532b37ddf..8b87edd47d 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java @@ -295,8 +295,7 @@ public EvalNode visitFuncCall(EvalTreeProtoBuilderContext context, FunctionEval WinFunctionEvalSpec.Builder windowFuncBuilder = WinFunctionEvalSpec.newBuilder(); if (winFunc.hasSortSpecs()) { - windowFuncBuilder.addAllSortSpec(ProtoUtil.toProtoObjects - (winFunc.getSortSpecs())); + windowFuncBuilder.addAllSortSpec(LogicalNodeSerializer.toProtos(winFunc.getSortSpecs())); } windowFuncBuilder.setWindowFrame(buildWindowFrame(winFunc.getWindowFrame())); diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java index 6b082f7159..b49699837b 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java @@ -41,9 +41,12 @@ import org.apache.tajo.util.ProtoUtil; import org.apache.tajo.util.TUtil; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Stack; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * It serializes a logical plan into a protobuf-based serialized bytes. @@ -148,8 +151,7 @@ public LogicalNode visitSetSession(SerializeContext context, LogicalPlan plan, L public LogicalNode visitEvalExpr(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, EvalExprNode exprEval, Stack stack) throws TajoException { PlanProto.EvalExprNode.Builder exprEvalBuilder = PlanProto.EvalExprNode.newBuilder(); - exprEvalBuilder.addAllTargets( - ProtoUtil.toProtoObjects(exprEval.getTargets().toArray(new ProtoObject[exprEval.getTargets().size()]))); + exprEvalBuilder.addAllTargets(toProtos(exprEval.targets())); PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, exprEval); nodeBuilder.setExprEval(exprEvalBuilder); @@ -167,8 +169,7 @@ public LogicalNode visitProjection(SerializeContext context, LogicalPlan plan, L PlanProto.ProjectionNode.Builder projectionBuilder = PlanProto.ProjectionNode.newBuilder(); projectionBuilder.setChildSeq(childIds[0]); - projectionBuilder.addAllTargets( - ProtoUtil.toProtoObjects(projection.getTargets().toArray(new ProtoObject[projection.getTargets().size()]))); + projectionBuilder.addAllTargets(toProtos(projection.targets())); projectionBuilder.setDistinct(projection.isDistinct()); PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, projection); @@ -207,23 +208,19 @@ public LogicalNode visitWindowAgg(SerializeContext context, LogicalPlan plan, Lo windowAggBuilder.setChildSeq(childIds[0]); if (windowAgg.hasPartitionKeys()) { - windowAggBuilder.addAllPartitionKeys( - ProtoUtil.toProtoObjects(windowAgg.getPartitionKeys())); + windowAggBuilder.addAllPartitionKeys(toProtos(windowAgg.getPartitionKeys())); } if (windowAgg.hasAggFunctions()) { - windowAggBuilder.addAllWindowFunctions( - ProtoUtil.toProtoObjects(windowAgg.getWindowFunctions())); + windowAggBuilder.addAllWindowFunctions(toProtos(windowAgg.getWindowFunctions())); } windowAggBuilder.setDistinct(windowAgg.isDistinct()); if (windowAgg.hasSortSpecs()) { - windowAggBuilder.addAllSortSpecs( - ProtoUtil.toProtoObjects(windowAgg.getSortSpecs())); + windowAggBuilder.addAllSortSpecs(toProtos(windowAgg.getSortSpecs())); } if (windowAgg.hasTargets()) { - windowAggBuilder.addAllTargets( - ProtoUtil.toProtoObjects(windowAgg.getTargets().toArray(new ProtoObject[windowAgg.getTargets().size()]))); + windowAggBuilder.addAllTargets(toProtos(windowAgg.targets())); } PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, windowAgg); @@ -290,15 +287,13 @@ private PlanProto.LogicalNode.Builder buildGroupby(SerializeContext context, Gro groupbyBuilder.setDistinct(node.isDistinct()); if (node.groupingKeyNum() > 0) { - groupbyBuilder.addAllGroupingKeys( - ProtoUtil.toProtoObjects(node.getGroupingColumns())); + groupbyBuilder.addAllGroupingKeys(toProtos(node.getGroupingColumns())); } if (node.hasAggFunctions()) { - groupbyBuilder.addAllAggFunctions( - ProtoUtil.toProtoObjects(node.getAggFunctions())); + groupbyBuilder.addAllAggFunctions(toProtos(node.getAggFunctions())); } if (node.hasTargets()) { - groupbyBuilder.addAllTargets(ProtoUtil.toProtoObjects(node.getTargets().toArray(new ProtoObject[node.getTargets().size()]))); + groupbyBuilder.addAllTargets(toProtos(node.targets())); } PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, node); @@ -325,15 +320,13 @@ public LogicalNode visitDistinctGroupby(SerializeContext context, LogicalPlan pl } if (node.getGroupingColumns().length > 0) { - distGroupbyBuilder.addAllGroupingKeys( - ProtoUtil.toProtoObjects(node.getGroupingColumns())); + distGroupbyBuilder.addAllGroupingKeys(toProtos(node.getGroupingColumns())); } if (node.getAggFunctions().length > 0) { - distGroupbyBuilder.addAllAggFunctions( - ProtoUtil.toProtoObjects(node.getAggFunctions())); + distGroupbyBuilder.addAllAggFunctions(toProtos(node.getAggFunctions())); } if (node.hasTargets()) { - distGroupbyBuilder.addAllTargets(ProtoUtil.toProtoObjects(node.getTargets().toArray(new ProtoObject[node.getTargets().size()]))); + distGroupbyBuilder.addAllTargets(toProtos(node.targets())); } for (int cid : node.getResultColumnIds()) { distGroupbyBuilder.addResultId(cid); @@ -382,7 +375,7 @@ public LogicalNode visitJoin(SerializeContext context, LogicalPlan plan, Logical if (join.hasTargets()) { joinBuilder.setExistsTargets(true); - joinBuilder.addAllTargets(ProtoUtil.toProtoObjects(join.getTargets().toArray(new ProtoObject[join.getTargets().size()]))); + joinBuilder.addAllTargets(toProtos(join.targets())); } else { joinBuilder.setExistsTargets(false); } @@ -435,7 +428,7 @@ public PlanProto.ScanNode.Builder buildScanNode(ScanNode scan) { if (scan.hasTargets()) { scanBuilder.setExistTargets(true); - scanBuilder.addAllTargets(ProtoUtil.toProtoObjects(scan.getTargets().toArray(new ProtoObject[scan.getTargets().size()]))); + scanBuilder.addAllTargets(toProtos(scan.targets())); } else { scanBuilder.setExistTargets(false); } @@ -506,7 +499,7 @@ public LogicalNode visitTableSubQuery(SerializeContext context, LogicalPlan plan builder.setTableName(node.getTableName()); if (node.hasTargets()) { - builder.addAllTargets(ProtoUtil.toProtoObjects(node.getTargets().toArray(new ProtoObject[node.getTargets().size()]))); + builder.addAllTargets(toProtos(node.targets())); } builder.setNameResolveBase(node.isNameResolveBase()); @@ -848,4 +841,26 @@ public static PlanProto.Target convertTarget(Target target) { } return childIds; } + + /** + * It converts a stream of ProtoObjects into Iteratable one. + * + * @param s ProtoObject Stream + * @param Protobuf message type + * @return Iteratable object for T + */ + public static Iterable toProtos(Stream s) { + return s.map(p -> (T)p.getProto()).collect(Collectors.toList()); + } + + /** + * It converts an array of ProtoObjects into Iteratable one. + * + * @param protos ProtoObject array + * @param Protobuf message type + * @return Iteratable object for T + */ + public static Iterable toProtos(ProtoObject [] protos) { + return Arrays.stream(protos).map(p -> (T)p.getProto()).collect(Collectors.toList()); + } } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/LogicalPlanVerifier.java b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/LogicalPlanVerifier.java index aff95df3e3..0a4d71f9e6 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/LogicalPlanVerifier.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/LogicalPlanVerifier.java @@ -90,9 +90,7 @@ public LogicalNode visitProjection(Context state, LogicalPlan plan, LogicalPlan. ProjectionNode node, Stack stack) throws TajoException { super.visitProjection(state, plan, block, node, stack); - for (Target target : node.getTargets()) { - ExprsVerifier.verify(state.state, node, target.getEvalTree()); - } + node.targets().forEach(t -> ExprsVerifier.verify(state.state, node, t.getEvalTree())); verifyProjectableOutputSchema(state, node); @@ -196,9 +194,7 @@ public LogicalNode visitTableSubQuery(Context context, LogicalPlan plan, Logical TableSubQueryNode node, Stack stack) throws TajoException { super.visitTableSubQuery(context, plan, block, node, stack); if (node.hasTargets()) { - for (Target target : node.getTargets()) { - ExprsVerifier.verify(context.state, node, target.getEvalTree()); - } + node.targets().forEach(t -> ExprsVerifier.verify(context.state, node, t.getEvalTree())); } verifyProjectableOutputSchema(context, node); @@ -209,9 +205,7 @@ public LogicalNode visitTableSubQuery(Context context, LogicalPlan plan, Logical public LogicalNode visitScan(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, ScanNode node, Stack stack) throws TajoException { if (node.hasTargets()) { - for (Target target : node.getTargets()) { - ExprsVerifier.verify(context.state, node, target.getEvalTree()); - } + node.targets().forEach(t -> ExprsVerifier.verify(context.state, node, t.getEvalTree())); } if (node.hasQual()) { From a94f45084357aaf4551871ca8268aa0d3720876c Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Wed, 2 Dec 2015 22:48:58 -0800 Subject: [PATCH 2/2] Removed unused imports. --- .../src/main/java/org/apache/tajo/plan/logical/ScanNode.java | 1 - .../java/org/apache/tajo/plan/verifier/LogicalPlanVerifier.java | 1 - 2 files changed, 2 deletions(-) diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java index 8fe888988c..bac4790524 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java @@ -36,7 +36,6 @@ import java.util.Collections; import java.util.List; import java.util.stream.Stream; -import java.util.stream.StreamSupport; import static org.apache.commons.lang.StringUtils.capitalize; diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/LogicalPlanVerifier.java b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/LogicalPlanVerifier.java index 0a4d71f9e6..8e9009fae9 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/LogicalPlanVerifier.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/LogicalPlanVerifier.java @@ -28,7 +28,6 @@ import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.plan.LogicalPlan; -import org.apache.tajo.plan.Target; import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;