From 18338c7c51c3f1ada3541ee988c8028d3a87d38a Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 28 Jan 2016 20:47:24 -0800 Subject: [PATCH] FLINK-3301 Ineffective synchronization in MessageAcknowledgingSourceBase#restoreState --- .../MessageAcknowledgingSourceBase.java | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java index 2f865d161f253..db35b675f62db 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java @@ -124,7 +124,7 @@ protected MessageAcknowledgingSourceBase(TypeInformation idTypeInfo) { } @Override - public void open(Configuration parameters) throws Exception { + public synchronized void open(Configuration parameters) throws Exception { idsForCurrentCheckpoint = new ArrayList<>(64); pendingCheckpoints = new ArrayDeque<>(numCheckpointsToKeep); idsProcessedButNotAcknowledged = new HashSet<>(); @@ -167,7 +167,7 @@ public SerializedCheckpointData[] snapshotState(long checkpointId, long checkpoi LOG.debug("Snapshotting state. Messages: {}, checkpoint id: {}, timestamp: {}", idsForCurrentCheckpoint, checkpointId, checkpointTimestamp); - synchronized (pendingCheckpoints) { + synchronized (this) { pendingCheckpoints.addLast(new Tuple2<>(checkpointId, idsForCurrentCheckpoint)); idsForCurrentCheckpoint = new ArrayList<>(64); @@ -177,21 +177,19 @@ public SerializedCheckpointData[] snapshotState(long checkpointId, long checkpoi } @Override - public void restoreState(SerializedCheckpointData[] state) throws Exception { - synchronized (pendingCheckpoints) { - pendingCheckpoints = SerializedCheckpointData.toDeque(state, idSerializer); - // build a set which contains all processed ids. It may be used to check if we have - // already processed an incoming message. - for (Tuple2> checkpoint : pendingCheckpoints) { - idsProcessedButNotAcknowledged.addAll(checkpoint.f1); - } + public synchronized void restoreState(SerializedCheckpointData[] state) throws Exception { + pendingCheckpoints = SerializedCheckpointData.toDeque(state, idSerializer); + // build a set which contains all processed ids. It may be used to check if we have + // already processed an incoming message. + for (Tuple2> checkpoint : pendingCheckpoints) { + idsProcessedButNotAcknowledged.addAll(checkpoint.f1); } } @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { LOG.debug("Committing Messages externally for checkpoint {}", checkpointId); - synchronized (pendingCheckpoints) { + synchronized (this) { for (Iterator>> iter = pendingCheckpoints.iterator(); iter.hasNext(); ) { Tuple2> checkpoint = iter.next(); long id = checkpoint.f0;