Skip to content

Commit

Permalink
[FLINK-35415][base] Fix compatibility with Flink 1.19
Browse files Browse the repository at this point in the history
  • Loading branch information
yuxiqian committed May 22, 2024
1 parent d386c7c commit c7e8b74
Show file tree
Hide file tree
Showing 21 changed files with 2,502 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.cdc.composer.definition.SinkDef;
import org.apache.flink.cdc.runtime.operators.sink.DataSinkFunctionOperator;
import org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperatorFactory;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
Expand All @@ -38,9 +39,11 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;

import java.lang.reflect.InvocationTargetException;

/** Translator used to build {@link DataSink} for given {@link DataStream}. */
@Internal
Expand Down Expand Up @@ -115,10 +118,8 @@ private <CommT> void addCommittingTopology(
DataStream<Event> inputStream,
String sinkName,
OperatorID schemaOperatorID) {
TwoPhaseCommittingSink<Event, CommT> committingSink =
(TwoPhaseCommittingSink<Event, CommT>) sink;
TypeInformation<CommittableMessage<CommT>> typeInformation =
CommittableMessageTypeInfo.of(committingSink::getCommittableSerializer);
CommittableMessageTypeInfo.of(() -> getCommittableSerializer(sink));
DataStream<CommittableMessage<CommT>> written =
inputStream.transform(
SINK_WRITER_PREFIX + sinkName,
Expand All @@ -138,8 +139,7 @@ private <CommT> void addCommittingTopology(
preCommitted.transform(
SINK_COMMITTER_PREFIX + sinkName,
typeInformation,
new CommitterOperatorFactory<>(
committingSink, isBatchMode, isCheckpointingEnabled));
getCommitterOperatorFactory(sink, isBatchMode, isCheckpointingEnabled));

if (sink instanceof WithPostCommitTopology) {
((WithPostCommitTopology<Event, CommT>) sink).addPostCommitTopology(committed);
Expand All @@ -150,4 +150,37 @@ private String generateSinkName(SinkDef sinkDef) {
return sinkDef.getName()
.orElse(String.format("Flink CDC Event Sink: %s", sinkDef.getType()));
}

private <CommT> SimpleVersionedSerializer<CommT> getCommittableSerializer(Object sink) {
// TwoPhaseCommittingSink has been deprecated, and its signature has changed
// during Flink 1.18 to 1.19. Remove this when Flink 1.18 is no longer supported.
try {
return (SimpleVersionedSerializer<CommT>)
sink.getClass().getDeclaredMethod("getCommittableSerializer").invoke(sink);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException("Failed to get CommittableSerializer", e);
}
}

private <CommT>
OneInputStreamOperatorFactory<CommittableMessage<CommT>, CommittableMessage<CommT>>
getCommitterOperatorFactory(
Sink<Event> sink, boolean isBatchMode, boolean isCheckpointingEnabled) {
// OneInputStreamOperatorFactory is an @Internal class, and its signature has changed
// during Flink 1.18 to 1.19. Remove this when Flink 1.18 is no longer supported.
try {
return (OneInputStreamOperatorFactory<
CommittableMessage<CommT>, CommittableMessage<CommT>>)
Class.forName(
"org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory")
.getDeclaredConstructors()[0]
.newInstance(sink, isBatchMode, isCheckpointingEnabled);

} catch (ClassNotFoundException
| InstantiationException
| IllegalAccessException
| InvocationTargetException e) {
throw new RuntimeException("Failed to create CommitterOperatorFactory", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,49 @@ limitations under the License.
<artifactId>flink-doris-connector-${flink.major.version}</artifactId>
<version>1.6.0</version>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>jdbc</artifactId>
<version>1.18.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.26</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Loading

0 comments on commit c7e8b74

Please sign in to comment.