Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,28 @@ public MongoSinkBuilder<IN> setDeliveryGuarantee(DeliveryGuarantee deliveryGuara
return this;
}

/**
* Set the ordered write {@link com.mongodb.client.model.BulkWriteOptions}.
*
* @param ordered describes the write behaviour
* @return this builder
*/
public MongoSinkBuilder<IN> setOrderedWrites(boolean ordered) {
writeOptionsBuilder.setOrderedWrites(ordered);
return this;
}

/**
* Set the bypass document validation {@link com.mongodb.client.model.BulkWriteOptions}.
*
* @param bypassDocumentValidation describes document validation behaviour
* @return this builder
*/
public MongoSinkBuilder<IN> setBypassDocumentValidation(boolean bypassDocumentValidation) {
writeOptionsBuilder.setBypassDocumentValidation(bypassDocumentValidation);
return this;
}

/**
* Sets the serialization schema which is invoked on every record to convert it to MongoDB bulk
* request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.BUFFER_FLUSH_INTERVAL;
import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.BUFFER_FLUSH_MAX_ROWS;
import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.DELIVERY_GUARANTEE;
import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SINK_BYPASS_VALIDATION;
import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SINK_MAX_RETRIES;
import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SINK_ORDERED_WRITES;
import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SINK_RETRY_INTERVAL;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand All @@ -40,25 +42,39 @@ public final class MongoWriteOptions implements Serializable {

private static final long serialVersionUID = 1L;

private final boolean orderedWrites;
private final boolean bypassDocumentValidation;
private final int batchSize;
private final long batchIntervalMs;
private final int maxRetries;
private final long retryIntervalMs;
private final DeliveryGuarantee deliveryGuarantee;

private MongoWriteOptions(
boolean orderedWrites,
boolean bypassDocumentValidation,
int batchSize,
long batchIntervalMs,
int maxRetries,
long retryIntervalMs,
DeliveryGuarantee deliveryGuarantee) {
this.orderedWrites = orderedWrites;
this.bypassDocumentValidation = bypassDocumentValidation;
this.batchSize = batchSize;
this.batchIntervalMs = batchIntervalMs;
this.maxRetries = maxRetries;
this.retryIntervalMs = retryIntervalMs;
this.deliveryGuarantee = deliveryGuarantee;
}

public boolean isOrderedWrites() {
return orderedWrites;
}

public boolean isBypassDocumentValidation() {
return bypassDocumentValidation;
}

public int getBatchSize() {
return batchSize;
}
Expand Down Expand Up @@ -88,7 +104,9 @@ public boolean equals(Object o) {
return false;
}
MongoWriteOptions that = (MongoWriteOptions) o;
return batchSize == that.batchSize
return orderedWrites == that.orderedWrites
&& bypassDocumentValidation == that.bypassDocumentValidation
&& batchSize == that.batchSize
&& batchIntervalMs == that.batchIntervalMs
&& maxRetries == that.maxRetries
&& retryIntervalMs == that.retryIntervalMs
Expand All @@ -98,7 +116,13 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
return Objects.hash(
batchSize, batchIntervalMs, maxRetries, retryIntervalMs, deliveryGuarantee);
orderedWrites,
bypassDocumentValidation,
batchSize,
batchIntervalMs,
maxRetries,
retryIntervalMs,
deliveryGuarantee);
}

public static MongoWriteOptionsBuilder builder() {
Expand All @@ -108,6 +132,8 @@ public static MongoWriteOptionsBuilder builder() {
/** Builder for {@link MongoWriteOptions}. */
@PublicEvolving
public static class MongoWriteOptionsBuilder {
private boolean orderedWrites = SINK_ORDERED_WRITES.defaultValue();
private boolean bypassDocumentValidation = SINK_BYPASS_VALIDATION.defaultValue();
private int batchSize = BUFFER_FLUSH_MAX_ROWS.defaultValue();
private long batchIntervalMs = BUFFER_FLUSH_INTERVAL.defaultValue().toMillis();
private int maxRetries = SINK_MAX_RETRIES.defaultValue();
Expand All @@ -116,6 +142,31 @@ public static class MongoWriteOptionsBuilder {

private MongoWriteOptionsBuilder() {}

/**
* Sets the mongodb bulk write option ordered {@link
* com.mongodb.client.model.BulkWriteOptions}.
*
* @param orderedWrites bulk write option
* @return this builder
*/
public MongoWriteOptionsBuilder setOrderedWrites(boolean orderedWrites) {
this.orderedWrites = orderedWrites;
return this;
}

/**
* Sets the mongodb bulk write option bypassDocumentValidation {@link
* com.mongodb.client.model.BulkWriteOptions}.
*
* @param bypassDocumentValidation bulk write option to bypass document validation
* @return this builder
*/
public MongoWriteOptionsBuilder setBypassDocumentValidation(
boolean bypassDocumentValidation) {
this.bypassDocumentValidation = bypassDocumentValidation;
return this;
}

/**
* Sets the maximum number of actions to buffer for each batch request. You can pass -1 to
* disable batching.
Expand Down Expand Up @@ -195,7 +246,13 @@ public MongoWriteOptionsBuilder setDeliveryGuarantee(DeliveryGuarantee deliveryG
*/
public MongoWriteOptions build() {
return new MongoWriteOptions(
batchSize, batchIntervalMs, maxRetries, retryIntervalMs, deliveryGuarantee);
orderedWrites,
bypassDocumentValidation,
batchSize,
batchIntervalMs,
maxRetries,
retryIntervalMs,
deliveryGuarantee);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.mongodb.MongoException;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.WriteModel;
import org.bson.BsonDocument;
import org.slf4j.Logger;
Expand Down Expand Up @@ -213,7 +214,12 @@ void doBulkWrite() throws IOException {
mongoClient
.getDatabase(connectionOptions.getDatabase())
.getCollection(connectionOptions.getCollection(), BsonDocument.class)
.bulkWrite(bulkRequests);
.bulkWrite(
bulkRequests,
new BulkWriteOptions()
.ordered(writeOptions.isOrderedWrites())
.bypassDocumentValidation(
writeOptions.isBypassDocumentValidation()));
ackTime = System.currentTimeMillis();
bulkRequests.clear();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,18 @@ private MongoConnectorOptions() {}
.withDescription(
"Specifies the retry time interval if lookup records from database failed.");

public static final ConfigOption<Boolean> SINK_ORDERED_WRITES =
ConfigOptions.key("sink.orderedWrites")
.booleanType()
.defaultValue(true)
.withDescription("Specifies mongodb bulk write ordered option");

public static final ConfigOption<Boolean> SINK_BYPASS_VALIDATION =
ConfigOptions.key("sink.bypassDocumentValidation")
.booleanType()
.defaultValue(false)
.withDescription("Specifies mongodb bulk write option to bypass validation");

public static final ConfigOption<Integer> BUFFER_FLUSH_MAX_ROWS =
ConfigOptions.key("sink.buffer-flush.max-rows")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,34 @@ static void tearDown() {
}
}

@Test
void unorderedWrite() throws Exception {
final String collection = "test-sink-with-unordered-write";
final MongoSink<Document> sink =
createSink(collection, DeliveryGuarantee.AT_LEAST_ONCE, false, true);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(100L);
env.setRestartStrategy(RestartStrategies.noRestart());

env.fromSequence(1, 5).map(new TestMapFunction()).sinkTo(sink);
env.execute();
assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3, 4, 5);
}

@Test
void bypassDocumentValidation() throws Exception {
final String collection = "test-sink-with-bypass-doc-validation";
final MongoSink<Document> sink =
createSink(collection, DeliveryGuarantee.AT_LEAST_ONCE, true, false);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(100L);
env.setRestartStrategy(RestartStrategies.noRestart());

env.fromSequence(1, 5).map(new TestMapFunction()).sinkTo(sink);
env.execute();
assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3, 4, 5);
}

@ParameterizedTest
@EnumSource(
value = DeliveryGuarantee.class,
Expand All @@ -100,7 +128,7 @@ static void tearDown() {
void testWriteToMongoWithDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee)
throws Exception {
final String collection = "test-sink-with-delivery-" + deliveryGuarantee;
final MongoSink<Document> sink = createSink(collection, deliveryGuarantee);
final MongoSink<Document> sink = createSink(collection, deliveryGuarantee, true, false);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(100L);
env.setRestartStrategy(RestartStrategies.noRestart());
Expand All @@ -113,7 +141,8 @@ void testWriteToMongoWithDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee)
@Test
void testRecovery() throws Exception {
final String collection = "test-recovery-mongo-sink";
final MongoSink<Document> sink = createSink(collection, DeliveryGuarantee.AT_LEAST_ONCE);
final MongoSink<Document> sink =
createSink(collection, DeliveryGuarantee.AT_LEAST_ONCE, true, false);

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(100L);
Expand All @@ -131,13 +160,18 @@ void testRecovery() throws Exception {
}

private static MongoSink<Document> createSink(
String collection, DeliveryGuarantee deliveryGuarantee) {
String collection,
DeliveryGuarantee deliveryGuarantee,
boolean ordered,
boolean bypassDocumentValidation) {
return MongoSink.<Document>builder()
.setUri(MONGO_CONTAINER.getConnectionString())
.setDatabase(TEST_DATABASE)
.setCollection(collection)
.setBatchSize(5)
.setDeliveryGuarantee(deliveryGuarantee)
.setOrderedWrites(ordered)
.setBypassDocumentValidation(bypassDocumentValidation)
.setSerializationSchema(new UpsertSerializationSchema())
.build();
}
Expand Down