From 5ca897db78383db6e8772daf211d6ffba87ff3e9 Mon Sep 17 00:00:00 2001 From: Gleb Kanterov Date: Wed, 13 Nov 2019 19:46:10 +0100 Subject: [PATCH] [BEAM-8042] Add test case to reproduce the issue DO NOT MERGE --- .../sql/impl/planner/BeamRuleSets.java | 3 +- .../sql/zetasql/ZetaSQLAggregationTest.java | 85 +++++++++++++++++++ 2 files changed, 87 insertions(+), 1 deletion(-) create mode 100644 sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLAggregationTest.java diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java index f30f9f344052..c65e9894b28b 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java @@ -96,7 +96,8 @@ public class BeamRuleSets { ProjectSetOpTransposeRule.INSTANCE, // aggregation and projection rules - AggregateProjectMergeRule.INSTANCE, + // try to disable AggregateProjectMergeRule + // AggregateProjectMergeRule.INSTANCE, // push a projection past a filter or vice versa ProjectFilterTransposeRule.INSTANCE, FilterProjectTransposeRule.INSTANCE, diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLAggregationTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLAggregationTest.java new file mode 100644 index 000000000000..76f4c3c50a17 --- /dev/null +++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLAggregationTest.java @@ -0,0 +1,85 @@ +package org.apache.beam.sdk.extensions.sql.zetasql; + +import org.apache.beam.sdk.extensions.sql.SqlTransform; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.junit.Rule; +import org.junit.Test; + +public class ZetaSQLAggregationTest { + + @Rule + public TestPipeline pipeline = TestPipeline.fromOptions(createPipelineOptions()); + + private static PipelineOptions createPipelineOptions() { + BeamSqlPipelineOptions opts = PipelineOptionsFactory.create().as(BeamSqlPipelineOptions.class); + opts.setPlannerName(ZetaSQLQueryPlanner.class.getName()); + return opts; + } + + @Test + public void testAggregate() { + Schema inputSchema = Schema.builder() + .addByteArrayField("id") + .addInt64Field("has_f1") + .addInt64Field("has_f2") + .addInt64Field("has_f3") + .addInt64Field("has_f4") + .addInt64Field("has_f5") + .addInt64Field("has_f6") + .build(); + + String sql = "SELECT \n" + + " id, \n" + + " COUNT(*) as count, \n" + + " SUM(has_f1) as f1_count, \n" + + " SUM(has_f2) as f2_count, \n" + + " SUM(has_f3) as f3_count, \n" + + " SUM(has_f4) as f4_count, \n" + + " SUM(has_f5) as f5_count, \n" + + " SUM(has_f6) as f6_count \n" + + "FROM PCOLLECTION \n" + + "GROUP BY id"; + + pipeline + .apply(Create.empty(inputSchema)) + .apply(SqlTransform.query(sql)); + + pipeline.run(); + } + + @Test + public void testAggregate2() { + Schema inputSchema = Schema.builder() + .addByteArrayField("id") + .addInt64Field("has_f1") + .addInt64Field("has_f2") + .addInt64Field("has_f3") + .addInt64Field("has_f4") + .addInt64Field("has_f5") + .addInt64Field("has_f6") + .build(); + + String sql = "SELECT \n" + + " id, \n" + + " SUM(has_f1) as f1_count, \n" + + " SUM(has_f2) as f2_count, \n" + + " SUM(has_f3) as f3_count, \n" + + " SUM(has_f4) as f4_count, \n" + + " SUM(has_f5) as f5_count, \n" + + " SUM(has_f6) as f6_count \n" + + "FROM PCOLLECTION \n" + + "GROUP BY id"; + + pipeline + .apply(Create.empty(inputSchema)) + .apply(SqlTransform.query(sql)); + + pipeline.run(); + } + +}