Skip to content

[HUDI-9520] Support new sink based on Flink sink V2 API #13423

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 17, 2025

Conversation

cshuo
Copy link
Contributor

@cshuo cshuo commented Jun 12, 2025

Change Logs

  • Sink V2 is supported by putting the write pipeline into the PreWriteTopology of sink v2.
  • Sink V2 is only supported for hudi-flink 1.19+.
  • Sink V2 is added for datastream pipeline only, which is used to facilitate the expansion of the ​Hudi ecosystem, e.g., Flink CDC 3.0 pipeline connector.

Impact

Provide new new sink with Flink Sink V2 API

Risk level (write none, low medium or high below)

medium

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@cshuo cshuo marked this pull request as draft June 12, 2025 11:41
@github-actions github-actions bot added the size:L PR with lines of changes in (300, 1000] label Jun 12, 2025
@cshuo cshuo changed the title [HUDI-9520] Support new sink based on Flink V2 sink API [HUDI-9520] Support new sink based on Flink sink V2 API Jun 12, 2025
@cshuo cshuo marked this pull request as ready for review June 16, 2025 02:05
@@ -40,7 +42,7 @@
* at a time, a new task can not be scheduled until the last task finished(fails or normally succeed).
* The cleaning task never expects to throw but only log.
*/
public class CleanFunction<T> extends AbstractRichFunction
public class CleanFunction<T> extends ProcessFunction<T, RowData>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just copy the whole class instead. CleanFunction -> CleanFunctionV2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@@ -123,6 +127,14 @@ public void invoke(ClusteringCommitEvent event, Context context) throws Exceptio
commitIfNecessary(instant, commitBuffer.get(instant).values());
}

@Override
public void processElement(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's weird we have both #invoke and #processElement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I'll make a copy then.

/**
* Get parallelism for Sink V2 writer to make sure the dummy writer operator can be chained with upstream operators.
*/
private static int getParallelismForSinkV2(Configuration conf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an elegant way to infer the sink parallelism as the upstream operators?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • getParallelismForSinkV2 is getting parallelism for HoodieSink#SinkWriter
  • the value is parallelism of the final operator of write pipeline which is build in HoodieSink#addPreWriteTopology.

The write pipeline cannot be pre-built outside HoodieSink, so it seems we cannot fetch the parallelism from operator/transformation directly.


private static final String SINK_V2_NAME = "sink_v2";

public static DataStreamSink<RowData> sink(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add doc for it.

.name(SINK_V2_NAME);
}

public static DataStream<RowData> composePipeline(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add doc for it.

@github-actions github-actions bot added size:XL PR with lines of changes > 1000 and removed size:L PR with lines of changes in (300, 1000] labels Jun 17, 2025
/**
* Utilities to generate pipelines for hudi sink V2.
*/
public class PipelinesV2 {
Copy link
Contributor

@danny0405 danny0405 Jun 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

either we move all the classes into v2 package or put each of the class into it's original package.

  • .../v2/CleanFunctionV2
  • .../v2/utils/PipelinesV2
  • .../v2/compact/CompactionCommitSinkV2
  • .../v2/clustering/ClusteringCommitSinkV2

@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Copy link
Contributor

@danny0405 danny0405 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@danny0405 danny0405 merged commit cb566b9 into apache:master Jun 17, 2025
60 of 62 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:XL PR with lines of changes > 1000
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants