Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-11800] Support ARRAY_AGG fn for Zetasql dialect #13483

Merged
merged 7 commits into from Feb 19, 2021

Conversation

sonam-vend
Copy link
Contributor

Please add a meaningful description for your change here


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Dataflow Flink Samza Spark Twister2
Go Build Status --- Build Status --- Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status
Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
--- Build Status ---
XLang Build Status Build Status Build Status --- Build Status ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status --- --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);

Schema schema = Schema.builder().addArrayField("array_field", FieldType.of(Schema.TypeName.ARRAY)).build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here we expect schema:

Schema schema = Schema.builder().addArrayField("array_field", FieldType.of(FieldType.INT64)).build();

meaning it is an "array of int64"

public static final SqlOperator ARR_AGG_ARR_FN =
createUdafOperator(
"array_agg",
x -> createTypeFactory().createSqlType(SqlTypeName.ARRAY),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you simply created an array type without specifying the element type. I guess that's why the error message says the "inferred type is ARRAY NOT NULL" (just array type, your can ignore the "NOT NULL" suffix), where it should be "BIGINT NOT NULL ARRAY NOT NULL" (array of bigint).

To create an array type with element type specified, you may want to use createTypeFactory().createArrayType(), like here: https://github.com/robinyqiu/beam/blob/cbe87445d4259b6b485bc010231dda1895022d83/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java#L170

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But ARRAY_AGG should support all element types, not just INT64. https://github.com/google/zetasql/blob/master/docs/aggregate_functions.md#array_agg

Copy link
Contributor Author

@sonam-vend sonam-vend Dec 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ibzib exactly. I try to implement generic array like not sure if it is the correct way.

private static RelDataType relDataType =
new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT)
.builder()
.add("col_tinyint", SqlTypeName.TINYINT)
.add("col_smallint", SqlTypeName.SMALLINT)
.add("col_integer", SqlTypeName.INTEGER)
.add("col_bigint", SqlTypeName.BIGINT)
.add("col_float", SqlTypeName.FLOAT)
.add("col_double", SqlTypeName.DOUBLE)
.add("col_decimal", SqlTypeName.DECIMAL)
.add("col_string_varchar", SqlTypeName.VARCHAR)
.add("col_time", SqlTypeName.TIME)
.add("col_date", SqlTypeName.DATE)
.add("col_timestamp_with_local_time_zone", SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE)
.add("col_timestamp", SqlTypeName.TIMESTAMP)
.add("col_boolean", SqlTypeName.BOOLEAN)
.build();

public static final SqlOperator ARR_AGG_ARR_FN =
createUdafOperator(
"array_agg",
x -> createTypeFactory().createArrayType(relDataType, -1),
new UdafImpl<>(new ArrayAgg.ArrayAggArray()));

@robinyqiu

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, ARRAY_AGG should be generic. I said the type should be "BIGINT NOT NULL ARRAY NOT NULL" because that's from the error message.

Sonam, I believe there is some way that you can get the element type from the input x. You don't need to define your new mapping.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

x -> createTypeFactory().createArrayType(x.getOperandType(0), -1) worked.

Thanks @robinyqiu @ibzib

@sonam-vend sonam-vend changed the title Implemeted ARRAY_AGG fn for Zetasql dialect [BEAM-11800] Support ARRAY_AGG fn for Zetasql dialect Feb 11, 2021
}

@Override
public Object[] extractOutput(List<Object> accumulator) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Beam ARRAY type expect the data to be a Collection type, not Object[] type (i.e Beam ARRAY != Java array). That's why you get the error during cast. The fix should be simple: change the return type of this function to list (also the third class generic parameter).

@sonam-vend sonam-vend marked this pull request as ready for review February 19, 2021 09:28
@sonam-vend
Copy link
Contributor Author

R: @robinyqiu

Copy link
Contributor

@robinyqiu robinyqiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@robinyqiu robinyqiu merged commit 31e1b84 into apache:master Feb 19, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants