From 2ed6075ece20b553047e5ee3918460ea4e2c995f Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Tue, 20 Jun 2017 09:20:19 +0800 Subject: [PATCH] [BEAM-2477] BeamAggregationRel should use Combine.perKey instead of GroupByKey --- .../beam/dsls/sql/rel/BeamAggregationRel.java | 20 +++++++------------ 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java index bcdc44f32093..7e80fc474751 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java @@ -24,10 +24,8 @@ import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms; import org.apache.beam.dsls.sql.utils.CalciteUtils; -import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.WithTimestamps; @@ -92,25 +90,21 @@ public PCollection buildBeamPipeline(PCollectionTuple inputPCollecti .accumulatingFiredPanes()); BeamSqlRowCoder keyCoder = new BeamSqlRowCoder(exKeyFieldsSchema(input.getRowType())); - PCollection> exGroupByStream = windowStream.apply( - stageName + "_exGroupBy", + PCollection> exCombineByStream = windowStream.apply( + stageName + "_exCombineBy", WithKeys .of(new BeamAggregationTransforms.AggregationGroupByKeyFn( windowFieldIdx, groupSet))) .setCoder(KvCoder.of(keyCoder, upstream.getCoder())); - PCollection>> groupedStream = exGroupByStream - .apply(stageName + "_groupBy", GroupByKey.create()) - .setCoder(KvCoder.>of(keyCoder, - IterableCoder.of(upstream.getCoder()))); - BeamSqlRowCoder aggCoder = new BeamSqlRowCoder(exAggFieldsSchema()); - PCollection> aggregatedStream = groupedStream.apply( - stageName + "_aggregation", - Combine.groupedValues( + + PCollection> aggregatedStream = exCombineByStream.apply( + stageName + "_combineBy", + Combine.perKey( new BeamAggregationTransforms.AggregationCombineFn(getAggCallList(), CalciteUtils.toBeamRecordType(input.getRowType())))) - .setCoder(KvCoder.of(keyCoder, aggCoder)); + .setCoder(KvCoder.of(keyCoder, aggCoder)); PCollection mergedStream = aggregatedStream.apply(stageName + "_mergeRecord", ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(