Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,28 @@ public MongoSinkBuilder<IN> setDeliveryGuarantee(DeliveryGuarantee deliveryGuara
return this;
}

/**
* Set the ordered write {@link com.mongodb.client.model.BulkWriteOptions}.
*
* @param ordered describes the write behaviour
* @return this builder
*/
public MongoSinkBuilder<IN> setOrderedWrites(boolean ordered) {
writeOptionsBuilder.setOrderedWrites(ordered);
return this;
}

/**
* Set the bypass document validation {@link com.mongodb.client.model.BulkWriteOptions}.
*
* @param bypassDocumentValidation describes document validation behaviour
* @return this builder
*/
public MongoSinkBuilder<IN> setBypassDocumentValidation(boolean bypassDocumentValidation) {
writeOptionsBuilder.setBypassDocumentValidation(bypassDocumentValidation);
return this;
}

/**
* Sets the serialization schema which is invoked on every record to convert it to MongoDB bulk
* request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.BUFFER_FLUSH_INTERVAL;
import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.BUFFER_FLUSH_MAX_ROWS;
import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.DELIVERY_GUARANTEE;
import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SINK_BYPASS_VALIDATION;
import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SINK_MAX_RETRIES;
import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SINK_ORDERED_WRITES;
import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SINK_RETRY_INTERVAL;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand All @@ -40,25 +42,39 @@ public final class MongoWriteOptions implements Serializable {

private static final long serialVersionUID = 1L;

private final boolean orderedWrites;
private final boolean bypassDocumentValidation;
private final int batchSize;
private final long batchIntervalMs;
private final int maxRetries;
private final long retryIntervalMs;
private final DeliveryGuarantee deliveryGuarantee;

private MongoWriteOptions(
boolean orderedWrites,
boolean bypassDocumentValidation,
int batchSize,
long batchIntervalMs,
int maxRetries,
long retryIntervalMs,
DeliveryGuarantee deliveryGuarantee) {
this.orderedWrites = orderedWrites;
this.bypassDocumentValidation = bypassDocumentValidation;
this.batchSize = batchSize;
this.batchIntervalMs = batchIntervalMs;
this.maxRetries = maxRetries;
this.retryIntervalMs = retryIntervalMs;
this.deliveryGuarantee = deliveryGuarantee;
}

public boolean isOrderedWrites() {
return orderedWrites;
}

public boolean isBypassDocumentValidation() {
return bypassDocumentValidation;
}

public int getBatchSize() {
return batchSize;
}
Expand Down Expand Up @@ -88,7 +104,9 @@ public boolean equals(Object o) {
return false;
}
MongoWriteOptions that = (MongoWriteOptions) o;
return batchSize == that.batchSize
return orderedWrites == that.orderedWrites
&& bypassDocumentValidation == that.bypassDocumentValidation
&& batchSize == that.batchSize
&& batchIntervalMs == that.batchIntervalMs
&& maxRetries == that.maxRetries
&& retryIntervalMs == that.retryIntervalMs
Expand All @@ -98,7 +116,13 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
return Objects.hash(
batchSize, batchIntervalMs, maxRetries, retryIntervalMs, deliveryGuarantee);
orderedWrites,
bypassDocumentValidation,
batchSize,
batchIntervalMs,
maxRetries,
retryIntervalMs,
deliveryGuarantee);
}

public static MongoWriteOptionsBuilder builder() {
Expand All @@ -108,6 +132,8 @@ public static MongoWriteOptionsBuilder builder() {
/** Builder for {@link MongoWriteOptions}. */
@PublicEvolving
public static class MongoWriteOptionsBuilder {
private boolean orderedWrites = SINK_ORDERED_WRITES.defaultValue();
private boolean bypassDocumentValidation = SINK_BYPASS_VALIDATION.defaultValue();
private int batchSize = BUFFER_FLUSH_MAX_ROWS.defaultValue();
private long batchIntervalMs = BUFFER_FLUSH_INTERVAL.defaultValue().toMillis();
private int maxRetries = SINK_MAX_RETRIES.defaultValue();
Expand All @@ -116,6 +142,31 @@ public static class MongoWriteOptionsBuilder {

private MongoWriteOptionsBuilder() {}

/**
* Sets the mongodb bulk write option ordered {@link
* com.mongodb.client.model.BulkWriteOptions}.
*
* @param orderedWrites bulk write option
* @return this builder
*/
public MongoWriteOptionsBuilder setOrderedWrites(boolean orderedWrites) {
this.orderedWrites = orderedWrites;
return this;
}

/**
* Sets the mongodb bulk write option bypassDocumentValidation {@link
* com.mongodb.client.model.BulkWriteOptions}.
*
* @param bypassDocumentValidation bulk write option to bypass document validation
* @return this builder
*/
public MongoWriteOptionsBuilder setBypassDocumentValidation(
boolean bypassDocumentValidation) {
this.bypassDocumentValidation = bypassDocumentValidation;
return this;
}

/**
* Sets the maximum number of actions to buffer for each batch request. You can pass -1 to
* disable batching.
Expand Down Expand Up @@ -195,7 +246,13 @@ public MongoWriteOptionsBuilder setDeliveryGuarantee(DeliveryGuarantee deliveryG
*/
public MongoWriteOptions build() {
return new MongoWriteOptions(
batchSize, batchIntervalMs, maxRetries, retryIntervalMs, deliveryGuarantee);
orderedWrites,
bypassDocumentValidation,
batchSize,
batchIntervalMs,
maxRetries,
retryIntervalMs,
deliveryGuarantee);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.mongodb.MongoException;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.WriteModel;
import org.bson.BsonDocument;
import org.slf4j.Logger;
Expand Down Expand Up @@ -213,7 +214,12 @@ void doBulkWrite() throws IOException {
mongoClient
.getDatabase(connectionOptions.getDatabase())
.getCollection(connectionOptions.getCollection(), BsonDocument.class)
.bulkWrite(bulkRequests);
.bulkWrite(
bulkRequests,
new BulkWriteOptions()
.ordered(writeOptions.isOrderedWrites())
.bypassDocumentValidation(
writeOptions.isBypassDocumentValidation()));
ackTime = System.currentTimeMillis();
bulkRequests.clear();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,18 @@ private MongoConnectorOptions() {}
.withDescription(
"Specifies the retry time interval if lookup records from database failed.");

public static final ConfigOption<Boolean> SINK_ORDERED_WRITES =
ConfigOptions.key("sink.orderedWrites")
.booleanType()
.defaultValue(true)
.withDescription("Specifies mongodb bulk write ordered option");

public static final ConfigOption<Boolean> SINK_BYPASS_VALIDATION =
ConfigOptions.key("sink.bypassDocumentValidation")
.booleanType()
.defaultValue(false)
.withDescription("Specifies mongodb bulk write option to bypass validation");

public static final ConfigOption<Integer> BUFFER_FLUSH_MAX_ROWS =
ConfigOptions.key("sink.buffer-flush.max-rows")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,34 @@ static void tearDown() {
}
}

@Test
void unorderedWrite() throws Exception {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The naming style is slightly inconsistent (unorderedWrite vs testRecovery)
Maybe it is cleaner to pick one style?

final String collection = "test-sink-with-unordered-write";
final MongoSink<Document> sink =
createSink(collection, DeliveryGuarantee.AT_LEAST_ONCE, false, true);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(100L);
env.setRestartStrategy(RestartStrategies.noRestart());

env.fromSequence(1, 5).map(new TestMapFunction()).sinkTo(sink);
env.execute();
assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3, 4, 5);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current tests don't fully validate the behavioral difference between ordered and unordered writes.
It would be valuable to add a test case that injects a failure (e.g., a duplicate key error) in the middle of a batch.

}

@Test
void bypassDocumentValidation() throws Exception {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final String collection = "test-sink-with-bypass-doc-validation";
final MongoSink<Document> sink =
createSink(collection, DeliveryGuarantee.AT_LEAST_ONCE, true, false);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createSink(collection, DeliveryGuarantee.AT_LEAST_ONCE, true, false);

In this test, you are passing false to the bypass parameter, effectively disabling the feature you intend to test. Or I'm missing something?

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(100L);
env.setRestartStrategy(RestartStrategies.noRestart());

env.fromSequence(1, 5).map(new TestMapFunction()).sinkTo(sink);
env.execute();
assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3, 4, 5);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the current test passes regardless of the flag because the collection has no validation rules.
Maybe to verify bypassDocumentValidation works, we can create the collection with a validator and assert that writes only succeed when the bypass flag is enabled?

}

