diff --git a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java index df76891e..5319959d 100644 --- a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java +++ b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java @@ -33,6 +33,7 @@ import org.apache.flink.metrics.groups.SinkWriterMetricGroup; import org.apache.flink.util.Collector; import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; import com.mongodb.MongoException; import com.mongodb.client.MongoClient; @@ -45,6 +46,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -68,11 +73,18 @@ public class MongoWriter implements SinkWriter { private final Collector> collector; private final Counter numRecordsOut; private final MongoClient mongoClient; + private final long batchIntervalMs; + private final int batchSize; private boolean checkpointInProgress = false; private volatile long lastSendTime = 0L; private volatile long ackTime = Long.MAX_VALUE; + private transient volatile boolean closed = false; + private transient ScheduledExecutorService scheduler; + private transient ScheduledFuture scheduledFuture; + private transient volatile Exception flushException; + public MongoWriter( MongoConnectionOptions connectionOptions, MongoWriteOptions writeOptions, @@ -83,6 +95,8 @@ public MongoWriter( this.writeOptions = checkNotNull(writeOptions); this.serializationSchema = checkNotNull(serializationSchema); this.flushOnCheckpoint = flushOnCheckpoint; + this.batchIntervalMs = writeOptions.getBatchIntervalMs(); + this.batchSize = writeOptions.getBatchSize(); checkNotNull(initContext); this.mailboxExecutor = checkNotNull(initContext.getMailboxExecutor()); @@ -105,10 +119,37 @@ public MongoWriter( // Initialize the mongo client. this.mongoClient = MongoClients.create(connectionOptions.getUri()); + + boolean flushOnlyOnCheckpoint = batchIntervalMs == -1 && batchSize == -1; + + if (!flushOnlyOnCheckpoint && batchIntervalMs > 0) { + this.scheduler = + Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("mongo-writer")); + + this.scheduledFuture = + this.scheduler.scheduleWithFixedDelay( + () -> { + synchronized (MongoWriter.this) { + if (!closed && isOverMaxBatchIntervalLimit()) { + try { + doBulkWrite(); + } catch (Exception e) { + flushException = e; + } + } + } + }, + batchIntervalMs, + batchIntervalMs, + TimeUnit.MILLISECONDS); + } } @Override - public void write(IN element, Context context) throws IOException, InterruptedException { + public synchronized void write(IN element, Context context) + throws IOException, InterruptedException { + checkFlushException(); + // do not allow new bulk writes until all actions are flushed while (checkpointInProgress) { mailboxExecutor.yield(); @@ -122,7 +163,9 @@ public void write(IN element, Context context) throws IOException, InterruptedEx } @Override - public void flush(boolean endOfInput) throws IOException { + public synchronized void flush(boolean endOfInput) throws IOException { + checkFlushException(); + checkpointInProgress = true; while (!bulkRequests.isEmpty() && (flushOnCheckpoint || endOfInput)) { doBulkWrite(); @@ -131,8 +174,28 @@ public void flush(boolean endOfInput) throws IOException { } @Override - public void close() { - mongoClient.close(); + public synchronized void close() throws Exception { + if (!closed) { + if (scheduledFuture != null) { + scheduledFuture.cancel(false); + scheduler.shutdown(); + } + + if (!bulkRequests.isEmpty()) { + try { + doBulkWrite(); + } catch (Exception e) { + LOG.error("Writing records to MongoDB failed when closing MongoWriter", e); + throw new IOException("Writing records to MongoDB failed.", e); + } finally { + mongoClient.close(); + closed = true; + } + } else { + mongoClient.close(); + closed = true; + } + } } @VisibleForTesting @@ -172,13 +235,17 @@ void doBulkWrite() throws IOException { } private boolean isOverMaxBatchSizeLimit() { - int bulkActions = writeOptions.getBatchSize(); - return bulkActions != -1 && bulkRequests.size() >= bulkActions; + return batchSize != -1 && bulkRequests.size() >= batchSize; } private boolean isOverMaxBatchIntervalLimit() { - long bulkFlushInterval = writeOptions.getBatchIntervalMs(); long lastSentInterval = System.currentTimeMillis() - lastSendTime; - return bulkFlushInterval != -1 && lastSentInterval >= bulkFlushInterval; + return batchIntervalMs != -1 && lastSentInterval >= batchIntervalMs; + } + + private void checkFlushException() { + if (flushException != null) { + throw new RuntimeException("Writing records to MongoDB failed.", flushException); + } } } diff --git a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java index c0717214..84767e8e 100644 --- a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java +++ b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java @@ -53,6 +53,7 @@ import static org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreNotWritten; import static org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreWritten; +import static org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreWrittenWithMaxWaitTime; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; @@ -138,12 +139,12 @@ void testWriteOnBatchIntervalFlush() throws Exception { createWriter(collection, batchSize, batchIntervalMs, flushOnCheckpoint)) { writer.write(buildMessage(1), null); writer.write(buildMessage(2), null); + writer.doBulkWrite(); writer.write(buildMessage(3), null); writer.write(buildMessage(4), null); - writer.doBulkWrite(); - } - assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3, 4); + assertThatIdsAreWrittenWithMaxWaitTime(collectionOf(collection), 10000L, 1, 2, 3, 4); + } } @Test diff --git a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoTestUtil.java b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoTestUtil.java index 7d8c3d0f..246f2bc9 100644 --- a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoTestUtil.java +++ b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoTestUtil.java @@ -77,4 +77,17 @@ public static void assertThatIdsAreWritten(MongoCollection coll, Integ assertThat(actualIds).containsExactlyInAnyOrder(ids); } + + public static void assertThatIdsAreWrittenWithMaxWaitTime( + MongoCollection coll, long maxWaitTimeMs, Integer... ids) + throws InterruptedException { + long startTimeMillis = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTimeMillis < maxWaitTimeMs) { + if (coll.countDocuments(Filters.in("_id", ids)) == ids.length) { + break; + } + Thread.sleep(1000L); + } + assertThatIdsAreWritten(coll, ids); + } }