From dc67a90b5a7c5bf6f36c336dd44b4c5d72abc5ad Mon Sep 17 00:00:00 2001 From: mingmxu Date: Fri, 12 May 2017 21:14:29 -0700 Subject: [PATCH] change return type of buildBeamPipeline to `PCollection` --- .../dsls/sql/planner/BeamPipelineCreator.java | 15 --------------- .../beam/dsls/sql/planner/BeamQueryPlanner.java | 4 +++- .../beam/dsls/sql/rel/BeamAggregationRel.java | 12 +++++------- .../apache/beam/dsls/sql/rel/BeamFilterRel.java | 14 ++++++-------- .../apache/beam/dsls/sql/rel/BeamIOSinkRel.java | 17 ++++++++--------- .../beam/dsls/sql/rel/BeamIOSourceRel.java | 8 +++----- .../beam/dsls/sql/rel/BeamProjectRel.java | 13 +++++-------- .../apache/beam/dsls/sql/rel/BeamRelNode.java | 11 +++++------ 8 files changed, 35 insertions(+), 59 deletions(-) diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java index 1d7cfd1b5704..98ccb57e939b 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java @@ -19,9 +19,6 @@ import java.util.Map; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; - import org.apache.beam.dsls.sql.rel.BeamRelNode; import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; @@ -32,7 +29,6 @@ import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.values.PCollection; /** * {@link BeamPipelineCreator} converts a {@link BeamRelNode} tree, into a Beam @@ -41,7 +37,6 @@ */ public class BeamPipelineCreator { private Map sourceTables; - private Queue> upStreamQueue; private PipelineOptions options; @@ -56,22 +51,12 @@ public BeamPipelineCreator(Map sourceTables) { .as(PipelineOptions.class); // FlinkPipelineOptions.class options.setJobName("BeamPlanCreator"); - upStreamQueue = new ConcurrentLinkedQueue<>(); - pipeline = Pipeline.create(options); CoderRegistry cr = pipeline.getCoderRegistry(); cr.registerCoder(BeamSQLRow.class, BeamSqlRowCoder.of()); cr.registerCoder(BeamSQLRecordType.class, BeamSQLRecordTypeCoder.of()); } - public PCollection popUpstream() { - return upStreamQueue.poll(); - } - - public void pushUpstream(PCollection upstream) { - this.upStreamQueue.add(upstream); - } - public Map getSourceTables() { return sourceTables; } diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java index 935dae768401..29b3f1d09eed 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java @@ -112,7 +112,9 @@ public Pipeline compileBeamPipeline(String sqlStatement) throws Exception { BeamRelNode relNode = convertToBeamRel(sqlStatement); BeamPipelineCreator planCreator = new BeamPipelineCreator(sourceTables); - return relNode.buildBeamPipeline(planCreator); + relNode.buildBeamPipeline(planCreator); + + return planCreator.getPipeline(); } /** diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java index ab98cc470a1c..3e147aade901 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java @@ -23,7 +23,6 @@ import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; import org.apache.beam.dsls.sql.schema.BeamSQLRow; import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.ParDo; @@ -68,13 +67,13 @@ public BeamAggregationRel(RelOptCluster cluster, RelTraitSet traits } @Override - public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception { + public PCollection buildBeamPipeline(BeamPipelineCreator planCreator) + throws Exception { RelNode input = getInput(); - BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator); - String stageName = BeamSQLRelUtils.getStageName(this); - PCollection upstream = planCreator.popUpstream(); + PCollection upstream = + BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator); if (windowFieldIdx != -1) { upstream = upstream.apply("assignEventTimestamp", WithTimestamps .of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx))); @@ -105,9 +104,8 @@ public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Except PCollection mergedStream = aggregatedStream.apply("mergeRecord", ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord( BeamSQLRecordType.from(getRowType()), getAggCallList()))); - planCreator.pushUpstream(mergedStream); - return planCreator.getPipeline(); + return mergedStream; } @Override diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java index e1c5b3e7ebdc..f2c1bba337c2 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java @@ -23,7 +23,6 @@ import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; import org.apache.beam.dsls.sql.schema.BeamSQLRow; import org.apache.beam.dsls.sql.transform.BeamSQLFilterFn; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.calcite.plan.RelOptCluster; @@ -49,23 +48,22 @@ public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) { } @Override - public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception { + public PCollection buildBeamPipeline(BeamPipelineCreator planCreator) + throws Exception { RelNode input = getInput(); - BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator); String stageName = BeamSQLRelUtils.getStageName(this); - PCollection upstream = planCreator.popUpstream(); + PCollection upstream = BeamSQLRelUtils.getBeamRelInput(input) + .buildBeamPipeline(planCreator); BeamSQLExpressionExecutor executor = new BeamSQLFnExecutor(this); - PCollection projectStream = upstream.apply(stageName, + PCollection filterStream = upstream.apply(stageName, ParDo.of(new BeamSQLFilterFn(getRelTypeName(), executor))); - planCreator.pushUpstream(projectStream); - - return planCreator.getPipeline(); + return filterStream; } } diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java index f38b9e16c501..bc94ab8102e1 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java @@ -24,7 +24,6 @@ import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.beam.dsls.sql.schema.BeamSQLRow; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.values.PCollection; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptTable; @@ -52,15 +51,17 @@ public RelNode copy(RelTraitSet traitSet, List inputs) { getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened()); } + /** + * Note that {@code BeamIOSinkRel} returns the input PCollection. + */ @Override - public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception { - + public PCollection buildBeamPipeline(BeamPipelineCreator planCreator) + throws Exception { RelNode input = getInput(); - BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator); - String stageName = BeamSQLRelUtils.getStageName(this); - PCollection upstream = planCreator.popUpstream(); + PCollection upstream = + BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator); String sourceName = Joiner.on('.').join(getTable().getQualifiedName()); @@ -68,9 +69,7 @@ public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Except upstream.apply(stageName, targetTable.buildIOWriter()); - planCreator.setHasPersistent(true); - - return planCreator.getPipeline(); + return upstream; } } diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java index 35382730881b..61f53eb14893 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java @@ -23,7 +23,6 @@ import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.beam.dsls.sql.schema.BeamSQLRow; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.values.PCollection; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptTable; @@ -41,7 +40,8 @@ public BeamIOSourceRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable } @Override - public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception { + public PCollection buildBeamPipeline(BeamPipelineCreator planCreator) + throws Exception { String sourceName = Joiner.on('.').join(getTable().getQualifiedName()).replace(".(STREAM)", ""); @@ -52,9 +52,7 @@ public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Except PCollection sourceStream = planCreator.getPipeline().apply(stageName, sourceTable.buildIOReader()); - planCreator.pushUpstream(sourceStream); - - return planCreator.getPipeline(); + return sourceStream; } } diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java index 65f5b2003dba..954868d4b77b 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java @@ -26,7 +26,6 @@ import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; import org.apache.beam.dsls.sql.schema.BeamSQLRow; import org.apache.beam.dsls.sql.transform.BeamSQLProjectFn; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.calcite.plan.RelOptCluster; @@ -61,22 +60,20 @@ public Project copy(RelTraitSet traitSet, RelNode input, List projects, } @Override - public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception { + public PCollection buildBeamPipeline(BeamPipelineCreator planCreator) + throws Exception { RelNode input = getInput(); - BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator); - String stageName = BeamSQLRelUtils.getStageName(this); - PCollection upstream = planCreator.popUpstream(); + PCollection upstream = BeamSQLRelUtils.getBeamRelInput(input) + .buildBeamPipeline(planCreator); BeamSQLExpressionExecutor executor = new BeamSQLFnExecutor(this); PCollection projectStream = upstream.apply(stageName, ParDo .of(new BeamSQLProjectFn(getRelTypeName(), executor, BeamSQLRecordType.from(rowType)))); - planCreator.pushUpstream(projectStream); - - return planCreator.getPipeline(); + return projectStream; } diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java index e50d71a282d3..ff2b5b652b2f 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java @@ -18,7 +18,8 @@ package org.apache.beam.dsls.sql.rel; import org.apache.beam.dsls.sql.planner.BeamPipelineCreator; -import org.apache.beam.sdk.Pipeline; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.values.PCollection; import org.apache.calcite.rel.RelNode; /** @@ -29,10 +30,8 @@ public interface BeamRelNode extends RelNode { /** - * A {@link BeamRelNode} is a recursive structure, the - * {@link BeamPipelineCreator} visits it with a DFS(Depth-First-Search) - * algorithm. - * + * {@code #buildBeamPipeline(BeamPipelineCreator)} applies a transform to upstream, + * and generate an output {@code PCollection}. */ - Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception; + PCollection buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception; }