-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-2287] UDAF support #3447
[BEAM-2287] UDAF support #3447
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One question: Can we do not expose BeamSqlRow
to UDF writer, it might look strange to them.
* Build-in aggregation for SUM. | ||
*/ | ||
public static class Sum<T> extends BeamSqlUdaf<T, T> { | ||
private static List<Integer> supportedType = Arrays.asList(Types.INTEGER, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about Decimal
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Decimal is supported, will update.
} | ||
|
||
/** | ||
* Build-in aggregation for MIN. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Built-in
Other suggestion? |
No need to write customized |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
1. support DECIMAL in built-in aggregators; 2. add JavaDoc for BeamSqlUdaf;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! This will be nice to have. Some initial comments.
* /Float/Date/BigDecimal, mapping as SQL type INTEGER/BIGINT/SMALLINT/TINYINE/DOUBLE/FLOAT | ||
* /TIMESTAMP/DECIMAL;<br> | ||
* 3. wrap intermediate data in a {@link BeamSqlRow}, and do not rely on elements in class;<br> | ||
* 4. The intermediate value of UDAF function is stored in a {@code BeamSqlRow} object.<br> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is constraint #4 necessary? Are SQL operations actually applied somewhere to the intermediate result? It looks like BeamSqlRow is just being used for the convenience of its built-in casting methods, which seems unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, BeamSqlRow
is introduced here to hold variable data types, the intermediate result is only used by CombineFn. I'm thinking of a generic type here as well, and ask developer to provide a coder if customized object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like that. It sounds cleaner.
|
||
//The test case is disabled temporally as BeamSql doesn't have methods to regester UDF/UDAF, | ||
//pending on task BEAM-2520 | ||
// BeamSqlEnv.registerUdaf("squaresum", SquareSum.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uncomment and remove note?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe let's remove this test case, and move to BEAM-2520
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fine by me
*/ | ||
public static class SquareSum extends BeamSqlUdaf<Integer, Integer> { | ||
private int outputFieldType; | ||
private BeamSqlRecordType accType; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final on both?
* Built-in aggregation for COUNT. | ||
*/ | ||
public static class Count<T> extends BeamSqlUdaf<T, Long> { | ||
private BeamSqlRecordType accType; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Y
throw new IllegalStateException(e); | ||
} | ||
} else { | ||
throw new UnsupportedOperationException(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Provide more details here? This will be a pretty opaque error when it gets hit otherwise.
case "SUM": | ||
//for both AVG/SUM, a summary value is hold at first. | ||
switch (ex.getOutputType()) { | ||
switch (call.type.getSqlTypeName()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please pull each of these switch statements on getSqlTypeName into a factory method in the corresponding aggregation (e.g., BeamBuiltinAggregations.Max.create(SqlTypeName typeName))? That will help manage some of the size of this method, and also put the per-type construction logic somewhere reusable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
return deltaAcc; | ||
} | ||
@Override | ||
public List<BeamSqlRow> mergeAccumulators(Iterable<List<BeamSqlRow>> accumulators) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add tests to exercising merging. It looks like none of the merge methods are getting tested; many of them loop forever.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems merge is not covered in unit test with TestPipeline
, any suggestion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it just a matter of exercising a UDAF query with session windowing?
List<BeamSqlRow> deltaAcc = new ArrayList<>(); | ||
for (int idx = 0; idx < aggregators.size(); ++idx) { | ||
List<BeamSqlRow> accs = new ArrayList<>(); | ||
while (accumulators.iterator().hasNext()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use a for loop.
Also note that Iterable.iterator() always returns a new iterator, so you need to store it in a variable before calling hasNext() and next() unless you're only trying to get the very first element. There are a bunch of instances of this across this PR that should be fixed.
/** | ||
* GROUP-BY with UDAF. | ||
*/ | ||
@Ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rm?
@takidau I change the definition of @xumingming do you want to have a peak again as it's a little big change? |
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Much nicer, thank you! A few more minor comments, then I think this is probably ready to go.
public Max(int outputFieldType) { | ||
this.accType = BeamSqlRecordType.create(Arrays.asList("__max"), | ||
Arrays.asList(outputFieldType)); | ||
private SqlTypeName fieldType; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
public Min(int outputFieldType) { | ||
this.accType = BeamSqlRecordType.create(Arrays.asList("__min"), | ||
Arrays.asList(outputFieldType)); | ||
private SqlTypeName fieldType; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
} | ||
|
||
@Override | ||
public T result(BeamSqlRow accumulator) { | ||
return (T) accumulator.getFieldValue(0); | ||
public Coder<T> getAccumulatorCoder(CoderRegistry registry) throws CannotProvideCoderException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull this into a shared getNumericSqlTypeCoder method that MAX and MIN can both use?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you give more hints, do you mean a helper function for subclass? Don't think it can be called in BeamSqlUdaf.getAccumulatorCoder()
as no guarantee that AccuT
is one of Beam SQL type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was just thinking a static method in BeamBuiltinAggregations. It could take a registry parameter and a String describing the command name for the UnsupportedOperationException. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, that can avoid duplicate code in MAX and MIN. Meanwhile, I could add a default coder for Short/Float/Date/BigDecimal in BeamSqlUdaf.getAccumulatorCoder
, so that developers donot need to care about coder if is uses Beam SQL field types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Helper method looks good, thanks! Please see my other concern about the auto-registration stuff, though.
registry.registerCoderForClass(Short.class, SerializableCoder.of(Short.class)); | ||
registry.registerCoderForClass(Float.class, SerializableCoder.of(Float.class)); | ||
registry.registerCoderForClass(BigDecimal.class, BigDecimalCoder.of()); | ||
registry.registerCoderForClass(Date.class, SerializableCoder.of(Date.class)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding coders to the registry automatically in a get method feels wrong. The user won't be expecting that, and if for some reason they've registered their own custom coders for those types, they won't be able to use SQL accumulators with them. I'd rather see us look into adding these to the common registry somehow as a separate change, possibly as part of creating a BeamSqlEnv or something similar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds reasonable, let me remove these lines and leave a a further task.
} | ||
|
||
@Override | ||
public T result(BeamSqlRow accumulator) { | ||
return (T) accumulator.getFieldValue(0); | ||
public Coder<T> getAccumulatorCoder(CoderRegistry registry) throws CannotProvideCoderException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Helper method looks good, thanks! Please see my other concern about the auto-registration stuff, though.
Am I right that we still have no test coverage for the merge methods in the accumulators? If so, what's the plan for getting test coverage there? Other than that, this PR LGTM. |
I guess it's only called with a distributed environment, and TestPipeline is not. I could add some tests directly for the built-in aggregators.
…Sent from my iPhone
On Jun 30, 2017, at 2:01 PM, Tyler Akidau ***@***.***> wrote:
Am I right that we still have no test coverage for the merge methods in the accumulators? If so, what's the plan for getting test coverage there?
Other than that, this PR LGTM.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub, or mute the thread.
|
That's okay, let's call this good enough for now. LGTM, I'll go ahead and merge. However, can you please:
Thank you! |
Merged. |
Thanks @takidau @xumingming ! |
R: @xumingming @takidau
add an abstract class
BeamSqlUdaf
following the UDAF definition in Calcite, also COUNT/SUM/AVG/MAX/MIN/ are rewritten with this new format.Note that the unit test is ignored after rebase BEAM-2446. Will re-open it in BEAM-2520.