Skip to content

[FLINK-39410] The flink-cdc-pipeline-connector-elasticsearch module offers better compatibility with Flink 1.20 and Flink 2.x#4368

Open
sd4324530 wants to merge 6 commits into
apache:masterfrom
sd4324530:flink2-es
Open

[FLINK-39410] The flink-cdc-pipeline-connector-elasticsearch module offers better compatibility with Flink 1.20 and Flink 2.x#4368
sd4324530 wants to merge 6 commits into
apache:masterfrom
sd4324530:flink2-es

Conversation

@sd4324530
Copy link
Copy Markdown
Contributor

@sd4324530 sd4324530 commented Apr 8, 2026

Currently, the flink-sql-connector-elasticsearchx component, which the flink-cdc-pipeline-connector-elasticsearch module depends on, is still compatible with Flink 1.17. This is unnecessary; compatibility with Flink 1.20 and Flink 2.2+ is sufficient.

4 new adapters:
AsyncSinkBaseAdapter
AsyncSinkWriterAdapter
StatefulSinkWriterAdapter
WriterInitContextAdapter

3 new e2e tests:
MysqlToElasticsearch6E2eITCase
MysqlToElasticsearch7E2eITCase
MysqlToElasticsearch8E2eITCase

@sd4324530
Copy link
Copy Markdown
Contributor Author

The CI error doesn't seem to be related to my PR, I ran the error use case locally and it works fine.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates the Elasticsearch pipeline connector to target newer Flink versions (Flink 1.20 by default, and Flink 2.x via profile) by introducing a small compatibility layer around Flink’s async sink APIs and aligning the connector implementation with the newer flink-sql-connector-elasticsearchx artifacts.

Changes:

  • Add Flink 1.20 and Flink 2.2 compatibility adapters for async sink base/writer and init-context bridging.
  • Update the Elasticsearch 8 async sink/writer implementation to use the new adapter types and newer async sink callback API (ResultHandler).
  • Bump flink-sql-connector-elasticsearch6/7 dependency version for Flink 1.20 and add a flink2 Maven profile to switch to the Flink 2.x connector artifact line.