@ParameterizedTest
@EnumSource(
value = DeliveryGuarantee.class,
Expand All @@ -100,7 +128,7 @@ static void tearDown() {
void testWriteToMongoWithDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee)
throws Exception {
final String collection = "test-sink-with-delivery-" + deliveryGuarantee;
final MongoSink<Document> sink = createSink(collection, deliveryGuarantee);
final MongoSink<Document> sink = createSink(collection, deliveryGuarantee, true, false);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(100L);
env.setRestartStrategy(RestartStrategies.noRestart());
Expand All @@ -113,7 +141,8 @@ void testWriteToMongoWithDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee)
@Test
void testRecovery() throws Exception {
final String collection = "test-recovery-mongo-sink";
final MongoSink<Document> sink = createSink(collection, DeliveryGuarantee.AT_LEAST_ONCE);
final MongoSink<Document> sink =
createSink(collection, DeliveryGuarantee.AT_LEAST_ONCE, true, false);

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(100L);
Expand All @@ -131,13 +160,18 @@ void testRecovery() throws Exception {
}

private static MongoSink<Document> createSink(
String collection, DeliveryGuarantee deliveryGuarantee) {
String collection,
DeliveryGuarantee deliveryGuarantee,
boolean ordered,
boolean bypassDocumentValidation) {
return MongoSink.<Document>builder()
.setUri(MONGO_CONTAINER.getConnectionString())
.setDatabase(TEST_DATABASE)
.setCollection(collection)
.setBatchSize(5)
.setDeliveryGuarantee(deliveryGuarantee)
.setOrderedWrites(ordered)
.setBypassDocumentValidation(bypassDocumentValidation)
.setSerializationSchema(new UpsertSerializationSchema())
.build();
}
Expand Down