From c3be863524cf1317b33e06e8c0ed9910ef4a47dc Mon Sep 17 00:00:00 2001 From: James Xu Date: Mon, 29 May 2017 11:11:34 +0800 Subject: [PATCH 1/6] [BEAM-2193] implement join --- dsls/pom.xml | 2 +- dsls/sql/pom.xml | 10 +- .../beam/dsls/sql/planner/BeamRuleSets.java | 6 +- .../beam/dsls/sql/rel/BeamAggregationRel.java | 19 +- .../apache/beam/dsls/sql/rel/BeamJoinRel.java | 273 ++++++++++++++++++ .../beam/dsls/sql/rule/BeamJoinRule.java | 53 ++++ .../dsls/sql/schema/BeamSqlRecordType.java | 2 +- .../beam/dsls/sql/schema/BeamSqlRow.java | 2 +- .../beam/dsls/sql/schema/BeamSqlRowCoder.java | 4 +- .../sql/transform/BeamJoinTransforms.java | 166 +++++++++++ .../transform/BeamSqlOutputToConsoleFn.java | 1 + .../org/apache/beam/dsls/sql/TestUtils.java | 53 ++++ .../dsls/sql/planner/MockedBeamSqlTable.java | 16 +- .../rel/BeamJoinRelBoundedVsBoundedTest.java | 165 +++++++++++ .../BeamJoinRelUnboundedVsBoundedTest.java | 187 ++++++++++++ .../BeamJoinRelUnboundedVsUnboundedTest.java | 203 +++++++++++++ .../dsls/sql/schema/BeamSqlRowCoderTest.java | 89 ++++-- 17 files changed, 1200 insertions(+), 51 deletions(-) create mode 100644 dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java create mode 100644 dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java create mode 100644 dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java create mode 100644 dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java create mode 100644 dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java create mode 100644 dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java create mode 100644 dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java diff --git a/dsls/pom.xml b/dsls/pom.xml index d9326985a239..a518d030b176 100644 --- a/dsls/pom.xml +++ b/dsls/pom.xml @@ -66,7 +66,7 @@ - + org.apache.maven.plugins diff --git a/dsls/sql/pom.xml b/dsls/sql/pom.xml index a2279d5a7447..551c4bac133b 100644 --- a/dsls/sql/pom.xml +++ b/dsls/sql/pom.xml @@ -156,6 +156,11 @@ beam-sdks-java-io-kafka provided + + org.apache.kafka + kafka-clients + 0.9.0.1 + com.google.guava guava @@ -203,11 +208,14 @@ auto-value provided - org.apache.beam beam-runners-direct-java test + + org.apache.beam + beam-sdks-java-extensions-join-library + diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java index 6c73558602a2..552ff8fc986c 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java @@ -19,15 +19,14 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; - import java.util.Iterator; - import org.apache.beam.dsls.sql.rel.BeamRelNode; import org.apache.beam.dsls.sql.rule.BeamAggregationRule; import org.apache.beam.dsls.sql.rule.BeamFilterRule; import org.apache.beam.dsls.sql.rule.BeamIOSinkRule; import org.apache.beam.dsls.sql.rule.BeamIOSourceRule; import org.apache.beam.dsls.sql.rule.BeamIntersectRule; +import org.apache.beam.dsls.sql.rule.BeamJoinRule; import org.apache.beam.dsls.sql.rule.BeamMinusRule; import org.apache.beam.dsls.sql.rule.BeamProjectRule; import org.apache.beam.dsls.sql.rule.BeamSortRule; @@ -47,7 +46,8 @@ public class BeamRuleSets { .builder().add(BeamIOSourceRule.INSTANCE, BeamProjectRule.INSTANCE, BeamFilterRule.INSTANCE, BeamIOSinkRule.INSTANCE, BeamAggregationRule.INSTANCE, BeamSortRule.INSTANCE, BeamValuesRule.INSTANCE, - BeamIntersectRule.INSTANCE, BeamMinusRule.INSTANCE, BeamUnionRule.INSTANCE) + BeamIntersectRule.INSTANCE, BeamMinusRule.INSTANCE, BeamUnionRule.INSTANCE, + BeamJoinRule.INSTANCE) .build(); public static RuleSet[] getRuleSets() { diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java index 701f6206add6..9ec9e9fd8f29 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java @@ -74,40 +74,41 @@ public BeamAggregationRel(RelOptCluster cluster, RelTraitSet traits public PCollection buildBeamPipeline(PCollectionTuple inputPCollections , BeamSqlEnv sqlEnv) throws Exception { RelNode input = getInput(); - String stageName = BeamSqlRelUtils.getStageName(this); + String stageName = BeamSqlRelUtils.getStageName(this) + "_"; PCollection upstream = BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); if (windowFieldIdx != -1) { - upstream = upstream.apply(stageName + "_assignEventTimestamp", WithTimestamps - .of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx))) + upstream = upstream.apply(stageName + "assignEventTimestamp", WithTimestamps + .of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx))) .setCoder(upstream.getCoder()); } - PCollection windowStream = upstream.apply(stageName + "_window", - Window.into(windowFn) + PCollection windowStream = upstream.apply(stageName + "window", + Window.into(windowFn) .triggering(trigger) .withAllowedLateness(allowedLatence) .accumulatingFiredPanes()); BeamSqlRowCoder keyCoder = new BeamSqlRowCoder(exKeyFieldsSchema(input.getRowType())); PCollection> exCombineByStream = windowStream.apply( - stageName + "_exCombineBy", + stageName + "exCombineBy", WithKeys .of(new BeamAggregationTransforms.AggregationGroupByKeyFn( windowFieldIdx, groupSet))) - .setCoder(KvCoder.of(keyCoder, upstream.getCoder())); + .setCoder(KvCoder.of(keyCoder, upstream.getCoder())); + BeamSqlRowCoder aggCoder = new BeamSqlRowCoder(exAggFieldsSchema()); PCollection> aggregatedStream = exCombineByStream.apply( - stageName + "_combineBy", + stageName + "combineBy", Combine.perKey( new BeamAggregationTransforms.AggregationCombineFn(getAggCallList(), CalciteUtils.toBeamRecordType(input.getRowType())))) .setCoder(KvCoder.of(keyCoder, aggCoder)); - PCollection mergedStream = aggregatedStream.apply(stageName + "_mergeRecord", + PCollection mergedStream = aggregatedStream.apply(stageName + "mergeRecord", ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord( CalciteUtils.toBeamRecordType(getRowType()), getAggCallList()))); mergedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType()))); diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java new file mode 100644 index 000000000000..558270659dc3 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.beam.dsls.sql.BeamSqlEnv; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; +import org.apache.beam.dsls.sql.transform.BeamJoinTransforms; +import org.apache.beam.dsls.sql.utils.CalciteUtils; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.CorrelationId; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.Pair; + +/** + * {@code BeamRelNode} to replace a {@code Join} node. + * + *

Support for join can be categorized into 3 cases: + *

    + *
  • BoundedTable JOIN BoundedTable
  • + *
  • UnboundedTable JOIN UnboundedTable
  • + *
  • BoundedTable JOIN UnboundedTable
  • + *
+ * + *

For the first two cases, a standard join can be utilized to implement them as long as the + * windowFn of the both sides match. For the third case, {@code sideInput} is utilized to implement + * the join, hence there are some constrains for the third case: 1) FULL JOIN is not supported + * 2) The unbounded table must be at the left side of the OUTER JOIN. + * + *

There is also some overall constrains: + * + *

    + *
  • Only equi-join is supported
  • + *
  • CROSS JOIN is not supported
  • + *
+ */ +public class BeamJoinRel extends Join implements BeamRelNode { + public BeamJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, + RexNode condition, Set variablesSet, JoinRelType joinType) { + super(cluster, traits, left, right, condition, variablesSet, joinType); + } + + @Override public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, + RelNode right, JoinRelType joinType, boolean semiJoinDone) { + return new BeamJoinRel(getCluster(), traitSet, left, right, conditionExpr, variablesSet, + joinType); + } + + @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections, + BeamSqlEnv sqlEnv) + throws Exception { + BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left); + BeamSqlRecordType leftRowType = CalciteUtils.toBeamRecordType(left.getRowType()); + PCollection leftRows = leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv); + leftRows.setCoder(new BeamSqlRowCoder(leftRowType)); + + final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right); + BeamSqlRecordType rightRowType = CalciteUtils.toBeamRecordType(right.getRowType()); + PCollection rightRows = rightRelNode.buildBeamPipeline(inputPCollections, sqlEnv); + rightRows.setCoder(new BeamSqlRowCoder(rightRowType)); + + String stageName = BeamSqlRelUtils.getStageName(this); + WindowFn leftWinFn = leftRows.getWindowingStrategy().getWindowFn(); + WindowFn rightWinFn = rightRows.getWindowingStrategy().getWindowFn(); + + // extract the join fields + List> pairs = extractJoinColumns( + leftRelNode.getRowType().getFieldCount()); + + // build the extract key type + // the name of the join field is not important + List names = new ArrayList<>(pairs.size()); + List types = new ArrayList<>(pairs.size()); + for (int i = 0; i < pairs.size(); i++) { + names.add("c" + i); + types.add(leftRowType.getFieldsType().get(pairs.get(i).getKey())); + } + BeamSqlRecordType extractKeyRowType = BeamSqlRecordType.create(names, types); + + Coder extractKeyRowCoder = new BeamSqlRowCoder(extractKeyRowType); + + // BeamSqlRow -> KV + PCollection> extractedLeftRows = leftRows + .apply(stageName + "_left_ExtractJoinFields", + MapElements.via(new BeamJoinTransforms.ExtractJoinFields(true, pairs))) + .setCoder(KvCoder.of(extractKeyRowCoder, leftRows.getCoder())); + + PCollection> extractedRightRows = rightRows + .apply(stageName + "_right_ExtractJoinFields", + MapElements.via(new BeamJoinTransforms.ExtractJoinFields(false, pairs))) + .setCoder(KvCoder.of(extractKeyRowCoder, rightRows.getCoder())); + + // prepare the NullRows + BeamSqlRow leftNullRow = buildNullRow(leftRelNode); + BeamSqlRow rightNullRow = buildNullRow(rightRelNode); + + // a regular join + if (leftWinFn.isCompatible(rightWinFn) + && ((leftRows.isBounded() == PCollection.IsBounded.BOUNDED + && rightRows.isBounded() == PCollection.IsBounded.BOUNDED) + || (leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED + && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED) + ) + ) { + return standardJoin(extractedLeftRows, extractedRightRows, + leftNullRow, rightNullRow, stageName); + } else if ( + (leftRows.isBounded() == PCollection.IsBounded.BOUNDED + && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED) + || (leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED + && rightRows.isBounded() == PCollection.IsBounded.BOUNDED) + ) { + // if one of the sides is Bounded & the other is Unbounded + // then do a sideInput + // when doing a sideInput, the windowFn does not need to match + // Only support INNER JOIN & LEFT OUTER JOIN where left side of the join must be + // the unbounded + if (joinType == JoinRelType.FULL) { + throw new UnsupportedOperationException("FULL OUTER JOIN is not supported when join " + + "a bounded table with an unbounded table."); + } + + if ((joinType == JoinRelType.LEFT + && leftRows.isBounded() == PCollection.IsBounded.BOUNDED) + || (joinType == JoinRelType.RIGHT + && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)) { + throw new UnsupportedOperationException( + "LEFT side of an OUTER JOIN must be Unbounded table."); + } + + return sideInputJoin(extractedLeftRows, extractedRightRows, + leftNullRow, rightNullRow); + } else { + throw new UnsupportedOperationException( + "The inputs to the JOIN have un-joinnable windowFns: " + leftWinFn + ", " + rightWinFn); + } + } + + private PCollection standardJoin( + PCollection> extractedLeftRows, + PCollection> extractedRightRows, + BeamSqlRow leftNullRow, BeamSqlRow rightNullRow, String stageName) { + PCollection>> joinedRows = null; + switch (joinType) { + case INNER: + joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join + .innerJoin(extractedLeftRows, extractedRightRows); + break; + case LEFT: + joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join + .leftOuterJoin(extractedLeftRows, extractedRightRows, rightNullRow); + break; + case RIGHT: + joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join + .rightOuterJoin(extractedLeftRows, extractedRightRows, leftNullRow); + break; + case FULL: + joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join + .fullOuterJoin(extractedLeftRows, extractedRightRows, leftNullRow, + rightNullRow); + } + + PCollection ret = joinedRows + .apply(stageName + "_JoinParts2WholeRow", + MapElements.via(new BeamJoinTransforms.JoinParts2WholeRow())) + .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType()))); + return ret; + } + + public PCollection sideInputJoin( + PCollection> extractedLeftRows, + PCollection> extractedRightRows, + BeamSqlRow leftNullRow, BeamSqlRow rightNullRow) { + // if the join is not a INNER JOIN we convert the join to a left join + // by swap the left/right side of the rows + boolean swapped = joinType != JoinRelType.INNER + && extractedLeftRows.isBounded() == PCollection.IsBounded.BOUNDED; + + PCollection> realLeftRows = + swapped ? extractedRightRows : extractedLeftRows; + PCollection> realRightRows = + swapped ? extractedLeftRows : extractedRightRows; + BeamSqlRow realRightNullRow = swapped ? leftNullRow : rightNullRow; + JoinRelType realJoinType = swapped ? JoinRelType.LEFT : joinType; + + final PCollectionView>> rowsView = realRightRows + .apply(View.asMultimap()); + + PCollection ret = realLeftRows + .apply(ParDo.of(new BeamJoinTransforms.SideInputJoinDoFn( + realJoinType, realRightNullRow, rowsView, swapped)).withSideInputs(rowsView)) + .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType()))); + + return ret; + } + + private BeamSqlRow buildNullRow(BeamRelNode relNode) { + BeamSqlRecordType leftType = CalciteUtils.toBeamRecordType(relNode.getRowType()); + BeamSqlRow nullRow = new BeamSqlRow(leftType); + for (int i = 0; i < leftType.size(); i++) { + nullRow.addField(i, null); + } + return nullRow; + } + + private List> extractJoinColumns(int separator) { + RexCall call = (RexCall) condition; + List> pairs = new ArrayList<>(); + if ("AND".equals(call.getOperator().getName())) { + List operands = call.getOperands(); + for (RexNode rexNode : operands) { + Pair pair = extractOneJoinColumn((RexCall) rexNode, separator); + pairs.add(pair); + } + } else if ("=".equals(call.getOperator().getName())) { + pairs.add(extractOneJoinColumn(call, separator)); + } else { + throw new UnsupportedOperationException( + "Operator " + call.getOperator().getName() + " is not supported in join condition"); + } + + return pairs; + } + + private Pair extractOneJoinColumn(RexCall oneCondition, int separator) { + List operands = oneCondition.getOperands(); + final int leftIndex = Math.min(((RexInputRef) operands.get(0)).getIndex(), + ((RexInputRef) operands.get(1)).getIndex()); + + final int rightIndex1 = Math.max(((RexInputRef) operands.get(0)).getIndex(), + ((RexInputRef) operands.get(1)).getIndex()); + final int rightIndex = rightIndex1 - separator; + + return new Pair<>(leftIndex, rightIndex); + } +} diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java new file mode 100644 index 000000000000..78253fe716c3 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rule; + +import org.apache.beam.dsls.sql.rel.BeamJoinRel; +import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.logical.LogicalJoin; + +/** + * {@code ConverterRule} to replace {@code Join} with {@code BeamJoinRel}. + */ +public class BeamJoinRule extends ConverterRule { + public static final BeamJoinRule INSTANCE = new BeamJoinRule(); + private BeamJoinRule() { + super(LogicalJoin.class, Convention.NONE, + BeamLogicalConvention.INSTANCE, "BeamJoinRule"); + } + + @Override public RelNode convert(RelNode rel) { + Join join = (Join) rel; + return new BeamJoinRel( + join.getCluster(), + join.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convert(join.getLeft(), + join.getLeft().getTraitSet().replace(BeamLogicalConvention.INSTANCE)), + convert(join.getRight(), + join.getRight().getTraitSet().replace(BeamLogicalConvention.INSTANCE)), + join.getCondition(), + join.getVariablesSet(), + join.getJoinType() + ); + } +} diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java index 9fc39451866f..52bd652e65d4 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java @@ -31,7 +31,7 @@ public abstract class BeamSqlRecordType implements Serializable { public abstract List getFieldsType(); public static BeamSqlRecordType create(List fieldNames, List fieldTypes) { - return new AutoValue_BeamSqlRecordType(fieldNames, fieldTypes); + return new org.apache.beam.dsls.sql.schema.AutoValue_BeamSqlRecordType(fieldNames, fieldTypes); } public int size() { diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java index 213dcd51a958..2d7e350a21eb 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java @@ -365,6 +365,6 @@ public boolean equals(Object obj) { } @Override public int hashCode() { - return toString().hashCode(); + return 31 * (31 * dataType.hashCode() + dataValues.hashCode()) + nullFields.hashCode(); } } diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java index e86fb3ff92a9..2621001532ce 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java @@ -23,7 +23,6 @@ import java.util.Date; import java.util.GregorianCalendar; import java.util.List; - import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.beam.sdk.coders.BigDecimalCoder; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; @@ -58,7 +57,7 @@ public BeamSqlRowCoder(BeamSqlRecordType tableSchema) { @Override public void encode(BeamSqlRow value, OutputStream outStream) throws CoderException, IOException { listCoder.encode(value.getNullFields(), outStream); - + System.out.println("Encode: nullFields: " + value.getNullFields()); for (int idx = 0; idx < value.size(); ++idx) { if (value.getNullFields().contains(idx)) { continue; @@ -113,7 +112,6 @@ public BeamSqlRow decode(InputStream inStream) throws CoderException, IOExceptio BeamSqlRow record = new BeamSqlRow(tableSchema); record.setNullFields(nullFields); - for (int idx = 0; idx < tableSchema.size(); ++idx) { if (nullFields.contains(idx)) { continue; diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java new file mode 100644 index 000000000000..bc739dd157e9 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.transform; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.util.Pair; + +/** + * Collections of {@code PTransform} and {@code DoFn} used to perform JOIN operation. + */ +public class BeamJoinTransforms { + + /** + * A {@code SimpleFunction} to extract join fields from the specified row. + */ + public static class ExtractJoinFields + extends SimpleFunction> { + private boolean isLeft; + private List> joinColumns; + + public ExtractJoinFields(boolean isLeft, List> joinColumns) { + this.isLeft = isLeft; + this.joinColumns = joinColumns; + } + + @Override public KV apply(BeamSqlRow input) { + // build the type + // the name of the join field is not important + List names = new ArrayList<>(joinColumns.size()); + List types = new ArrayList<>(joinColumns.size()); + for (int i = 0; i < joinColumns.size(); i++) { + names.add("c" + i); + types.add(isLeft + ? input.getDataType().getFieldsType().get(joinColumns.get(i).getKey()) : + input.getDataType().getFieldsType().get(joinColumns.get(i).getValue())); + } + BeamSqlRecordType type = BeamSqlRecordType.create(names, types); + + // build the row + BeamSqlRow row = new BeamSqlRow(type); + for (int i = 0; i < joinColumns.size(); i++) { + row.addField(i, input + .getFieldValue(isLeft ? joinColumns.get(i).getKey() : joinColumns.get(i).getValue())); + } + return KV.of(row, input); + } + } + + + /** + * A {@code DoFn} which implement the sideInput-JOIN. + */ + public static class SideInputJoinDoFn extends DoFn, BeamSqlRow> { + private PCollectionView>> sideInputView; + private JoinRelType joinType; + private BeamSqlRow rightNullRow; + private boolean swap; + + public SideInputJoinDoFn(JoinRelType joinType, BeamSqlRow rightNullRow, + PCollectionView>> sideInputView, + boolean swap) { + this.joinType = joinType; + this.rightNullRow = rightNullRow; + this.sideInputView = sideInputView; + this.swap = swap; + } + + @ProcessElement public void processElement(ProcessContext context) { + BeamSqlRow key = context.element().getKey(); + BeamSqlRow leftRow = context.element().getValue(); + Map> key2Rows = context.sideInput(sideInputView); + Iterable rightRowsIterable = key2Rows.get(key); + + if (rightRowsIterable != null && rightRowsIterable.iterator().hasNext()) { + Iterator it = rightRowsIterable.iterator(); + while (it.hasNext()) { + context.output(combineTwoRowsIntoOne(leftRow, it.next(), swap)); + } + } else { + if (joinType == JoinRelType.LEFT) { + context.output(combineTwoRowsIntoOne(leftRow, rightNullRow, swap)); + } + } + } + } + + + /** + * A {@code SimpleFunction} to combine two rows into one. + */ + public static class JoinParts2WholeRow + extends SimpleFunction>, BeamSqlRow> { + @Override public BeamSqlRow apply(KV> input) { + KV parts = input.getValue(); + BeamSqlRow leftRow = parts.getKey(); + BeamSqlRow rightRow = parts.getValue(); + return combineTwoRowsIntoOne(leftRow, rightRow); + } + } + + private static BeamSqlRow combineTwoRowsIntoOne(BeamSqlRow leftRow, BeamSqlRow rightRow) { + return combineTwoRowsIntoOne(leftRow, rightRow, false); + } + + /** + * As the method name suggests: combine two rows into one wide row. + */ + private static BeamSqlRow combineTwoRowsIntoOne(BeamSqlRow leftRow, + BeamSqlRow rightRow, boolean swap) { + // build the type + List names = new ArrayList<>(leftRow.size() + rightRow.size()); + names.addAll( + swap ? rightRow.getDataType().getFieldsName() : leftRow.getDataType().getFieldsName()); + names.addAll( + swap ? leftRow.getDataType().getFieldsName() : rightRow.getDataType().getFieldsName()); + + List types = new ArrayList<>(leftRow.size() + rightRow.size()); + types.addAll( + swap ? rightRow.getDataType().getFieldsType() : leftRow.getDataType().getFieldsType()); + types.addAll( + swap ? leftRow.getDataType().getFieldsType() : rightRow.getDataType().getFieldsType()); + + BeamSqlRecordType type = BeamSqlRecordType.create(names, types); + + BeamSqlRow row = new BeamSqlRow(type); + BeamSqlRow currentRow = swap ? rightRow : leftRow; + int leftRowSize = currentRow.size(); + // build the row + for (int i = 0; i < currentRow.size(); i++) { + row.addField(i, currentRow.getFieldValue(i)); + } + + currentRow = swap ? leftRow : rightRow; + for (int i = 0; i < currentRow.size(); i++) { + row.addField(i + leftRowSize, currentRow.getFieldValue(i)); + } + + return row; + } +} diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlOutputToConsoleFn.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlOutputToConsoleFn.java index d8a2a63528c3..781fcbfb48d3 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlOutputToConsoleFn.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlOutputToConsoleFn.java @@ -35,6 +35,7 @@ public BeamSqlOutputToConsoleFn(String stepName) { @ProcessElement public void processElement(ProcessContext c) { + //System.out.println(c.element().getDataType().getFieldsName()); System.out.println("Output: " + c.element().getDataValues()); } diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java new file mode 100644 index 000000000000..4074515930e1 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.transforms.DoFn; + +/** + * Test utilities. + */ +public class TestUtils { + + /** + * A {@code DoFn} to convert a {@code BeamSqlRow} to a comparable {@code}. + */ + public static class BeamSqlRow2StringDoFn extends DoFn { + @ProcessElement + public void processElement(ProcessContext ctx) { + ctx.output(ctx.element().valueInString()); + } + } + + /** + * Convert list of {@code BeamSqlRow} to list of {@code String}. + */ + public static List beamSqlRows2Strings(List rows) { + List strs = new ArrayList<>(); + for (BeamSqlRow row : rows) { + strs.add(row.valueInString()); + } + + return strs; + } +} diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java index f651f6a41274..751705e9e968 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java @@ -21,7 +21,6 @@ import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; - import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.beam.dsls.sql.schema.BeamIOType; import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; @@ -49,6 +48,7 @@ public class MockedBeamSqlTable extends BaseBeamTable { public static final ConcurrentLinkedQueue CONTENT = new ConcurrentLinkedQueue<>(); private List inputRecords; + private PCollection.IsBounded isBounded = PCollection.IsBounded.BOUNDED; public MockedBeamSqlTable(BeamSqlRecordType beamSqlRecordType) { super(beamSqlRecordType); @@ -119,13 +119,19 @@ public RelDataType apply(RelDataTypeFactory a0) { @Override public BeamIOType getSourceType() { - return BeamIOType.UNBOUNDED; + if (isBounded == PCollection.IsBounded.BOUNDED) { + return BeamIOType.BOUNDED; + } else { + return BeamIOType.UNBOUNDED; + } } @Override + public PCollection buildIOReader(Pipeline pipeline) { return PBegin.in(pipeline).apply( - "MockedBeamSQLTable_Reader_" + COUNTER.incrementAndGet(), Create.of(inputRecords)); + "MockedBeamSQLTable_Reader_" + COUNTER.incrementAndGet(), Create.of(inputRecords)) + .setIsBoundedInternal(isBounded); } @Override @@ -137,6 +143,10 @@ public List getInputRecords() { return inputRecords; } + public MockedBeamSqlTable withIsBounded(PCollection.IsBounded isBounded) { + this.isBounded = isBounded; + return this; + } /** * Keep output in {@code CONTENT} for validation. * diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java new file mode 100644 index 000000000000..af45d0752525 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import org.apache.beam.dsls.sql.BeamSqlCli; +import org.apache.beam.dsls.sql.BeamSqlEnv; +import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Bounded + Bounded Test for {@code BeamJoinRel}. + */ +public class BeamJoinRelBoundedVsBoundedTest { + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv(); + + @Test + public void testInnerJoin() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS o1" + + " JOIN ORDER_DETAILS o2" + + " on " + + " o1.order_id=o2.site_id AND o2.price=o1.site_id" + ; + + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of( + SqlTypeName.INTEGER, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.INTEGER, "price", + SqlTypeName.INTEGER, "order_id0", + SqlTypeName.INTEGER, "site_id0", + SqlTypeName.INTEGER, "price0", + + 2, 3, 3, 1, 2, 3 + ).getInputRecords()); + pipeline.run(); + } + + @Test + public void testLeftOuterJoin() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS o1" + + " LEFT OUTER JOIN ORDER_DETAILS0 o2" + + " on " + + " o1.order_id=o2.site_id0 AND o2.price0=o1.site_id" + ; + + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + pipeline.enableAbandonedNodeEnforcement(false); + PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of( + SqlTypeName.INTEGER, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.INTEGER, "price", + SqlTypeName.INTEGER, "order_id0", + SqlTypeName.INTEGER, "site_id0", + SqlTypeName.INTEGER, "price0", + + 1, 2, 3, null, null, null, + 2, 3, 3, 1, 2, 3, + 3, 4, 5, null, null, null + ).getInputRecords()); + pipeline.run(); + } + + @Test + public void testRightOuterJoin() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS o1" + + " RIGHT OUTER JOIN ORDER_DETAILS o2" + + " on " + + " o1.order_id=o2.site_id AND o2.price=o1.site_id" + ; + + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of( + SqlTypeName.INTEGER, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.INTEGER, "price", + SqlTypeName.INTEGER, "order_id0", + SqlTypeName.INTEGER, "site_id0", + SqlTypeName.INTEGER, "price0", + + 2, 3, 3, 1, 2, 3, + null, null, null, 2, 3, 3, + null, null, null, 3, 4, 5 + ).getInputRecords()); + pipeline.run(); + } + + @Test + public void testFullOuterJoin() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS o1" + + " FULL OUTER JOIN ORDER_DETAILS o2" + + " on " + + " o1.order_id=o2.site_id AND o2.price=o1.site_id" + ; + + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of( + SqlTypeName.INTEGER, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.INTEGER, "price", + SqlTypeName.INTEGER, "order_id0", + SqlTypeName.INTEGER, "site_id0", + SqlTypeName.INTEGER, "price0", + + 2, 3, 3, 1, 2, 3, + 1, 2, 3, null, null, null, + 3, 4, 5, null, null, null, + null, null, null, 2, 3, 3, + null, null, null, 3, 4, 5 + ).getInputRecords()); + pipeline.run(); + } + + @BeforeClass + public static void prepare() { + beamSqlEnv.registerTable("ORDER_DETAILS", + MockedBeamSqlTable + .of(SqlTypeName.INTEGER, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.INTEGER, "price", + + 1, 2, 3, 2, 3, 3, 3, 4, 5)); + + beamSqlEnv.registerTable("ORDER_DETAILS0", + MockedBeamSqlTable + .of(SqlTypeName.INTEGER, "order_id0", + SqlTypeName.INTEGER, "site_id0", + SqlTypeName.INTEGER, "price0", + + 1, 2, 3, 2, 3, 3, 3, 4, 5)); + + } +} diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java new file mode 100644 index 000000000000..ff054b2dd0f3 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import static org.apache.beam.dsls.sql.TestUtils.beamSqlRows2Strings; + +import java.util.Date; +import org.apache.beam.dsls.sql.BeamSqlCli; +import org.apache.beam.dsls.sql.BeamSqlEnv; +import org.apache.beam.dsls.sql.TestUtils; +import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.transform.BeamSqlOutputToConsoleFn; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Unbounded + Unbounded Test for {@code BeamJoinRel}. + */ +public class BeamJoinRelUnboundedVsBoundedTest { + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv(); + public static final Date FIRST_DATE = new Date(); + public static final Date SECOND_DATE = new Date(); + public static final Date THIRD_DATE = new Date(); + + @BeforeClass + public static void prepare() { + FIRST_DATE.setTime(1); + SECOND_DATE.setTime(1 + 3600 * 1000); + THIRD_DATE.setTime(1 + 3600 * 1000 + 3600 * 1000 + 1); + beamSqlEnv.registerTable("ORDER_DETAILS", MockedBeamSqlTable + .of(SqlTypeName.INTEGER, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.INTEGER, + "price", SqlTypeName.TIMESTAMP, "order_time", + + 1, 1, 1, FIRST_DATE, 1, 2, 2, FIRST_DATE, 2, 2, 3, SECOND_DATE, 2, 3, 3, SECOND_DATE, 3, + 3, 3, THIRD_DATE).withIsBounded(PCollection.IsBounded.UNBOUNDED)); + + beamSqlEnv.registerTable("ORDER_DETAILS1", MockedBeamSqlTable + .of(SqlTypeName.INTEGER, "order_id", + SqlTypeName.VARCHAR, "buyer", + + 1, "james", + 2, "bond" + ).withIsBounded(PCollection.IsBounded.BOUNDED)); + } + + @Test + public void testInnerJoin() throws Exception { + String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " JOIN " + + " ORDER_DETAILS1 o2 " + + " on " + + " o1.order_id=o2.order_id" + ; + + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder(beamSqlRows2Strings(MockedBeamSqlTable.of( + SqlTypeName.INTEGER, "order_id", + SqlTypeName.INTEGER, "sum_site_id", + SqlTypeName.VARCHAR, "buyer", + 1, 3, "james", + 2, 5, "bond" + ).getInputRecords())); + pipeline.run(); + } + + @Test + public void testLeftOuterJoin() throws Exception { + String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " LEFT OUTER JOIN " + + " ORDER_DETAILS1 o2 " + + " on " + + " o1.order_id=o2.order_id" + ; + + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("helloworld"))); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder(beamSqlRows2Strings(MockedBeamSqlTable.of( + SqlTypeName.INTEGER, "order_id", + SqlTypeName.INTEGER, "sum_site_id", + SqlTypeName.VARCHAR, "buyer", + 1, 3, "james", + 2, 5, "bond", + 3, 3, null + ).getInputRecords())); + pipeline.run(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testLeftOuterJoinError() throws Exception { + String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " + + " ORDER_DETAILS1 o2 " + + " LEFT OUTER JOIN " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " on " + + " o1.order_id=o2.order_id" + ; + pipeline.enableAbandonedNodeEnforcement(false); + BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + pipeline.run(); + } + + @Test + public void testRightOuterJoin() throws Exception { + String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " + + " ORDER_DETAILS1 o2 " + + " RIGHT OUTER JOIN " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " on " + + " o1.order_id=o2.order_id" + ; + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder(beamSqlRows2Strings(MockedBeamSqlTable.of( + SqlTypeName.INTEGER, "order_id", + SqlTypeName.INTEGER, "sum_site_id", + SqlTypeName.VARCHAR, "buyer", + 1, 3, "james", + 2, 5, "bond", + 3, 3, null + ).getInputRecords())); + pipeline.run(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testRightOuterJoinError() throws Exception { + String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " RIGHT OUTER JOIN " + + " ORDER_DETAILS1 o2 " + + " on " + + " o1.order_id=o2.order_id" + ; + + pipeline.enableAbandonedNodeEnforcement(false); + BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + pipeline.run(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testFullOuterJoinError() throws Exception { + String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " + + " ORDER_DETAILS1 o2 " + + " FULL OUTER JOIN " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " on " + + " o1.order_id=o2.order_id" + ; + pipeline.enableAbandonedNodeEnforcement(false); + BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + pipeline.run(); + } +} diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java new file mode 100644 index 000000000000..a1686f3d8e2c --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import static org.apache.beam.dsls.sql.TestUtils.beamSqlRows2Strings; + +import java.util.Date; +import org.apache.beam.dsls.sql.BeamSqlCli; +import org.apache.beam.dsls.sql.BeamSqlEnv; +import org.apache.beam.dsls.sql.TestUtils; +import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Unbounded + Unbounded Test for {@code BeamJoinRel}. + */ +public class BeamJoinRelUnboundedVsUnboundedTest { + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv(); + public static final Date FIRST_DATE = new Date(); + public static final Date SECOND_DATE = new Date(); + + @BeforeClass + public static void prepare() { + FIRST_DATE.setTime(1); + SECOND_DATE.setTime(1 + 3600 * 1000); + beamSqlEnv.registerTable("ORDER_DETAILS", + MockedBeamSqlTable + .of(SqlTypeName.INTEGER, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.INTEGER, "price", + SqlTypeName.TIMESTAMP, "order_time", + + 1, 1, 1, FIRST_DATE, + 1, 2, 2, FIRST_DATE, + 2, 2, 3, SECOND_DATE, + 2, 3, 3, SECOND_DATE + ).withIsBounded(PCollection.IsBounded.UNBOUNDED)); + } + + @Test + public void testInnerJoin() throws Exception { + String sql = "SELECT * FROM " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " JOIN " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 " + + " on " + + " o1.order_id=o2.order_id" + ; + + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder(beamSqlRows2Strings(MockedBeamSqlTable.of( + SqlTypeName.INTEGER, "order_id", + SqlTypeName.INTEGER, "sum_site_id", + SqlTypeName.INTEGER, "order_id0", + SqlTypeName.INTEGER, "sum_site_id0", + 1, 3, 1, 3, + 2, 5, 2, 5 + ).getInputRecords())); + pipeline.run(); + } + + @Test + public void testLeftOuterJoin() throws Exception { + String sql = "SELECT * FROM " + + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " LEFT OUTER JOIN " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 " + + " on " + + " o1.order_id=o2.order_id" + ; + + // 1, 1 | 1, 3 + // 2, 2 | NULL, NULL + // ---- | ----- + // 2, 2 | 2, 5 + // 3, 3 | NULL, NULL + + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder(beamSqlRows2Strings(MockedBeamSqlTable.of( + SqlTypeName.INTEGER, "order_id", + SqlTypeName.INTEGER, "sum_site_id", + SqlTypeName.INTEGER, "order_id0", + SqlTypeName.INTEGER, "sum_site_id0", + + 1, 1, 1, 3, + 2, 2, null, null, + 2, 2, 2, 5, + 3, 3, null, null + ).getInputRecords())); + pipeline.run(); + } + + @Test + public void testRightOuterJoin() throws Exception { + String sql = "SELECT * FROM " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " RIGHT OUTER JOIN " + + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 " + + " on " + + " o1.order_id=o2.order_id" + ; + + System.out.println(sql); + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder(beamSqlRows2Strings(MockedBeamSqlTable.of( + SqlTypeName.INTEGER, "order_id", + SqlTypeName.INTEGER, "sum_site_id", + SqlTypeName.INTEGER, "order_id0", + SqlTypeName.INTEGER, "sum_site_id0", + + 1, 3, 1, 1, + null, null, 2, 2, + 2, 5, 2, 2, + null, null, 3, 3 + ).getInputRecords())); + pipeline.run(); + } + + @Test + public void testFullOuterJoin() throws Exception { + String sql = "SELECT * FROM " + + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " LEFT OUTER JOIN " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 " + + " on " + + " o1.order_id=o2.order_id" + ; + + // 1, 1 | 1, 3 + // 2, 2 | NULL, NULL + // ---- | ----- + // 2, 2 | 2, 5 + // 3, 3 | NULL, NULL + + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder(beamSqlRows2Strings(MockedBeamSqlTable.of( + SqlTypeName.INTEGER, "order_id", + SqlTypeName.INTEGER, "sum_site_id", + SqlTypeName.INTEGER, "order_id0", + SqlTypeName.INTEGER, "sum_site_id0", + + 1, 1, 1, 3, + 2, 2, null, null, + 2, 2, 2, 5, + 3, 3, null, null + ).getInputRecords())); + pipeline.run(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testWindowsMismatch() throws Exception { + String sql = "SELECT * FROM " + + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY site_id, TUMBLE(order_time, INTERVAL '2' HOUR)) o1 " + + " LEFT OUTER JOIN " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 " + + " on " + + " o1.order_id=o2.order_id" + ; + pipeline.enableAbandonedNodeEnforcement(false); + BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + pipeline.run(); + } +} diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java index b358fe1b2a03..749405f2b6fd 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java @@ -19,47 +19,35 @@ package org.apache.beam.dsls.sql.schema; import java.math.BigDecimal; +import java.sql.Types; +import java.util.Arrays; import java.util.Date; import java.util.GregorianCalendar; - -import org.apache.beam.dsls.sql.utils.CalciteUtils; +import java.util.List; import org.apache.beam.sdk.testing.CoderProperties; -import org.apache.calcite.jdbc.JavaTypeFactoryImpl; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelDataTypeSystem; -import org.apache.calcite.rel.type.RelProtoDataType; -import org.apache.calcite.sql.type.SqlTypeName; import org.junit.Test; /** * Tests for BeamSqlRowCoder. */ public class BeamSqlRowCoderTest { + private static final List columnNames = Arrays.asList( + "col_tinyint", "col_smallint", "col_integer", + "col_bigint", "col_float", "col_double", "col_decimal", + "col_string_varchar", "col_time", "col_timestamp" + ); + + private static final List columnTypes = Arrays.asList( + Types.TINYINT, Types.SMALLINT, Types.INTEGER, + Types.BIGINT, Types.FLOAT, Types.DOUBLE, + Types.DECIMAL, Types.VARCHAR, Types.TIME, Types.TIMESTAMP + ); + + private static final BeamSqlRecordType beamSQLRecordType = + BeamSqlRecordType.create(columnNames, columnTypes); @Test public void encodeAndDecode() throws Exception { - final RelProtoDataType protoRowType = new RelProtoDataType() { - @Override - public RelDataType apply(RelDataTypeFactory a0) { - return a0.builder() - .add("col_tinyint", SqlTypeName.TINYINT) - .add("col_smallint", SqlTypeName.SMALLINT) - .add("col_integer", SqlTypeName.INTEGER) - .add("col_bigint", SqlTypeName.BIGINT) - .add("col_float", SqlTypeName.FLOAT) - .add("col_double", SqlTypeName.DOUBLE) - .add("col_decimal", SqlTypeName.DECIMAL) - .add("col_string_varchar", SqlTypeName.VARCHAR) - .add("col_time", SqlTypeName.TIME) - .add("col_timestamp", SqlTypeName.TIMESTAMP) - .build(); - } - }; - - BeamSqlRecordType beamSQLRecordType = CalciteUtils.toBeamRecordType( - protoRowType.apply(new JavaTypeFactoryImpl( - RelDataTypeSystem.DEFAULT))); BeamSqlRow row = new BeamSqlRow(beamSQLRecordType); row.addField("col_tinyint", Byte.valueOf("1")); row.addField("col_smallint", Short.valueOf("1")); @@ -75,6 +63,49 @@ public RelDataType apply(RelDataTypeFactory a0) { row.addField("col_timestamp", new Date()); + BeamSqlRowCoder coder = new BeamSqlRowCoder(beamSQLRecordType); + CoderProperties.coderDecodeEncodeEqual(coder, row); + } + + @Test + public void encodeAndDecode_withNulls() throws Exception { + BeamSqlRow row = new BeamSqlRow(beamSQLRecordType); + row.addField("col_tinyint", null); + row.addField("col_smallint", null); + row.addField("col_integer", null); + row.addField("col_bigint", null); + row.addField("col_float", null); + row.addField("col_double", 1.1); + row.addField("col_decimal", null); + row.addField("col_string_varchar", null); + row.addField("col_time", null); + row.addField("col_timestamp", null); + + + BeamSqlRowCoder coder = new BeamSqlRowCoder(beamSQLRecordType); + CoderProperties.coderDecodeEncodeEqual(coder, row); + } + + @Test + public void encodeAndDecode_allIntegers() throws Exception { + List columnNames = Arrays.asList( + "i1", "i2", "i3", "i4" + ); + + List columnTypes = Arrays.asList( + Types.INTEGER, Types.INTEGER, Types.INTEGER, Types.INTEGER + ); + + BeamSqlRecordType beamSQLRecordType = + BeamSqlRecordType.create(columnNames, columnTypes); + + BeamSqlRow row = new BeamSqlRow(beamSQLRecordType); + row.addField("i1", 1); + row.addField("i2", null); + row.addField("i3", null); + row.addField("i4", null); + + BeamSqlRowCoder coder = new BeamSqlRowCoder(beamSQLRecordType); CoderProperties.coderDecodeEncodeEqual(coder, row); } From 0623bbae3672ebcfa6963526aca651c18b27633b Mon Sep 17 00:00:00 2001 From: James Xu Date: Fri, 23 Jun 2017 17:26:18 +0800 Subject: [PATCH 2/6] cleanup --- dsls/sql/pom.xml | 6 -- .../beam/dsls/sql/schema/BeamSqlRowCoder.java | 1 - .../transform/BeamSqlOutputToConsoleFn.java | 1 - .../dsls/sql/schema/BeamSqlRowCoderTest.java | 89 ++++++------------- 4 files changed, 29 insertions(+), 68 deletions(-) diff --git a/dsls/sql/pom.xml b/dsls/sql/pom.xml index 551c4bac133b..54f590ed3d27 100644 --- a/dsls/sql/pom.xml +++ b/dsls/sql/pom.xml @@ -197,12 +197,6 @@ joda-time joda-time - - org.apache.kafka - kafka-clients - 0.10.1.0 - provided - com.google.auto.value auto-value diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java index 2621001532ce..d53ba8d0cf47 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java @@ -57,7 +57,6 @@ public BeamSqlRowCoder(BeamSqlRecordType tableSchema) { @Override public void encode(BeamSqlRow value, OutputStream outStream) throws CoderException, IOException { listCoder.encode(value.getNullFields(), outStream); - System.out.println("Encode: nullFields: " + value.getNullFields()); for (int idx = 0; idx < value.size(); ++idx) { if (value.getNullFields().contains(idx)) { continue; diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlOutputToConsoleFn.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlOutputToConsoleFn.java index 781fcbfb48d3..d8a2a63528c3 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlOutputToConsoleFn.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlOutputToConsoleFn.java @@ -35,7 +35,6 @@ public BeamSqlOutputToConsoleFn(String stepName) { @ProcessElement public void processElement(ProcessContext c) { - //System.out.println(c.element().getDataType().getFieldsName()); System.out.println("Output: " + c.element().getDataValues()); } diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java index 749405f2b6fd..f8eaa5131bb4 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java @@ -19,35 +19,47 @@ package org.apache.beam.dsls.sql.schema; import java.math.BigDecimal; -import java.sql.Types; -import java.util.Arrays; import java.util.Date; import java.util.GregorianCalendar; -import java.util.List; + +import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.sql.type.SqlTypeName; import org.junit.Test; /** * Tests for BeamSqlRowCoder. */ public class BeamSqlRowCoderTest { - private static final List columnNames = Arrays.asList( - "col_tinyint", "col_smallint", "col_integer", - "col_bigint", "col_float", "col_double", "col_decimal", - "col_string_varchar", "col_time", "col_timestamp" - ); - - private static final List columnTypes = Arrays.asList( - Types.TINYINT, Types.SMALLINT, Types.INTEGER, - Types.BIGINT, Types.FLOAT, Types.DOUBLE, - Types.DECIMAL, Types.VARCHAR, Types.TIME, Types.TIMESTAMP - ); - - private static final BeamSqlRecordType beamSQLRecordType = - BeamSqlRecordType.create(columnNames, columnTypes); @Test public void encodeAndDecode() throws Exception { + final RelProtoDataType protoRowType = new RelProtoDataType() { + @Override + public RelDataType apply(RelDataTypeFactory a0) { + return a0.builder() + .add("col_tinyint", SqlTypeName.TINYINT) + .add("col_smallint", SqlTypeName.SMALLINT) + .add("col_integer", SqlTypeName.INTEGER) + .add("col_bigint", SqlTypeName.BIGINT) + .add("col_float", SqlTypeName.FLOAT) + .add("col_double", SqlTypeName.DOUBLE) + .add("col_decimal", SqlTypeName.DECIMAL) + .add("col_string_varchar", SqlTypeName.VARCHAR) + .add("col_time", SqlTypeName.TIME) + .add("col_timestamp", SqlTypeName.TIMESTAMP) + .build(); + } + }; + + BeamSqlRecordType beamSQLRecordType = CalciteUtils.toBeamRecordType( + protoRowType.apply(new JavaTypeFactoryImpl( + RelDataTypeSystem.DEFAULT))); BeamSqlRow row = new BeamSqlRow(beamSQLRecordType); row.addField("col_tinyint", Byte.valueOf("1")); row.addField("col_smallint", Short.valueOf("1")); @@ -63,49 +75,6 @@ public void encodeAndDecode() throws Exception { row.addField("col_timestamp", new Date()); - BeamSqlRowCoder coder = new BeamSqlRowCoder(beamSQLRecordType); - CoderProperties.coderDecodeEncodeEqual(coder, row); - } - - @Test - public void encodeAndDecode_withNulls() throws Exception { - BeamSqlRow row = new BeamSqlRow(beamSQLRecordType); - row.addField("col_tinyint", null); - row.addField("col_smallint", null); - row.addField("col_integer", null); - row.addField("col_bigint", null); - row.addField("col_float", null); - row.addField("col_double", 1.1); - row.addField("col_decimal", null); - row.addField("col_string_varchar", null); - row.addField("col_time", null); - row.addField("col_timestamp", null); - - - BeamSqlRowCoder coder = new BeamSqlRowCoder(beamSQLRecordType); - CoderProperties.coderDecodeEncodeEqual(coder, row); - } - - @Test - public void encodeAndDecode_allIntegers() throws Exception { - List columnNames = Arrays.asList( - "i1", "i2", "i3", "i4" - ); - - List columnTypes = Arrays.asList( - Types.INTEGER, Types.INTEGER, Types.INTEGER, Types.INTEGER - ); - - BeamSqlRecordType beamSQLRecordType = - BeamSqlRecordType.create(columnNames, columnTypes); - - BeamSqlRow row = new BeamSqlRow(beamSQLRecordType); - row.addField("i1", 1); - row.addField("i2", null); - row.addField("i3", null); - row.addField("i4", null); - - BeamSqlRowCoder coder = new BeamSqlRowCoder(beamSQLRecordType); CoderProperties.coderDecodeEncodeEqual(coder, row); } From c2576bb44e7c9c1cb300cee2b129c66119c2c8d9 Mon Sep 17 00:00:00 2001 From: James Xu Date: Mon, 26 Jun 2017 19:09:42 +0800 Subject: [PATCH 3/6] add MockedUnboundedTable --- .../dsls/sql/planner/MockedBeamSqlTable.java | 15 +- .../beam/dsls/sql/planner/MockedTable.java | 33 ++++ .../sql/planner/MockedUnboundedTable.java | 164 ++++++++++++++++++ .../rel/BeamJoinRelBoundedVsBoundedTest.java | 8 +- .../BeamJoinRelUnboundedVsBoundedTest.java | 18 +- .../BeamJoinRelUnboundedVsUnboundedTest.java | 20 ++- 6 files changed, 232 insertions(+), 26 deletions(-) create mode 100644 dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedTable.java create mode 100644 dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedUnboundedTable.java diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java index 751705e9e968..fa80cc1a9d1f 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java @@ -48,8 +48,6 @@ public class MockedBeamSqlTable extends BaseBeamTable { public static final ConcurrentLinkedQueue CONTENT = new ConcurrentLinkedQueue<>(); private List inputRecords; - private PCollection.IsBounded isBounded = PCollection.IsBounded.BOUNDED; - public MockedBeamSqlTable(BeamSqlRecordType beamSqlRecordType) { super(beamSqlRecordType); } @@ -119,19 +117,14 @@ public RelDataType apply(RelDataTypeFactory a0) { @Override public BeamIOType getSourceType() { - if (isBounded == PCollection.IsBounded.BOUNDED) { - return BeamIOType.BOUNDED; - } else { - return BeamIOType.UNBOUNDED; - } + return BeamIOType.BOUNDED; } @Override public PCollection buildIOReader(Pipeline pipeline) { return PBegin.in(pipeline).apply( - "MockedBeamSQLTable_Reader_" + COUNTER.incrementAndGet(), Create.of(inputRecords)) - .setIsBoundedInternal(isBounded); + "MockedBeamSQLTable_Reader_" + COUNTER.incrementAndGet(), Create.of(inputRecords)); } @Override @@ -143,10 +136,6 @@ public List getInputRecords() { return inputRecords; } - public MockedBeamSqlTable withIsBounded(PCollection.IsBounded isBounded) { - this.isBounded = isBounded; - return this; - } /** * Keep output in {@code CONTENT} for validation. * diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedTable.java new file mode 100644 index 000000000000..d096a61756b6 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedTable.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.planner; + +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.beam.dsls.sql.schema.BaseBeamTable; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; + +/** + * Base class for mocked table. + */ +public abstract class MockedTable extends BaseBeamTable { + public static final AtomicInteger COUNTER = new AtomicInteger(); + public MockedTable(BeamSqlRecordType beamSqlRecordType) { + super(beamSqlRecordType); + } +} diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedUnboundedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedUnboundedTable.java new file mode 100644 index 000000000000..27fa75d25f15 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedUnboundedTable.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.planner; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.dsls.sql.schema.BeamIOType; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; +import org.apache.beam.dsls.sql.utils.CalciteUtils; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.commons.lang3.tuple.Pair; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * A mocked unbounded table. + */ +public class MockedUnboundedTable extends MockedTable { + private List>> timestampedRows = new ArrayList<>(); + private int timestampField; + public MockedUnboundedTable(BeamSqlRecordType beamSqlRecordType, int timestampField) { + super(beamSqlRecordType); + this.timestampField = timestampField; + } + + /** + * Convenient way to build a mocked table with mock data: + * + *

e.g. + * + *

{@code
+   * MockedUnboundedTable
+   *   .of(SqlTypeName.BIGINT, "order_id",
+   *       SqlTypeName.INTEGER, "site_id",
+   *       SqlTypeName.DOUBLE, "price",
+   *       SqlTypeName.TIMESTAMP, "order_time",
+   *
+   *       1L, 2, 1.0, new Date(),
+   *       1L, 1, 2.0, new Date(),
+   *       2L, 4, 3.0, new Date(),
+   *       2L, 1, 4.0, new Date(),
+   *       5L, 5, 5.0, new Date(),
+   *       6L, 6, 6.0, new Date(),
+   *       7L, 7, 7.0, new Date(),
+   *       8L, 8888, 8.0, new Date(),
+   *       8L, 999, 9.0, new Date(),
+   *       10L, 100, 10.0, new Date())
+   * }
+ */ + public static MockedUnboundedTable of(final Object... args){ + final RelProtoDataType protoRowType = new RelProtoDataType() { + @Override + public RelDataType apply(RelDataTypeFactory a0) { + RelDataTypeFactory.FieldInfoBuilder builder = a0.builder(); + + int lastTypeIndex = 0; + for (; lastTypeIndex < args.length; lastTypeIndex += 2) { + if (args[lastTypeIndex] instanceof SqlTypeName) { + builder.add(args[lastTypeIndex + 1].toString(), + (SqlTypeName) args[lastTypeIndex]); + } else { + break; + } + } + return builder.build(); + } + }; + + + List rows = new ArrayList<>(); + BeamSqlRecordType beamSQLRecordType = CalciteUtils + .toBeamRecordType(protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY)); + int fieldCount = beamSQLRecordType.size(); + + + + for (int i = fieldCount * 2 + 1; i < args.length; i += fieldCount) { + BeamSqlRow row = new BeamSqlRow(beamSQLRecordType); + for (int j = 0; j < fieldCount; j++) { + row.addField(j, args[i + j]); + } + rows.add(row); + } + MockedUnboundedTable table = new MockedUnboundedTable( + beamSQLRecordType, (int) args[fieldCount * 2]); + table.addInputRecords(rows); + + return table; + } + + public MockedUnboundedTable addInputRecords(List rows) { + this.timestampedRows.add(Pair.of(Duration.standardMinutes(0), rows)); + + return this; + } + + public MockedUnboundedTable addInputRecords(Duration duration, Object... args) { + List rows = new ArrayList<>(); + int fieldCount = getRecordType().size(); + + for (int i = 0; i < args.length; i += fieldCount) { + BeamSqlRow row = new BeamSqlRow(getRecordType()); + for (int j = 0; j < fieldCount; j++) { + row.addField(j, args[i + j]); + } + rows.add(row); + } + + this.timestampedRows.add(Pair.of(duration, rows)); + return this; + } + + @Override public BeamIOType getSourceType() { + return BeamIOType.UNBOUNDED; + } + + @Override public PCollection buildIOReader(Pipeline pipeline) { + TestStream.Builder values = TestStream.create( + new BeamSqlRowCoder(beamSqlRecordType)); + + for (Pair> pair : timestampedRows) { + values = values.advanceWatermarkTo(new Instant(0).plus(pair.getKey())); + for (int i = 0; i < pair.getValue().size(); i++) { + values = values.addElements(TimestampedValue.of(pair.getValue().get(i), + new Instant(pair.getValue().get(i).getDate(timestampField)))); + } + } + + return pipeline.begin().apply( + "MockedUnboundedTable_" + COUNTER.incrementAndGet(), + values.advanceWatermarkToInfinity()); + } + + @Override public PTransform, PDone> buildIOWriter() { + throw new UnsupportedOperationException("MockedUnboundedTable#buildIOWriter unsupported!"); + } +} diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java index af45d0752525..4e3497f7dc82 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java @@ -151,7 +151,9 @@ public static void prepare() { SqlTypeName.INTEGER, "site_id", SqlTypeName.INTEGER, "price", - 1, 2, 3, 2, 3, 3, 3, 4, 5)); + 1, 2, 3, + 2, 3, 3, + 3, 4, 5)); beamSqlEnv.registerTable("ORDER_DETAILS0", MockedBeamSqlTable @@ -159,7 +161,9 @@ public static void prepare() { SqlTypeName.INTEGER, "site_id0", SqlTypeName.INTEGER, "price0", - 1, 2, 3, 2, 3, 3, 3, 4, 5)); + 1, 2, 3, + 2, 3, 3, + 3, 4, 5)); } } diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java index ff054b2dd0f3..ecadd5103bec 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java @@ -25,6 +25,7 @@ import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.TestUtils; import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable; +import org.apache.beam.dsls.sql.planner.MockedUnboundedTable; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.transform.BeamSqlOutputToConsoleFn; import org.apache.beam.sdk.testing.PAssert; @@ -52,12 +53,17 @@ public static void prepare() { FIRST_DATE.setTime(1); SECOND_DATE.setTime(1 + 3600 * 1000); THIRD_DATE.setTime(1 + 3600 * 1000 + 3600 * 1000 + 1); - beamSqlEnv.registerTable("ORDER_DETAILS", MockedBeamSqlTable - .of(SqlTypeName.INTEGER, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.INTEGER, - "price", SqlTypeName.TIMESTAMP, "order_time", + beamSqlEnv.registerTable("ORDER_DETAILS", MockedUnboundedTable + .of(SqlTypeName.INTEGER, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.INTEGER, "price", + SqlTypeName.TIMESTAMP, "order_time", - 1, 1, 1, FIRST_DATE, 1, 2, 2, FIRST_DATE, 2, 2, 3, SECOND_DATE, 2, 3, 3, SECOND_DATE, 3, - 3, 3, THIRD_DATE).withIsBounded(PCollection.IsBounded.UNBOUNDED)); + 1, 1, 1, FIRST_DATE, + 1, 2, 2, FIRST_DATE, + 2, 2, 3, SECOND_DATE, + 2, 3, 3, SECOND_DATE, + 3, 3, 3, THIRD_DATE)); beamSqlEnv.registerTable("ORDER_DETAILS1", MockedBeamSqlTable .of(SqlTypeName.INTEGER, "order_id", @@ -65,7 +71,7 @@ public static void prepare() { 1, "james", 2, "bond" - ).withIsBounded(PCollection.IsBounded.BOUNDED)); + )); } @Test diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java index a1686f3d8e2c..e555d4005bc0 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java @@ -25,12 +25,14 @@ import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.TestUtils; import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable; +import org.apache.beam.dsls.sql.planner.MockedUnboundedTable; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.calcite.sql.type.SqlTypeName; +import org.joda.time.Duration; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -49,18 +51,26 @@ public class BeamJoinRelUnboundedVsUnboundedTest { public static void prepare() { FIRST_DATE.setTime(1); SECOND_DATE.setTime(1 + 3600 * 1000); - beamSqlEnv.registerTable("ORDER_DETAILS", - MockedBeamSqlTable + beamSqlEnv.registerTable("ORDER_DETAILS", MockedUnboundedTable .of(SqlTypeName.INTEGER, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.INTEGER, "price", SqlTypeName.TIMESTAMP, "order_time", + 3, + 1, 1, 1, FIRST_DATE, 1, 2, 2, FIRST_DATE, 2, 2, 3, SECOND_DATE, 2, 3, 3, SECOND_DATE - ).withIsBounded(PCollection.IsBounded.UNBOUNDED)); + ) + .addInputRecords( + // this late record is omitted + Duration.standardHours(1).plus(Duration.standardMinutes(40)), + 2, 3, 3, SECOND_DATE + ) + + ); } @Test @@ -83,7 +93,8 @@ public void testInnerJoin() throws Exception { SqlTypeName.INTEGER, "order_id0", SqlTypeName.INTEGER, "sum_site_id0", 1, 3, 1, 3, - 2, 5, 2, 5 + //2, 5, 2, 5 + 2, 8, 2, 8 ).getInputRecords())); pipeline.run(); } @@ -134,7 +145,6 @@ public void testRightOuterJoin() throws Exception { + " o1.order_id=o2.order_id" ; - System.out.println(sql); PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) .containsInAnyOrder(beamSqlRows2Strings(MockedBeamSqlTable.of( From 79a2ded5b87e22eca1990df1eccfb05bf25269e3 Mon Sep 17 00:00:00 2001 From: James Xu Date: Mon, 26 Jun 2017 23:32:54 +0800 Subject: [PATCH 4/6] refactor JOIN test with MockedUnboundedTable --- .../org/apache/beam/dsls/sql/TestUtils.java | 46 ++++++ .../sql/planner/MockedUnboundedTable.java | 84 +++-------- .../BeamJoinRelUnboundedVsBoundedTest.java | 99 ++++++++----- .../BeamJoinRelUnboundedVsUnboundedTest.java | 137 ++++++++++-------- 4 files changed, 200 insertions(+), 166 deletions(-) diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java index 4074515930e1..3b9b2c2e0ffa 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.transforms.DoFn; @@ -50,4 +51,49 @@ public static List beamSqlRows2Strings(List rows) { return strs; } + + /** + * Convenient way to build a list of {@code BeamSqlRow}s. + */ + public static class RowsBuilder { + private BeamSqlRecordType type; + private List rows = new ArrayList<>(); + + public static RowsBuilder of(final Object... args) { + List types = new ArrayList<>(); + List names = new ArrayList<>(); + int lastTypeIndex = 0; + for (; lastTypeIndex < args.length; lastTypeIndex += 2) { + types.add((int) args[lastTypeIndex]); + names.add((String) args[lastTypeIndex + 1]); + } + + BeamSqlRecordType beamSQLRecordType = BeamSqlRecordType.create(names, types); + RowsBuilder builder = new RowsBuilder(); + builder.type = beamSQLRecordType; + + return builder; + } + + public RowsBuilder values(final Object... args) { + int fieldCount = type.size(); + for (int i = 0; i < args.length; i += fieldCount) { + BeamSqlRow row = new BeamSqlRow(type); + for (int j = 0; j < fieldCount; j++) { + row.addField(j, args[i + j]); + } + this.rows.add(row); + } + + return this; + } + + public List getRows() { + return rows; + } + + public List getStrRows() { + return beamSqlRows2Strings(rows); + } + } } diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedUnboundedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedUnboundedTable.java index 27fa75d25f15..3f22df3c30d6 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedUnboundedTable.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedUnboundedTable.java @@ -24,18 +24,13 @@ import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; -import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.TimestampedValue; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelProtoDataType; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.commons.lang3.tuple.Pair; +import org.apache.calcite.util.Pair; import org.joda.time.Duration; import org.joda.time.Instant; @@ -45,83 +40,43 @@ public class MockedUnboundedTable extends MockedTable { private List>> timestampedRows = new ArrayList<>(); private int timestampField; - public MockedUnboundedTable(BeamSqlRecordType beamSqlRecordType, int timestampField) { + private MockedUnboundedTable(BeamSqlRecordType beamSqlRecordType) { super(beamSqlRecordType); - this.timestampField = timestampField; } /** - * Convenient way to build a mocked table with mock data: + * Convenient way to build a mocked table. * *

e.g. * *

{@code
    * MockedUnboundedTable
-   *   .of(SqlTypeName.BIGINT, "order_id",
-   *       SqlTypeName.INTEGER, "site_id",
-   *       SqlTypeName.DOUBLE, "price",
-   *       SqlTypeName.TIMESTAMP, "order_time",
-   *
-   *       1L, 2, 1.0, new Date(),
-   *       1L, 1, 2.0, new Date(),
-   *       2L, 4, 3.0, new Date(),
-   *       2L, 1, 4.0, new Date(),
-   *       5L, 5, 5.0, new Date(),
-   *       6L, 6, 6.0, new Date(),
-   *       7L, 7, 7.0, new Date(),
-   *       8L, 8888, 8.0, new Date(),
-   *       8L, 999, 9.0, new Date(),
-   *       10L, 100, 10.0, new Date())
+   *   .of(Types.BIGINT, "order_id",
+   *       Types.INTEGER, "site_id",
+   *       Types.DOUBLE, "price",
+   *       Types.TIMESTAMP, "order_time")
    * }
*/ public static MockedUnboundedTable of(final Object... args){ - final RelProtoDataType protoRowType = new RelProtoDataType() { - @Override - public RelDataType apply(RelDataTypeFactory a0) { - RelDataTypeFactory.FieldInfoBuilder builder = a0.builder(); - - int lastTypeIndex = 0; - for (; lastTypeIndex < args.length; lastTypeIndex += 2) { - if (args[lastTypeIndex] instanceof SqlTypeName) { - builder.add(args[lastTypeIndex + 1].toString(), - (SqlTypeName) args[lastTypeIndex]); - } else { - break; - } - } - return builder.build(); - } - }; - - - List rows = new ArrayList<>(); - BeamSqlRecordType beamSQLRecordType = CalciteUtils - .toBeamRecordType(protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY)); - int fieldCount = beamSQLRecordType.size(); - - - - for (int i = fieldCount * 2 + 1; i < args.length; i += fieldCount) { - BeamSqlRow row = new BeamSqlRow(beamSQLRecordType); - for (int j = 0; j < fieldCount; j++) { - row.addField(j, args[i + j]); - } - rows.add(row); + List types = new ArrayList<>(); + List names = new ArrayList<>(); + int lastTypeIndex = 0; + for (; lastTypeIndex < args.length; lastTypeIndex += 2) { + types.add((int) args[lastTypeIndex]); + names.add((String) args[lastTypeIndex + 1]); } - MockedUnboundedTable table = new MockedUnboundedTable( - beamSQLRecordType, (int) args[fieldCount * 2]); - table.addInputRecords(rows); - return table; + return new MockedUnboundedTable( + BeamSqlRecordType.create(names, types) + ); } - public MockedUnboundedTable addInputRecords(List rows) { - this.timestampedRows.add(Pair.of(Duration.standardMinutes(0), rows)); - + public MockedUnboundedTable timestampColumnIndex(int idx) { + this.timestampField = idx; return this; } - public MockedUnboundedTable addInputRecords(Duration duration, Object... args) { + public MockedUnboundedTable addRows(Duration duration, Object... args) { List rows = new ArrayList<>(); int fieldCount = getRecordType().size(); @@ -133,6 +88,7 @@ public MockedUnboundedTable addInputRecords(Duration duration, Object... args) { rows.add(row); } + // record the watermark + rows this.timestampedRows.add(Pair.of(duration, rows)); return this; } diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java index ecadd5103bec..4aeec5a9a317 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java @@ -18,8 +18,7 @@ package org.apache.beam.dsls.sql.rel; -import static org.apache.beam.dsls.sql.TestUtils.beamSqlRows2Strings; - +import java.sql.Types; import java.util.Date; import org.apache.beam.dsls.sql.BeamSqlCli; import org.apache.beam.dsls.sql.BeamSqlEnv; @@ -33,6 +32,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.calcite.sql.type.SqlTypeName; +import org.joda.time.Duration; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -44,26 +44,40 @@ public class BeamJoinRelUnboundedVsBoundedTest { @Rule public final TestPipeline pipeline = TestPipeline.create(); private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv(); - public static final Date FIRST_DATE = new Date(); - public static final Date SECOND_DATE = new Date(); - public static final Date THIRD_DATE = new Date(); + public static final Date FIRST_DATE = new Date(1); + public static final Date SECOND_DATE = new Date(1 + 3600 * 1000); + public static final Date THIRD_DATE = new Date(1 + 3600 * 1000 + 3600 * 1000 + 1); + private static final Duration WINDOW_SIZE = Duration.standardHours(1); @BeforeClass public static void prepare() { - FIRST_DATE.setTime(1); - SECOND_DATE.setTime(1 + 3600 * 1000); - THIRD_DATE.setTime(1 + 3600 * 1000 + 3600 * 1000 + 1); beamSqlEnv.registerTable("ORDER_DETAILS", MockedUnboundedTable - .of(SqlTypeName.INTEGER, "order_id", - SqlTypeName.INTEGER, "site_id", - SqlTypeName.INTEGER, "price", - SqlTypeName.TIMESTAMP, "order_time", - + .of( + Types.INTEGER, "order_id", + Types.INTEGER, "site_id", + Types.INTEGER, "price", + Types.TIMESTAMP, "order_time" + ) + .timestampColumnIndex(3) + .addRows( + Duration.ZERO, 1, 1, 1, FIRST_DATE, - 1, 2, 2, FIRST_DATE, + 1, 2, 2, FIRST_DATE + ) + .addRows( + WINDOW_SIZE.plus(Duration.standardSeconds(1)), 2, 2, 3, SECOND_DATE, 2, 3, 3, SECOND_DATE, - 3, 3, 3, THIRD_DATE)); + // this late data is omitted + 1, 2, 3, FIRST_DATE + ) + .addRows( + WINDOW_SIZE.plus(WINDOW_SIZE).plus(Duration.standardSeconds(1)), + 3, 3, 3, THIRD_DATE, + // this late data is omitted + 2, 2, 3, SECOND_DATE + ) + ); beamSqlEnv.registerTable("ORDER_DETAILS1", MockedBeamSqlTable .of(SqlTypeName.INTEGER, "order_id", @@ -87,13 +101,16 @@ public void testInnerJoin() throws Exception { PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) - .containsInAnyOrder(beamSqlRows2Strings(MockedBeamSqlTable.of( - SqlTypeName.INTEGER, "order_id", - SqlTypeName.INTEGER, "sum_site_id", - SqlTypeName.VARCHAR, "buyer", - 1, 3, "james", - 2, 5, "bond" - ).getInputRecords())); + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "sum_site_id", + Types.VARCHAR, "buyer" + ).values( + 1, 3, "james", + 2, 5, "bond" + ).getStrRows() + ); pipeline.run(); } @@ -111,14 +128,17 @@ public void testLeftOuterJoin() throws Exception { PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("helloworld"))); PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) - .containsInAnyOrder(beamSqlRows2Strings(MockedBeamSqlTable.of( - SqlTypeName.INTEGER, "order_id", - SqlTypeName.INTEGER, "sum_site_id", - SqlTypeName.VARCHAR, "buyer", - 1, 3, "james", - 2, 5, "bond", - 3, 3, null - ).getInputRecords())); + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "sum_site_id", + Types.VARCHAR, "buyer" + ).values( + 1, 3, "james", + 2, 5, "bond", + 3, 3, null + ).getStrRows() + ); pipeline.run(); } @@ -149,14 +169,17 @@ public void testRightOuterJoin() throws Exception { ; PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) - .containsInAnyOrder(beamSqlRows2Strings(MockedBeamSqlTable.of( - SqlTypeName.INTEGER, "order_id", - SqlTypeName.INTEGER, "sum_site_id", - SqlTypeName.VARCHAR, "buyer", - 1, 3, "james", - 2, 5, "bond", - 3, 3, null - ).getInputRecords())); + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "sum_site_id", + Types.VARCHAR, "buyer" + ).values( + 1, 3, "james", + 2, 5, "bond", + 3, 3, null + ).getStrRows() + ); pipeline.run(); } diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java index e555d4005bc0..55300afb0c46 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java @@ -18,20 +18,17 @@ package org.apache.beam.dsls.sql.rel; -import static org.apache.beam.dsls.sql.TestUtils.beamSqlRows2Strings; - +import java.sql.Types; import java.util.Date; import org.apache.beam.dsls.sql.BeamSqlCli; import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.TestUtils; -import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable; import org.apache.beam.dsls.sql.planner.MockedUnboundedTable; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; -import org.apache.calcite.sql.type.SqlTypeName; import org.joda.time.Duration; import org.junit.BeforeClass; import org.junit.Rule; @@ -44,32 +41,37 @@ public class BeamJoinRelUnboundedVsUnboundedTest { @Rule public final TestPipeline pipeline = TestPipeline.create(); private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv(); - public static final Date FIRST_DATE = new Date(); - public static final Date SECOND_DATE = new Date(); + public static final Date FIRST_DATE = new Date(1); + public static final Date SECOND_DATE = new Date(1 + 3600 * 1000); + + private static final Duration WINDOW_SIZE = Duration.standardHours(1); @BeforeClass public static void prepare() { - FIRST_DATE.setTime(1); - SECOND_DATE.setTime(1 + 3600 * 1000); beamSqlEnv.registerTable("ORDER_DETAILS", MockedUnboundedTable - .of(SqlTypeName.INTEGER, "order_id", - SqlTypeName.INTEGER, "site_id", - SqlTypeName.INTEGER, "price", - SqlTypeName.TIMESTAMP, "order_time", - - 3, - + .of(Types.INTEGER, "order_id", + Types.INTEGER, "site_id", + Types.INTEGER, "price", + Types.TIMESTAMP, "order_time" + ) + .timestampColumnIndex(3) + .addRows( + Duration.ZERO, 1, 1, 1, FIRST_DATE, - 1, 2, 2, FIRST_DATE, + 1, 2, 2, FIRST_DATE + ) + .addRows( + WINDOW_SIZE.plus(Duration.standardMinutes(1)), 2, 2, 3, SECOND_DATE, - 2, 3, 3, SECOND_DATE - ) - .addInputRecords( - // this late record is omitted - Duration.standardHours(1).plus(Duration.standardMinutes(40)), + 2, 3, 3, SECOND_DATE, + // this late record is omitted(First window) + 1, 3, 3, FIRST_DATE + ) + .addRows( + // this late record is omitted(Second window) + WINDOW_SIZE.plus(WINDOW_SIZE).plus(Duration.standardMinutes(1)), 2, 3, 3, SECOND_DATE ) - ); } @@ -87,15 +89,16 @@ public void testInnerJoin() throws Exception { PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) - .containsInAnyOrder(beamSqlRows2Strings(MockedBeamSqlTable.of( - SqlTypeName.INTEGER, "order_id", - SqlTypeName.INTEGER, "sum_site_id", - SqlTypeName.INTEGER, "order_id0", - SqlTypeName.INTEGER, "sum_site_id0", - 1, 3, 1, 3, - //2, 5, 2, 5 - 2, 8, 2, 8 - ).getInputRecords())); + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "sum_site_id", + Types.INTEGER, "order_id0", + Types.INTEGER, "sum_site_id0").values( + 1, 3, 1, 3, + 2, 5, 2, 5 + ).getStrRows() + ); pipeline.run(); } @@ -119,17 +122,19 @@ public void testLeftOuterJoin() throws Exception { PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) - .containsInAnyOrder(beamSqlRows2Strings(MockedBeamSqlTable.of( - SqlTypeName.INTEGER, "order_id", - SqlTypeName.INTEGER, "sum_site_id", - SqlTypeName.INTEGER, "order_id0", - SqlTypeName.INTEGER, "sum_site_id0", - - 1, 1, 1, 3, - 2, 2, null, null, - 2, 2, 2, 5, - 3, 3, null, null - ).getInputRecords())); + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "sum_site_id", + Types.INTEGER, "order_id0", + Types.INTEGER, "sum_site_id0" + ).values( + 1, 1, 1, 3, + 2, 2, null, null, + 2, 2, 2, 5, + 3, 3, null, null + ).getStrRows() + ); pipeline.run(); } @@ -147,17 +152,19 @@ public void testRightOuterJoin() throws Exception { PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) - .containsInAnyOrder(beamSqlRows2Strings(MockedBeamSqlTable.of( - SqlTypeName.INTEGER, "order_id", - SqlTypeName.INTEGER, "sum_site_id", - SqlTypeName.INTEGER, "order_id0", - SqlTypeName.INTEGER, "sum_site_id0", - - 1, 3, 1, 1, - null, null, 2, 2, - 2, 5, 2, 2, - null, null, 3, 3 - ).getInputRecords())); + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "sum_site_id", + Types.INTEGER, "order_id0", + Types.INTEGER, "sum_site_id0" + ).values( + 1, 3, 1, 1, + null, null, 2, 2, + 2, 5, 2, 2, + null, null, 3, 3 + ).getStrRows() + ); pipeline.run(); } @@ -181,17 +188,19 @@ public void testFullOuterJoin() throws Exception { PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) - .containsInAnyOrder(beamSqlRows2Strings(MockedBeamSqlTable.of( - SqlTypeName.INTEGER, "order_id", - SqlTypeName.INTEGER, "sum_site_id", - SqlTypeName.INTEGER, "order_id0", - SqlTypeName.INTEGER, "sum_site_id0", - - 1, 1, 1, 3, - 2, 2, null, null, - 2, 2, 2, 5, - 3, 3, null, null - ).getInputRecords())); + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "sum_site_id", + Types.INTEGER, "order_id0", + Types.INTEGER, "sum_site_id0" + ).values( + 1, 1, 1, 3, + 2, 2, null, null, + 2, 2, 2, 5, + 3, 3, null, null + ).getStrRows() + ); pipeline.run(); } From e7b2b26f8939b760a4ae91055b6394298d4c4b26 Mon Sep 17 00:00:00 2001 From: James Xu Date: Wed, 28 Jun 2017 14:05:36 +0800 Subject: [PATCH 5/6] fix findbugs bug --- .../java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java index 558270659dc3..a01d2ad8c158 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java @@ -179,10 +179,6 @@ private PCollection standardJoin( BeamSqlRow leftNullRow, BeamSqlRow rightNullRow, String stageName) { PCollection>> joinedRows = null; switch (joinType) { - case INNER: - joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join - .innerJoin(extractedLeftRows, extractedRightRows); - break; case LEFT: joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join .leftOuterJoin(extractedLeftRows, extractedRightRows, rightNullRow); @@ -195,6 +191,12 @@ private PCollection standardJoin( joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join .fullOuterJoin(extractedLeftRows, extractedRightRows, leftNullRow, rightNullRow); + break; + case INNER: + default: + joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join + .innerJoin(extractedLeftRows, extractedRightRows); + break; } PCollection ret = joinedRows From deaea0725ac706c93aad4c5412be1646b7fcdad4 Mon Sep 17 00:00:00 2001 From: James Xu Date: Thu, 29 Jun 2017 13:58:02 +0800 Subject: [PATCH 6/6] update according to the review comments --- .../apache/beam/dsls/sql/rel/BeamJoinRel.java | 84 +++++++++++++------ .../sql/transform/BeamJoinTransforms.java | 54 ++++++------ .../org/apache/beam/dsls/sql/TestUtils.java | 30 ++++++- .../rel/BeamJoinRelBoundedVsBoundedTest.java | 64 +++++++++----- .../BeamJoinRelUnboundedVsBoundedTest.java | 34 +++++++- .../BeamJoinRelUnboundedVsUnboundedTest.java | 45 +++++----- 6 files changed, 208 insertions(+), 103 deletions(-) diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java index a01d2ad8c158..e85368e3e2c9 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -46,6 +47,7 @@ import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; import org.apache.calcite.util.Pair; @@ -59,16 +61,24 @@ *
  • BoundedTable JOIN UnboundedTable
  • * * - *

    For the first two cases, a standard join can be utilized to implement them as long as the - * windowFn of the both sides match. For the third case, {@code sideInput} is utilized to implement - * the join, hence there are some constrains for the third case: 1) FULL JOIN is not supported - * 2) The unbounded table must be at the left side of the OUTER JOIN. + *

    For the first two cases, a standard join is utilized as long as the windowFn of the both + * sides match. * - *

    There is also some overall constrains: + *

    For the third case, {@code sideInput} is utilized to implement the join, so there are some + * constraints: * *

      - *
    • Only equi-join is supported
    • - *
    • CROSS JOIN is not supported
    • + *
    • {@code FULL OUTER JOIN} is not supported.
    • + *
    • If it's a {@code LEFT OUTER JOIN}, the unbounded table should on the left side.
    • + *
    • If it's a {@code RIGHT OUTER JOIN}, the unbounded table should on the right side.
    • + *
    + * + * + *

    There are also some general constraints: + * + *

      + *
    • Only equi-join is supported.
    • + *
    • CROSS JOIN is not supported.
    • *
    */ public class BeamJoinRel extends Join implements BeamRelNode { @@ -132,13 +142,17 @@ public BeamJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelN BeamSqlRow rightNullRow = buildNullRow(rightRelNode); // a regular join - if (leftWinFn.isCompatible(rightWinFn) - && ((leftRows.isBounded() == PCollection.IsBounded.BOUNDED + if ((leftRows.isBounded() == PCollection.IsBounded.BOUNDED && rightRows.isBounded() == PCollection.IsBounded.BOUNDED) || (leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED - && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED) - ) - ) { + && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED)) { + try { + leftWinFn.verifyCompatibility(rightWinFn); + } catch (IncompatibleWindowException e) { + throw new IllegalArgumentException( + "WindowFns must match for a bounded-vs-bounded/unbounded-vs-unbounded join.", e); + } + return standardJoin(extractedLeftRows, extractedRightRows, leftNullRow, rightNullRow, stageName); } else if ( @@ -148,8 +162,8 @@ public BeamJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelN && rightRows.isBounded() == PCollection.IsBounded.BOUNDED) ) { // if one of the sides is Bounded & the other is Unbounded - // then do a sideInput - // when doing a sideInput, the windowFn does not need to match + // then do a sideInput join + // when doing a sideInput join, the windowFn does not need to match // Only support INNER JOIN & LEFT OUTER JOIN where left side of the join must be // the unbounded if (joinType == JoinRelType.FULL) { @@ -210,24 +224,34 @@ public PCollection sideInputJoin( PCollection> extractedLeftRows, PCollection> extractedRightRows, BeamSqlRow leftNullRow, BeamSqlRow rightNullRow) { - // if the join is not a INNER JOIN we convert the join to a left join - // by swap the left/right side of the rows - boolean swapped = joinType != JoinRelType.INNER - && extractedLeftRows.isBounded() == PCollection.IsBounded.BOUNDED; + // we always make the Unbounded table on the left to do the sideInput join + // (will convert the result accordingly before return) + boolean swapped = (extractedLeftRows.isBounded() == PCollection.IsBounded.BOUNDED); + JoinRelType realJoinType = + (swapped && joinType != JoinRelType.INNER) ? JoinRelType.LEFT : joinType; PCollection> realLeftRows = swapped ? extractedRightRows : extractedLeftRows; PCollection> realRightRows = swapped ? extractedLeftRows : extractedRightRows; BeamSqlRow realRightNullRow = swapped ? leftNullRow : rightNullRow; - JoinRelType realJoinType = swapped ? JoinRelType.LEFT : joinType; - final PCollectionView>> rowsView = realRightRows + // swapped still need to pass down because, we need to swap the result back. + return sideInputJoinHelper(realJoinType, realLeftRows, realRightRows, + realRightNullRow, swapped); + } + + private PCollection sideInputJoinHelper( + JoinRelType joinType, + PCollection> leftRows, + PCollection> rightRows, + BeamSqlRow rightNullRow, boolean swapped) { + final PCollectionView>> rowsView = rightRows .apply(View.asMultimap()); - PCollection ret = realLeftRows + PCollection ret = leftRows .apply(ParDo.of(new BeamJoinTransforms.SideInputJoinDoFn( - realJoinType, realRightNullRow, rowsView, swapped)).withSideInputs(rowsView)) + joinType, rightNullRow, rowsView, swapped)).withSideInputs(rowsView)) .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType()))); return ret; @@ -242,17 +266,22 @@ private BeamSqlRow buildNullRow(BeamRelNode relNode) { return nullRow; } - private List> extractJoinColumns(int separator) { + private List> extractJoinColumns(int leftRowColumnCount) { + // it's a CROSS JOIN because: condition == true + if (condition instanceof RexLiteral && (Boolean) ((RexLiteral) condition).getValue()) { + throw new UnsupportedOperationException("CROSS JOIN is not supported!"); + } + RexCall call = (RexCall) condition; List> pairs = new ArrayList<>(); if ("AND".equals(call.getOperator().getName())) { List operands = call.getOperands(); for (RexNode rexNode : operands) { - Pair pair = extractOneJoinColumn((RexCall) rexNode, separator); + Pair pair = extractOneJoinColumn((RexCall) rexNode, leftRowColumnCount); pairs.add(pair); } } else if ("=".equals(call.getOperator().getName())) { - pairs.add(extractOneJoinColumn(call, separator)); + pairs.add(extractOneJoinColumn(call, leftRowColumnCount)); } else { throw new UnsupportedOperationException( "Operator " + call.getOperator().getName() + " is not supported in join condition"); @@ -261,14 +290,15 @@ private List> extractJoinColumns(int separator) { return pairs; } - private Pair extractOneJoinColumn(RexCall oneCondition, int separator) { + private Pair extractOneJoinColumn(RexCall oneCondition, + int leftRowColumnCount) { List operands = oneCondition.getOperands(); final int leftIndex = Math.min(((RexInputRef) operands.get(0)).getIndex(), ((RexInputRef) operands.get(1)).getIndex()); final int rightIndex1 = Math.max(((RexInputRef) operands.get(0)).getIndex(), ((RexInputRef) operands.get(1)).getIndex()); - final int rightIndex = rightIndex1 - separator; + final int rightIndex = rightIndex1 - leftRowColumnCount; return new Pair<>(leftIndex, rightIndex); } diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java index bc739dd157e9..8169b837b4c4 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java @@ -41,8 +41,8 @@ public class BeamJoinTransforms { */ public static class ExtractJoinFields extends SimpleFunction> { - private boolean isLeft; - private List> joinColumns; + private final boolean isLeft; + private final List> joinColumns; public ExtractJoinFields(boolean isLeft, List> joinColumns) { this.isLeft = isLeft; @@ -77,10 +77,10 @@ public ExtractJoinFields(boolean isLeft, List> joinColumn * A {@code DoFn} which implement the sideInput-JOIN. */ public static class SideInputJoinDoFn extends DoFn, BeamSqlRow> { - private PCollectionView>> sideInputView; - private JoinRelType joinType; - private BeamSqlRow rightNullRow; - private boolean swap; + private final PCollectionView>> sideInputView; + private final JoinRelType joinType; + private final BeamSqlRow rightNullRow; + private final boolean swap; public SideInputJoinDoFn(JoinRelType joinType, BeamSqlRow rightNullRow, PCollectionView>> sideInputView, @@ -120,45 +120,45 @@ public static class JoinParts2WholeRow KV parts = input.getValue(); BeamSqlRow leftRow = parts.getKey(); BeamSqlRow rightRow = parts.getValue(); - return combineTwoRowsIntoOne(leftRow, rightRow); + return combineTwoRowsIntoOne(leftRow, rightRow, false); } } - private static BeamSqlRow combineTwoRowsIntoOne(BeamSqlRow leftRow, BeamSqlRow rightRow) { - return combineTwoRowsIntoOne(leftRow, rightRow, false); - } - /** * As the method name suggests: combine two rows into one wide row. */ private static BeamSqlRow combineTwoRowsIntoOne(BeamSqlRow leftRow, BeamSqlRow rightRow, boolean swap) { + if (swap) { + return combineTwoRowsIntoOneHelper(rightRow, leftRow); + } else { + return combineTwoRowsIntoOneHelper(leftRow, rightRow); + } + } + + /** + * As the method name suggests: combine two rows into one wide row. + */ + private static BeamSqlRow combineTwoRowsIntoOneHelper(BeamSqlRow leftRow, + BeamSqlRow rightRow) { // build the type List names = new ArrayList<>(leftRow.size() + rightRow.size()); - names.addAll( - swap ? rightRow.getDataType().getFieldsName() : leftRow.getDataType().getFieldsName()); - names.addAll( - swap ? leftRow.getDataType().getFieldsName() : rightRow.getDataType().getFieldsName()); + names.addAll(leftRow.getDataType().getFieldsName()); + names.addAll(rightRow.getDataType().getFieldsName()); List types = new ArrayList<>(leftRow.size() + rightRow.size()); - types.addAll( - swap ? rightRow.getDataType().getFieldsType() : leftRow.getDataType().getFieldsType()); - types.addAll( - swap ? leftRow.getDataType().getFieldsType() : rightRow.getDataType().getFieldsType()); - + types.addAll(leftRow.getDataType().getFieldsType()); + types.addAll(rightRow.getDataType().getFieldsType()); BeamSqlRecordType type = BeamSqlRecordType.create(names, types); BeamSqlRow row = new BeamSqlRow(type); - BeamSqlRow currentRow = swap ? rightRow : leftRow; - int leftRowSize = currentRow.size(); // build the row - for (int i = 0; i < currentRow.size(); i++) { - row.addField(i, currentRow.getFieldValue(i)); + for (int i = 0; i < leftRow.size(); i++) { + row.addField(i, leftRow.getFieldValue(i)); } - currentRow = swap ? leftRow : rightRow; - for (int i = 0; i < currentRow.size(); i++) { - row.addField(i + leftRowSize, currentRow.getFieldValue(i)); + for (int i = 0; i < rightRow.size(); i++) { + row.addField(i + leftRow.size(), rightRow.getFieldValue(i)); } return row; diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java index 3b9b2c2e0ffa..375027a37675 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java @@ -31,7 +31,7 @@ public class TestUtils { /** - * A {@code DoFn} to convert a {@code BeamSqlRow} to a comparable {@code}. + * A {@code DoFn} to convert a {@code BeamSqlRow} to a comparable {@code String}. */ public static class BeamSqlRow2StringDoFn extends DoFn { @ProcessElement @@ -54,11 +54,32 @@ public static List beamSqlRows2Strings(List rows) { /** * Convenient way to build a list of {@code BeamSqlRow}s. + * + *

    You can use it like this: + * + *

    {@code
    +   * TestUtils.RowsBuilder.of(
    +   *   Types.INTEGER, "order_id",
    +   *   Types.INTEGER, "sum_site_id",
    +   *   Types.VARCHAR, "buyer"
    +   * ).values(
    +   *   1, 3, "james",
    +   *   2, 5, "bond"
    +   *   ).getStringRows()
    +   * }
    + * {@code} */ public static class RowsBuilder { private BeamSqlRecordType type; private List rows = new ArrayList<>(); + /** + * Create a RowsBuilder with the specified row type info. + * + *

    Note: check the class javadoc for for detailed example. + * + * @args pairs of column type and column names. + */ public static RowsBuilder of(final Object... args) { List types = new ArrayList<>(); List names = new ArrayList<>(); @@ -75,6 +96,11 @@ public static RowsBuilder of(final Object... args) { return builder; } + /** + * Add values to the builder. + * + *

    Note: check the class javadoc for for detailed example. + */ public RowsBuilder values(final Object... args) { int fieldCount = type.size(); for (int i = 0; i < args.length; i += fieldCount) { @@ -92,7 +118,7 @@ public List getRows() { return rows; } - public List getStrRows() { + public List getStringRows() { return beamSqlRows2Strings(rows); } } diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java index 4e3497f7dc82..505b742855ac 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java @@ -38,6 +38,30 @@ public class BeamJoinRelBoundedVsBoundedTest { public final TestPipeline pipeline = TestPipeline.create(); private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv(); + @BeforeClass + public static void prepare() { + beamSqlEnv.registerTable("ORDER_DETAILS", + MockedBeamSqlTable + .of(SqlTypeName.INTEGER, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.INTEGER, "price", + + 1, 2, 3, + 2, 3, 3, + 3, 4, 5)); + + beamSqlEnv.registerTable("ORDER_DETAILS0", + MockedBeamSqlTable + .of(SqlTypeName.INTEGER, "order_id0", + SqlTypeName.INTEGER, "site_id0", + SqlTypeName.INTEGER, "price0", + + 1, 2, 3, + 2, 3, 3, + 3, 4, 5)); + + } + @Test public void testInnerJoin() throws Exception { String sql = @@ -143,27 +167,29 @@ public void testFullOuterJoin() throws Exception { pipeline.run(); } - @BeforeClass - public static void prepare() { - beamSqlEnv.registerTable("ORDER_DETAILS", - MockedBeamSqlTable - .of(SqlTypeName.INTEGER, "order_id", - SqlTypeName.INTEGER, "site_id", - SqlTypeName.INTEGER, "price", - - 1, 2, 3, - 2, 3, 3, - 3, 4, 5)); + @Test(expected = UnsupportedOperationException.class) + public void testException_nonEqualJoin() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS o1" + + " JOIN ORDER_DETAILS o2" + + " on " + + " o1.order_id>o2.site_id" + ; - beamSqlEnv.registerTable("ORDER_DETAILS0", - MockedBeamSqlTable - .of(SqlTypeName.INTEGER, "order_id0", - SqlTypeName.INTEGER, "site_id0", - SqlTypeName.INTEGER, "price0", + pipeline.enableAbandonedNodeEnforcement(false); + BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + pipeline.run(); + } - 1, 2, 3, - 2, 3, 3, - 3, 4, 5)); + @Test(expected = UnsupportedOperationException.class) + public void testException_crossJoin() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS o1, ORDER_DETAILS o2"; + pipeline.enableAbandonedNodeEnforcement(false); + BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + pipeline.run(); } } diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java index 4aeec5a9a317..2ddb00b13e9f 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java @@ -89,7 +89,7 @@ public static void prepare() { } @Test - public void testInnerJoin() throws Exception { + public void testInnerJoin_unboundedTableOnTheLeftSide() throws Exception { String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " @@ -109,7 +109,33 @@ public void testInnerJoin() throws Exception { ).values( 1, 3, "james", 2, 5, "bond" - ).getStrRows() + ).getStringRows() + ); + pipeline.run(); + } + + @Test + public void testInnerJoin_boundedTableOnTheLeftSide() throws Exception { + String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " + + " ORDER_DETAILS1 o2 " + + " JOIN " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " on " + + " o1.order_id=o2.order_id" + ; + + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "sum_site_id", + Types.VARCHAR, "buyer" + ).values( + 1, 3, "james", + 2, 5, "bond" + ).getStringRows() ); pipeline.run(); } @@ -137,7 +163,7 @@ public void testLeftOuterJoin() throws Exception { 1, 3, "james", 2, 5, "bond", 3, 3, null - ).getStrRows() + ).getStringRows() ); pipeline.run(); } @@ -178,7 +204,7 @@ public void testRightOuterJoin() throws Exception { 1, 3, "james", 2, 5, "bond", 3, 3, null - ).getStrRows() + ).getStringRows() ); pipeline.run(); } diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java index 55300afb0c46..18a5f608aae2 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java @@ -25,6 +25,7 @@ import org.apache.beam.dsls.sql.TestUtils; import org.apache.beam.dsls.sql.planner.MockedUnboundedTable; import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.transform.BeamSqlOutputToConsoleFn; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.ParDo; @@ -58,12 +59,12 @@ public static void prepare() { .addRows( Duration.ZERO, 1, 1, 1, FIRST_DATE, - 1, 2, 2, FIRST_DATE + 1, 2, 6, FIRST_DATE ) .addRows( WINDOW_SIZE.plus(Duration.standardMinutes(1)), - 2, 2, 3, SECOND_DATE, - 2, 3, 3, SECOND_DATE, + 2, 2, 7, SECOND_DATE, + 2, 3, 8, SECOND_DATE, // this late record is omitted(First window) 1, 3, 3, FIRST_DATE ) @@ -97,7 +98,7 @@ public void testInnerJoin() throws Exception { Types.INTEGER, "sum_site_id0").values( 1, 3, 1, 3, 2, 5, 2, 5 - ).getStrRows() + ).getStringRows() ); pipeline.run(); } @@ -133,7 +134,7 @@ public void testLeftOuterJoin() throws Exception { 2, 2, null, null, 2, 2, 2, 5, 3, 3, null, null - ).getStrRows() + ).getStringRows() ); pipeline.run(); } @@ -163,7 +164,7 @@ public void testRightOuterJoin() throws Exception { null, null, 2, 2, 2, 5, 2, 2, null, null, 3, 3 - ).getStrRows() + ).getStringRows() ); pipeline.run(); } @@ -171,40 +172,36 @@ public void testRightOuterJoin() throws Exception { @Test public void testFullOuterJoin() throws Exception { String sql = "SELECT * FROM " - + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " - + " LEFT OUTER JOIN " + + "(select price as order_id1, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY price, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " FULL OUTER JOIN " + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 " + + " GROUP BY order_id , TUMBLE(order_time, INTERVAL '1' HOUR)) o2 " + " on " - + " o1.order_id=o2.order_id" + + " o1.order_id1=o2.order_id" ; - // 1, 1 | 1, 3 - // 2, 2 | NULL, NULL - // ---- | ----- - // 2, 2 | 2, 5 - // 3, 3 | NULL, NULL - PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("hello"))); PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) .containsInAnyOrder( TestUtils.RowsBuilder.of( - Types.INTEGER, "order_id", + Types.INTEGER, "order_id1", Types.INTEGER, "sum_site_id", - Types.INTEGER, "order_id0", + Types.INTEGER, "order_id", Types.INTEGER, "sum_site_id0" ).values( 1, 1, 1, 3, - 2, 2, null, null, - 2, 2, 2, 5, - 3, 3, null, null - ).getStrRows() + 6, 2, null, null, + 7, 2, null, null, + 8, 3, null, null, + null, null, 2, 5 + ).getStringRows() ); pipeline.run(); } - @Test(expected = UnsupportedOperationException.class) + @Test(expected = IllegalArgumentException.class) public void testWindowsMismatch() throws Exception { String sql = "SELECT * FROM " + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "