-
Notifications
You must be signed in to change notification settings - Fork 470
[Connector] Pojo To InternalRow(Fluss) Utility #726
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
wuchong
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution @MehulBatra , I left some comments about the implementation, please let me know if you have any questions.
...k/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/PojoToRowDataConverter.java
Outdated
Show resolved
Hide resolved
...k/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/PojoToRowDataConverter.java
Outdated
Show resolved
Hide resolved
Thanks for the feedback. I will address these comments over the coming weekend and get back to you in case I am stuck! |
|
@wuchong Jark made changes as per the comments, please have a look and let me know if I underdid or overdid anything, I will accommodate that. |
2c520b6 to
c7aaf2b
Compare
wuchong
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The pull request looks good to me overall, but I have one key concern: it currently implements a conversion from POJO to Flink's RowData. In my opinion, what we actually need is a utility for converting POJOs to Fluss' InternalRow. This utility would be essential for writing a DataStream<POJO> to the Fluss Sink, as all data types must eventually be converted into Fluss' InternalRow—this is the data type expected by AppendWriter and UpsertWriter.
On the other hand, the conversion from POJO to Flink's RowData is already implemented by Flink itself. This is used in scenarios where a DataStream<T> is converted into a Flink Table (e.g., via org.apache.flink.table.api.bridge.java.StreamTableEnvironment#fromDataStream(org.apache.flink.streaming.api.datastream.DataStream<T>)).
To address this, I have updated the pull request to focus on converting POJOs to Fluss' InternalRow. Additionally, I have added a pre-check for the field types of the POJO to ensure compatibility and prevent potential issues.
|
Merging... |
Thanks @wuchong for identifying and fixing this critical issue! I initially misunderstood the relationship between Flink's RowData and Fluss' InternalRow. |
Purpose
Linked issue: #723 , #569
Brief change log:
Tests:
API and Format
No API or storage format changes.
Documentation
No documentation changes are required.