New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-12100][BEAM-10379][BEAM-9514][BEAM-12647][BEAM-12099] AssertionError type mismatch from AggregateScanConverter #15174
Conversation
Hi @ibzib, I am working on a group of related issues using some of your work in #14392. Could you help me to take a look at the progress so far and provide some guidance on the next steps? I am planning to include BEAM-12098 and BEAM-12099 in this PR, but for those, I have a problem, in the transformation related to UNNEST, when it receives an empty array, does not return anything so it fails before aggregation transforms (COUNT, BIT_OR). I think the problem could be in this part https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/unnest/BeamZetaSqlUncollectRel.java#L115 because when the array is empty, this transform does not output anything. Any guidance on this would be appreciated. |
Hi Benjamin, thanks for working on this. If I understand correctly, I experimented with other queries and found that |
private PTransform<PCollection<Row>, PCollection<Row>> createGlobalCombiner() { | ||
org.apache.beam.sdk.schemas.transforms.Group.Global<Row> globally = | ||
org.apache.beam.sdk.schemas.transforms.Group.globally(); | ||
org.apache.beam.sdk.schemas.transforms.Group.CombineFieldsGlobally<Row> combined = null; | ||
for (FieldAggregation fieldAggregation : fieldAggregations) { | ||
List<Integer> inputs = fieldAggregation.inputs; | ||
CombineFn combineFn = fieldAggregation.combineFn; | ||
if (inputs.size() > 1 || inputs.isEmpty()) { | ||
// In this path we extract a Row (an empty row if inputs.isEmpty). | ||
combined = | ||
(combined == null) | ||
? globally.aggregateFieldsById(inputs, combineFn, fieldAggregation.outputField) | ||
: combined.aggregateFieldsById(inputs, combineFn, fieldAggregation.outputField); | ||
} else { | ||
// Combining over a single field, so extract just that field. | ||
combined = | ||
(combined == null) | ||
? globally.aggregateField(inputs.get(0), combineFn, fieldAggregation.outputField) | ||
: combined.aggregateField(inputs.get(0), combineFn, fieldAggregation.outputField); | ||
} | ||
} | ||
|
||
PTransform<PCollection<Row>, PCollection<Row>> combiner = combined; | ||
if (combiner == null) { | ||
// If no field aggregations were specified, we run a constant combiner that always returns | ||
// a single empty row for each key. This is used by the SELECT DISTINCT query plan - in this | ||
// case a group by is generated to determine unique keys, and a constant null combiner is | ||
// used. | ||
combiner = | ||
globally.aggregateField( | ||
"*", | ||
AggregationCombineFnAdapter.createConstantCombineFn(), | ||
Field.of( | ||
"e", | ||
FieldType.row(AggregationCombineFnAdapter.EMPTY_SCHEMA).withNullable(true))); | ||
ignoreValues = true; | ||
} | ||
return combiner; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created this method to process aggregations without "GROUP BY".
Some tests related to aggregations on empty tables are passing now, but different tests like this
Line 2216 in bdee3bd
public void testWithQueryEight() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the incorrect null value is coming from here:
return null; |
You can fix it by overriding the identity()
method in LongSum
. Or maybe you could change it back to Sum.ofLongs
, I'm not sure why it needed a separate definition here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created LongSum
to handle ArithmeticExceptions like Sumoverflow and Sumunderflow, without modifying the original Sum.ofLongs.
Codecov Report
@@ Coverage Diff @@
## master #15174 +/- ##
=======================================
Coverage 83.74% 83.74%
=======================================
Files 443 443
Lines 60077 60077
=======================================
+ Hits 50312 50313 +1
+ Misses 9765 9764 -1
Continue to review full report at Codecov.
|
Hi @ibzib, I overrode |
I'm guessing the reason the error message mentions "elements of type class java.lang.Object" is because of the type signature of DropNullFn. |
I used generics in Line 3843 in 072a47e
|
@benWize the other option it suggested in the error message was to use |
Yes, I tried but it causes a compilation error because of the return type of |
I think what we really want is to check whether or not the PCollection is in the global window, which is orthogonal to whether or not it's bounded. (In the test data, the bounded input is globally windowed while the unbounded input is not, but that's not always true generally.) I'll try to do a full review of this PR tomorrow, but I might not get to it till next week. |
I changed the condition to validate if the PCollection is in the global window. |
@benWize Sorry I haven't been able to review this promptly. I'm planning on reviewing and merging #14729 first, since it's critical we upgrade Calcite. Unfortunately there's a chance that could break this PR. Just a heads up. cc @kileys |
Ok Kyle, thanks for the heads up! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @benWize ! Comments inline.
@@ -216,6 +216,8 @@ public RelWriter explainTerms(RelWriter pw) { | |||
private WindowFn<Row, IntervalWindow> windowFn; | |||
private int windowFieldIndex; | |||
private List<FieldAggregation> fieldAggregations; | |||
private int groupSetCount; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these be final
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, fixed!
for (FieldAggregation fieldAggregation : fieldAggregations) { | ||
List<Integer> inputs = fieldAggregation.inputs; | ||
CombineFn combineFn = fieldAggregation.combineFn; | ||
if (inputs.size() > 1 || inputs.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: this logic is simpler if we put the single field case first.
if (inputs.size() == 1) {
} else {
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed
|
||
private PTransform<PCollection<Row>, PCollection<Row>> createGlobalCombiner() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
createGroupCombiner
and createGlobalCombiner
are mostly the same. Can we find a way to reduce code duplication?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created a new interface in order to make the code reusable for both global
and byFields
combiners.
.put("SUM", BeamBuiltinAggregations::createSum) | ||
.put("$SUM0", BeamBuiltinAggregations::createSum) | ||
.put("AVG", BeamBuiltinAggregations::createAvg) | ||
// Drop null elements for these aggregations BEAM-10379 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be confusing to link to that jira here, since it's referring to functions that don't want to drop null elements.
// Drop null elements for these aggregations BEAM-10379 | |
// Drop null elements for these aggregations. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment changed
} | ||
|
||
static class DropNullFn<InputT, AccumT, OutputT> extends CombineFn<InputT, AccumT, OutputT> { | ||
CombineFn<InputT, AccumT, OutputT> combineFn; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be private final
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, fixed
} | ||
} | ||
|
||
static class DropNullFn<InputT, AccumT, OutputT> extends CombineFn<InputT, AccumT, OutputT> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, fixed
public Long addInput(Long accum, T input) { | ||
return accum | input.longValue(); | ||
public Accum addInput(Accum accum, T input) { | ||
accum.bitOr |= input.longValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if input
is null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a validation similar to BitAnd
@@ -124,6 +124,11 @@ private BigDecimal getCovariance(CovarianceAccumulator covariance) { | |||
BigDecimal adjustedCount = | |||
this.isSample ? covariance.count().subtract(BigDecimal.ONE) : covariance.count(); | |||
|
|||
// Avoid ArithmeticException: Division is undefined when adjustedCount and covariance are 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't division undefined whenever the denominator is 0, regardless of the numerator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I changed the validation and comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is one ZetaSQL compliance test failure. (ZetaSQL is mostly open source but its compliance tests are not yet, so we usually end up duplicating the tests in Beam where necessary.)
It seems we are not handling this case properly: Returns NULL if the input contains only NULLs. (from https://github.com/google/zetasql/blob/master/docs/aggregate_functions.md#sum)
Can you write a test for this case and fix it?
@@ -364,13 +369,29 @@ | |||
} | |||
} | |||
|
|||
public interface AggregateCombinerInterface<InputT> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: don't include Interface
in the name of an interface.
public interface AggregateCombinerInterface<InputT> { | |
public interface AggregateCombiner<InputT> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed
@@ -364,13 +369,29 @@ | |||
} | |||
} | |||
|
|||
public interface AggregateCombinerInterface<InputT> { | |||
<CombineInputT, AccumT, CombineOutputT> AggregateCombinerInterface<InputT> aggregateField( | |||
int inputFielId, CombineFn<CombineInputT, AccumT, CombineOutputT> fn, Field outputField); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo
int inputFielId, CombineFn<CombineInputT, AccumT, CombineOutputT> fn, Field outputField); | |
int inputFieldId, CombineFn<CombineInputT, AccumT, CombineOutputT> fn, Field outputField); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
PTransform<PCollection<Row>, PCollection<Row>> combiner = combined; | ||
boolean ignoreValues = false; | ||
PTransform<PCollection<Row>, PCollection<Row>> combiner = | ||
(PTransform<PCollection<Row>, PCollection<Row>>) combined; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These casts indicate that AggregateCombinerInterface
should probably extend PTransform.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to extend from PTransform, but I got several conflicts because of the different output types for Global
, CombineFieldsGlobally
and CombineFieldsByFields
classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the problem is that CombineFieldsGlobally
and CombineFieldsByFields
extend PTransform<PCollection<InputT>, PCollection<Row>>
, but Global
extends PTransform<PCollection<InputT>, PCollection<Iterable<InputT>>>
. However, it looks like you only use Global.globally().aggregateField(...)
, which returns a CombineFieldsGlobally
transform anyway.
beam/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java
Line 171 in 3a7b8e7
return new CombineFieldsGlobally<>( |
So can you try using CombineFieldsGlobally
as the initial combiner instead of Global
? Then Global
doesn't have to extend AggregateCombiner
at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did this, but I had to create a new method to make an instance of CombineFieldsGlobally
with null schemaAggregateFn
because that param is initialized here
Line 284 in 22e2adb
? initialCombiner.aggregateField( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we always turn the null schemaAggregateFn
into SchemaAggregateFn.create()
, why not initialize with SchemaAggregateFn.create()
instead of null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made the change and it worked, thanks!
I added the test, and fix the null handling case, but I had to branch |
return new BigDecimalSum0(); | ||
default: | ||
throw new UnsupportedOperationException( | ||
String.format("[%s] is not supported in SUM", fieldType)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
String.format("[%s] is not supported in SUM", fieldType)); | |
String.format("[%s] is not supported in SUM0", fieldType)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
PTransform<PCollection<Row>, PCollection<Row>> combiner = combined; | ||
boolean ignoreValues = false; | ||
PTransform<PCollection<Row>, PCollection<Row>> combiner = | ||
(PTransform<PCollection<Row>, PCollection<Row>>) combined; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the problem is that CombineFieldsGlobally
and CombineFieldsByFields
extend PTransform<PCollection<InputT>, PCollection<Row>>
, but Global
extends PTransform<PCollection<InputT>, PCollection<Iterable<InputT>>>
. However, it looks like you only use Global.globally().aggregateField(...)
, which returns a CombineFieldsGlobally
transform anyway.
beam/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java
Line 171 in 3a7b8e7
return new CombineFieldsGlobally<>( |
So can you try using CombineFieldsGlobally
as the initial combiner instead of Global
? Then Global
doesn't have to extend AggregateCombiner
at all.
...nsions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
Show resolved
Hide resolved
We just merged #14729 so you will need to rebase this PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rebase this PR onto the master branch (instead of merging it).
private final SchemaAggregateFn.Inner schemaAggregateFn; | ||
|
||
CombineFieldsGlobally(SchemaAggregateFn.Inner schemaAggregateFn) { | ||
this.schemaAggregateFn = schemaAggregateFn; | ||
} | ||
|
||
public static CombineFieldsGlobally initialCombiner() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a javadoc comment for this method.
Also consider renaming it to something more standard. Factory methods like this are usually just called create
(like SchemaAggregateFn.create()
) so we can follow that pattern here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java
Show resolved
Hide resolved
…create a DropNull wrapper for the aggregations that rely on dropping nulls
…undefined division
Run Java PreCommit |
run sql postcommit |
@benWize I tried running Google compliance tests on this PR again. Most tests pass, but this test crashes:
|
I copied the test and fixed the error in |
Run SQL PreCommit |
Run Java_Examples_Dataflow PreCommit |
@benWize Thanks, that fixed the error. The tests are no longer crashing, but 6 failures remain:
I took a quick look at the errors, and it seems they are all related to null handling. (I'm guessing these are all test cases where the SQL query failed to compile before, but now they're compiling and giving wrong answers.)
|
@ibzib I copied most of the tests to check the fixes, I was able to fix null handling for Line 259 in 8adc437
|
Hi @benWize the operators for BIT_OR etc. are taken from Calcite's SqlStdOperatorTable. Line 79 in be08391
To override their return type inference, we will need replace the operators from SqlStdOperatorTable with custom operators defined in SqlOperators, and make sure nullability is set correctly. For example, BIT_XOR already has a custom operator, but it looks like the return type is non-null. Line 164 in be08391
|
Thanks, @ibzib I fixed null handling for BitXor with your suggestion, and for the case of Line 92 in dcc1812
but an assert fails with java.lang.AssertionError: use createArrayType() instead I looked into createArrayType() implementation, and it creates an ArraySqlType with nullable = false .
|
ArraySqlType has a public constructor, so can't we just call it directly with nullable = true? https://calcite.apache.org/javadocAggregate/org/apache/calcite/sql/type/ArraySqlType.html#%3Cinit%3E(org.apache.calcite.rel.type.RelDataType,boolean) |
Yes, it is fixed now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All compliance tests are passing now. Thanks for all your hard work!
LGTM
Thank you for all your guidance! |
These are passing now thanks to PR apache#15174.
These are passing now thanks to PR apache#15174.
…Error type mismatch from AggregateScanConverter (apache#15174) * [BEAM-12100] SUM throws error when overflow/underflow occurs * [BEAM-10379] Remove filter of nulls in AggregateCombineFnAdapter and create a DropNull wrapper for the aggregations that rely on dropping nulls * Fix checkstyle issues * [BEAM-12647] Branch combiner in Aggregations * [BEAM-12647] Create different paths for aggregations with and without GroupBy * [BEAM-12647] Override identity in LongSums * [BEAM-12647] Override identity in CombineFns, add condition to avoid undefined division * [BEAM-12647] Fix Coder Cast Exceptions * [BEAM-12647] Fix CountIf coder and result bug * Fix spotless * [BEAM-12099] Fix return value in Bit_OR for empty arrays * [BEAM-12647] Add .withoutDefaults() to GloballyCombineFn for unbounded pcollections * [BEAM-12647] Change condition in Group GloballyCombine to check if the input has a GlobalWindow * Fix accesors and comments * [BEAM-12647] Refactor aggregation combineFn to avoid code duplication * Change interface name and fix typo * Branch SUM and SUM0 for null management * Change AggregateCombiner from interface to abstract class to extend PTransform * Fix SUM0 message * Change schemaFn initialize from null to SchemaAggregateFn.create() * Add javadocs to AggregationCombiner * Fix Coder error in bitAnd operator * Fix null handling for bitOr and bitAnd ZetaSQL * Fix null handling BitXor * Fix null handling array_agg * Fix @nullable checker error
These are passing now thanks to PR apache#15174.
…Error type mismatch from AggregateScanConverter (apache#15174) * [BEAM-12100] SUM throws error when overflow/underflow occurs * [BEAM-10379] Remove filter of nulls in AggregateCombineFnAdapter and create a DropNull wrapper for the aggregations that rely on dropping nulls * Fix checkstyle issues * [BEAM-12647] Branch combiner in Aggregations * [BEAM-12647] Create different paths for aggregations with and without GroupBy * [BEAM-12647] Override identity in LongSums * [BEAM-12647] Override identity in CombineFns, add condition to avoid undefined division * [BEAM-12647] Fix Coder Cast Exceptions * [BEAM-12647] Fix CountIf coder and result bug * Fix spotless * [BEAM-12099] Fix return value in Bit_OR for empty arrays * [BEAM-12647] Add .withoutDefaults() to GloballyCombineFn for unbounded pcollections * [BEAM-12647] Change condition in Group GloballyCombine to check if the input has a GlobalWindow * Fix accesors and comments * [BEAM-12647] Refactor aggregation combineFn to avoid code duplication * Change interface name and fix typo * Branch SUM and SUM0 for null management * Change AggregateCombiner from interface to abstract class to extend PTransform * Fix SUM0 message * Change schemaFn initialize from null to SchemaAggregateFn.create() * Add javadocs to AggregationCombiner * Fix Coder error in bitAnd operator * Fix null handling for bitOr and bitAnd ZetaSQL * Fix null handling BitXor * Fix null handling array_agg * Fix @nullable checker error
These are passing now thanks to PR apache#15174.
Working on related issues
[BEAM-9514] For aggregate return type, use Calcite type inference instead of ZetaSQL. Reference #14392
[BEAM-12100] LongSum uses Math.addExact that handles overflow and underflow sum.
[BEAM-10379] Remove default filter of nulls in AggregationCombineFn and create a DropNull wrapper for the aggregations that rely on dropping nulls.
[BEAM-12647] Branch aggregation functions to use Combine.GroupedValues or Combine.Globally when using or not GROUP BY.
[BEAM-12099] Fix return value of BitOr aggregation when input is an empty array.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
ValidatesRunner
compliance status (on master branch)Examples testing status on various runners
Post-Commit SDK/Transform Integration Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.