diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java index 631f8d06dcf45..95a8f1c8c21f8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java @@ -354,18 +354,19 @@ private void handleRegisterResource(ActorRef jobManager, ActorRef taskManager, ResourceID resourceID = msg.resourceId(); try { Preconditions.checkNotNull(resourceID); - WorkerType newWorker = workerRegistered(resourceID); - WorkerType oldWorker = registeredWorkers.put(resourceID, newWorker); + // check if resourceID is already registered (TaskManager may send duplicate register messages) + WorkerType oldWorker = registeredWorkers.get(resourceID); if (oldWorker != null) { - LOG.warn("TaskManager {} had been registered before.", resourceID); + LOG.debug("TaskManager {} had been registered before.", resourceID); } else { + WorkerType newWorker = workerRegistered(resourceID); + registeredWorkers.put(resourceID, newWorker); LOG.info("TaskManager {} has registered.", resourceID); } jobManager.tell(decorateMessage( new RegisterResourceSuccessful(taskManager, msg)), self()); } catch (Exception e) { - // This may happen on duplicate task manager registration message to the job manager LOG.warn("TaskManager resource registration failed for {}", resourceID, e); // tell the JobManager about the failure