Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SEDONA-142] Add ST_Collect to Flink Catalog #662

Closed

Conversation

Elephantusparvus
Copy link
Contributor

@Elephantusparvus Elephantusparvus commented Aug 15, 2022

Did you read the Contributor Guide?

Is this PR related to a JIRA ticket?

I actually created to JIRA tickets by accident (SEDONA-141). One could/should be deleted. Sorry for that.

What changes were proposed in this PR?

This is the first attempt to add ST_Collect aggregation to the flink api.

How was this patch tested?

In progress
For myself I only tested aggregations with group by.
I guess it would be necessary to test window/over aggregations as well. Does anyone have specific data/queries for these in mind?

Did this PR include necessary documentation updates?

Currently not.

In advance this pull request is mainly done to discuss if this is a feasible approach to implement ST_Collect.
The PR would have to be cleaned up (split up), tests would need to be added and it would need to be documented.

I basically took the Internal Collect Aggregator from flink-table and adjusted it's types, letting it return a GeometryCollection.
The getTypeInference needs to be overriden otherwise reflection fails for the mapview / selecting the right serializer fails.
One Problem is/was that this function is not instantiated from a BuiltInFunctionDefinition/,
so it is not possible to implement it as BuiltInAggregateFunction and let it pass a SpecializedFunction.SpecializedContext to its constructor. And flinks collect function is constructed through its AggFunctionFactory which also passes the argType.
So I needed to pass the Datatype for the geometry manually when constructing and adding it to the catalog, therefore needing a util function in the registrator.

If needed I add separate prs to ST_MakeValid which is an almost 1-t port/copy from the spark version, and the ST_Intersection, ST_Difference which just wrap the jts locationtech functions.

Comment on lines 35 to 37
new Functions.ST_MakeValid(),
new Functions.ST_Intersection(),
new Functions.ST_Difference(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you just make a separate PR for these and add them to the new common module created in #647? Even though they're pretty simple it will help not to maintain both definitions in flink and spark

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, just saw that #647 got merged earlier this morning, so i have to merge it in my branch anyways. I'll clean it up and make a seperate PR for them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a rebased version with #647 only containing the ST_Collect function.
Apparently #647 broke the build process for me, see: #647 (comment). I did NOT include the fix in my branch/this commit.

@jiayuasu
Copy link
Member

Thanks for your contribution. I have a much neat implementation of spatial aggregation in Flink using the UDAF api. It has been there for a while and I just forgot to create a PR.

I will create a new PR in one or two days. Then you can follow my implementation to tweak your ST_Collect a bit.

@Elephantusparvus
Copy link
Contributor Author

Elephantusparvus commented Aug 17, 2022

Thanks for your contribution. I have a much neat implementation of spatial aggregation in Flink using the UDAF api. It has been there for a while and I just forgot to create a PR.

I will create a new PR in one or two days. Then you can follow my implementation to tweak your ST_Collect a bit.

Sounds good. As stated earlier this really just is a copy/minimal port of flinks built in CollectAggFunction with a different output type (GeometryCollection instead of a MultiSet ) using the AggregateFunction Interface instead of BuiltInAggregateFunction.

@jiayuasu
Copy link
Member

Please see PR #672 for the updates

@Elephantusparvus
Copy link
Contributor Author

Hi, sorry for the late response.
I saw that this was marked as part of the milestone.
Unfortunately time is limited for me right now and I am not able to work on this for the next 1-2 weeks.
If there is a scheduled timeline for the release of 1.3, I guess this PR should not hold it back.
If in the meantime someone else wants to improve this PR feel free to do so, otherwise I hopefully will be able to do this roughly until the beginning of October.

/**
* This is needed because for Resolving the RAW Geometry DataType in ST_Collect (MapView in Accumulator)
* the serializer needs to be known. To get the registered Serializer the ExecutionEnvironment is needed and
* to add the UDF the TableEnvironment ist needed.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* to add the UDF the TableEnvironment ist needed.
* to add the UDF the TableEnvironment is needed.

@jiayuasu jiayuasu closed this Jul 24, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants