From e81a8a8fdfe09f61846d7ed23dbb959e1dd1bcc4 Mon Sep 17 00:00:00 2001 From: Rahul Teke Date: Fri, 31 Oct 2025 18:02:40 +0000 Subject: [PATCH 1/2] [FLINK-38606][Connectors/MongoDB] Support for Flink 2.1.0 API --- .../apache/flink/connector/mongodb/sink/MongoSink.java | 3 ++- .../flink/connector/mongodb/sink/writer/MongoWriter.java | 4 ++-- .../sink/writer/context/DefaultMongoSinkContext.java | 8 ++++---- .../mongodb/sink/writer/context/MongoSinkContext.java | 4 ++-- .../flink/connector/mongodb/source/MongoSource.java | 5 ----- .../mongodb/source/reader/MongoSourceReader.java | 6 +----- .../flink/connector/mongodb/sink/MongoSinkITCase.java | 8 +++++--- .../connector/mongodb/sink/writer/MongoWriterITCase.java | 3 ++- .../mongodb/table/MongoFilterPushDownVisitorTest.java | 2 +- pom.xml | 2 +- 10 files changed, 20 insertions(+), 25 deletions(-) diff --git a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/MongoSink.java b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/MongoSink.java index 33596c7b..be1dc62b 100644 --- a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/MongoSink.java +++ b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/MongoSink.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions; @@ -78,7 +79,7 @@ public static MongoSinkBuilder builder() { } @Override - public SinkWriter createWriter(InitContext context) { + public SinkWriter createWriter(WriterInitContext context) { return new MongoWriter<>( connectionOptions, writeOptions, diff --git a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java index 5319959d..cbd9a61e 100644 --- a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java +++ b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java @@ -22,8 +22,8 @@ import org.apache.flink.api.common.functions.util.ListCollector; import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.common.serialization.SerializationSchema; -import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions; import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions; import org.apache.flink.connector.mongodb.sink.writer.context.DefaultMongoSinkContext; @@ -89,7 +89,7 @@ public MongoWriter( MongoConnectionOptions connectionOptions, MongoWriteOptions writeOptions, boolean flushOnCheckpoint, - Sink.InitContext initContext, + WriterInitContext initContext, MongoSerializationSchema serializationSchema) { this.connectionOptions = checkNotNull(connectionOptions); this.writeOptions = checkNotNull(writeOptions); diff --git a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/DefaultMongoSinkContext.java b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/DefaultMongoSinkContext.java index f2958e18..d62094e2 100644 --- a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/DefaultMongoSinkContext.java +++ b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/DefaultMongoSinkContext.java @@ -18,23 +18,23 @@ package org.apache.flink.connector.mongodb.sink.writer.context; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions; /** Default {@link MongoSinkContext} implementation. */ @Internal public class DefaultMongoSinkContext implements MongoSinkContext { - private final Sink.InitContext initContext; + private final WriterInitContext initContext; private final MongoWriteOptions writeOptions; - public DefaultMongoSinkContext(Sink.InitContext initContext, MongoWriteOptions writeOptions) { + public DefaultMongoSinkContext(WriterInitContext initContext, MongoWriteOptions writeOptions) { this.initContext = initContext; this.writeOptions = writeOptions; } @Override - public Sink.InitContext getInitContext() { + public WriterInitContext getInitContext() { return initContext; } diff --git a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/MongoSinkContext.java b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/MongoSinkContext.java index 55e89d80..645c3daa 100644 --- a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/MongoSinkContext.java +++ b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/MongoSinkContext.java @@ -18,7 +18,7 @@ package org.apache.flink.connector.mongodb.sink.writer.context; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions; import org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema; @@ -27,7 +27,7 @@ public interface MongoSinkContext { /** Returns the current sink's init context. */ - Sink.InitContext getInitContext(); + WriterInitContext getInitContext(); /** Returns the current process time in flink. */ long processTime(); diff --git a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/MongoSource.java b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/MongoSource.java index bbec5625..ea57a838 100644 --- a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/MongoSource.java +++ b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/MongoSource.java @@ -26,9 +26,7 @@ import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; -import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions; import org.apache.flink.connector.mongodb.source.config.MongoReadOptions; import org.apache.flink.connector.mongodb.source.enumerator.MongoSourceEnumState; @@ -134,8 +132,6 @@ public Boundedness getBoundedness() { @Override public SourceReader createReader(SourceReaderContext readerContext) { - FutureCompletingBlockingQueue> elementsQueue = - new FutureCompletingBlockingQueue<>(); MongoSourceReaderContext mongoReaderContext = new MongoSourceReaderContext(readerContext, limit); @@ -150,7 +146,6 @@ public SourceReader createReader(SourceReaderContext read mongoReaderContext); return new MongoSourceReader<>( - elementsQueue, splitReaderSupplier, new MongoRecordEmitter<>(deserializationSchema), mongoReaderContext); diff --git a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/MongoSourceReader.java b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/MongoSourceReader.java index 5eb6669d..7182b19d 100644 --- a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/MongoSourceReader.java +++ b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/MongoSourceReader.java @@ -19,11 +19,9 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.connector.base.source.reader.RecordEmitter; -import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; -import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit; import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplitState; import org.apache.flink.connector.mongodb.source.split.MongoSourceSplit; @@ -49,13 +47,11 @@ public class MongoSourceReader private static final Logger LOG = LoggerFactory.getLogger(MongoSourceReader.class); public MongoSourceReader( - FutureCompletingBlockingQueue> elementQueue, Supplier> splitReaderSupplier, RecordEmitter recordEmitter, MongoSourceReaderContext readerContext) { super( - elementQueue, - new SingleThreadFetcherManager<>(elementQueue, splitReaderSupplier), + new SingleThreadFetcherManager<>(splitReaderSupplier, readerContext.getConfiguration()), recordEmitter, readerContext.getConfiguration(), readerContext); diff --git a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java index 867f1f05..326c99a6 100644 --- a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java +++ b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java @@ -18,8 +18,9 @@ package org.apache.flink.connector.mongodb.sink; import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestartStrategyOptions; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext; import org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema; @@ -101,9 +102,10 @@ void testWriteToMongoWithDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) throws Exception { final String collection = "test-sink-with-delivery-" + deliveryGuarantee; final MongoSink sink = createSink(collection, deliveryGuarantee); - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + Configuration config = new Configuration(); + config.set(RestartStrategyOptions.RESTART_STRATEGY, "disable"); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); env.enableCheckpointing(100L); - env.setRestartStrategy(RestartStrategies.noRestart()); env.fromSequence(1, 5).map(new TestMapFunction()).sinkTo(sink); env.execute(); diff --git a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java index daa5b8bf..2c5ba849 100644 --- a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java +++ b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java @@ -243,7 +243,8 @@ void testSinkContext() throws Exception { MongoSerializationSchema testSerializationSchema = (element, context) -> { - assertThat(context.getInitContext().getSubtaskId()).isEqualTo(0); + assertThat(context.getInitContext().getTaskInfo().getIndexOfThisSubtask()) + .isEqualTo(0); assertThat(context.getWriteOptions()).isEqualTo(expectOptions); assertThat(context.processTime()) .isEqualTo( diff --git a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoFilterPushDownVisitorTest.java b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoFilterPushDownVisitorTest.java index f92ce175..8d7e9f15 100644 --- a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoFilterPushDownVisitorTest.java +++ b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoFilterPushDownVisitorTest.java @@ -286,7 +286,7 @@ private List resolveSQLFilterToExpression( RexNodeExpression rexExp = (RexNodeExpression) tbImpl.getParser().parseSqlExpression(sqlExp, sourceType, null); - RexNode cnf = FlinkRexUtil.toCnf(rexBuilder, -1, rexExp.getRexNode()); + RexNode cnf = FlinkRexUtil.toCnf(rexBuilder, rexExp.getRexNode()); // converts the cnf condition to a list of AND conditions List conjunctions = RelOptUtil.conjunctions(cnf); diff --git a/pom.xml b/pom.xml index 7ceeae5d..9b147f1e 100644 --- a/pom.xml +++ b/pom.xml @@ -57,7 +57,7 @@ under the License. 7.0.12 ${mongodb4.version} - 1.20.0 + 2.1.0 2.12 2.12.7 5.8.1 From 0d3d8245430d2f4f2a7a8c535fea94372040a0b0 Mon Sep 17 00:00:00 2001 From: Rahul Teke Date: Thu, 13 Nov 2025 17:08:57 +0000 Subject: [PATCH 2/2] [FLINK-38606][Connectors/MongoDB] Fixing formatting issues with spotless:apply & change to only support jdk 17 & 18 only --- .github/workflows/push_pr.yml | 4 ++-- .../connector/mongodb/source/reader/MongoSourceReader.java | 3 ++- .../apache/flink/connector/mongodb/sink/MongoSinkITCase.java | 3 ++- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml index 37f8922d..8cfc6549 100644 --- a/.github/workflows/push_pr.yml +++ b/.github/workflows/push_pr.yml @@ -29,8 +29,8 @@ jobs: strategy: matrix: mongodb: [ mongodb4, mongodb5, mongodb6, mongodb7 ] - flink: [ 1.19-SNAPSHOT, 1.20-SNAPSHOT ] - jdk: [ '8, 11, 17, 21' ] + flink: [ 2.1-SNAPSHOT ] + jdk: [ '17, 21' ] uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils with: diff --git a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/MongoSourceReader.java b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/MongoSourceReader.java index 7182b19d..57c126d3 100644 --- a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/MongoSourceReader.java +++ b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/MongoSourceReader.java @@ -51,7 +51,8 @@ public MongoSourceReader( RecordEmitter recordEmitter, MongoSourceReaderContext readerContext) { super( - new SingleThreadFetcherManager<>(splitReaderSupplier, readerContext.getConfiguration()), + new SingleThreadFetcherManager<>( + splitReaderSupplier, readerContext.getConfiguration()), recordEmitter, readerContext.getConfiguration(), readerContext); diff --git a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java index 326c99a6..d7b4a07d 100644 --- a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java +++ b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java @@ -104,7 +104,8 @@ void testWriteToMongoWithDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) final MongoSink sink = createSink(collection, deliveryGuarantee); Configuration config = new Configuration(); config.set(RestartStrategyOptions.RESTART_STRATEGY, "disable"); - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(config); env.enableCheckpointing(100L); env.fromSequence(1, 5).map(new TestMapFunction()).sinkTo(sink);