-
Notifications
You must be signed in to change notification settings - Fork 2.4k
[HUDI-9520] Support new sink based on Flink sink V2 API #13423
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -40,7 +42,7 @@ | |||
* at a time, a new task can not be scheduled until the last task finished(fails or normally succeed). | |||
* The cleaning task never expects to throw but only log. | |||
*/ | |||
public class CleanFunction<T> extends AbstractRichFunction | |||
public class CleanFunction<T> extends ProcessFunction<T, RowData> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just copy the whole class instead. CleanFunction
-> CleanFunctionV2
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
@@ -123,6 +127,14 @@ public void invoke(ClusteringCommitEvent event, Context context) throws Exceptio | |||
commitIfNecessary(instant, commitBuffer.get(instant).values()); | |||
} | |||
|
|||
@Override | |||
public void processElement( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's weird we have both #invoke
and #processElement
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I'll make a copy then.
/** | ||
* Get parallelism for Sink V2 writer to make sure the dummy writer operator can be chained with upstream operators. | ||
*/ | ||
private static int getParallelismForSinkV2(Configuration conf) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there an elegant way to infer the sink parallelism as the upstream operators?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getParallelismForSinkV2
is getting parallelism forHoodieSink#SinkWriter
- the value is parallelism of the final operator of write pipeline which is build in
HoodieSink#addPreWriteTopology
.
The write pipeline cannot be pre-built outside HoodieSink
, so it seems we cannot fetch the parallelism from operator/transformation directly.
|
||
private static final String SINK_V2_NAME = "sink_v2"; | ||
|
||
public static DataStreamSink<RowData> sink( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add doc for it.
.name(SINK_V2_NAME); | ||
} | ||
|
||
public static DataStream<RowData> composePipeline( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add doc for it.
/** | ||
* Utilities to generate pipelines for hudi sink V2. | ||
*/ | ||
public class PipelinesV2 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
either we move all the classes into v2
package or put each of the class into it's original package.
.../v2/CleanFunctionV2
.../v2/utils/PipelinesV2
.../v2/compact/CompactionCommitSinkV2
.../v2/clustering/ClusteringCommitSinkV2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Change Logs
Impact
Provide new new sink with Flink Sink V2 API
Risk level (write none, low medium or high below)
medium
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist