From aabeb89dd1259174c786f19b7e97c4c50038610f Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Fri, 2 Mar 2018 14:38:20 +0100 Subject: [PATCH] [FLINK-8517] fix missing synchronization in TaskEventDispatcher --- .../flink/runtime/io/network/TaskEventDispatcher.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java index 1ec4ade85ad43..c9de902e63aed 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java @@ -102,7 +102,10 @@ public void subscribeToEvent( checkNotNull(eventListener); checkNotNull(eventType); - TaskEventHandler taskEventHandler = registeredHandlers.get(partitionId); + TaskEventHandler taskEventHandler; + synchronized (registeredHandlers) { + taskEventHandler = registeredHandlers.get(partitionId); + } if (taskEventHandler == null) { throw new IllegalStateException( "Partition " + partitionId + " not registered at task event dispatcher."); @@ -123,7 +126,10 @@ public boolean publish(ResultPartitionID partitionId, TaskEvent event) { checkNotNull(partitionId); checkNotNull(event); - TaskEventHandler taskEventHandler = registeredHandlers.get(partitionId); + TaskEventHandler taskEventHandler; + synchronized (registeredHandlers) { + taskEventHandler = registeredHandlers.get(partitionId); + } if (taskEventHandler != null) { taskEventHandler.publish(event);