Reviewed changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
flink-cdc-flink2-compat/.../AsyncSinkWriterAdapter.java Adds Flink 2.2 adapter to bridge Sink.InitContext to WriterInitContext for AsyncSinkWriter.
flink-cdc-flink2-compat/.../AsyncSinkBaseAdapter.java Adds Flink 2.2 adapter surface for restoring writers via restoreWriterAdapter.
flink-cdc-flink2-compat/.../WriterInitContextAdapter.java Implements a WriterInitContext wrapper around Sink.InitContext.
flink-cdc-flink2-compat/.../StatefulSinkWriterAdapter.java Adds a compat interface to represent a stateful writer in Flink 2.2.
flink-cdc-flink1-compat/.../AsyncSinkWriterAdapter.java Adds Flink 1.20 adapter wrapper for AsyncSinkWriter.
flink-cdc-flink1-compat/.../AsyncSinkBaseAdapter.java Adds Flink 1.20 adapter surface for writer restoration (currently has a compile issue).
flink-cdc-flink1-compat/.../WriterInitContextAdapter.java Implements WriterInitContext wrapper around Sink.InitContext for Flink 1.20.
flink-cdc-flink1-compat/.../StatefulSinkWriterAdapter.java Adds a Flink 1.20 compat interface for a stateful writer type.
.../Elasticsearch8AsyncWriter.java Migrates writer to adapter base + new ResultHandler callback style.
.../Elasticsearch8AsyncSink.java Migrates sink to adapter base and updates writer creation/restore paths for compatibility.
.../ElasticsearchEventSerializer.java Removes now-unneeded open(Sink.InitContext) override.
.../pom.xml Bumps Flink ES SQL connector version and adds a flink2 profile to switch artifact line.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +56 to +59
public StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> restoreWriter(
WriterInitContext context,
Collection<BufferedRequestState<RequestEntryT>> recoveredState)
throws IOException {
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StatefulSinkWriter is referenced in the restoreWriter(...) override but isn’t imported or qualified anywhere in this file. Given this module targets Flink 1.20 where the writer type may be StatefulSink.StatefulSinkWriter, this currently won’t compile. Fix by using the correct fully-qualified type (or importing the proper Flink 1.20 type) so the override matches AsyncSinkBase#restoreWriter’s signature.

Suggested change
public StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> restoreWriter(
WriterInitContext context,
Collection<BufferedRequestState<RequestEntryT>> recoveredState)
throws IOException {
public org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter<
InputT, BufferedRequestState<RequestEntryT>>
restoreWriter(
WriterInitContext context,
Collection<BufferedRequestState<RequestEntryT>> recoveredState)
throws IOException {

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class will only compile with flink1.x, and the method signatures match.

@@ -166,13 +167,13 @@ private void handleFailedRequest(
numRecordsOutErrorsCounter.inc(requestEntries.size());

if (isRetryable(error.getCause())) {
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handleFailedRequest(...) calls isRetryable(error.getCause()). If error.getCause() is null (which is possible for many exception types), this will lead to a NullPointerException inside isRetryable/FatalExceptionClassifier, masking the real failure. Pass a non-null throwable (e.g., error.getCause() != null ? error.getCause() : error) or update isRetryable to handle null safely.

Suggested change
if (isRetryable(error.getCause())) {
Throwable retryableError = error.getCause() != null ? error.getCause() : error;
if (isRetryable(retryableError)) {

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 81 to +88
/**
* Creates a new {@link StatefulSinkWriter} for writing elements to Elasticsearch.
*
* @param context the initialization context.
* @return a new instance of {@link Elasticsearch8AsyncWriter}.
*/
@Override
public StatefulSinkWriter<InputT, BufferedRequestState<Operation>> createWriter(
InitContext context) {
public SinkWriter<InputT> createWriter(InitContext context) {
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Javadocs for createWriter/restoreWriterAdapter still refer to returning/creating a StatefulSinkWriter, but the method now returns SinkWriter<InputT> and the restore method returns StatefulSinkWriterAdapter. Please update the links/wording to match the new signatures to avoid misleading API documentation.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@sd4324530
Copy link
Copy Markdown
Contributor Author

The CI error doesn't seem to be related to my PR also.

@sd4324530
Copy link
Copy Markdown
Contributor Author

Error:  Errors: 
Error:    OceanBaseE2eITCase » ContainerLaunch Container startup failed for image oceanb...

@lvyanquan
Copy link
Copy Markdown
Contributor

lvyanquan commented May 8, 2026

Overall, +1 for the change.

It would be better if you could add an e2e test for this.

Copy link
Copy Markdown
Contributor

@ChengbingLiu ChengbingLiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @sd4324530 for submitting this PR. I have verified it in our test env, it works well. I left a comment PTAL.

Throwable retryableError = error.getCause() != null ? error.getCause() : error;
if (isRetryable(retryableError)) {
resultHandler.retryForEntries(requestEntries);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ResultHandler API has a completeExceptionally(Exception) method to get it fail-fast. I think we should better handle the case where the error is not retriable:

        if (isRetryable(retryableError)) {
            resultHandler.retryForEntries(requestEntries);
        } else {
            resultHandler.completeExceptionally(
                    retryableError instanceof Exception
                            ? (Exception) retryableError
                            : new FlinkRuntimeException(retryableError));
        }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, done

sd4324530 added 6 commits May 8, 2026 21:12
… compatibility with Flink 1.20 and Flink 2.x.

Signed-off-by: Pei Yu <125331682@qq.com>
Signed-off-by: Pei Yu <125331682@qq.com>
Signed-off-by: Pei Yu <125331682@qq.com>
Signed-off-by: Pei Yu <125331682@qq.com>
Signed-off-by: Pei Yu <125331682@qq.com>
@github-actions github-actions Bot added docs Improvements or additions to documentation e2e-tests labels May 9, 2026
@sd4324530
Copy link
Copy Markdown
Contributor Author

sd4324530 commented May 9, 2026

Overall, +1 for the change.

It would be better if you could add an e2e test for this.

@lvyanquan Hi, e2e test have been added.
When I ran the e2e test, I found that the default value of the config option record.size.max.bytes was incorrect.
The value of record.size.max.bytes cannot be greater than the value of batch.size.max.bytes.

see:
flink1.20: https://github.com/apache/flink/blob/release-1.20.3/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L363-L366

flink2.2: https://github.com/apache/flink/blob/release-2.2.0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L262-L265

And I've fixed it.

@lvyanquan
Copy link
Copy Markdown
Contributor

lvyanquan commented May 11, 2026

When I ran the e2e test, I found that the default value of the config option record.size.max.bytes was incorrect.
The value of record.size.max.bytes cannot be greater than the value of batch.size.max.bytes.

@sd4324530 Could you please explain the reason in more detail?

@sd4324530
Copy link
Copy Markdown
Contributor Author

sd4324530 commented May 11, 2026

When I ran the e2e test, I found that the default value of the config option record.size.max.bytes was incorrect.
The value of record.size.max.bytes cannot be greater than the value of batch.size.max.bytes.

@sd4324530 Could you please explain the reason in more detail?

@lvyanquan
In the current implementation of ElasticsearchDataSink, the EventSinkProvider for ES6 and 7 uses the ElasticsearchSink provided by flink-connector-elasticsearch, which we will call Solution A.[1]
For ES8, we use our own implementation of Elasticsearch8AsyncSink, which we will call Solution B.[2]

Differences in the implementation of SinkWriter:
Solution A uses the ElasticsearchWriter class provided by flink-connector-elasticsearch, which implements the SinkWriter interface.[3]
Solution B is our own implementation of Elasticsearch8AsyncWriter, which implements the base class AsyncSinkWriter, unlike Solution A.[4]

Config options batch.size.max.bytes and record.size.max.bytes:
Solution A: The implemented EventSinkProvider does not use these two configuration options.
Solution B: The implemented EventSinkProvider uses these two config options, and ultimately performs validity checks on these two options in the constructor of Elasticsearch8AsyncWriter (from the base class AsyncSinkWriter).[5][6]

Currently, batch.size.max.bytes and record.size.max.bytes only apply to ES8 and must comply with AsyncSinkWriter, which states that the maximum allowed size in bytes per flush must be greater than or equal to the maximum allowed size in bytes of a single record.

[1] https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java#L69-L72
[2] https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java#L73-L74
[3] https://github.com/apache/flink-connector-elasticsearch/blob/v3.1.0/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java#L55
[4] https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java#L56
[5] https://github.com/apache/flink/blob/release-1.20.3/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L363-L366
[6] https://github.com/apache/flink/blob/release-2.2.0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L262-L265

@sd4324530
Copy link
Copy Markdown
Contributor Author

sd4324530 commented May 11, 2026

@lvyanquan
I found that the ES8 implementation is already implemented in flink-connector-elasticsearch. Could we consider adopting this implementation approach instead of implementing it ourselves, in order to better align with the community? What do you think?
https://github.com/apache/flink-connector-elasticsearch/tree/v3.1.0/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

docs Improvements or additions to documentation e2e-tests elasticsearch-pipeline-connector

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants