Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35415][base] Fix compatibility with Flink 1.19 #3348

Merged
merged 5 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.cdc.composer.definition.SinkDef;
import org.apache.flink.cdc.runtime.operators.sink.DataSinkFunctionOperator;
import org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperatorFactory;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
Expand All @@ -39,9 +40,11 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;

import java.lang.reflect.InvocationTargetException;

/** Translator used to build {@link DataSink} for given {@link DataStream}. */
@Internal
Expand Down Expand Up @@ -117,10 +120,8 @@ private <CommT> void addCommittingTopology(
DataStream<Event> inputStream,
String sinkName,
OperatorID schemaOperatorID) {
TwoPhaseCommittingSink<Event, CommT> committingSink =
(TwoPhaseCommittingSink<Event, CommT>) sink;
TypeInformation<CommittableMessage<CommT>> typeInformation =
CommittableMessageTypeInfo.of(committingSink::getCommittableSerializer);
CommittableMessageTypeInfo.of(() -> getCommittableSerializer(sink));
DataStream<CommittableMessage<CommT>> written =
inputStream.transform(
SINK_WRITER_PREFIX + sinkName,
Expand All @@ -140,8 +141,7 @@ private <CommT> void addCommittingTopology(
preCommitted.transform(
SINK_COMMITTER_PREFIX + sinkName,
typeInformation,
new CommitterOperatorFactory<>(
committingSink, isBatchMode, isCheckpointingEnabled));
getCommitterOperatorFactory(sink, isBatchMode, isCheckpointingEnabled));

if (sink instanceof WithPostCommitTopology) {
((WithPostCommitTopology<Event, CommT>) sink).addPostCommitTopology(committed);
Expand All @@ -152,4 +152,37 @@ private String generateSinkName(SinkDef sinkDef) {
return sinkDef.getName()
.orElse(String.format("Flink CDC Event Sink: %s", sinkDef.getType()));
}

private static <CommT> SimpleVersionedSerializer<CommT> getCommittableSerializer(Object sink) {
// FIX ME: TwoPhaseCommittingSink has been deprecated, and its signature has changed
// during Flink 1.18 to 1.19. Remove this when Flink 1.18 is no longer supported.
try {
return (SimpleVersionedSerializer<CommT>)
sink.getClass().getDeclaredMethod("getCommittableSerializer").invoke(sink);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering that we have had similar operations before in DataSinkWriterOperator, this change is acceptable for me.

Copy link
Contributor Author

@yuxiqian yuxiqian May 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but it's definitely not an encouraged way to play with Flink :( Maybe we can decouple Flink base with specific connectors and distribute CDC with various Flink versions respectively.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, distribute CDC with various Flink versions in the future is recommended just like other flink external connectors

} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException("Failed to get CommittableSerializer", e);
}
}

private static <CommT>
OneInputStreamOperatorFactory<CommittableMessage<CommT>, CommittableMessage<CommT>>
getCommitterOperatorFactory(
Sink<Event> sink, boolean isBatchMode, boolean isCheckpointingEnabled) {
// FIX ME: OneInputStreamOperatorFactory is an @Internal class, and its signature has
// changed during Flink 1.18 to 1.19. Remove this when Flink 1.18 is no longer supported.
try {
return (OneInputStreamOperatorFactory<
CommittableMessage<CommT>, CommittableMessage<CommT>>)
Class.forName(
"org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory")
.getDeclaredConstructors()[0]
.newInstance(sink, isBatchMode, isCheckpointingEnabled);

} catch (ClassNotFoundException
| InstantiationException
| IllegalAccessException
| InvocationTargetException e) {
throw new RuntimeException("Failed to create CommitterOperatorFactory", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,49 @@ limitations under the License.
<artifactId>flink-doris-connector-${flink.major.version}</artifactId>
<version>1.6.0</version>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>jdbc</artifactId>
<version>1.18.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.26</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Loading
Loading