From 159a536385c5c6f4f4708101f21f5ebe785a1ba5 Mon Sep 17 00:00:00 2001 From: Rahul Teke Date: Sun, 16 Nov 2025 15:29:37 +0000 Subject: [PATCH] [FLINK-37950][Connectors/MongoDB] Supporting ordered & bypassdocumentValidation behaviour for sink writer. --- .../mongodb/sink/MongoSinkBuilder.java | 22 +++++++ .../sink/config/MongoWriteOptions.java | 63 ++++++++++++++++++- .../mongodb/sink/writer/MongoWriter.java | 8 ++- .../mongodb/table/MongoConnectorOptions.java | 12 ++++ .../mongodb/sink/MongoSinkITCase.java | 40 +++++++++++- 5 files changed, 138 insertions(+), 7 deletions(-) diff --git a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/MongoSinkBuilder.java b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/MongoSinkBuilder.java index 49aa35bd..7199b2ab 100644 --- a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/MongoSinkBuilder.java +++ b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/MongoSinkBuilder.java @@ -126,6 +126,28 @@ public MongoSinkBuilder setDeliveryGuarantee(DeliveryGuarantee deliveryGuara return this; } + /** + * Set the ordered write {@link com.mongodb.client.model.BulkWriteOptions}. + * + * @param ordered describes the write behaviour + * @return this builder + */ + public MongoSinkBuilder setOrderedWrites(boolean ordered) { + writeOptionsBuilder.setOrderedWrites(ordered); + return this; + } + + /** + * Set the bypass document validation {@link com.mongodb.client.model.BulkWriteOptions}. + * + * @param bypassDocumentValidation describes document validation behaviour + * @return this builder + */ + public MongoSinkBuilder setBypassDocumentValidation(boolean bypassDocumentValidation) { + writeOptionsBuilder.setBypassDocumentValidation(bypassDocumentValidation); + return this; + } + /** * Sets the serialization schema which is invoked on every record to convert it to MongoDB bulk * request. diff --git a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/config/MongoWriteOptions.java b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/config/MongoWriteOptions.java index 15f42939..576d1991 100644 --- a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/config/MongoWriteOptions.java +++ b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/config/MongoWriteOptions.java @@ -26,7 +26,9 @@ import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.BUFFER_FLUSH_INTERVAL; import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.BUFFER_FLUSH_MAX_ROWS; import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.DELIVERY_GUARANTEE; +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SINK_BYPASS_VALIDATION; import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SINK_MAX_RETRIES; +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SINK_ORDERED_WRITES; import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SINK_RETRY_INTERVAL; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -40,6 +42,8 @@ public final class MongoWriteOptions implements Serializable { private static final long serialVersionUID = 1L; + private final boolean orderedWrites; + private final boolean bypassDocumentValidation; private final int batchSize; private final long batchIntervalMs; private final int maxRetries; @@ -47,11 +51,15 @@ public final class MongoWriteOptions implements Serializable { private final DeliveryGuarantee deliveryGuarantee; private MongoWriteOptions( + boolean orderedWrites, + boolean bypassDocumentValidation, int batchSize, long batchIntervalMs, int maxRetries, long retryIntervalMs, DeliveryGuarantee deliveryGuarantee) { + this.orderedWrites = orderedWrites; + this.bypassDocumentValidation = bypassDocumentValidation; this.batchSize = batchSize; this.batchIntervalMs = batchIntervalMs; this.maxRetries = maxRetries; @@ -59,6 +67,14 @@ private MongoWriteOptions( this.deliveryGuarantee = deliveryGuarantee; } + public boolean isOrderedWrites() { + return orderedWrites; + } + + public boolean isBypassDocumentValidation() { + return bypassDocumentValidation; + } + public int getBatchSize() { return batchSize; } @@ -88,7 +104,9 @@ public boolean equals(Object o) { return false; } MongoWriteOptions that = (MongoWriteOptions) o; - return batchSize == that.batchSize + return orderedWrites == that.orderedWrites + && bypassDocumentValidation == that.bypassDocumentValidation + && batchSize == that.batchSize && batchIntervalMs == that.batchIntervalMs && maxRetries == that.maxRetries && retryIntervalMs == that.retryIntervalMs @@ -98,7 +116,13 @@ public boolean equals(Object o) { @Override public int hashCode() { return Objects.hash( - batchSize, batchIntervalMs, maxRetries, retryIntervalMs, deliveryGuarantee); + orderedWrites, + bypassDocumentValidation, + batchSize, + batchIntervalMs, + maxRetries, + retryIntervalMs, + deliveryGuarantee); } public static MongoWriteOptionsBuilder builder() { @@ -108,6 +132,8 @@ public static MongoWriteOptionsBuilder builder() { /** Builder for {@link MongoWriteOptions}. */ @PublicEvolving public static class MongoWriteOptionsBuilder { + private boolean orderedWrites = SINK_ORDERED_WRITES.defaultValue(); + private boolean bypassDocumentValidation = SINK_BYPASS_VALIDATION.defaultValue(); private int batchSize = BUFFER_FLUSH_MAX_ROWS.defaultValue(); private long batchIntervalMs = BUFFER_FLUSH_INTERVAL.defaultValue().toMillis(); private int maxRetries = SINK_MAX_RETRIES.defaultValue(); @@ -116,6 +142,31 @@ public static class MongoWriteOptionsBuilder { private MongoWriteOptionsBuilder() {} + /** + * Sets the mongodb bulk write option ordered {@link + * com.mongodb.client.model.BulkWriteOptions}. + * + * @param orderedWrites bulk write option + * @return this builder + */ + public MongoWriteOptionsBuilder setOrderedWrites(boolean orderedWrites) { + this.orderedWrites = orderedWrites; + return this; + } + + /** + * Sets the mongodb bulk write option bypassDocumentValidation {@link + * com.mongodb.client.model.BulkWriteOptions}. + * + * @param bypassDocumentValidation bulk write option to bypass document validation + * @return this builder + */ + public MongoWriteOptionsBuilder setBypassDocumentValidation( + boolean bypassDocumentValidation) { + this.bypassDocumentValidation = bypassDocumentValidation; + return this; + } + /** * Sets the maximum number of actions to buffer for each batch request. You can pass -1 to * disable batching. @@ -195,7 +246,13 @@ public MongoWriteOptionsBuilder setDeliveryGuarantee(DeliveryGuarantee deliveryG */ public MongoWriteOptions build() { return new MongoWriteOptions( - batchSize, batchIntervalMs, maxRetries, retryIntervalMs, deliveryGuarantee); + orderedWrites, + bypassDocumentValidation, + batchSize, + batchIntervalMs, + maxRetries, + retryIntervalMs, + deliveryGuarantee); } } } diff --git a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java index 5319959d..a8a81378 100644 --- a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java +++ b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java @@ -38,6 +38,7 @@ import com.mongodb.MongoException; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; +import com.mongodb.client.model.BulkWriteOptions; import com.mongodb.client.model.WriteModel; import org.bson.BsonDocument; import org.slf4j.Logger; @@ -213,7 +214,12 @@ void doBulkWrite() throws IOException { mongoClient .getDatabase(connectionOptions.getDatabase()) .getCollection(connectionOptions.getCollection(), BsonDocument.class) - .bulkWrite(bulkRequests); + .bulkWrite( + bulkRequests, + new BulkWriteOptions() + .ordered(writeOptions.isOrderedWrites()) + .bypassDocumentValidation( + writeOptions.isBypassDocumentValidation())); ackTime = System.currentTimeMillis(); bulkRequests.clear(); break; diff --git a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoConnectorOptions.java b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoConnectorOptions.java index 36948f29..4ea63a4e 100644 --- a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoConnectorOptions.java +++ b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoConnectorOptions.java @@ -115,6 +115,18 @@ private MongoConnectorOptions() {} .withDescription( "Specifies the retry time interval if lookup records from database failed."); + public static final ConfigOption SINK_ORDERED_WRITES = + ConfigOptions.key("sink.orderedWrites") + .booleanType() + .defaultValue(true) + .withDescription("Specifies mongodb bulk write ordered option"); + + public static final ConfigOption SINK_BYPASS_VALIDATION = + ConfigOptions.key("sink.bypassDocumentValidation") + .booleanType() + .defaultValue(false) + .withDescription("Specifies mongodb bulk write option to bypass validation"); + public static final ConfigOption BUFFER_FLUSH_MAX_ROWS = ConfigOptions.key("sink.buffer-flush.max-rows") .intType() diff --git a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java index 867f1f05..c0cfa28d 100644 --- a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java +++ b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java @@ -92,6 +92,34 @@ static void tearDown() { } } + @Test + void unorderedWrite() throws Exception { + final String collection = "test-sink-with-unordered-write"; + final MongoSink sink = + createSink(collection, DeliveryGuarantee.AT_LEAST_ONCE, false, true); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(100L); + env.setRestartStrategy(RestartStrategies.noRestart()); + + env.fromSequence(1, 5).map(new TestMapFunction()).sinkTo(sink); + env.execute(); + assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3, 4, 5); + } + + @Test + void bypassDocumentValidation() throws Exception { + final String collection = "test-sink-with-bypass-doc-validation"; + final MongoSink sink = + createSink(collection, DeliveryGuarantee.AT_LEAST_ONCE, true, false); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(100L); + env.setRestartStrategy(RestartStrategies.noRestart()); + + env.fromSequence(1, 5).map(new TestMapFunction()).sinkTo(sink); + env.execute(); + assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3, 4, 5); + } + @ParameterizedTest @EnumSource( value = DeliveryGuarantee.class, @@ -100,7 +128,7 @@ static void tearDown() { void testWriteToMongoWithDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) throws Exception { final String collection = "test-sink-with-delivery-" + deliveryGuarantee; - final MongoSink sink = createSink(collection, deliveryGuarantee); + final MongoSink sink = createSink(collection, deliveryGuarantee, true, false); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(100L); env.setRestartStrategy(RestartStrategies.noRestart()); @@ -113,7 +141,8 @@ void testWriteToMongoWithDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) @Test void testRecovery() throws Exception { final String collection = "test-recovery-mongo-sink"; - final MongoSink sink = createSink(collection, DeliveryGuarantee.AT_LEAST_ONCE); + final MongoSink sink = + createSink(collection, DeliveryGuarantee.AT_LEAST_ONCE, true, false); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(100L); @@ -131,13 +160,18 @@ void testRecovery() throws Exception { } private static MongoSink createSink( - String collection, DeliveryGuarantee deliveryGuarantee) { + String collection, + DeliveryGuarantee deliveryGuarantee, + boolean ordered, + boolean bypassDocumentValidation) { return MongoSink.builder() .setUri(MONGO_CONTAINER.getConnectionString()) .setDatabase(TEST_DATABASE) .setCollection(collection) .setBatchSize(5) .setDeliveryGuarantee(deliveryGuarantee) + .setOrderedWrites(ordered) + .setBypassDocumentValidation(bypassDocumentValidation) .setSerializationSchema(new UpsertSerializationSchema()) .build(); }