Skip to content

Conversation

Jiabao-Sun
Copy link
Contributor

What is the purpose of the change

[FLINK-21949][table] Support ARRAY_AGG aggregate function

Some nosql databases like mongodb and elasticsearch support nested data types.
Aggregating multiple rows into ARRAY is a common requirement.

Brief change log

Introduce built in function ARRAY_AGG([ ALL | DISTINCT ] expression) to return an array that concatenates the input rows
and returns NULL if there are no input rows. NULL values will be ignored. Use DISTINCT for one unique instance of each value.

SELECT ARRAY_AGG(f1)
  FROM tmp
 GROUP BY f0

image

Note that we have made some simplifications based on Calcite's SqlLibraryOperators.ARRAY_AGG.

-- calcite
ARRAY_AGG([ ALL | DISTINCT ] value [ RESPECT NULLS | IGNORE NULLS ] [ ORDER BY orderItem [, orderItem ]* ] )
-- flink
ARRAY_AGG([ ALL | DISTINCT ] expression)

The differences from Calcite are as follows:

  1. Null values are ignored.
  2. The order by expression within the function is not supported because the complete row record cannot be accessed within the function implementation.
  3. The function returns null when there's no input rows, but calcite definition returns an empty array. The behavior was referenced from BigQuery and Postgres.

Verifying this change

ITCase and UnitCase are added.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)
  • The serializers: (no )
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (docs)

@flinkbot
Copy link
Collaborator

flinkbot commented Sep 13, 2023

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@Jiabao-Sun Jiabao-Sun force-pushed the array_agg_function branch 5 times, most recently from ffb85dd to 8cee2f8 Compare September 16, 2023 17:34
@Jiabao-Sun
Copy link
Contributor Author

Hi @wuchong, could you help review this when you have time?

Copy link
Contributor

@snuyanzin snuyanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the contribution and sorry for the delay

one thing that I noticed: during aggregation there is java's hashmap is used
unfortunately java comparison rules are not fully in sync with SQL comparison rules
for that reason we can not use hash mpas here for generic cases.

We need to use custom expression evaluator, example could be seen in ArrayDistinct implementation

@Jiabao-Sun
Copy link
Contributor Author

Thanks @snuyanzin for the review.
Sorry, I was a little puzzled because I did not use HashMap in Arrayaggfunction.
Could you help pinpoint exactly where it is?

@Jiabao-Sun
Copy link
Contributor Author

Do you mean using equalityEvaluator to compare when merging and retracting lists?

@snuyanzin
Copy link
Contributor

yes, sorry was not clear enough

to compare when merging and retracting lists?

yes

@Jiabao-Sun Jiabao-Sun force-pushed the array_agg_function branch 2 times, most recently from 72323f4 to 7fbdcf3 Compare December 18, 2023 08:24
@Jiabao-Sun
Copy link
Contributor Author

@flinkbot run azure

@Jiabao-Sun Jiabao-Sun force-pushed the array_agg_function branch 3 times, most recently from 7991087 to a860e51 Compare December 19, 2023 02:38
@Jiabao-Sun Jiabao-Sun requested a review from snuyanzin December 19, 2023 09:55
@Jiabao-Sun
Copy link
Contributor Author

Hi @snuyanzin, could you help take a look again when you have time?
Thanks.

@dawidwys dawidwys self-assigned this Jan 11, 2024
@dawidwys
Copy link
Contributor

#23411 (review)

It took me a while to understand that myself, but I think it's actually ok to depend on the Object#equals/hashcode in AggregateFunctions @snuyanzin what you said is correct for ArrayDistinctFunction, because:

  1. it is a scalar function, so it can be chained with other functions without an exchange before it
  2. It gets data from two input(s) (previous operator and e.g. a literal) haystack and needle may come from different operators which may produce data in different formats e.g. GenericRowData and BinaryRowData

