From 12dfd6d7cf935eb351f142302097456f096919b6 Mon Sep 17 00:00:00 2001 From: Robert Metzger Date: Thu, 21 Jan 2016 12:22:21 +0100 Subject: [PATCH] [FLINK-3265] Make RabbitMQ source threadsafe --- .../connectors/rabbitmq/RMQSource.java | 4 +- .../connectors/rabbitmq/RMQSourceTest.java | 80 ++++++++++++++++++- .../MessageAcknowledgingSourceBase.java | 51 ++++++------ ...ipleIdsMessageAcknowledgingSourceBase.java | 24 +++--- 4 files changed, 124 insertions(+), 35 deletions(-) diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java index 09bb07c708290..59bc057fd399c 100644 --- a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java +++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java @@ -196,7 +196,9 @@ public void run(SourceContext ctx) throws Exception { continue; } } - sessionIds.add(deliveryTag); + synchronized (sessionIdsPerSnapshot) { + sessionIds.add(deliveryTag); + } } ctx.collect(result); diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java index 1da9d27c3a2b6..0a3de844dcaa6 100644 --- a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java +++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java @@ -21,9 +21,9 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.QueueingConsumer; -import junit.framework.Assert; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.Configuration; @@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -104,6 +105,83 @@ public void afterTest() throws Exception { sourceThread.join(); } + /** + * Make sure concurrent access to snapshotState() and notifyCheckpointComplete() don't cause + * an issue. + * + * Without proper synchronization, the test will fail with a concurrent modification exception + * + */ + @Test + public void testConcurrentAccess() throws Exception { + source.autoAck = false; + sourceThread.start(); + + final Tuple1 error = new Tuple1<>(null); + + Thread.sleep(5); + + Thread snapshotThread = new Thread(new Runnable() { + public long id = 0; + + @Override + public void run() { + while (!Thread.interrupted()) { + try { + source.snapshotState(id++, 0); + } catch (Exception e) { + error.f0 = e; + break; // stop thread + } + } + } + }); + + Thread notifyThread = new Thread(new Runnable() { + @Override + public void run() { + while (!Thread.interrupted()) { + try { + // always remove all checkpoints + source.notifyCheckpointComplete(Long.MAX_VALUE); + } catch (Exception e) { + error.f0 = e; + break; // stop thread + } + } + } + }); + + snapshotThread.start(); + notifyThread.start(); + + long deadline = System.currentTimeMillis() + 1000L; + while(System.currentTimeMillis() < deadline) { + if(!snapshotThread.isAlive()) { + notifyThread.interrupt(); + break; + } + if(!notifyThread.isAlive()) { + snapshotThread.interrupt(); + break; + } + Thread.sleep(10); + } + if(snapshotThread.isAlive()) { + snapshotThread.interrupt(); + snapshotThread.join(); + } + if(notifyThread.isAlive()) { + notifyThread.interrupt(); + notifyThread.join(); + } + if(error.f0 != null) { + error.f0.printStackTrace(); + Assert.fail("Test failed with " + error.f0.getClass().getCanonicalName()); + } + + } + @Test public void testCheckpointing() throws Exception { source.autoAck = false; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java index 43858845aab3d..2f865d161f253 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java @@ -76,6 +76,7 @@ * @param The type of the messages created by the source. * @param The type of unique IDs which may be used to acknowledge elements. */ +@SuppressWarnings("SynchronizeOnNonFinalField") public abstract class MessageAcknowledgingSourceBase extends RichSourceFunction implements Checkpointed, CheckpointNotifier { @@ -166,41 +167,45 @@ public SerializedCheckpointData[] snapshotState(long checkpointId, long checkpoi LOG.debug("Snapshotting state. Messages: {}, checkpoint id: {}, timestamp: {}", idsForCurrentCheckpoint, checkpointId, checkpointTimestamp); - pendingCheckpoints.addLast(new Tuple2<>(checkpointId, idsForCurrentCheckpoint)); + synchronized (pendingCheckpoints) { + pendingCheckpoints.addLast(new Tuple2<>(checkpointId, idsForCurrentCheckpoint)); - idsForCurrentCheckpoint = new ArrayList<>(64); + idsForCurrentCheckpoint = new ArrayList<>(64); - return SerializedCheckpointData.fromDeque(pendingCheckpoints, idSerializer); + return SerializedCheckpointData.fromDeque(pendingCheckpoints, idSerializer); + } } @Override public void restoreState(SerializedCheckpointData[] state) throws Exception { - pendingCheckpoints = SerializedCheckpointData.toDeque(state, idSerializer); - // build a set which contains all processed ids. It may be used to check if we have - // already processed an incoming message. - for (Tuple2> checkpoint : pendingCheckpoints) { - idsProcessedButNotAcknowledged.addAll(checkpoint.f1); + synchronized (pendingCheckpoints) { + pendingCheckpoints = SerializedCheckpointData.toDeque(state, idSerializer); + // build a set which contains all processed ids. It may be used to check if we have + // already processed an incoming message. + for (Tuple2> checkpoint : pendingCheckpoints) { + idsProcessedButNotAcknowledged.addAll(checkpoint.f1); + } } } @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { LOG.debug("Committing Messages externally for checkpoint {}", checkpointId); - - for (Iterator>> iter = pendingCheckpoints.iterator(); iter.hasNext();) { - Tuple2> checkpoint = iter.next(); - long id = checkpoint.f0; - - if (id <= checkpointId) { - LOG.trace("Committing Messages with following IDs {}", checkpoint.f1); - acknowledgeIDs(checkpointId, checkpoint.f1); - // remove deduplication data - idsProcessedButNotAcknowledged.removeAll(checkpoint.f1); - // remove checkpoint data - iter.remove(); - } - else { - break; + synchronized (pendingCheckpoints) { + for (Iterator>> iter = pendingCheckpoints.iterator(); iter.hasNext(); ) { + Tuple2> checkpoint = iter.next(); + long id = checkpoint.f0; + + if (id <= checkpointId) { + LOG.trace("Committing Messages with following IDs {}", checkpoint.f1); + acknowledgeIDs(checkpointId, checkpoint.f1); + // remove deduplication data + idsProcessedButNotAcknowledged.removeAll(checkpoint.f1); + // remove checkpoint data + iter.remove(); + } else { + break; + } } } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java index c097066770237..4709759e8031e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java @@ -107,14 +107,16 @@ public void close() throws Exception { */ protected final void acknowledgeIDs(long checkpointId, List uniqueIds) { LOG.debug("Acknowledging ids for checkpoint {}", checkpointId); - Iterator>> iterator = sessionIdsPerSnapshot.iterator(); - while (iterator.hasNext()) { - final Tuple2> next = iterator.next(); - long id = next.f0; - if (id <= checkpointId) { - acknowledgeSessionIDs(next.f1); - // remove ids for this session - iterator.remove(); + synchronized (sessionIdsPerSnapshot) { + Iterator>> iterator = sessionIdsPerSnapshot.iterator(); + while (iterator.hasNext()) { + final Tuple2> next = iterator.next(); + long id = next.f0; + if (id <= checkpointId) { + acknowledgeSessionIDs(next.f1); + // remove ids for this session + iterator.remove(); + } } } } @@ -132,8 +134,10 @@ protected final void acknowledgeIDs(long checkpointId, List uniqueIds) { @Override public SerializedCheckpointData[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - sessionIdsPerSnapshot.add(new Tuple2<>(checkpointId, sessionIds)); - sessionIds = new ArrayList<>(64); + synchronized (sessionIdsPerSnapshot) { + sessionIdsPerSnapshot.add(new Tuple2<>(checkpointId, sessionIds)); + sessionIds = new ArrayList<>(64); + } return super.snapshotState(checkpointId, checkpointTimestamp); } }