From 4e1097b7e932f4ab6817843755160230cd61278c Mon Sep 17 00:00:00 2001 From: James Xu Date: Fri, 19 May 2017 21:47:10 +0800 Subject: [PATCH 1/5] support Set operator: intersect & except --- .../beam/dsls/sql/planner/BeamRuleSets.java | 5 +- .../beam/dsls/sql/rel/BeamIntersectRel.java | 83 +++++++++++++ .../beam/dsls/sql/rel/BeamMinusRel.java | 80 +++++++++++++ .../beam/dsls/sql/rule/BeamIntersectRule.java | 51 ++++++++ .../beam/dsls/sql/rule/BeamMinusRule.java | 51 ++++++++ .../beam/dsls/sql/schema/BeamSqlRow.java | 4 +- .../dsls/sql/transform/BeamSQLRow2KvFn.java | 40 +++++++ .../transform/SetOperatorFilteringDiffFn.java | 49 ++++++++ .../transform/SetOperatorFilteringDoFn.java | 91 ++++++++++++++ .../dsls/sql/rel/BeamIntersectRelTest.java | 111 ++++++++++++++++++ .../beam/dsls/sql/rel/BeamMinusRelTest.java | 110 +++++++++++++++++ .../apache/beam/dsls/sql/rel/CheckSize.java | 41 +++++++ 12 files changed, 713 insertions(+), 3 deletions(-) create mode 100644 dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java create mode 100644 dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java create mode 100644 dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java create mode 100644 dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java create mode 100644 dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLRow2KvFn.java create mode 100644 dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/SetOperatorFilteringDiffFn.java create mode 100644 dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/SetOperatorFilteringDoFn.java create mode 100644 dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java create mode 100644 dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java create mode 100644 dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/CheckSize.java diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java index 1ad62bcd6b73..8ef814686c9a 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java @@ -26,6 +26,8 @@ import org.apache.beam.dsls.sql.rule.BeamFilterRule; import org.apache.beam.dsls.sql.rule.BeamIOSinkRule; import org.apache.beam.dsls.sql.rule.BeamIOSourceRule; +import org.apache.beam.dsls.sql.rule.BeamIntersectRule; +import org.apache.beam.dsls.sql.rule.BeamMinusRule; import org.apache.beam.dsls.sql.rule.BeamProjectRule; import org.apache.beam.dsls.sql.rule.BeamSortRule; import org.apache.beam.dsls.sql.rule.BeamValuesRule; @@ -42,7 +44,8 @@ public class BeamRuleSets { private static final ImmutableSet calciteToBeamConversionRules = ImmutableSet .builder().add(BeamIOSourceRule.INSTANCE, BeamProjectRule.INSTANCE, BeamFilterRule.INSTANCE, BeamIOSinkRule.INSTANCE, - BeamAggregationRule.INSTANCE, BeamSortRule.INSTANCE, BeamValuesRule.INSTANCE) + BeamAggregationRule.INSTANCE, BeamSortRule.INSTANCE, BeamValuesRule.INSTANCE, + BeamIntersectRule.INSTANCE, BeamMinusRule.INSTANCE) .build(); public static RuleSet[] getRuleSets() { diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java new file mode 100644 index 000000000000..baa3ee069007 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import java.util.List; + +import org.apache.beam.dsls.sql.planner.BeamPipelineCreator; +import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.transform.BeamSQLRow2KvFn; +import org.apache.beam.dsls.sql.transform.SetOperatorFilteringDoFn; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.transforms.join.CoGroupByKey; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Intersect; +import org.apache.calcite.rel.core.SetOp; + +/** + * {@code BeamRelNode} to replace a {@code Intersect} node. + * + *

This is used to combine two SELECT statements, but returns rows only from the + * first SELECT statement that are identical to a row in the second SELECT statement. + */ +public class BeamIntersectRel extends Intersect implements BeamRelNode { + public BeamIntersectRel( + RelOptCluster cluster, + RelTraitSet traits, + List inputs, + boolean all) { + super(cluster, traits, inputs, all); + } + + @Override public SetOp copy(RelTraitSet traitSet, List inputs, boolean all) { + return new BeamIntersectRel(getCluster(), traitSet, inputs, all); + } + + @Override public PCollection buildBeamPipeline( + BeamPipelineCreator planCreator) throws Exception { + List input = getInputs(); + PCollection leftRows = BeamSQLRelUtils.getBeamRelInput(input.get(0)) + .buildBeamPipeline(planCreator); + PCollection rightRows = BeamSQLRelUtils.getBeamRelInput(input.get(1)) + .buildBeamPipeline(planCreator); + + final TupleTag leftTag = new TupleTag<>(); + final TupleTag rightTag = new TupleTag<>(); + // co-group + PCollection> coGbkResultCollection = KeyedPCollectionTuple + .of(leftTag, leftRows.apply( + "CreateLeftIndex", MapElements.via(new BeamSQLRow2KvFn(!all)))) + .and(rightTag, rightRows.apply( + "CreateRightIndex", MapElements.via(new BeamSQLRow2KvFn(!all)))) + .apply(CoGroupByKey.create()); + PCollection ret = coGbkResultCollection + .apply(ParDo.of(new SetOperatorFilteringDoFn(leftTag, rightTag, + SetOperatorFilteringDoFn.OpType.INTERSECT, all))); + return ret; + } +} diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java new file mode 100644 index 000000000000..e7265c844bbc --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import java.util.List; + +import org.apache.beam.dsls.sql.planner.BeamPipelineCreator; +import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.transform.BeamSQLRow2KvFn; +import org.apache.beam.dsls.sql.transform.SetOperatorFilteringDoFn; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.transforms.join.CoGroupByKey; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Minus; +import org.apache.calcite.rel.core.SetOp; + +/** + * {@code BeamRelNode} to replace a {@code Minus} node. + * + *

Corresponds to the SQL {@code EXCEPT} operator. + */ +public class BeamMinusRel extends Minus implements BeamRelNode { + + public BeamMinusRel(RelOptCluster cluster, RelTraitSet traits, List inputs, + boolean all) { + super(cluster, traits, inputs, all); + } + + @Override public SetOp copy(RelTraitSet traitSet, List inputs, boolean all) { + return new BeamMinusRel(getCluster(), traitSet, inputs, all); + } + + @Override public PCollection buildBeamPipeline(BeamPipelineCreator planCreator) + throws Exception { + List input = getInputs(); + PCollection leftRows = BeamSQLRelUtils.getBeamRelInput(input.get(0)) + .buildBeamPipeline(planCreator); + PCollection rightRows = BeamSQLRelUtils.getBeamRelInput(input.get(1)) + .buildBeamPipeline(planCreator); + + final TupleTag leftTag = new TupleTag<>(); + final TupleTag rightTag = new TupleTag<>(); + // co-group + PCollection> coGbkResultCollection = KeyedPCollectionTuple + .of(leftTag, leftRows.apply("CreateLeftIndex", + MapElements.via(new BeamSQLRow2KvFn(false)))) + .and(rightTag, rightRows.apply("CreateRightIndex", + MapElements.via(new BeamSQLRow2KvFn(false)))) + .apply(CoGroupByKey.create()); + PCollection ret = coGbkResultCollection + .apply(ParDo.of(new SetOperatorFilteringDoFn(leftTag, rightTag, + SetOperatorFilteringDoFn.OpType.MINUS, all))); + return ret; + } +} diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java new file mode 100644 index 000000000000..70716c509ef0 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rule; + +import java.util.List; + +import org.apache.beam.dsls.sql.rel.BeamIntersectRel; +import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Intersect; +import org.apache.calcite.rel.logical.LogicalIntersect; + +/** + * {@code ConverterRule} to replace {@code Intersect} with {@code BeamIntersectRel}. + */ +public class BeamIntersectRule extends ConverterRule { + public static final BeamIntersectRule INSTANCE = new BeamIntersectRule(); + private BeamIntersectRule() { + super(LogicalIntersect.class, Convention.NONE, + BeamLogicalConvention.INSTANCE, "BeamIntersectRule"); + } + + @Override public RelNode convert(RelNode rel) { + Intersect intersect = (Intersect) rel; + final List inputs = intersect.getInputs(); + return new BeamIntersectRel( + intersect.getCluster(), + intersect.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convertList(inputs, BeamLogicalConvention.INSTANCE), + intersect.all + ); + } +} diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java new file mode 100644 index 000000000000..ca93c714985b --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rule; + +import java.util.List; + +import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; +import org.apache.beam.dsls.sql.rel.BeamMinusRel; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Minus; +import org.apache.calcite.rel.logical.LogicalMinus; + +/** + * {@code ConverterRule} to replace {@code Minus} with {@code BeamMinusRel}. + */ +public class BeamMinusRule extends ConverterRule { + public static final BeamMinusRule INSTANCE = new BeamMinusRule(); + private BeamMinusRule() { + super(LogicalMinus.class, Convention.NONE, + BeamLogicalConvention.INSTANCE, "BeamMinusRule"); + } + + @Override public RelNode convert(RelNode rel) { + Minus minus = (Minus) rel; + final List inputs = minus.getInputs(); + return new BeamMinusRel( + minus.getCluster(), + minus.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convertList(inputs, BeamLogicalConvention.INSTANCE), + minus.all + ); + } +} diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java index 0f8273308334..8b85327d3cca 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java @@ -24,6 +24,7 @@ import java.util.GregorianCalendar; import java.util.List; import java.util.concurrent.TimeUnit; + import org.apache.beam.dsls.sql.exception.InvalidFieldException; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; @@ -31,11 +32,10 @@ import org.joda.time.Instant; /** - * Repersent a generic ROW record in Beam SQL. + * Represent a generic ROW record in Beam SQL. * */ public class BeamSqlRow implements Serializable { - private List nullFields = new ArrayList<>(); private List dataValues; private BeamSqlRecordType dataType; diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLRow2KvFn.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLRow2KvFn.java new file mode 100644 index 000000000000..35cbb5ce6674 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLRow2KvFn.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.transform; + +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.KV; + +/** + * Transform a {@code BeamSQLRow} to a {@code KV}. + */ +public class BeamSQLRow2KvFn extends SimpleFunction> { + private boolean useEmptyRow; + public BeamSQLRow2KvFn(boolean useEmptyRow) { + this.useEmptyRow = useEmptyRow; + } + @Override public KV apply(BeamSQLRow input) { + if (useEmptyRow) { + return KV.of(input, BeamSQLRow.EMPTY_ROW); + } else { + return KV.of(input, input); + } + } +} diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/SetOperatorFilteringDiffFn.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/SetOperatorFilteringDiffFn.java new file mode 100644 index 000000000000..edb024544cf7 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/SetOperatorFilteringDiffFn.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.transform; + +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.transforms.join.CoGroupByKey; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Function to filter the `diff` from a {@link CoGroupByKey} result. + */ +public class SetOperatorFilteringDiffFn + implements SerializableFunction, Boolean> { + + private TupleTag leftTag; + private TupleTag rightTag; + + public SetOperatorFilteringDiffFn(TupleTag leftTag, TupleTag rightTag) { + this.leftTag = leftTag; + this.rightTag = rightTag; + } + + @Override public Boolean apply(KV input) { + Iterable leftRows = input.getValue().getAll(leftTag); + Iterable rightRows = input.getValue().getAll(rightTag); + + return leftRows.iterator().hasNext() && rightRows.iterator().hasNext(); + } + +} diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/SetOperatorFilteringDoFn.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/SetOperatorFilteringDoFn.java new file mode 100644 index 000000000000..2b54b02ca57f --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/SetOperatorFilteringDoFn.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.transform; + +import java.io.Serializable; +import java.util.Iterator; + +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Filter function used for Set operators. + */ +public class SetOperatorFilteringDoFn extends DoFn, BeamSQLRow> { + + /** + * Set operator type. + */ + public enum OpType implements Serializable { + INTERSECT, + MINUS + } + + private TupleTag leftTag; + private TupleTag rightTag; + private OpType opType; + // ALL? + private boolean all; + + public SetOperatorFilteringDoFn(TupleTag leftTag, TupleTag rightTag, + OpType opType, boolean all) { + this.leftTag = leftTag; + this.rightTag = rightTag; + this.opType = opType; + this.all = all; + } + + @ProcessElement public void processElement(ProcessContext ctx) { + CoGbkResult coGbkResult = ctx.element().getValue(); + Iterable leftRows = coGbkResult.getAll(leftTag); + Iterable rightRows = coGbkResult.getAll(rightTag); + switch (opType) { + case INTERSECT: + if (leftRows.iterator().hasNext() && rightRows.iterator().hasNext()) { + if (all) { + Iterator iter = leftRows.iterator(); + while (iter.hasNext()) { + ctx.output(iter.next()); + } + } else { + ctx.output(ctx.element().getKey()); + } + } + break; + case MINUS: + if (leftRows.iterator().hasNext() && !rightRows.iterator().hasNext()) { + Iterator iter = leftRows.iterator(); + if (all) { + // output all + while (iter.hasNext()) { + ctx.output(iter.next()); + } + } else { + // only output one + ctx.output(iter.next()); + } + } + } + } + + +} diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java new file mode 100644 index 000000000000..289f825736f5 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import org.apache.beam.dsls.sql.BeamSQLEnvironment; +import org.apache.beam.dsls.sql.planner.MockedBeamSQLTable; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +/** + * Test for {@code BeamIntersectRel}. + */ +public class BeamIntersectRelTest { + public static BeamSQLEnvironment runner = BeamSQLEnvironment.create(); + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + private MockedBeamSQLTable orderDetailsTable1 = MockedBeamSQLTable + .of(SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price", + 1L, 1, 1.0, + 1L, 1, 1.0, + 2L, 2, 2.0, + 4L, 4, 4.0 + ); + + private MockedBeamSQLTable orderDetailsTable2 = MockedBeamSQLTable + .of(SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price", + 1L, 1, 1.0, + 2L, 2, 2.0, + 3L, 3, 3.0 + ); + + @Before + public void setUp() { + runner.addTableMetadata("ORDER_DETAILS1", orderDetailsTable1); + runner.addTableMetadata("ORDER_DETAILS2", orderDetailsTable2); + MockedBeamSQLTable.CONTENT.clear(); + } + + @Test + public void testIntersect() throws Exception { + String sql = ""; + sql += "SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS1 " + + " INTERSECT " + + "SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS2 "; + + PCollection rows = runner.compileBeamPipeline(sql, pipeline); + PAssert.that(rows).containsInAnyOrder( + MockedBeamSQLTable.of( + SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price", + 1L, 1, 1.0, + 2L, 2, 2.0 + ).getInputRecords()); + + pipeline.run(); + } + + @Test + public void testIntersectAll() throws Exception { + String sql = ""; + sql += "SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS1 " + + " INTERSECT ALL " + + "SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS2 "; + + PCollection rows = runner.compileBeamPipeline(sql, pipeline); + PAssert.that(rows).satisfies(new CheckSize(3)); + + PAssert.that(rows).containsInAnyOrder( + MockedBeamSQLTable.of( + SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price", + 1L, 1, 1.0, + 1L, 1, 1.0, + 2L, 2, 2.0 + ).getInputRecords()); + + pipeline.run(); + } +} diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java new file mode 100644 index 000000000000..01fc33ed312e --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import org.apache.beam.dsls.sql.BeamSQLEnvironment; +import org.apache.beam.dsls.sql.planner.MockedBeamSQLTable; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +/** + * Test for {@code BeamMinusRel}. + */ +public class BeamMinusRelTest { + public static BeamSQLEnvironment runner = BeamSQLEnvironment.create(); + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + private MockedBeamSQLTable orderDetailsTable1 = MockedBeamSQLTable + .of(SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price", + 1L, 1, 1.0, + 1L, 1, 1.0, + 2L, 2, 2.0, + 4L, 4, 4.0, + 4L, 4, 4.0 + ); + + private MockedBeamSQLTable orderDetailsTable2 = MockedBeamSQLTable + .of(SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price", + 1L, 1, 1.0, + 2L, 2, 2.0, + 3L, 3, 3.0 + ); + + @Before + public void setUp() { + runner.addTableMetadata("ORDER_DETAILS1", orderDetailsTable1); + runner.addTableMetadata("ORDER_DETAILS2", orderDetailsTable2); + MockedBeamSQLTable.CONTENT.clear(); + } + + @Test + public void testExcept() throws Exception { + String sql = ""; + sql += "SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS1 " + + " EXCEPT " + + "SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS2 "; + + PCollection rows = runner.compileBeamPipeline(sql, pipeline); + PAssert.that(rows).containsInAnyOrder( + MockedBeamSQLTable.of( + SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price", + 4L, 4, 4.0 + ).getInputRecords()); + + pipeline.run(); + } + + @Test + public void testExceptAll() throws Exception { + String sql = ""; + sql += "SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS1 " + + " EXCEPT ALL " + + "SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS2 "; + + PCollection rows = runner.compileBeamPipeline(sql, pipeline); + PAssert.that(rows).satisfies(new CheckSize(2)); + + PAssert.that(rows).containsInAnyOrder( + MockedBeamSQLTable.of( + SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price", + 4L, 4, 4.0, + 4L, 4, 4.0 + ).getInputRecords()); + + pipeline.run(); + } +} diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/CheckSize.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/CheckSize.java new file mode 100644 index 000000000000..774c42159e42 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/CheckSize.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.junit.Assert; + +/** + * Utility class to check size of BeamSQLRow iterable. + */ +public class CheckSize implements SerializableFunction, Void> { + private int size; + public CheckSize(int size) { + this.size = size; + } + @Override public Void apply(Iterable input) { + int count = 0; + for (BeamSQLRow row : input) { + count++; + } + Assert.assertEquals(size, count); + return null; + } +} From 572fe3f1bb10bf9230795d2a681011d9a63d4881 Mon Sep 17 00:00:00 2001 From: James Xu Date: Sat, 20 May 2017 16:43:59 +0800 Subject: [PATCH 2/5] refactor to include UNION --- .../beam/dsls/sql/planner/BeamRuleSets.java | 4 +- .../beam/dsls/sql/rel/BeamAggregationRel.java | 28 ++-- .../beam/dsls/sql/rel/BeamIntersectRel.java | 39 +----- .../beam/dsls/sql/rel/BeamMinusRel.java | 37 +----- .../dsls/sql/rel/BeamSetOperatorRelBase.java | 106 +++++++++++++++ .../beam/dsls/sql/rel/BeamUnionRel.java | 87 +++++++++++++ .../beam/dsls/sql/rule/BeamUnionRule.java | 50 +++++++ .../transform/SetOperatorFilteringDiffFn.java | 49 ------- .../transform/SetOperatorFilteringDoFn.java | 33 +++-- .../dsls/sql/planner/MockedBeamSqlTable.java | 7 +- .../dsls/sql/rel/BeamIntersectRelTest.java | 26 ++-- .../beam/dsls/sql/rel/BeamMinusRelTest.java | 12 +- .../sql/rel/BeamSetOperatorRelBaseTest.java | 122 ++++++++++++++++++ .../beam/dsls/sql/rel/BeamUnionRelTest.java | 99 ++++++++++++++ 14 files changed, 539 insertions(+), 160 deletions(-) create mode 100644 dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java create mode 100644 dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java create mode 100644 dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java delete mode 100644 dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/SetOperatorFilteringDiffFn.java create mode 100644 dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java create mode 100644 dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java index 8ef814686c9a..6c73558602a2 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java @@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; + import java.util.Iterator; import org.apache.beam.dsls.sql.rel.BeamRelNode; @@ -30,6 +31,7 @@ import org.apache.beam.dsls.sql.rule.BeamMinusRule; import org.apache.beam.dsls.sql.rule.BeamProjectRule; import org.apache.beam.dsls.sql.rule.BeamSortRule; +import org.apache.beam.dsls.sql.rule.BeamUnionRule; import org.apache.beam.dsls.sql.rule.BeamValuesRule; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.rel.RelNode; @@ -45,7 +47,7 @@ public class BeamRuleSets { .builder().add(BeamIOSourceRule.INSTANCE, BeamProjectRule.INSTANCE, BeamFilterRule.INSTANCE, BeamIOSinkRule.INSTANCE, BeamAggregationRule.INSTANCE, BeamSortRule.INSTANCE, BeamValuesRule.INSTANCE, - BeamIntersectRule.INSTANCE, BeamMinusRule.INSTANCE) + BeamIntersectRule.INSTANCE, BeamMinusRule.INSTANCE, BeamUnionRule.INSTANCE) .build(); public static RuleSet[] getRuleSets() { diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java index c0d278328260..3c7eeb05e45c 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java @@ -21,6 +21,8 @@ import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils; import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; + + import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms; import org.apache.beam.sdk.coders.IterableCoder; @@ -79,37 +81,39 @@ public PCollection buildBeamPipeline(PCollectionTuple inputPCollecti PCollection upstream = BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections); if (windowFieldIdx != -1) { - upstream = upstream.apply("assignEventTimestamp", WithTimestamps - .of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx))) + upstream = upstream.apply(stageName + "_assignEventTimestamp", WithTimestamps + .of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx))) .setCoder(upstream.getCoder()); } - PCollection windowStream = upstream.apply("window", - Window.into(windowFn) + PCollection windowStream = upstream.apply(stageName + "_window", + Window.into(windowFn) .triggering(trigger) .withAllowedLateness(allowedLatence) .accumulatingFiredPanes()); BeamSqlRowCoder keyCoder = new BeamSqlRowCoder(exKeyFieldsSchema(input.getRowType())); - PCollection> exGroupByStream = windowStream.apply("exGroupBy", + PCollection> exGroupByStream = windowStream.apply( + stageName + "_exGroupBy", WithKeys .of(new BeamAggregationTransforms.AggregationGroupByKeyFn( windowFieldIdx, groupSet))) .setCoder(KvCoder.of(keyCoder, upstream.getCoder())); - PCollection>> groupedStream = exGroupByStream - .apply("groupBy", GroupByKey.create()) - .setCoder(KvCoder.>of(keyCoder, - IterableCoder.of(upstream.getCoder()))); + PCollection>> groupedStream = exGroupByStream + .apply(stageName + "_groupBy", GroupByKey.create()) + .setCoder(KvCoder.>of(keyCoder, + IterableCoder.of(upstream.getCoder()))); BeamSqlRowCoder aggCoder = new BeamSqlRowCoder(exAggFieldsSchema()); - PCollection> aggregatedStream = groupedStream.apply("aggregation", - Combine.groupedValues( + PCollection> aggregatedStream = groupedStream.apply( + stageName + "_aggregation", + Combine.groupedValues( new BeamAggregationTransforms.AggregationCombineFn(getAggCallList(), BeamSqlRecordType.from(input.getRowType())))) .setCoder(KvCoder.of(keyCoder, aggCoder)); - PCollection mergedStream = aggregatedStream.apply("mergeRecord", + PCollection mergedStream = aggregatedStream.apply(stageName + "_mergeRecord", ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord( BeamSqlRecordType.from(getRowType()), getAggCallList()))); mergedStream.setCoder(new BeamSqlRowCoder(BeamSqlRecordType.from(getRowType()))); diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java index baa3ee069007..fd8fb5a6fd4f 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java @@ -20,19 +20,9 @@ import java.util.List; -import org.apache.beam.dsls.sql.planner.BeamPipelineCreator; -import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; import org.apache.beam.dsls.sql.schema.BeamSQLRow; -import org.apache.beam.dsls.sql.transform.BeamSQLRow2KvFn; -import org.apache.beam.dsls.sql.transform.SetOperatorFilteringDoFn; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.join.CoGbkResult; -import org.apache.beam.sdk.transforms.join.CoGroupByKey; -import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; -import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; @@ -46,38 +36,23 @@ * first SELECT statement that are identical to a row in the second SELECT statement. */ public class BeamIntersectRel extends Intersect implements BeamRelNode { + private BeamSetOperatorRelBase delegate; public BeamIntersectRel( RelOptCluster cluster, RelTraitSet traits, List inputs, boolean all) { super(cluster, traits, inputs, all); + delegate = new BeamSetOperatorRelBase(BeamSetOperatorRelBase.OpType.INTERSECT, + inputs, all); } @Override public SetOp copy(RelTraitSet traitSet, List inputs, boolean all) { return new BeamIntersectRel(getCluster(), traitSet, inputs, all); } - @Override public PCollection buildBeamPipeline( - BeamPipelineCreator planCreator) throws Exception { - List input = getInputs(); - PCollection leftRows = BeamSQLRelUtils.getBeamRelInput(input.get(0)) - .buildBeamPipeline(planCreator); - PCollection rightRows = BeamSQLRelUtils.getBeamRelInput(input.get(1)) - .buildBeamPipeline(planCreator); - - final TupleTag leftTag = new TupleTag<>(); - final TupleTag rightTag = new TupleTag<>(); - // co-group - PCollection> coGbkResultCollection = KeyedPCollectionTuple - .of(leftTag, leftRows.apply( - "CreateLeftIndex", MapElements.via(new BeamSQLRow2KvFn(!all)))) - .and(rightTag, rightRows.apply( - "CreateRightIndex", MapElements.via(new BeamSQLRow2KvFn(!all)))) - .apply(CoGroupByKey.create()); - PCollection ret = coGbkResultCollection - .apply(ParDo.of(new SetOperatorFilteringDoFn(leftTag, rightTag, - SetOperatorFilteringDoFn.OpType.INTERSECT, all))); - return ret; + @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) + throws Exception { + return delegate.buildBeamPipeline(inputPCollections); } } diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java index e7265c844bbc..bf283af881aa 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java @@ -20,19 +20,9 @@ import java.util.List; -import org.apache.beam.dsls.sql.planner.BeamPipelineCreator; -import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; import org.apache.beam.dsls.sql.schema.BeamSQLRow; -import org.apache.beam.dsls.sql.transform.BeamSQLRow2KvFn; -import org.apache.beam.dsls.sql.transform.SetOperatorFilteringDoFn; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.join.CoGbkResult; -import org.apache.beam.sdk.transforms.join.CoGroupByKey; -import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; -import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; @@ -46,35 +36,20 @@ */ public class BeamMinusRel extends Minus implements BeamRelNode { + private BeamSetOperatorRelBase delegate; + public BeamMinusRel(RelOptCluster cluster, RelTraitSet traits, List inputs, boolean all) { super(cluster, traits, inputs, all); + delegate = new BeamSetOperatorRelBase(BeamSetOperatorRelBase.OpType.MINUS, inputs, all); } @Override public SetOp copy(RelTraitSet traitSet, List inputs, boolean all) { return new BeamMinusRel(getCluster(), traitSet, inputs, all); } - @Override public PCollection buildBeamPipeline(BeamPipelineCreator planCreator) + @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) throws Exception { - List input = getInputs(); - PCollection leftRows = BeamSQLRelUtils.getBeamRelInput(input.get(0)) - .buildBeamPipeline(planCreator); - PCollection rightRows = BeamSQLRelUtils.getBeamRelInput(input.get(1)) - .buildBeamPipeline(planCreator); - - final TupleTag leftTag = new TupleTag<>(); - final TupleTag rightTag = new TupleTag<>(); - // co-group - PCollection> coGbkResultCollection = KeyedPCollectionTuple - .of(leftTag, leftRows.apply("CreateLeftIndex", - MapElements.via(new BeamSQLRow2KvFn(false)))) - .and(rightTag, rightRows.apply("CreateRightIndex", - MapElements.via(new BeamSQLRow2KvFn(false)))) - .apply(CoGroupByKey.create()); - PCollection ret = coGbkResultCollection - .apply(ParDo.of(new SetOperatorFilteringDoFn(leftTag, rightTag, - SetOperatorFilteringDoFn.OpType.MINUS, all))); - return ret; + return delegate.buildBeamPipeline(inputPCollections); } } diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java new file mode 100644 index 000000000000..2d953a54248c --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import java.io.Serializable; +import java.util.List; + +import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.transform.BeamSQLRow2KvFn; +import org.apache.beam.dsls.sql.transform.SetOperatorFilteringDoFn; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.transforms.join.CoGroupByKey; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.calcite.rel.RelNode; + +/** + * Delegate for Set operators: {@code BeamUnionRel}, {@code BeamIntersectRel} + * and {@code BeamMinusRel}. + */ +public class BeamSetOperatorRelBase { + /** + * Set operator type. + */ + public enum OpType implements Serializable { + UNION, + INTERSECT, + MINUS + } + + private List inputs; + private boolean all; + private OpType opType; + public BeamSetOperatorRelBase(OpType opType, List inputs, boolean all) { + this.opType = opType; + this.inputs = inputs; + this.all = all; + } + + public PCollection buildBeamPipeline( + PCollectionTuple inputPCollections) throws Exception { + PCollection leftRows = BeamSQLRelUtils.getBeamRelInput(inputs.get(0)) + .buildBeamPipeline(inputPCollections); + PCollection rightRows = BeamSQLRelUtils.getBeamRelInput(inputs.get(1)) + .buildBeamPipeline(inputPCollections); + + WindowFn leftWindow = leftRows.getWindowingStrategy().getWindowFn(); + WindowFn rightWindow = rightRows.getWindowingStrategy().getWindowFn(); + if (!leftWindow.isCompatible(rightWindow)) { + throw new IllegalArgumentException( + "inputs of " + opType + " have different window strategy: " + + leftWindow + " VS " + rightWindow); + } + + final TupleTag leftTag = new TupleTag<>(); + final TupleTag rightTag = new TupleTag<>(); + + boolean useEmptyRow; + switch (opType) { + case INTERSECT: + case UNION: + useEmptyRow = !all; + break; + case MINUS: + useEmptyRow = false; + break; + default: + throw new IllegalArgumentException("Unexpected set operator type: " + opType); + } + + // co-group + PCollection> coGbkResultCollection = KeyedPCollectionTuple + .of(leftTag, leftRows.apply( + "CreateLeftIndex", MapElements.via(new BeamSQLRow2KvFn(useEmptyRow)))) + .and(rightTag, rightRows.apply( + "CreateRightIndex", MapElements.via(new BeamSQLRow2KvFn(useEmptyRow)))) + .apply(CoGroupByKey.create()); + PCollection ret = coGbkResultCollection + .apply(ParDo.of(new SetOperatorFilteringDoFn(leftTag, rightTag, + opType, all))); + return ret; + } +} diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java new file mode 100644 index 000000000000..0a341b6b7912 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import java.util.List; + +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelInput; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.SetOp; +import org.apache.calcite.rel.core.Union; + +/** + * {@link BeamRelNode} to replace a {@link Union}. + * + *

{@code BeamUnionRel} needs the input of it have the same {@link WindowFn}. From the SQL + * perspective, two cases are supported: + * + *

1) Do not use {@code grouped window function}: + * + *

{@code
+ *   select * from person UNION select * from person
+ * }
+ * + *

2) Use the same {@code grouped window function}, with the same param: + *

{@code
+ *   select id, count(*) from person
+ *   group by id, TUMBLE(order_time, INTERVAL '1' HOUR)
+ *   UNION
+ *   select * from person
+ *   group by id, TUMBLE(order_time, INTERVAL '1' HOUR)
+ * }
+ * + *

Inputs with different group functions are NOT supported: + *

{@code
+ *   select id, count(*) from person
+ *   group by id, TUMBLE(order_time, INTERVAL '1' HOUR)
+ *   UNION
+ *   select * from person
+ *   group by id, TUMBLE(order_time, INTERVAL '2' HOUR)
+ * }
+ */ +public class BeamUnionRel extends Union implements BeamRelNode { + private BeamSetOperatorRelBase delegate; + public BeamUnionRel(RelOptCluster cluster, + RelTraitSet traits, + List inputs, + boolean all) { + super(cluster, traits, inputs, all); + this.delegate = new BeamSetOperatorRelBase(BeamSetOperatorRelBase.OpType.UNION, + inputs, all); + } + + public BeamUnionRel(RelInput input) { + super(input); + } + + @Override public SetOp copy(RelTraitSet traitSet, List inputs, boolean all) { + return new BeamUnionRel(getCluster(), traitSet, inputs, all); + } + + @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) + throws Exception { + return delegate.buildBeamPipeline(inputPCollections); + } +} diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java new file mode 100644 index 000000000000..b8430b9f7e55 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rule; + +import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; +import org.apache.beam.dsls.sql.rel.BeamUnionRel; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Union; +import org.apache.calcite.rel.logical.LogicalUnion; + +/** + * A {@code ConverterRule} to replace {@link org.apache.calcite.rel.core.Union} with + * {@link BeamUnionRule}. + */ +public class BeamUnionRule extends ConverterRule { + public static final BeamUnionRule INSTANCE = new BeamUnionRule(); + private BeamUnionRule() { + super(LogicalUnion.class, Convention.NONE, BeamLogicalConvention.INSTANCE, + "BeamUnionRule"); + } + + @Override public RelNode convert(RelNode rel) { + Union union = (Union) rel; + + return new BeamUnionRel( + union.getCluster(), + union.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convertList(union.getInputs(), BeamLogicalConvention.INSTANCE), + union.all + ); + } +} diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/SetOperatorFilteringDiffFn.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/SetOperatorFilteringDiffFn.java deleted file mode 100644 index edb024544cf7..000000000000 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/SetOperatorFilteringDiffFn.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.transform; - -import org.apache.beam.dsls.sql.schema.BeamSQLRow; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.join.CoGbkResult; -import org.apache.beam.sdk.transforms.join.CoGroupByKey; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.TupleTag; - -/** - * Function to filter the `diff` from a {@link CoGroupByKey} result. - */ -public class SetOperatorFilteringDiffFn - implements SerializableFunction, Boolean> { - - private TupleTag leftTag; - private TupleTag rightTag; - - public SetOperatorFilteringDiffFn(TupleTag leftTag, TupleTag rightTag) { - this.leftTag = leftTag; - this.rightTag = rightTag; - } - - @Override public Boolean apply(KV input) { - Iterable leftRows = input.getValue().getAll(leftTag); - Iterable rightRows = input.getValue().getAll(rightTag); - - return leftRows.iterator().hasNext() && rightRows.iterator().hasNext(); - } - -} diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/SetOperatorFilteringDoFn.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/SetOperatorFilteringDoFn.java index 2b54b02ca57f..27b5d18dba5d 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/SetOperatorFilteringDoFn.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/SetOperatorFilteringDoFn.java @@ -18,9 +18,9 @@ package org.apache.beam.dsls.sql.transform; -import java.io.Serializable; import java.util.Iterator; +import org.apache.beam.dsls.sql.rel.BeamSetOperatorRelBase; import org.apache.beam.dsls.sql.schema.BeamSQLRow; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.join.CoGbkResult; @@ -31,23 +31,14 @@ * Filter function used for Set operators. */ public class SetOperatorFilteringDoFn extends DoFn, BeamSQLRow> { - - /** - * Set operator type. - */ - public enum OpType implements Serializable { - INTERSECT, - MINUS - } - private TupleTag leftTag; private TupleTag rightTag; - private OpType opType; + private BeamSetOperatorRelBase.OpType opType; // ALL? private boolean all; public SetOperatorFilteringDoFn(TupleTag leftTag, TupleTag rightTag, - OpType opType, boolean all) { + BeamSetOperatorRelBase.OpType opType, boolean all) { this.leftTag = leftTag; this.rightTag = rightTag; this.opType = opType; @@ -59,6 +50,22 @@ public SetOperatorFilteringDoFn(TupleTag leftTag, TupleTag leftRows = coGbkResult.getAll(leftTag); Iterable rightRows = coGbkResult.getAll(rightTag); switch (opType) { + case UNION: + if (all) { + // output both left & right + Iterator iter = leftRows.iterator(); + while (iter.hasNext()) { + ctx.output(iter.next()); + } + iter = rightRows.iterator(); + while (iter.hasNext()) { + ctx.output(iter.next()); + } + } else { + // only output the key + ctx.output(ctx.element().getKey()); + } + break; case INTERSECT: if (leftRows.iterator().hasNext() && rightRows.iterator().hasNext()) { if (all) { @@ -86,6 +93,4 @@ public SetOperatorFilteringDoFn(TupleTag leftTag, TupleTag buildIOReader(Pipeline pipeline) { - return PBegin.in(pipeline).apply(Create.of(inputRecords)); + public PCollection buildIOReader(Pipeline pipeline) { + return PBegin.in(pipeline).apply( + UUID.randomUUID().toString() + "_MockedBeamSQLTable_buildIOReader", + Create.of(inputRecords)); } @Override diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java index 289f825736f5..567fafe9252f 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java @@ -18,14 +18,15 @@ package org.apache.beam.dsls.sql.rel; -import org.apache.beam.dsls.sql.BeamSQLEnvironment; +import org.apache.beam.dsls.sql.BeamSqlCli; +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.planner.MockedBeamSQLTable; import org.apache.beam.dsls.sql.schema.BeamSQLRow; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; import org.apache.calcite.sql.type.SqlTypeName; -import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -33,10 +34,9 @@ * Test for {@code BeamIntersectRel}. */ public class BeamIntersectRelTest { - public static BeamSQLEnvironment runner = BeamSQLEnvironment.create(); @Rule public final TestPipeline pipeline = TestPipeline.create(); - private MockedBeamSQLTable orderDetailsTable1 = MockedBeamSQLTable + private static MockedBeamSQLTable orderDetailsTable1 = MockedBeamSQLTable .of(SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price", @@ -46,7 +46,7 @@ public class BeamIntersectRelTest { 4L, 4, 4.0 ); - private MockedBeamSQLTable orderDetailsTable2 = MockedBeamSQLTable + private static MockedBeamSQLTable orderDetailsTable2 = MockedBeamSQLTable .of(SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price", @@ -55,11 +55,10 @@ public class BeamIntersectRelTest { 3L, 3, 3.0 ); - @Before - public void setUp() { - runner.addTableMetadata("ORDER_DETAILS1", orderDetailsTable1); - runner.addTableMetadata("ORDER_DETAILS2", orderDetailsTable2); - MockedBeamSQLTable.CONTENT.clear(); + @BeforeClass + public static void setUp() { + BeamSqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1); + BeamSqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2); } @Test @@ -71,17 +70,18 @@ public void testIntersect() throws Exception { + "SELECT order_id, site_id, price " + "FROM ORDER_DETAILS2 "; - PCollection rows = runner.compileBeamPipeline(sql, pipeline); + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); PAssert.that(rows).containsInAnyOrder( MockedBeamSQLTable.of( SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price", + 1L, 1, 1.0, 2L, 2, 2.0 ).getInputRecords()); - pipeline.run(); + pipeline.run().waitUntilFinish(); } @Test @@ -93,7 +93,7 @@ public void testIntersectAll() throws Exception { + "SELECT order_id, site_id, price " + "FROM ORDER_DETAILS2 "; - PCollection rows = runner.compileBeamPipeline(sql, pipeline); + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); PAssert.that(rows).satisfies(new CheckSize(3)); PAssert.that(rows).containsInAnyOrder( diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java index 01fc33ed312e..1980ef569169 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java @@ -18,7 +18,8 @@ package org.apache.beam.dsls.sql.rel; -import org.apache.beam.dsls.sql.BeamSQLEnvironment; +import org.apache.beam.dsls.sql.BeamSqlCli; +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.planner.MockedBeamSQLTable; import org.apache.beam.dsls.sql.schema.BeamSQLRow; import org.apache.beam.sdk.testing.PAssert; @@ -33,7 +34,6 @@ * Test for {@code BeamMinusRel}. */ public class BeamMinusRelTest { - public static BeamSQLEnvironment runner = BeamSQLEnvironment.create(); @Rule public final TestPipeline pipeline = TestPipeline.create(); private MockedBeamSQLTable orderDetailsTable1 = MockedBeamSQLTable @@ -58,8 +58,8 @@ public class BeamMinusRelTest { @Before public void setUp() { - runner.addTableMetadata("ORDER_DETAILS1", orderDetailsTable1); - runner.addTableMetadata("ORDER_DETAILS2", orderDetailsTable2); + BeamSqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1); + BeamSqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2); MockedBeamSQLTable.CONTENT.clear(); } @@ -72,7 +72,7 @@ public void testExcept() throws Exception { + "SELECT order_id, site_id, price " + "FROM ORDER_DETAILS2 "; - PCollection rows = runner.compileBeamPipeline(sql, pipeline); + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); PAssert.that(rows).containsInAnyOrder( MockedBeamSQLTable.of( SqlTypeName.BIGINT, "order_id", @@ -93,7 +93,7 @@ public void testExceptAll() throws Exception { + "SELECT order_id, site_id, price " + "FROM ORDER_DETAILS2 "; - PCollection rows = runner.compileBeamPipeline(sql, pipeline); + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); PAssert.that(rows).satisfies(new CheckSize(2)); PAssert.that(rows).containsInAnyOrder( diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java new file mode 100644 index 000000000000..c0943af19e2f --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import org.apache.beam.dsls.sql.BeamSqlCli; +import org.apache.beam.dsls.sql.BeamSqlEnv; +import org.apache.beam.dsls.sql.planner.MockedBeamSQLTable; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Test for {@code BeamSetOperatorRelBase}. + */ +public class BeamSetOperatorRelBaseTest { + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + public static final Date THE_DATE = new Date(); + private static MockedBeamSQLTable orderDetailsTable = MockedBeamSQLTable + .of(SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price", + SqlTypeName.TIMESTAMP, "order_time", + + 1L, 1, 1.0, THE_DATE, + 2L, 2, 2.0, THE_DATE); + + @BeforeClass + public static void prepare() { + THE_DATE.setTime(100000); + BeamSqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable); + } + + @Test + public void testSameWindow() throws Exception { + String sql = "SELECT " + + " order_id, site_id, count(*) as cnt " + + "FROM ORDER_DETAILS GROUP BY order_id, site_id" + + ", TUMBLE(order_time, INTERVAL '1' HOUR) " + + " UNION SELECT " + + " order_id, site_id, count(*) as cnt " + + "FROM ORDER_DETAILS GROUP BY order_id, site_id" + + ", TUMBLE(order_time, INTERVAL '1' HOUR) "; + + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); + List expRows = + MockedBeamSQLTable.of( + SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.BIGINT, "cnt", + + 1L, 1, 1L, + 2L, 2, 1L + ).getInputRecords(); + // compare valueInString to ignore the windowStart & windowEnd + PAssert.that(rows.apply(ParDo.of(new ToString()))).containsInAnyOrder(toString(expRows)); + pipeline.run(); + } + + @Test(expected = IllegalArgumentException.class) + public void testDifferentWindows() throws Exception { + String sql = "SELECT " + + " order_id, site_id, count(*) as cnt " + + "FROM ORDER_DETAILS GROUP BY order_id, site_id" + + ", TUMBLE(order_time, INTERVAL '1' HOUR) " + + " UNION SELECT " + + " order_id, site_id, count(*) as cnt " + + "FROM ORDER_DETAILS GROUP BY order_id, site_id" + + ", TUMBLE(order_time, INTERVAL '2' HOUR) "; + + // use a real pipeline rather than the TestPipeline because we are + // testing exceptions, the pipeline will not actually run. + Pipeline pipeline1 = Pipeline.create(PipelineOptionsFactory.create()); + BeamSqlCli.compilePipeline(sql, pipeline1); + pipeline.run(); + } + + static class ToString extends DoFn { + @ProcessElement + public void processElement(ProcessContext ctx) { + ctx.output(ctx.element().valueInString()); + } + } + + static List toString (List rows) { + List strs = new ArrayList<>(); + for (BeamSQLRow row : rows) { + strs.add(row.valueInString()); + } + + return strs; + } +} diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java new file mode 100644 index 000000000000..cd059344d72b --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import org.apache.beam.dsls.sql.BeamSqlCli; +import org.apache.beam.dsls.sql.BeamSqlEnv; +import org.apache.beam.dsls.sql.planner.MockedBeamSQLTable; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Test for {@code BeamUnionRel}. + */ +public class BeamUnionRelTest { + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + private static MockedBeamSQLTable orderDetailsTable = MockedBeamSQLTable + .of(SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price", + + 1L, 1, 1.0, + 2L, 2, 2.0); + + @BeforeClass + public static void prepare() { + BeamSqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable); + } + + @Test + public void testUnion() throws Exception { + String sql = "SELECT " + + " order_id, site_id, price " + + "FROM ORDER_DETAILS " + + " UNION SELECT " + + " order_id, site_id, price " + + "FROM ORDER_DETAILS "; + + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); + PAssert.that(rows).containsInAnyOrder( + MockedBeamSQLTable.of( + SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price", + + 1L, 1, 1.0, + 2L, 2, 2.0 + ).getInputRecords() + ); + pipeline.run(); + } + + @Test + public void testUnionAll() throws Exception { + String sql = "SELECT " + + " order_id, site_id, price " + + "FROM ORDER_DETAILS" + + " UNION ALL " + + " SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS"; + + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); + PAssert.that(rows).containsInAnyOrder( + MockedBeamSQLTable.of( + SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price", + + 1L, 1, 1.0, + 1L, 1, 1.0, + 2L, 2, 2.0, + 2L, 2, 2.0 + ).getInputRecords() + ); + pipeline.run(); + } +} From fc6302eafa04568fa006adba171c1f4d32277ee2 Mon Sep 17 00:00:00 2001 From: James Xu Date: Wed, 24 May 2017 19:26:04 +0800 Subject: [PATCH 3/5] give a unique name for MockedBeamSQLTable.buildIOReader() --- .../apache/beam/dsls/sql/planner/MockedBeamSqlTable.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java index cc6ef82b4461..662a28cfc9e9 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.beam.dsls.sql.schema.BeamIOType; @@ -44,8 +45,8 @@ * */ public class MockedBeamSqlTable extends BaseBeamTable { - - public static final ConcurrentLinkedQueue CONTENT = new ConcurrentLinkedQueue<>(); + public static final AtomicInteger COUNTER = new AtomicInteger(); + public static final ConcurrentLinkedQueue CONTENT = new ConcurrentLinkedQueue<>(); private List inputRecords; @@ -124,8 +125,7 @@ public BeamIOType getSourceType() { @Override public PCollection buildIOReader(Pipeline pipeline) { return PBegin.in(pipeline).apply( - UUID.randomUUID().toString() + "_MockedBeamSQLTable_buildIOReader", - Create.of(inputRecords)); + "MockedBeamSQLTable_Reader_" + COUNTER.incrementAndGet(), Create.of(inputRecords)); } @Override From b4cc981adcbe5c56e21c032c9f51dfa027c29516 Mon Sep 17 00:00:00 2001 From: James Xu Date: Fri, 9 Jun 2017 11:30:03 +0800 Subject: [PATCH 4/5] cleanup --- .../beam/dsls/sql/rel/BeamIntersectRel.java | 4 +- .../beam/dsls/sql/rel/BeamMinusRel.java | 3 +- .../dsls/sql/rel/BeamSetOperatorRelBase.java | 31 ++--- .../beam/dsls/sql/rel/BeamUnionRel.java | 3 +- .../dsls/sql/transform/BeamSQLRow2KvFn.java | 40 ------- .../transform/BeamSetOperatorsTransforms.java | 113 ++++++++++++++++++ .../transform/SetOperatorFilteringDoFn.java | 96 --------------- .../dsls/sql/planner/MockedBeamSqlTable.java | 1 - 8 files changed, 131 insertions(+), 160 deletions(-) delete mode 100644 dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLRow2KvFn.java create mode 100644 dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java delete mode 100644 dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/SetOperatorFilteringDoFn.java diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java index fd8fb5a6fd4f..5fef568906a0 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java @@ -43,8 +43,8 @@ public BeamIntersectRel( List inputs, boolean all) { super(cluster, traits, inputs, all); - delegate = new BeamSetOperatorRelBase(BeamSetOperatorRelBase.OpType.INTERSECT, - inputs, all); + delegate = new BeamSetOperatorRelBase(this, + BeamSetOperatorRelBase.OpType.INTERSECT, inputs, all); } @Override public SetOp copy(RelTraitSet traitSet, List inputs, boolean all) { diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java index bf283af881aa..afe9053df28d 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java @@ -41,7 +41,8 @@ public class BeamMinusRel extends Minus implements BeamRelNode { public BeamMinusRel(RelOptCluster cluster, RelTraitSet traits, List inputs, boolean all) { super(cluster, traits, inputs, all); - delegate = new BeamSetOperatorRelBase(BeamSetOperatorRelBase.OpType.MINUS, inputs, all); + delegate = new BeamSetOperatorRelBase(this, + BeamSetOperatorRelBase.OpType.MINUS, inputs, all); } @Override public SetOp copy(RelTraitSet traitSet, List inputs, boolean all) { diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java index 2d953a54248c..b0c5b723d325 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java @@ -23,8 +23,7 @@ import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; import org.apache.beam.dsls.sql.schema.BeamSQLRow; -import org.apache.beam.dsls.sql.transform.BeamSQLRow2KvFn; -import org.apache.beam.dsls.sql.transform.SetOperatorFilteringDoFn; +import org.apache.beam.dsls.sql.transform.BeamSetOperatorsTransforms; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.join.CoGbkResult; @@ -51,10 +50,14 @@ public enum OpType implements Serializable { MINUS } + private BeamRelNode beamRelNode; private List inputs; private boolean all; private OpType opType; - public BeamSetOperatorRelBase(OpType opType, List inputs, boolean all) { + + public BeamSetOperatorRelBase(BeamRelNode beamRelNode, OpType opType, + List inputs, boolean all) { + this.beamRelNode = beamRelNode; this.opType = opType; this.inputs = inputs; this.all = all; @@ -78,28 +81,18 @@ public PCollection buildBeamPipeline( final TupleTag leftTag = new TupleTag<>(); final TupleTag rightTag = new TupleTag<>(); - boolean useEmptyRow; - switch (opType) { - case INTERSECT: - case UNION: - useEmptyRow = !all; - break; - case MINUS: - useEmptyRow = false; - break; - default: - throw new IllegalArgumentException("Unexpected set operator type: " + opType); - } - // co-group + String stageName = BeamSQLRelUtils.getStageName(beamRelNode); PCollection> coGbkResultCollection = KeyedPCollectionTuple .of(leftTag, leftRows.apply( - "CreateLeftIndex", MapElements.via(new BeamSQLRow2KvFn(useEmptyRow)))) + stageName + "_CreateLeftIndex", MapElements.via( + new BeamSetOperatorsTransforms.BeamSQLRow2KvFn()))) .and(rightTag, rightRows.apply( - "CreateRightIndex", MapElements.via(new BeamSQLRow2KvFn(useEmptyRow)))) + stageName + "_CreateRightIndex", MapElements.via( + new BeamSetOperatorsTransforms.BeamSQLRow2KvFn()))) .apply(CoGroupByKey.create()); PCollection ret = coGbkResultCollection - .apply(ParDo.of(new SetOperatorFilteringDoFn(leftTag, rightTag, + .apply(ParDo.of(new BeamSetOperatorsTransforms.SetOperatorFilteringDoFn(leftTag, rightTag, opType, all))); return ret; } diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java index 0a341b6b7912..eaa850de9e8f 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java @@ -68,7 +68,8 @@ public BeamUnionRel(RelOptCluster cluster, List inputs, boolean all) { super(cluster, traits, inputs, all); - this.delegate = new BeamSetOperatorRelBase(BeamSetOperatorRelBase.OpType.UNION, + this.delegate = new BeamSetOperatorRelBase(this, + BeamSetOperatorRelBase.OpType.UNION, inputs, all); } diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLRow2KvFn.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLRow2KvFn.java deleted file mode 100644 index 35cbb5ce6674..000000000000 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLRow2KvFn.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.transform; - -import org.apache.beam.dsls.sql.schema.BeamSQLRow; -import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.values.KV; - -/** - * Transform a {@code BeamSQLRow} to a {@code KV}. - */ -public class BeamSQLRow2KvFn extends SimpleFunction> { - private boolean useEmptyRow; - public BeamSQLRow2KvFn(boolean useEmptyRow) { - this.useEmptyRow = useEmptyRow; - } - @Override public KV apply(BeamSQLRow input) { - if (useEmptyRow) { - return KV.of(input, BeamSQLRow.EMPTY_ROW); - } else { - return KV.of(input, input); - } - } -} diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java new file mode 100644 index 000000000000..8992f9feae16 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.transform; + +import java.util.Iterator; + +import org.apache.beam.dsls.sql.rel.BeamSetOperatorRelBase; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Collections of {@code PTransform} and {@code DoFn} used to perform Set operations. + */ +public abstract class BeamSetOperatorsTransforms { + /** + * Transform a {@code BeamSQLRow} to a {@code KV}. + */ + public static class BeamSQLRow2KvFn extends + SimpleFunction> { + @Override public KV apply(BeamSQLRow input) { + return KV.of(input, input); + } + } + + /** + * Filter function used for Set operators. + */ + public static class SetOperatorFilteringDoFn extends + DoFn, BeamSQLRow> { + private TupleTag leftTag; + private TupleTag rightTag; + private BeamSetOperatorRelBase.OpType opType; + // ALL? + private boolean all; + + public SetOperatorFilteringDoFn(TupleTag leftTag, TupleTag rightTag, + BeamSetOperatorRelBase.OpType opType, boolean all) { + this.leftTag = leftTag; + this.rightTag = rightTag; + this.opType = opType; + this.all = all; + } + + @ProcessElement public void processElement(ProcessContext ctx) { + CoGbkResult coGbkResult = ctx.element().getValue(); + Iterable leftRows = coGbkResult.getAll(leftTag); + Iterable rightRows = coGbkResult.getAll(rightTag); + switch (opType) { + case UNION: + if (all) { + // output both left & right + Iterator iter = leftRows.iterator(); + while (iter.hasNext()) { + ctx.output(iter.next()); + } + iter = rightRows.iterator(); + while (iter.hasNext()) { + ctx.output(iter.next()); + } + } else { + // only output the key + ctx.output(ctx.element().getKey()); + } + break; + case INTERSECT: + if (leftRows.iterator().hasNext() && rightRows.iterator().hasNext()) { + if (all) { + Iterator iter = leftRows.iterator(); + while (iter.hasNext()) { + ctx.output(iter.next()); + } + } else { + ctx.output(ctx.element().getKey()); + } + } + break; + case MINUS: + if (leftRows.iterator().hasNext() && !rightRows.iterator().hasNext()) { + Iterator iter = leftRows.iterator(); + if (all) { + // output all + while (iter.hasNext()) { + ctx.output(iter.next()); + } + } else { + // only output one + ctx.output(iter.next()); + } + } + } + } + } +} diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/SetOperatorFilteringDoFn.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/SetOperatorFilteringDoFn.java deleted file mode 100644 index 27b5d18dba5d..000000000000 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/SetOperatorFilteringDoFn.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.transform; - -import java.util.Iterator; - -import org.apache.beam.dsls.sql.rel.BeamSetOperatorRelBase; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.join.CoGbkResult; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.TupleTag; - -/** - * Filter function used for Set operators. - */ -public class SetOperatorFilteringDoFn extends DoFn, BeamSQLRow> { - private TupleTag leftTag; - private TupleTag rightTag; - private BeamSetOperatorRelBase.OpType opType; - // ALL? - private boolean all; - - public SetOperatorFilteringDoFn(TupleTag leftTag, TupleTag rightTag, - BeamSetOperatorRelBase.OpType opType, boolean all) { - this.leftTag = leftTag; - this.rightTag = rightTag; - this.opType = opType; - this.all = all; - } - - @ProcessElement public void processElement(ProcessContext ctx) { - CoGbkResult coGbkResult = ctx.element().getValue(); - Iterable leftRows = coGbkResult.getAll(leftTag); - Iterable rightRows = coGbkResult.getAll(rightTag); - switch (opType) { - case UNION: - if (all) { - // output both left & right - Iterator iter = leftRows.iterator(); - while (iter.hasNext()) { - ctx.output(iter.next()); - } - iter = rightRows.iterator(); - while (iter.hasNext()) { - ctx.output(iter.next()); - } - } else { - // only output the key - ctx.output(ctx.element().getKey()); - } - break; - case INTERSECT: - if (leftRows.iterator().hasNext() && rightRows.iterator().hasNext()) { - if (all) { - Iterator iter = leftRows.iterator(); - while (iter.hasNext()) { - ctx.output(iter.next()); - } - } else { - ctx.output(ctx.element().getKey()); - } - } - break; - case MINUS: - if (leftRows.iterator().hasNext() && !rightRows.iterator().hasNext()) { - Iterator iter = leftRows.iterator(); - if (all) { - // output all - while (iter.hasNext()) { - ctx.output(iter.next()); - } - } else { - // only output one - ctx.output(iter.next()); - } - } - } - } -} diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java index 662a28cfc9e9..c39fd72c4300 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java @@ -19,7 +19,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; From 27d521503dc57d6708426b9f542d5b5c8eccf87b Mon Sep 17 00:00:00 2001 From: James Xu Date: Tue, 13 Jun 2017 16:31:35 +0800 Subject: [PATCH 5/5] SQL -> sql --- .../beam/dsls/sql/rel/BeamAggregationRel.java | 25 ++++++++--------- .../beam/dsls/sql/rel/BeamIntersectRel.java | 4 +-- .../beam/dsls/sql/rel/BeamMinusRel.java | 4 +-- .../dsls/sql/rel/BeamSetOperatorRelBase.java | 26 ++++++++--------- .../beam/dsls/sql/rel/BeamUnionRel.java | 4 +-- .../transform/BeamSetOperatorsTransforms.java | 28 +++++++++---------- .../dsls/sql/planner/MockedBeamSqlTable.java | 4 +-- .../dsls/sql/rel/BeamIntersectRelTest.java | 16 +++++------ .../beam/dsls/sql/rel/BeamMinusRelTest.java | 18 ++++++------ .../sql/rel/BeamSetOperatorRelBaseTest.java | 18 ++++++------ .../beam/dsls/sql/rel/BeamUnionRelTest.java | 14 +++++----- .../apache/beam/dsls/sql/rel/CheckSize.java | 8 +++--- 12 files changed, 84 insertions(+), 85 deletions(-) diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java index 3c7eeb05e45c..9951536d914f 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java @@ -18,11 +18,10 @@ package org.apache.beam.dsls.sql.rel; import java.util.List; + import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils; import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; - - import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms; import org.apache.beam.sdk.coders.IterableCoder; @@ -82,38 +81,38 @@ public PCollection buildBeamPipeline(PCollectionTuple inputPCollecti BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections); if (windowFieldIdx != -1) { upstream = upstream.apply(stageName + "_assignEventTimestamp", WithTimestamps - .of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx))) + .of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx))) .setCoder(upstream.getCoder()); } - PCollection windowStream = upstream.apply(stageName + "_window", - Window.into(windowFn) + PCollection windowStream = upstream.apply(stageName + "_window", + Window.into(windowFn) .triggering(trigger) .withAllowedLateness(allowedLatence) .accumulatingFiredPanes()); BeamSqlRowCoder keyCoder = new BeamSqlRowCoder(exKeyFieldsSchema(input.getRowType())); - PCollection> exGroupByStream = windowStream.apply( + PCollection> exGroupByStream = windowStream.apply( stageName + "_exGroupBy", WithKeys .of(new BeamAggregationTransforms.AggregationGroupByKeyFn( windowFieldIdx, groupSet))) .setCoder(KvCoder.of(keyCoder, upstream.getCoder())); - PCollection>> groupedStream = exGroupByStream - .apply(stageName + "_groupBy", GroupByKey.create()) - .setCoder(KvCoder.>of(keyCoder, - IterableCoder.of(upstream.getCoder()))); + PCollection>> groupedStream = exGroupByStream + .apply(stageName + "_groupBy", GroupByKey.create()) + .setCoder(KvCoder.>of(keyCoder, + IterableCoder.of(upstream.getCoder()))); BeamSqlRowCoder aggCoder = new BeamSqlRowCoder(exAggFieldsSchema()); - PCollection> aggregatedStream = groupedStream.apply( + PCollection> aggregatedStream = groupedStream.apply( stageName + "_aggregation", - Combine.groupedValues( + Combine.groupedValues( new BeamAggregationTransforms.AggregationCombineFn(getAggCallList(), BeamSqlRecordType.from(input.getRowType())))) .setCoder(KvCoder.of(keyCoder, aggCoder)); - PCollection mergedStream = aggregatedStream.apply(stageName + "_mergeRecord", + PCollection mergedStream = aggregatedStream.apply(stageName + "_mergeRecord", ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord( BeamSqlRecordType.from(getRowType()), getAggCallList()))); mergedStream.setCoder(new BeamSqlRowCoder(BeamSqlRecordType.from(getRowType()))); diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java index 5fef568906a0..01e1c336b76a 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java @@ -20,7 +20,7 @@ import java.util.List; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.calcite.plan.RelOptCluster; @@ -51,7 +51,7 @@ public BeamIntersectRel( return new BeamIntersectRel(getCluster(), traitSet, inputs, all); } - @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) + @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) throws Exception { return delegate.buildBeamPipeline(inputPCollections); } diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java index afe9053df28d..bee6c11ac475 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java @@ -20,7 +20,7 @@ import java.util.List; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.calcite.plan.RelOptCluster; @@ -49,7 +49,7 @@ public BeamMinusRel(RelOptCluster cluster, RelTraitSet traits, List inp return new BeamMinusRel(getCluster(), traitSet, inputs, all); } - @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) + @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) throws Exception { return delegate.buildBeamPipeline(inputPCollections); } diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java index b0c5b723d325..271e98f5f251 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java @@ -21,8 +21,8 @@ import java.io.Serializable; import java.util.List; -import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.transform.BeamSetOperatorsTransforms; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; @@ -63,11 +63,11 @@ public BeamSetOperatorRelBase(BeamRelNode beamRelNode, OpType opType, this.all = all; } - public PCollection buildBeamPipeline( + public PCollection buildBeamPipeline( PCollectionTuple inputPCollections) throws Exception { - PCollection leftRows = BeamSQLRelUtils.getBeamRelInput(inputs.get(0)) + PCollection leftRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(0)) .buildBeamPipeline(inputPCollections); - PCollection rightRows = BeamSQLRelUtils.getBeamRelInput(inputs.get(1)) + PCollection rightRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(1)) .buildBeamPipeline(inputPCollections); WindowFn leftWindow = leftRows.getWindowingStrategy().getWindowFn(); @@ -78,20 +78,20 @@ public PCollection buildBeamPipeline( + leftWindow + " VS " + rightWindow); } - final TupleTag leftTag = new TupleTag<>(); - final TupleTag rightTag = new TupleTag<>(); + final TupleTag leftTag = new TupleTag<>(); + final TupleTag rightTag = new TupleTag<>(); // co-group - String stageName = BeamSQLRelUtils.getStageName(beamRelNode); - PCollection> coGbkResultCollection = KeyedPCollectionTuple + String stageName = BeamSqlRelUtils.getStageName(beamRelNode); + PCollection> coGbkResultCollection = KeyedPCollectionTuple .of(leftTag, leftRows.apply( stageName + "_CreateLeftIndex", MapElements.via( - new BeamSetOperatorsTransforms.BeamSQLRow2KvFn()))) + new BeamSetOperatorsTransforms.BeamSqlRow2KvFn()))) .and(rightTag, rightRows.apply( stageName + "_CreateRightIndex", MapElements.via( - new BeamSetOperatorsTransforms.BeamSQLRow2KvFn()))) - .apply(CoGroupByKey.create()); - PCollection ret = coGbkResultCollection + new BeamSetOperatorsTransforms.BeamSqlRow2KvFn()))) + .apply(CoGroupByKey.create()); + PCollection ret = coGbkResultCollection .apply(ParDo.of(new BeamSetOperatorsTransforms.SetOperatorFilteringDoFn(leftTag, rightTag, opType, all))); return ret; diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java index eaa850de9e8f..63cf11afa127 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java @@ -20,7 +20,7 @@ import java.util.List; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -81,7 +81,7 @@ public BeamUnionRel(RelInput input) { return new BeamUnionRel(getCluster(), traitSet, inputs, all); } - @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) + @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) throws Exception { return delegate.buildBeamPipeline(inputPCollections); } diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java index 8992f9feae16..56b3e149605c 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java @@ -21,7 +21,7 @@ import java.util.Iterator; import org.apache.beam.dsls.sql.rel.BeamSetOperatorRelBase; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.join.CoGbkResult; @@ -33,11 +33,11 @@ */ public abstract class BeamSetOperatorsTransforms { /** - * Transform a {@code BeamSQLRow} to a {@code KV}. + * Transform a {@code BeamSqlRow} to a {@code KV}. */ - public static class BeamSQLRow2KvFn extends - SimpleFunction> { - @Override public KV apply(BeamSQLRow input) { + public static class BeamSqlRow2KvFn extends + SimpleFunction> { + @Override public KV apply(BeamSqlRow input) { return KV.of(input, input); } } @@ -46,14 +46,14 @@ public static class BeamSQLRow2KvFn extends * Filter function used for Set operators. */ public static class SetOperatorFilteringDoFn extends - DoFn, BeamSQLRow> { - private TupleTag leftTag; - private TupleTag rightTag; + DoFn, BeamSqlRow> { + private TupleTag leftTag; + private TupleTag rightTag; private BeamSetOperatorRelBase.OpType opType; // ALL? private boolean all; - public SetOperatorFilteringDoFn(TupleTag leftTag, TupleTag rightTag, + public SetOperatorFilteringDoFn(TupleTag leftTag, TupleTag rightTag, BeamSetOperatorRelBase.OpType opType, boolean all) { this.leftTag = leftTag; this.rightTag = rightTag; @@ -63,13 +63,13 @@ public SetOperatorFilteringDoFn(TupleTag leftTag, TupleTag leftRows = coGbkResult.getAll(leftTag); - Iterable rightRows = coGbkResult.getAll(rightTag); + Iterable leftRows = coGbkResult.getAll(leftTag); + Iterable rightRows = coGbkResult.getAll(rightTag); switch (opType) { case UNION: if (all) { // output both left & right - Iterator iter = leftRows.iterator(); + Iterator iter = leftRows.iterator(); while (iter.hasNext()) { ctx.output(iter.next()); } @@ -85,7 +85,7 @@ public SetOperatorFilteringDoFn(TupleTag leftTag, TupleTag iter = leftRows.iterator(); + Iterator iter = leftRows.iterator(); while (iter.hasNext()) { ctx.output(iter.next()); } @@ -96,7 +96,7 @@ public SetOperatorFilteringDoFn(TupleTag leftTag, TupleTag iter = leftRows.iterator(); + Iterator iter = leftRows.iterator(); if (all) { // output all while (iter.hasNext()) { diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java index c39fd72c4300..185e95ab0053 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java @@ -45,7 +45,7 @@ */ public class MockedBeamSqlTable extends BaseBeamTable { public static final AtomicInteger COUNTER = new AtomicInteger(); - public static final ConcurrentLinkedQueue CONTENT = new ConcurrentLinkedQueue<>(); + public static final ConcurrentLinkedQueue CONTENT = new ConcurrentLinkedQueue<>(); private List inputRecords; @@ -122,7 +122,7 @@ public BeamIOType getSourceType() { } @Override - public PCollection buildIOReader(Pipeline pipeline) { + public PCollection buildIOReader(Pipeline pipeline) { return PBegin.in(pipeline).apply( "MockedBeamSQLTable_Reader_" + COUNTER.incrementAndGet(), Create.of(inputRecords)); } diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java index 567fafe9252f..02223c2acede 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java @@ -20,8 +20,8 @@ import org.apache.beam.dsls.sql.BeamSqlCli; import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.planner.MockedBeamSQLTable; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; @@ -36,7 +36,7 @@ public class BeamIntersectRelTest { @Rule public final TestPipeline pipeline = TestPipeline.create(); - private static MockedBeamSQLTable orderDetailsTable1 = MockedBeamSQLTable + private static MockedBeamSqlTable orderDetailsTable1 = MockedBeamSqlTable .of(SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price", @@ -46,7 +46,7 @@ public class BeamIntersectRelTest { 4L, 4, 4.0 ); - private static MockedBeamSQLTable orderDetailsTable2 = MockedBeamSQLTable + private static MockedBeamSqlTable orderDetailsTable2 = MockedBeamSqlTable .of(SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price", @@ -70,9 +70,9 @@ public void testIntersect() throws Exception { + "SELECT order_id, site_id, price " + "FROM ORDER_DETAILS2 "; - PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); PAssert.that(rows).containsInAnyOrder( - MockedBeamSQLTable.of( + MockedBeamSqlTable.of( SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price", @@ -93,11 +93,11 @@ public void testIntersectAll() throws Exception { + "SELECT order_id, site_id, price " + "FROM ORDER_DETAILS2 "; - PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); PAssert.that(rows).satisfies(new CheckSize(3)); PAssert.that(rows).containsInAnyOrder( - MockedBeamSQLTable.of( + MockedBeamSqlTable.of( SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price", diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java index 1980ef569169..cd6ba163f3d1 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java @@ -20,8 +20,8 @@ import org.apache.beam.dsls.sql.BeamSqlCli; import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.planner.MockedBeamSQLTable; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; @@ -36,7 +36,7 @@ public class BeamMinusRelTest { @Rule public final TestPipeline pipeline = TestPipeline.create(); - private MockedBeamSQLTable orderDetailsTable1 = MockedBeamSQLTable + private MockedBeamSqlTable orderDetailsTable1 = MockedBeamSqlTable .of(SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price", @@ -47,7 +47,7 @@ public class BeamMinusRelTest { 4L, 4, 4.0 ); - private MockedBeamSQLTable orderDetailsTable2 = MockedBeamSQLTable + private MockedBeamSqlTable orderDetailsTable2 = MockedBeamSqlTable .of(SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price", @@ -60,7 +60,7 @@ public class BeamMinusRelTest { public void setUp() { BeamSqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1); BeamSqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2); - MockedBeamSQLTable.CONTENT.clear(); + MockedBeamSqlTable.CONTENT.clear(); } @Test @@ -72,9 +72,9 @@ public void testExcept() throws Exception { + "SELECT order_id, site_id, price " + "FROM ORDER_DETAILS2 "; - PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); PAssert.that(rows).containsInAnyOrder( - MockedBeamSQLTable.of( + MockedBeamSqlTable.of( SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price", @@ -93,11 +93,11 @@ public void testExceptAll() throws Exception { + "SELECT order_id, site_id, price " + "FROM ORDER_DETAILS2 "; - PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); PAssert.that(rows).satisfies(new CheckSize(2)); PAssert.that(rows).containsInAnyOrder( - MockedBeamSQLTable.of( + MockedBeamSqlTable.of( SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price", diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java index c0943af19e2f..4936062bf0c0 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java @@ -24,8 +24,8 @@ import org.apache.beam.dsls.sql.BeamSqlCli; import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.planner.MockedBeamSQLTable; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; @@ -45,7 +45,7 @@ public class BeamSetOperatorRelBaseTest { @Rule public final TestPipeline pipeline = TestPipeline.create(); public static final Date THE_DATE = new Date(); - private static MockedBeamSQLTable orderDetailsTable = MockedBeamSQLTable + private static MockedBeamSqlTable orderDetailsTable = MockedBeamSqlTable .of(SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price", @@ -71,9 +71,9 @@ public void testSameWindow() throws Exception { + "FROM ORDER_DETAILS GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR) "; - PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); - List expRows = - MockedBeamSQLTable.of( + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); + List expRows = + MockedBeamSqlTable.of( SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.BIGINT, "cnt", @@ -104,16 +104,16 @@ public void testDifferentWindows() throws Exception { pipeline.run(); } - static class ToString extends DoFn { + static class ToString extends DoFn { @ProcessElement public void processElement(ProcessContext ctx) { ctx.output(ctx.element().valueInString()); } } - static List toString (List rows) { + static List toString (List rows) { List strs = new ArrayList<>(); - for (BeamSQLRow row : rows) { + for (BeamSqlRow row : rows) { strs.add(row.valueInString()); } diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java index cd059344d72b..c2a05973a59e 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java @@ -20,8 +20,8 @@ import org.apache.beam.dsls.sql.BeamSqlCli; import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.planner.MockedBeamSQLTable; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; @@ -36,7 +36,7 @@ public class BeamUnionRelTest { @Rule public final TestPipeline pipeline = TestPipeline.create(); - private static MockedBeamSQLTable orderDetailsTable = MockedBeamSQLTable + private static MockedBeamSqlTable orderDetailsTable = MockedBeamSqlTable .of(SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price", @@ -58,9 +58,9 @@ public void testUnion() throws Exception { + " order_id, site_id, price " + "FROM ORDER_DETAILS "; - PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); PAssert.that(rows).containsInAnyOrder( - MockedBeamSQLTable.of( + MockedBeamSqlTable.of( SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price", @@ -81,9 +81,9 @@ public void testUnionAll() throws Exception { + " SELECT order_id, site_id, price " + "FROM ORDER_DETAILS"; - PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); PAssert.that(rows).containsInAnyOrder( - MockedBeamSQLTable.of( + MockedBeamSqlTable.of( SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price", diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/CheckSize.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/CheckSize.java index 774c42159e42..ce532df5806c 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/CheckSize.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/CheckSize.java @@ -18,21 +18,21 @@ package org.apache.beam.dsls.sql.rel; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.transforms.SerializableFunction; import org.junit.Assert; /** * Utility class to check size of BeamSQLRow iterable. */ -public class CheckSize implements SerializableFunction, Void> { +public class CheckSize implements SerializableFunction, Void> { private int size; public CheckSize(int size) { this.size = size; } - @Override public Void apply(Iterable input) { + @Override public Void apply(Iterable input) { int count = 0; - for (BeamSQLRow row : input) { + for (BeamSqlRow row : input) { count++; } Assert.assertEquals(size, count);