-
Notifications
You must be signed in to change notification settings - Fork 470
[Datastream] Introduce FlussSource #710
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@wuchong RowConverters are used for testing now and can be improved and exposed as helper functions to users in another PR. |
wuchong
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution @polyzos . I left some comments. Please also rebase your branch to the latest main branch, but NOT squash commits.
...link/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlussSourceBuilder.java
Outdated
Show resolved
Hide resolved
...link/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlussSourceBuilder.java
Outdated
Show resolved
Hide resolved
| return this; | ||
| } | ||
|
|
||
| public FlussSourceBuilder<IN> setProjectedFields(int[] projectedFields) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The indexes are not friendly to use for end-users. Let's support projection by field names setProjectedFields(String... fieldNames).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wuchong currently changing to fieldNames might break compatibility with the FlinkTableSource as it uses the same FlinkSource constructor, but it uses the indexes.. maybe leave it as is and introduce a separate ticket/PR to address this properly so its easier to track later?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean we only expose fieldNames projection in the FlussSourceBuilder, but still use field index projection in the underlying FlinkSource. We can do the mapping from fieldNames to field index in the build() method, as we have got the schema in it.
We can implement it in a follow-up PR.
...link/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlussSourceBuilder.java
Outdated
Show resolved
Hide resolved
...link/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlussSourceBuilder.java
Outdated
Show resolved
Hide resolved
...nk/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/FlussSourceLogITCase.java
Outdated
Show resolved
Hide resolved
...nk/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/FlussSourceLogITCase.java
Outdated
Show resolved
Hide resolved
...nk/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/FlussSourceLogITCase.java
Outdated
Show resolved
Hide resolved
...nk/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/FlussSourceLogITCase.java
Outdated
Show resolved
Hide resolved
...ink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/FlussSourcePKITCase.java
Outdated
Show resolved
Hide resolved
...fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/testutils/MockDataUtils.java
Outdated
Show resolved
Hide resolved
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/row/RowConverters.java
Outdated
Show resolved
Hide resolved
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/row/RowConverters.java
Outdated
Show resolved
Hide resolved
09ff0e0 to
a947de9
Compare
a947de9 to
d62abee
Compare
|
@wuchong thank you for your time and detailed feedback. PTAL as all comments should have been addressed.. let me know if I missed anything |
wuchong
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I only left some minor comments and pushed a commit to fix it.
Will merge it once CI is passed.
| offsetsInitializer, | ||
| scanPartitionDiscoveryIntervalMs, | ||
| streaming); | ||
| true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why hard-cord this to true?
| } | ||
|
|
||
| @VisibleForTesting | ||
| public OffsetsInitializer getOffsetsInitializer() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be package visible
| } | ||
|
|
||
| @VisibleForTesting | ||
| public long getScanPartitionDiscoveryIntervalMs() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
never used, can remove
| return this; | ||
| } | ||
|
|
||
| public FlussSourceBuilder<OUT> setIsStreaming(boolean isStreaming) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's remove it as we don't support it yet.
| public class FlussSource<OUT> extends FlinkSource<OUT> { | ||
| private static final long serialVersionUID = 1L; | ||
|
|
||
| public FlussSource( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change to package visibility to avoid users direct use this constructor. As this constructor is evolving frequently.
| return this; | ||
| } | ||
|
|
||
| public FlussSourceBuilder<IN> setProjectedFields(int[] projectedFields) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean we only expose fieldNames projection in the FlussSourceBuilder, but still use field index projection in the underlying FlinkSource. We can do the mapping from fieldNames to field index in the build() method, as we have got the schema in it.
We can implement it in a follow-up PR.
This PR introduces the FlussSource for the Datastream API