In AggregateFunctions we will always have all records as BinaryRowData (and alike) and those equals/hashcode should work just fine. (It may be a different story when we support STRUCTURED_TYPE#equals/hashcode, but we will need to revisit most of the operators then, because all MapView(s) will work incorrectly). This is what we do in other aggregate functions already. Take a look at e.g. JsonArrayAggFunction or FirstValueWithRetractAggFunction

I haven't seen the previous version @Jiabao-Sun , but I believe we can remove the equalityHandler and make the ArrayAggFunction look similar to JsonArrayAggFunction.

private final List<RelDataType> paramTypes;

private BridgingSqlAggFunction(
FlinkContext context,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason for those changes? Is it only to be able to create the equalityHandler? If so, let's keep it simple and remove those.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'll revert that changes.
Thanks.

@snuyanzin
Copy link
Contributor

thanks a lot for the explanation @dawidwys you're right

Arrays.asList(
Row.of("A", new Integer[] {1, 2}),
Row.of("B", new Integer[] {2, 2, 3}),
Row.of("C", new Integer[] {3}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why here is just

Row.of("C", new Integer[] {3}),

based on input I would expect

Row.of("C", new Integer[] {3, null}),

or did I miss anything?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also similar query for postgres

with input(a, c) as (
select 'a', 1
union all 
select 'a', 2
union all 
select 'c', 3
union all 
select 'c', null
)
select a, array_agg(distinct c) from input group by a

gives

a|array_agg|
-+---------+
a|{1,2}    |
c|{3,NULL} |

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-- calcite
ARRAY_AGG([ ALL | DISTINCT ] value [ RESPECT NULLS | IGNORE NULLS ] [ ORDER BY orderItem [, orderItem ]* ] )
-- flink
ARRAY_AGG([ ALL | DISTINCT ] expression)

This function simplifies some aspects compared to calcite.

  1. Currently, the Flink parser does not support parsing RESPECT NULLS | IGNORE NULLS, but it is still possible to make changes to support it.
  2. AggregateUtil#extractDistinctInformation 848~858 ignores the ignoreNulls fields.
    AggregateCall.create(
    call.getAggregation,
    false,
    false,
    false,
    call.getArgList,
    -1, // remove filterArg
    null,
    RelCollations.EMPTY,
    call.getType,
    call.getName)
  3. ListView does not support null values.
    * <p>Note: Elements of a {@link ListView} must not be null. For heap-based state backends, {@code
    * hashCode/equals} of the original (i.e. external) class are used. However, the serialization
    * format will use internal data structures.

For that reasons I made some simplification. If we need it, I can make some attempts.

Copy link
Contributor

@snuyanzin snuyanzin Jan 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's ok to support it only partially however the main concern here:
I would expect more consistent result
since there is also input Row.ofKind(INSERT, "D", null), and expected value Row.of("D", null),
So I would expect for ths either containing nulls or both not containing null however not mixed

UPD: to be more clear:
after playing with Postgres and BigQuery I noticed that both respect nulls by default and for BigQuery to make it ignoring nulls it should be specified explicitely. Is there a reason why for Flink it is done differently? Probably it's better to have similar behaviour

I don't tell that we need to support RESPECT NULLS | IGNORE NULLS syntax, however need to make RESPECT NULLS default behaviour to be on same page with vendors

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @snuyanzin, I'm trying to support RESPECT NULLS | IGNORE NULLS syntax.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@snuyanzin
ListView<T> cannot hold null values, and since it is a generic List, it cannot be represented by a specific object for null values. Is there any good solution for this?

Copy link
Contributor

@dawidwys dawidwys Jan 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a very good point. It would be nice to be compatible with SQL and other vendors.

One idea is we could keep the null indices and set those while retrieving the result. Happy to hear better solutions though.

Another is to wrap all values in a GenericRowData, but we'd need to iterate over the data to unwrap it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I was also thinking about storing null indexes

Copy link
Contributor Author

@Jiabao-Sun Jiabao-Sun Jan 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @dawidwys, @snuyanzin,

I referenced LagAggFunction to use LinkedList to hold null values and it works. Now RESPECT NULLS and IGNORE NULLS are supported.

public static class LagAcc<T> {
public int offset = 1;
public T defaultValue = null;
public LinkedList<T> buffer = new LinkedList<>();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The downside of the solution is that the array must fit into memory at all times. The difference with the LagAggFunction is that LAG keeps at most n elements where n is controlled by the user.

Still I am reasonably good with the LinkedList approach because it anyhow needs to fit into memory when we emit it at the end as a single record. Writing this down for awareness.

Arrays.asList(
Row.of("A", new Integer[] {1, 2}),
Row.of("B", new Integer[] {2, 3}),
Row.of("C", new Integer[] {3}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here
based on input I would expect

Row.of("C", new Integer[] {3, null}),

@Jiabao-Sun
Copy link
Contributor Author

Thanks @snuyanzin @dawidwys for the review.
Could you help review it again?

TestSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_AGG)
.withDescription("ARRAY changelog stream aggregation")
.withSource(
ROW(STRING(), INT()),
Copy link
Contributor

@snuyanzin snuyanzin Jan 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more thing
currently it checks only ability to work with INT input for ARRAY_AGG
it would be great to have tests for other types
especially ROW, ARRAY, MAP where expected output should be ARRAY<ROW>, ARRAY<ARRAY>, ARRAY<MAP>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Full types test is in ArrayAggFunctionTest.

@snuyanzin
Copy link
Contributor

Thanks for addressing comments
in general it looks ok from my side
i guess there is one little thing: since it is based on Calcite parser it allows to have ORDER BY inside...
At the same time it is currently not supported on Flink level, not sure whether we can redefine this behavior however at least it would make sense to mention it in doc that it is not supported

@Jiabao-Sun
Copy link
Contributor Author

Thanks for addressing comments in general it looks ok from my side i guess there is one little thing: since it is based on Calcite parser it allows to have ORDER BY inside... At the same time it is currently not supported on Flink level, not sure whether we can redefine this behavior however at least it would make sense to mention it in doc that it is not supported

Yes, ORDER BY allows sorting of any field in the input rows, but currently it is difficult to obtain the complete input rows for sorting in the function implementation. Therefore, the ORDER BY clause is not supported yet.
I have added an explanation in the documentation.

@snuyanzin, please help take a look again when you have time.

@Jiabao-Sun
Copy link
Contributor Author

@flinkbot run azure

description: |
By default or with keyword `ALL` and, return an array that concatenates the input rows
and returns `NULL` if there are no input rows. Use `DISTINCT` for one unique instance of each value.
By default null values are respected, use `IGNORE NULLS` to skip null values.
Copy link
Contributor

@snuyanzin snuyanzin Jan 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
By default null values are respected, use `IGNORE NULLS` to skip null values.
By default `NULL` values are respected, use `IGNORE NULLS` to skip `NULL` values.

Copy link
Contributor

@snuyanzin snuyanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks ok from my side

@dawidwys do you have anything to add (asking since you're also participating here)

@Jiabao-Sun
Copy link
Contributor Author

@flinkbot run azure

@Jiabao-Sun Jiabao-Sun requested a review from dawidwys January 19, 2024 06:12
@Jiabao-Sun
Copy link
Contributor Author

Hi @dawidwys, please help review it again when you have time.
Thanks a lot.

@dawidwys
Copy link
Contributor

dawidwys commented Feb 2, 2024

Sorry, it takes such a long time from my side. I had a vacation in the meantime. I'll try to check it Monday. Nevertheless if you're comfortable with the PR @snuyanzin feel free to merge it without waiting for my review.

@snuyanzin
Copy link
Contributor

snuyanzin commented Feb 2, 2024

I think we could wait until Monday or even more since right now there is a feature freeze and need to wait for cutting release branch

Copy link
Contributor

@dawidwys dawidwys left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put some comments to the implementation


/**
* Use the definitions in Flink instead of {@link SqlLibraryOperators#ARRAY_AGG}, because we
* ignore nulls and returns nullable ARRAY type. Order by clause like <code>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the comment still correct after the last changes?

Arrays.asList(
Row.of("A", new Integer[] {1, 2}),
Row.of("B", new Integer[] {2, 2, 3}),
Row.of("C", new Integer[] {3}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The downside of the solution is that the array must fit into memory at all times. The difference with the LagAggFunction is that LAG keeps at most n elements where n is controlled by the user.

Still I am reasonably good with the LinkedList approach because it anyhow needs to fit into memory when we emit it at the end as a single record. Writing this down for awareness.

Comment on lines 151 to 141
List<T> retractBuffer = new ArrayList<>();
for (T element : acc.retractList) {
retractBuffer.add(element);
}
for (T element : otherAcc.retractList) {
retractBuffer.add(element);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need the retractBuffer? Can't we just iterate over both the retractList and create only the final newRetractBuffer? It seems we create quite some unnecessary objects and potentially list resizing here.


// update to acc
acc.list.clear();
acc.list.addAll(buffer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we populate acc.list in a single go? The current approach does make sense with a ListView but it does not with a LinkedList kept in memory.

I believe we don't need the intermediate buffer and newRetractBuffer

}

public void retract(ArrayAggAccumulator<T> acc, T value) throws Exception {
if (value != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about retracting nulls?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be super nice to have a test case for that

@dawidwys
Copy link
Contributor

dawidwys commented Feb 6, 2024

Thanks for the update @Jiabao-Sun The implementation looks good now. I want to go through the tests again, but I need a bit more time. I hope this is fine, cause anyway we need to wait for a branch cut.

Copy link
Contributor

@dawidwys dawidwys left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thank you for the contribution @Jiabao-Sun !

@dawidwys dawidwys merged commit 042a4d2 into apache:master Feb 7, 2024
pnowojski pushed a commit to pnowojski/flink that referenced this pull request Mar 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants