From 9c4cc72a010eeffda9535186068ab2930e9ba514 Mon Sep 17 00:00:00 2001 From: Roman Khachatryan Date: Wed, 23 Sep 2020 15:32:45 +0200 Subject: [PATCH] [FLINK-19385] Request partitions for each InputGate independently --- .../flink/streaming/runtime/tasks/StreamTask.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index d9b9f73c0196f..57ab9782c4c05 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -508,14 +508,11 @@ private void readRecoveredChannelState() throws IOException, InterruptedExceptio // output can request more floating buffers from global firstly. InputGate[] inputGates = getEnvironment().getAllInputGates(); if (inputGates != null && inputGates.length > 0) { - CompletableFuture[] futures = new CompletableFuture[inputGates.length]; - for (int i = 0; i < inputGates.length; i++) { - futures[i] = inputGates[i].readRecoveredState(channelIOExecutor, reader); + for (InputGate inputGate : inputGates) { + inputGate + .readRecoveredState(channelIOExecutor, reader) + .thenRun(() -> mainMailboxExecutor.execute(inputGate::requestPartitions, "Input gate request partitions")); } - - // Note that we must request partition after all the single gates finished recovery. - CompletableFuture.allOf(futures).thenRun(() -> mainMailboxExecutor.execute( - this::requestPartitions, "Input gates request partitions")); } }