From 0b6c086af921ec1abc0757680ddb3285a8828bca Mon Sep 17 00:00:00 2001 From: Roman Khachatryan Date: Wed, 23 Sep 2020 15:56:07 +0200 Subject: [PATCH] [FLINK-18989][task] Remove non-sequental ChannelStateReader and the related code - ChannelStateReader and supporting classes - methods for channel state recovery in inputChannels, subpartitions, etc. - tests for reading channel state non-sequentially --- .../runtime/SavepointTaskStateManager.java | 6 +- .../channel/ChannelStateReader.java | 77 ----- .../channel/ChannelStateReaderImpl.java | 124 --------- .../channel/ChannelStateStreamReader.java | 111 -------- .../channel/RefCountingFSDataInputStream.java | 129 --------- .../CheckpointedResultPartition.java | 9 - .../CheckpointedResultSubpartition.java | 3 - .../partition/PipelinedResultPartition.java | 10 - .../partition/PipelinedSubpartition.java | 25 -- .../network/partition/consumer/InputGate.java | 11 - .../consumer/LocalRecoveredInputChannel.java | 3 +- .../consumer/RecoveredInputChannel.java | 30 +- .../consumer/RemoteRecoveredInputChannel.java | 3 +- .../partition/consumer/SingleInputGate.java | 38 --- .../partition/consumer/UnionInputGate.java | 7 - .../flink/runtime/state/TaskStateManager.java | 3 - .../runtime/state/TaskStateManagerImpl.java | 30 -- ...tifyingResultPartitionWriterDecorator.java | 6 - .../taskmanager/InputGateWithMetrics.java | 7 - .../channel/ChannelStateReaderImplTest.java | 196 ------------- .../ChannelStateSerializerImplTest.java | 7 +- .../network/api/writer/RecordWriterTest.java | 64 ----- .../partition/ResultPartitionTest.java | 198 ------------- .../consumer/RecoveredInputChannelTest.java | 262 ------------------ .../consumer/SingleInputGateTest.java | 116 -------- .../runtime/state/TestTaskStateManager.java | 6 - ...kpointBarrierAlignerMassiveRandomTest.java | 4 +- .../runtime/io/MockIndexedInputGate.java | 4 +- .../streaming/runtime/io/MockInputGate.java | 4 +- .../runtime/tasks/StreamTaskTest.java | 145 ---------- 30 files changed, 15 insertions(+), 1623 deletions(-) delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateReader.java delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateReaderImpl.java delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateStreamReader.java delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RefCountingFSDataInputStream.java delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateReaderImplTest.java diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java index 4fb15a2c9590e..4380b906cfaee 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java @@ -22,7 +22,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; -import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader; +import org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider; @@ -68,8 +68,8 @@ public LocalRecoveryConfig createLocalRecoveryConfig() { } @Override - public ChannelStateReader getChannelStateReader() { - return ChannelStateReader.NO_OP; + public SequentialChannelStateReader getSequentialChannelStateReader() { + return SequentialChannelStateReader.NO_OP; } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateReader.java deleted file mode 100644 index 81ff48fdc0e54..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateReader.java +++ /dev/null @@ -1,77 +0,0 @@ -package org.apache.flink.runtime.checkpoint.channel; -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import org.apache.flink.annotation.Internal; -import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferBuilder; - -import java.io.IOException; - -/** - * Reads channel state saved during checkpoint/savepoint. - */ -@Internal -public interface ChannelStateReader extends AutoCloseable { - - /** - * Status of reading result. - */ - enum ReadResult { HAS_MORE_DATA, NO_MORE_DATA } - - /** - * Return whether there are any channel states to be read. - */ - boolean hasChannelStates(); - - /** - * Put data into the supplied buffer to be injected into - * {@link org.apache.flink.runtime.io.network.partition.consumer.InputChannel InputChannel}. - */ - ReadResult readInputData(InputChannelInfo info, Buffer buffer) throws IOException; - - /** - * Put data into the supplied buffer to be injected into - * {@link org.apache.flink.runtime.io.network.partition.ResultSubpartition ResultSubpartition}. - */ - ReadResult readOutputData(ResultSubpartitionInfo info, BufferBuilder bufferBuilder) throws IOException; - - @Override - void close() throws Exception; - - ChannelStateReader NO_OP = new ChannelStateReader() { - - @Override - public boolean hasChannelStates() { - return false; - } - - @Override - public ReadResult readInputData(InputChannelInfo info, Buffer buffer) { - return ReadResult.NO_MORE_DATA; - } - - @Override - public ReadResult readOutputData(ResultSubpartitionInfo info, BufferBuilder bufferBuilder) { - return ReadResult.NO_MORE_DATA; - } - - @Override - public void close() { - } - }; -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateReaderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateReaderImpl.java deleted file mode 100644 index dbc5bb8b45fff..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateReaderImpl.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.checkpoint.channel; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; -import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; -import org.apache.flink.runtime.checkpoint.channel.RefCountingFSDataInputStream.RefCountingFSDataInputStreamFactory; -import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferBuilder; -import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.flink.runtime.state.AbstractChannelStateHandle; -import org.apache.flink.util.Preconditions; - -import org.apache.flink.shaded.guava18.com.google.common.io.Closer; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.concurrent.NotThreadSafe; - -import java.io.IOException; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - -import static java.util.Arrays.asList; -import static org.apache.flink.util.Preconditions.checkState; - -/** - * {@link ChannelStateReader} implementation. Usage considerations: - *
    - *
  1. state of a channel can be read once per instance of this class; once done it returns - * {@link org.apache.flink.runtime.checkpoint.channel.ChannelStateReader.ReadResult#NO_MORE_DATA NO_MORE_DATA}
  2. - *
  3. reader/writer indices of the passed buffer are respected and updated
  4. - *
  5. buffers must be prepared (cleared) before passing to reader
  6. - *
  7. buffers must be released after use
  8. - *
- */ -@NotThreadSafe -@Internal -public class ChannelStateReaderImpl implements ChannelStateReader { - private static final Logger log = LoggerFactory.getLogger(ChannelStateReaderImpl.class); - - private final Map inputChannelHandleReaders; - private final Map resultSubpartitionHandleReaders; - private boolean isClosed = false; - - public ChannelStateReaderImpl(TaskStateSnapshot snapshot) { - this(snapshot, new ChannelStateSerializerImpl()); - } - - ChannelStateReaderImpl(TaskStateSnapshot snapshot, ChannelStateSerializer serializer) { - RefCountingFSDataInputStreamFactory streamFactory = new RefCountingFSDataInputStreamFactory(serializer); - final HashMap inputChannelHandleReadersTmp = new HashMap<>(); - final HashMap resultSubpartitionHandleReadersTmp = new HashMap<>(); - for (Map.Entry e : snapshot.getSubtaskStateMappings()) { - addReaders(inputChannelHandleReadersTmp, e.getValue().getInputChannelState(), streamFactory); - addReaders(resultSubpartitionHandleReadersTmp, e.getValue().getResultSubpartitionState(), streamFactory); - } - inputChannelHandleReaders = inputChannelHandleReadersTmp; // memory barrier to allow another thread call clear() - resultSubpartitionHandleReaders = resultSubpartitionHandleReadersTmp; // memory barrier to allow another thread call clear() - } - - private void addReaders( - Map readerMap, - Collection> handles, - RefCountingFSDataInputStreamFactory streamFactory) { - for (AbstractChannelStateHandle handle : handles) { - checkState(!readerMap.containsKey(handle.getInfo()), "multiple states exist for channel: " + handle.getInfo()); - readerMap.put(handle.getInfo(), new ChannelStateStreamReader(handle, streamFactory)); - } - } - - @Override - public boolean hasChannelStates() { - return !(inputChannelHandleReaders.isEmpty() && resultSubpartitionHandleReaders.isEmpty()); - } - - @Override - public ReadResult readInputData(InputChannelInfo info, Buffer buffer) throws IOException { - Preconditions.checkState(!isClosed, "reader is closed"); - log.debug("readInputData, resultSubpartitionInfo: {} , buffer {}", info, buffer); - ChannelStateStreamReader reader = inputChannelHandleReaders.get(info); - return reader == null ? ReadResult.NO_MORE_DATA : reader.readInto(buffer); - } - - @Override - public ReadResult readOutputData(ResultSubpartitionInfo info, BufferBuilder bufferBuilder) throws IOException { - Preconditions.checkState(!isClosed, "reader is closed"); - log.debug("readOutputData, resultSubpartitionInfo: {} , bufferBuilder {}", info, bufferBuilder); - ChannelStateStreamReader reader = resultSubpartitionHandleReaders.get(info); - return reader == null ? ReadResult.NO_MORE_DATA : reader.readInto(bufferBuilder); - } - - @Override - public void close() throws Exception { - isClosed = true; - try (Closer closer = Closer.create()) { - for (Map map : asList(inputChannelHandleReaders, resultSubpartitionHandleReaders)) { - for (ChannelStateStreamReader reader : map.values()) { - closer.register(reader); - } - map.clear(); - } - } - } - -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateStreamReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateStreamReader.java deleted file mode 100644 index edc2406b718df..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateStreamReader.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.checkpoint.channel; - -import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader.ReadResult; -import org.apache.flink.runtime.checkpoint.channel.RefCountingFSDataInputStream.RefCountingFSDataInputStreamFactory; -import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferBuilder; -import org.apache.flink.runtime.state.AbstractChannelStateHandle; -import org.apache.flink.util.Preconditions; - -import javax.annotation.concurrent.NotThreadSafe; - -import java.io.Closeable; -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; - -import static org.apache.flink.runtime.checkpoint.channel.ChannelStateByteBuffer.wrap; -import static org.apache.flink.runtime.checkpoint.channel.ChannelStateReader.ReadResult.HAS_MORE_DATA; -import static org.apache.flink.runtime.checkpoint.channel.ChannelStateReader.ReadResult.NO_MORE_DATA; - -/** - * Reads the state of a single channel pointed by {@link org.apache.flink.runtime.state.AbstractChannelStateHandle AbstractChannelStateHandle}. - * Once all data is read, this class can't be used anymore. - * Uses {@link RefCountingFSDataInputStream} internally. - */ -@NotThreadSafe -class ChannelStateStreamReader implements Closeable { - - private final RefCountingFSDataInputStream stream; - private final ChannelStateSerializer serializer; - private final Queue offsets; - private int remainingBytes = -1; - private boolean closed = false; - - ChannelStateStreamReader(AbstractChannelStateHandle handle, RefCountingFSDataInputStreamFactory streamFactory) { - this(streamFactory.getOrCreate(handle), handle.getOffsets(), streamFactory.getSerializer()); - } - - private ChannelStateStreamReader(RefCountingFSDataInputStream stream, List offsets, ChannelStateSerializer serializer) { - this.stream = stream; - this.stream.incRef(); - this.serializer = serializer; - this.offsets = new LinkedList<>(offsets); - } - - ReadResult readInto(Buffer buffer) throws IOException { - return readInto(wrap(buffer)); - } - - ReadResult readInto(BufferBuilder bufferBuilder) throws IOException { - return readInto(wrap(bufferBuilder)); - } - - private ReadResult readInto(ChannelStateByteBuffer buffer) throws IOException { - Preconditions.checkState(!closed, "reader is closed"); - readWhilePossible(buffer); - if (haveMoreData()) { - return HAS_MORE_DATA; - } else { - closed = true; - stream.decRef(); - return NO_MORE_DATA; - } - } - - private void readWhilePossible(ChannelStateByteBuffer buffer) throws IOException { - while (haveMoreData() && buffer.isWritable()) { - if (remainingBytes <= 0) { - advanceOffset(); - } - int bytesRead = serializer.readData(stream, buffer, remainingBytes); - remainingBytes -= bytesRead; - } - } - - private boolean haveMoreData() { - return remainingBytes > 0 || !offsets.isEmpty(); - } - - @SuppressWarnings("ConstantConditions") - private void advanceOffset() throws IOException { - stream.seek(offsets.poll()); - remainingBytes = serializer.readLength(stream); - } - - @Override - public void close() throws IOException { - if (!closed) { - closed = true; - stream.close(); - } - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RefCountingFSDataInputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RefCountingFSDataInputStream.java deleted file mode 100644 index 5dd16b177c6cb..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RefCountingFSDataInputStream.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.checkpoint.channel; - -import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.runtime.state.AbstractChannelStateHandle; -import org.apache.flink.runtime.state.StreamStateHandle; -import org.apache.flink.util.Preconditions; -import org.apache.flink.util.function.SupplierWithException; - -import javax.annotation.concurrent.NotThreadSafe; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; - -@NotThreadSafe -class RefCountingFSDataInputStream extends FSDataInputStream { - - private enum State {NEW, OPENED, CLOSED} - - private final SupplierWithException streamSupplier; - private FSDataInputStream stream; - private final ChannelStateSerializer serializer; - private int refCount = 0; - private State state = State.NEW; - - private RefCountingFSDataInputStream( - SupplierWithException streamSupplier, - ChannelStateSerializer serializer) { - this.streamSupplier = checkNotNull(streamSupplier); - this.serializer = checkNotNull(serializer); - } - - void incRef() { - checkNotClosed(); - refCount++; - } - - void decRef() throws IOException { - checkNotClosed(); - refCount--; - if (refCount == 0) { - close(); - } - } - - @Override - public int read() throws IOException { - ensureOpen(); - return stream.read(); - } - - @Override - public void seek(long pos) throws IOException { - ensureOpen(); - stream.seek(pos); - } - - @Override - public long getPos() throws IOException { - ensureOpen(); - return stream.getPos(); - } - - public void close() throws IOException { - state = State.CLOSED; - if (stream != null) { - stream.close(); - stream = null; - } - } - - private void ensureOpen() throws IOException { - checkNotClosed(); - if (state == State.NEW) { - stream = Preconditions.checkNotNull(streamSupplier.get()); - serializer.readHeader(stream); - state = State.OPENED; - } - } - - private void checkNotClosed() { - checkState(state != State.CLOSED, "stream is closed"); - } - - @NotThreadSafe - static class RefCountingFSDataInputStreamFactory { - private final Map streams = new HashMap<>(); // not clearing: expecting short life - private final ChannelStateSerializer serializer; - - RefCountingFSDataInputStreamFactory(ChannelStateSerializer serializer) { - this.serializer = checkNotNull(serializer); - } - - RefCountingFSDataInputStream getOrCreate(AbstractChannelStateHandle handle) { - StreamStateHandle streamStateHandle = handle.getDelegate(); - RefCountingFSDataInputStream stream = streams.get(streamStateHandle); - if (stream == null) { - stream = new RefCountingFSDataInputStream(streamStateHandle::openInputStream, serializer); - streams.put(streamStateHandle, stream); - } - return stream; - } - - ChannelStateSerializer getSerializer() { - return serializer; - } - } - -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/CheckpointedResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/CheckpointedResultPartition.java index f3e2c2fba832b..530822bf7b6ac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/CheckpointedResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/CheckpointedResultPartition.java @@ -18,10 +18,6 @@ package org.apache.flink.runtime.io.network.partition; -import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader; - -import java.io.IOException; - /** * Interface for partitions that are checkpointed, meaning they store data as part of unaligned checkpoints. */ @@ -32,9 +28,4 @@ public interface CheckpointedResultPartition { */ CheckpointedResultSubpartition getCheckpointedSubpartition(int subpartitionIndex); - /** - * Reads the previous output states with the given reader for unaligned checkpoint. - * It should be done before task processing the inputs. - */ - void readRecoveredState(ChannelStateReader stateReader) throws IOException, InterruptedException; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/CheckpointedResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/CheckpointedResultSubpartition.java index 0e00d8df82a41..e23e7dfa40a1b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/CheckpointedResultSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/CheckpointedResultSubpartition.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.io.network.partition; -import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader; import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo; import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.io.network.buffer.BufferConsumer; @@ -35,6 +34,4 @@ public interface CheckpointedResultSubpartition { BufferBuilder requestBufferBuilderBlocking() throws IOException, RuntimeException, InterruptedException; void addBufferConsumer(BufferConsumer bufferConsumer); - - void readRecoveredState(ChannelStateReader stateReader) throws IOException, InterruptedException; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java index 9ad2cf704c769..e42623faa63dc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.io.network.partition; -import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader; import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.io.network.buffer.BufferCompressor; import org.apache.flink.runtime.io.network.buffer.BufferPool; @@ -136,15 +135,6 @@ public CheckpointedResultSubpartition getCheckpointedSubpartition(int subpartiti return (CheckpointedResultSubpartition) subpartitions[subpartitionIndex]; } - @Override - public void readRecoveredState(ChannelStateReader stateReader) throws IOException, InterruptedException { - for (ResultSubpartition subPar : subpartitions) { - ((PipelinedSubpartition) subPar).readRecoveredState(stateReader); - } - - LOG.debug("{}: Finished reading recovered state.", this); - } - @Override public void flushAll() { flushAllSubpartitions(false); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java index b8fa654c02180..86f8bc069afb0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java @@ -19,8 +19,6 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader; -import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader.ReadResult; import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; @@ -115,29 +113,6 @@ public void setChannelStateWriter(ChannelStateWriter channelStateWriter) { this.channelStateWriter = checkNotNull(channelStateWriter); } - @Override - public void readRecoveredState(ChannelStateReader stateReader) throws IOException, InterruptedException { - boolean recycleBuffer = true; - for (ReadResult readResult = ReadResult.HAS_MORE_DATA; readResult == ReadResult.HAS_MORE_DATA;) { - BufferBuilder bufferBuilder = parent.getBufferPool().requestBufferBuilderBlocking(subpartitionInfo.getSubPartitionIdx()); - BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer(); - try { - readResult = stateReader.readOutputData(subpartitionInfo, bufferBuilder); - - // check whether there are some states data filled in this time - if (bufferConsumer.isDataAvailable()) { - add(bufferConsumer, false); - recycleBuffer = false; - bufferBuilder.finish(); - } - } finally { - if (recycleBuffer) { - bufferConsumer.close(); - } - } - } - } - @Override public boolean add(BufferConsumer bufferConsumer) { return add(bufferConsumer, false); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java index 3e90f435a568d..3fc66497d617d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.io.network.partition.consumer; -import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader; import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.event.TaskEvent; @@ -29,7 +28,6 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -183,15 +181,6 @@ public String toString() { */ public abstract void setup() throws IOException; - /** - * Reads the previous unaligned checkpoint states before requesting partition data. - * - * @param executor the dedicated executor for performing this action for all the internal channels. - * @param reader the dedicated reader for unspilling the respective channel state from snapshots. - * @return the future indicates whether the recovered states have already been drained or not. - */ - public abstract CompletableFuture readRecoveredState(ExecutorService executor, ChannelStateReader reader) throws IOException; - public abstract void requestPartitions() throws IOException; public abstract CompletableFuture getStateConsumedFuture(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalRecoveredInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalRecoveredInputChannel.java index 04a192b408280..db03d534f2083 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalRecoveredInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalRecoveredInputChannel.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.io.network.partition.consumer; -import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader; import org.apache.flink.runtime.io.network.TaskEventPublisher; import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; @@ -28,7 +27,7 @@ /** * An input channel reads recovered state from previous unaligned checkpoint snapshots - * via {@link ChannelStateReader} and then converts into {@link LocalInputChannel} finally. + * and then converts into {@link LocalInputChannel} finally. */ public class LocalRecoveredInputChannel extends RecoveredInputChannel { private final ResultPartitionManager partitionManager; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java index caf1363f7210c..7cc8480a39aaa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java @@ -20,8 +20,6 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.metrics.Counter; -import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader; -import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader.ReadResult; import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.event.TaskEvent; @@ -46,8 +44,7 @@ import static org.apache.flink.util.Preconditions.checkState; /** - * An input channel reads recovered state from previous unaligned checkpoint snapshots - * via {@link ChannelStateReader}. + * An input channel reads recovered state from previous unaligned checkpoint snapshots. */ public abstract class RecoveredInputChannel extends InputChannel implements ChannelStateHolder { @@ -100,31 +97,6 @@ CompletableFuture getStateConsumedFuture() { return stateConsumedFuture; } - protected void readRecoveredState(ChannelStateReader reader) throws IOException, InterruptedException { - ReadResult result = ReadResult.HAS_MORE_DATA; - while (result == ReadResult.HAS_MORE_DATA) { - Buffer buffer = bufferManager.requestBufferBlocking(); - result = internalReaderRecoveredState(reader, buffer); - } - finishReadRecoveredState(); - } - - private ReadResult internalReaderRecoveredState(ChannelStateReader reader, Buffer buffer) throws IOException { - ReadResult result; - try { - result = reader.readInputData(channelInfo, buffer); - } catch (Throwable t) { - buffer.recycleBuffer(); - throw t; - } - if (buffer.readableBytes() > 0) { - onRecoveredStateBuffer(buffer); - } else { - buffer.recycleBuffer(); - } - return result; - } - public void onRecoveredStateBuffer(Buffer buffer) { boolean recycleBuffer = true; try { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteRecoveredInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteRecoveredInputChannel.java index 7f30f4c3675be..a89b71ff291af 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteRecoveredInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteRecoveredInputChannel.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.io.network.partition.consumer; -import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics; @@ -30,7 +29,7 @@ /** * An input channel reads recovered state from previous unaligned checkpoint snapshots - * via {@link ChannelStateReader} and then converts into {@link RemoteInputChannel} finally. + * and then converts into {@link RemoteInputChannel} finally. */ public class RemoteRecoveredInputChannel extends RecoveredInputChannel { private final ConnectionID connectionId; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index 5791a096517c9..76418c9d4bfc8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -20,9 +20,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.core.memory.MemorySegmentProvider; -import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.execution.CancelTaskException; @@ -54,14 +52,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Timer; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -246,27 +242,6 @@ public void setup() throws IOException { setBufferPool(bufferPool); } - @Override - public CompletableFuture readRecoveredState(ExecutorService executor, ChannelStateReader reader) { - synchronized (requestLock) { - if (closeFuture.isDone()) { - return FutureUtils.completedVoidFuture(); - } - } - - CompletableFuture stateConsumedFuture = getStateConsumedFuture(); - - executor.submit(() -> { - Collection channels; - synchronized (requestLock) { - channels = inputChannels.values(); - } - internalReadRecoveredState(reader, channels); - }); - - return stateConsumedFuture; - } - @Override public CompletableFuture getStateConsumedFuture() { synchronized (requestLock) { @@ -280,19 +255,6 @@ public CompletableFuture getStateConsumedFuture() { } } - private void internalReadRecoveredState(ChannelStateReader reader, Collection inputChannels) { - for (InputChannel inputChannel : inputChannels) { - try { - if (inputChannel instanceof RecoveredInputChannel) { - ((RecoveredInputChannel) inputChannel).readRecoveredState(reader); - } - } catch (Throwable t) { - inputChannel.setError(t); - return; - } - } - } - @Override public void requestPartitions() { synchronized (requestLock) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java index df7ef49a76d79..f7bf13360db8c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.io.network.partition.consumer; -import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.partition.PrioritizedDeque; @@ -32,7 +31,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkArgument; @@ -279,11 +277,6 @@ public void resumeConsumption(int channelIndex) throws IOException { public void setup() { } - @Override - public CompletableFuture readRecoveredState(ExecutorService executor, ChannelStateReader reader) { - throw new UnsupportedOperationException("This method should never be called."); - } - @Override public CompletableFuture getStateConsumedFuture() { return CompletableFuture.allOf( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java index c910a80b6e881..ff8e8683c00dd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java @@ -22,7 +22,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; -import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader; import org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -72,7 +71,5 @@ void reportTaskStateSnapshots( @Nonnull LocalRecoveryConfig createLocalRecoveryConfig(); - ChannelStateReader getChannelStateReader(); - SequentialChannelStateReader getSequentialChannelStateReader(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java index 16c7c41fc31eb..4d453421700d1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java @@ -25,8 +25,6 @@ import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; -import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader; -import org.apache.flink.runtime.checkpoint.channel.ChannelStateReaderImpl; import org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader; import org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReaderImpl; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -72,8 +70,6 @@ public class TaskStateManagerImpl implements TaskStateManager { /** The checkpoint responder through which this manager can report to the job manager. */ private final CheckpointResponder checkpointResponder; - private final ChannelStateReader channelStateReader; - private final SequentialChannelStateReader sequentialChannelStateReader; public TaskStateManagerImpl( @@ -88,24 +84,6 @@ public TaskStateManagerImpl( localStateStore, jobManagerTaskRestore, checkpointResponder, - new ChannelStateReaderImpl(jobManagerTaskRestore == null ? new TaskStateSnapshot() : jobManagerTaskRestore.getTaskStateSnapshot()) - ); - } - - public TaskStateManagerImpl( - @Nonnull JobID jobId, - @Nonnull ExecutionAttemptID executionAttemptID, - @Nonnull TaskLocalStateStore localStateStore, - @Nullable JobManagerTaskRestore jobManagerTaskRestore, - @Nonnull CheckpointResponder checkpointResponder, - @Nonnull ChannelStateReader channelStateReader) { - this( - jobId, - executionAttemptID, - localStateStore, - jobManagerTaskRestore, - checkpointResponder, - channelStateReader, new SequentialChannelStateReaderImpl(jobManagerTaskRestore == null ? new TaskStateSnapshot() : jobManagerTaskRestore.getTaskStateSnapshot())); } @@ -115,14 +93,12 @@ public TaskStateManagerImpl( @Nonnull TaskLocalStateStore localStateStore, @Nullable JobManagerTaskRestore jobManagerTaskRestore, @Nonnull CheckpointResponder checkpointResponder, - @Nonnull ChannelStateReader channelStateReader, @Nonnull SequentialChannelStateReaderImpl sequentialChannelStateReader) { this.jobId = jobId; this.localStateStore = localStateStore; this.jobManagerTaskRestore = jobManagerTaskRestore; this.executionAttemptID = executionAttemptID; this.checkpointResponder = checkpointResponder; - this.channelStateReader = channelStateReader; this.sequentialChannelStateReader = sequentialChannelStateReader; } @@ -197,11 +173,6 @@ public LocalRecoveryConfig createLocalRecoveryConfig() { return localStateStore.getLocalRecoveryConfig(); } - @Override - public ChannelStateReader getChannelStateReader() { - return channelStateReader; - } - @Override public SequentialChannelStateReader getSequentialChannelStateReader() { return sequentialChannelStateReader; @@ -225,7 +196,6 @@ public void notifyCheckpointAborted(long checkpointId) { @Override public void close() throws Exception { - channelStateReader.close(); sequentialChannelStateReader.close(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java index 51e01e32a4d69..730e3cf262232 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.taskmanager; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; @@ -226,11 +225,6 @@ public CheckpointedResultSubpartition getCheckpointedSubpartition(int subpartiti return getCheckpointablePartition().getCheckpointedSubpartition(subpartitionIndex); } - @Override - public void readRecoveredState(ChannelStateReader stateReader) throws IOException, InterruptedException { - getCheckpointablePartition().readRecoveredState(stateReader); - } - private CheckpointedResultPartition getCheckpointablePartition() { if (partitionWriter instanceof CheckpointedResultPartition) { return (CheckpointedResultPartition) partitionWriter; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java index 16b59dc3fad73..59b422ac8ed11 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.taskmanager; import org.apache.flink.metrics.Counter; -import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader; import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; @@ -31,7 +30,6 @@ import java.io.IOException; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -85,11 +83,6 @@ public void setup() throws IOException { inputGate.setup(); } - @Override - public CompletableFuture readRecoveredState(ExecutorService executor, ChannelStateReader reader) throws IOException { - return inputGate.readRecoveredState(executor, reader); - } - @Override public CompletableFuture getStateConsumedFuture() { return inputGate.getStateConsumedFuture(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateReaderImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateReaderImplTest.java deleted file mode 100644 index fbe1ade8ec9a6..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateReaderImplTest.java +++ /dev/null @@ -1,196 +0,0 @@ -package org.apache.flink.runtime.checkpoint.channel; -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import org.apache.flink.core.memory.HeapMemorySegment; -import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; -import org.apache.flink.runtime.checkpoint.StateObjectCollection; -import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; -import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; -import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; -import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.flink.runtime.state.InputChannelStateHandle; -import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Random; - -import static java.util.Collections.singletonList; -import static java.util.stream.Collectors.toMap; -import static org.apache.flink.runtime.checkpoint.channel.ChannelStateReader.ReadResult.HAS_MORE_DATA; -import static org.apache.flink.runtime.checkpoint.channel.ChannelStateReader.ReadResult.NO_MORE_DATA; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; - -/** - * {@link ChannelStateReaderImpl} test. - */ -public class ChannelStateReaderImplTest { - - private static final InputChannelInfo CHANNEL = new InputChannelInfo(1, 2); - private static final byte[] DATA = generateData(10); - private ChannelStateReaderImpl reader; - - @Before - public void init() { - reader = getReader(CHANNEL, DATA); - } - - @After - public void tearDown() throws Exception { - reader.close(); - } - - @Test - public void testDifferentBufferSizes() throws Exception { - for (int bufferSize = 1; bufferSize < 2 * DATA.length; bufferSize++) { - try (ChannelStateReaderImpl reader = getReader(CHANNEL, DATA)) { // re-create reader to re-read the same channel - readAndVerify(bufferSize, CHANNEL, DATA, reader); - } - } - } - - @Test - public void testWithOffsets() throws IOException { - Map handlesAndBytes = generateHandlesWithBytes(10, 20); - ChannelStateReader reader = new ChannelStateReaderImpl(taskStateSnapshot(handlesAndBytes.keySet()), new ChannelStateSerializerImpl()); - for (Map.Entry e : handlesAndBytes.entrySet()) { - readAndVerify(42, e.getKey().getInfo(), e.getValue(), reader); - } - } - - @Test(expected = Exception.class) - public void testReadOnlyOnce() throws IOException { - reader.readInputData(CHANNEL, getBuffer(DATA.length)); - reader.readInputData(CHANNEL, getBuffer(DATA.length)); - } - - @Test(expected = IllegalStateException.class) - public void testReadClosed() throws Exception { - reader.close(); - reader.readInputData(CHANNEL, getBuffer(DATA.length)); - } - - @Test - public void testReadUnknownChannelState() throws IOException { - InputChannelInfo unknownChannel = new InputChannelInfo(CHANNEL.getGateIdx() + 1, CHANNEL.getInputChannelIdx() + 1); - assertEquals(NO_MORE_DATA, reader.readInputData(unknownChannel, getBuffer(DATA.length))); - } - - private TaskStateSnapshot taskStateSnapshot(Collection inputChannelStateHandles) { - return new TaskStateSnapshot(Collections.singletonMap( - new OperatorID(), - new OperatorSubtaskState( - StateObjectCollection.empty(), - StateObjectCollection.empty(), - StateObjectCollection.empty(), - StateObjectCollection.empty(), - new StateObjectCollection<>(inputChannelStateHandles), - StateObjectCollection.empty() - ))); - } - - static byte[] generateData(int len) { - byte[] bytes = new byte[len]; - new Random().nextBytes(bytes); - return bytes; - } - - private byte[] toBytes(NetworkBuffer buffer) { - byte[] buf = new byte[buffer.readableBytes()]; - buffer.readBytes(buf); - return buf; - } - - private ChannelStateReaderImpl getReader(InputChannelInfo channel, byte[] data) { - return new ChannelStateReaderImpl( - taskStateSnapshot(singletonList(new InputChannelStateHandle(channel, new ByteStreamStateHandle("", data), singletonList(0L)))), - new ChannelStateSerializerImpl() { - @Override - public void readHeader(InputStream stream) { - } - - @Override - public int readLength(InputStream stream) { - return data.length; - } - }); - } - - private void readAndVerify(int bufferSize, InputChannelInfo channelInfo, byte[] data, ChannelStateReader reader) throws IOException { - int dataSize = data.length; - int iterations = dataSize / bufferSize + (-(dataSize % bufferSize) >>> 31); - NetworkBuffer buffer = getBuffer(bufferSize); - try { - for (int i = 0; i < iterations; i++) { - String hint = String.format("dataSize=%d, bufferSize=%d, iteration=%d/%d", dataSize, bufferSize, i + 1, iterations); - boolean isLast = i == iterations - 1; - assertEquals(hint, isLast ? NO_MORE_DATA : HAS_MORE_DATA, reader.readInputData(channelInfo, buffer)); - assertEquals(hint, isLast ? dataSize - bufferSize * i : bufferSize, buffer.readableBytes()); - assertArrayEquals(hint, Arrays.copyOfRange(data, i * bufferSize, Math.min(dataSize, (i + 1) * bufferSize)), toBytes(buffer)); - buffer.resetReaderIndex(); - buffer.resetWriterIndex(); - } - } finally { - buffer.release(); - } - } - - private NetworkBuffer getBuffer(int len) { - return new NetworkBuffer(HeapMemorySegment.FACTORY.allocateUnpooledSegment(len, null), FreeingBufferRecycler.INSTANCE); - } - - private Map generateHandlesWithBytes(int numHandles, int handleDataSize) throws IOException { - Map offsetsAndBytes = new HashMap<>(); - ByteArrayOutputStream baos = new ByteArrayOutputStream(100); - DataOutputStream out = new DataOutputStream(baos); - ChannelStateSerializerImpl serializer = new ChannelStateSerializerImpl(); - serializer.writeHeader(out); - for (int i = 0; i < numHandles; i++) { - offsetsAndBytes.put(baos.size(), writeSomeBytes(handleDataSize, out, serializer)); - } - ByteStreamStateHandle sharedUnderlyingHandle = new ByteStreamStateHandle("", baos.toByteArray()); - return offsetsAndBytes.entrySet().stream().collect(toMap( - e -> new InputChannelStateHandle(new InputChannelInfo(e.getKey(), e.getKey()), sharedUnderlyingHandle, singletonList((long) e.getKey())), - Map.Entry::getValue)); - } - - private byte[] writeSomeBytes(int bytesCount, DataOutputStream out, ChannelStateSerializer serializer) throws IOException { - byte[] bytes = generateData(bytesCount); - NetworkBuffer buf = getBuffer(bytesCount); - try { - buf.writeBytes(bytes); - serializer.writeData(out, buf); - return bytes; - } finally { - buf.release(); - } - } - -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateSerializerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateSerializerImplTest.java index 79a3b04210042..4b23e61dd6a25 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateSerializerImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateSerializerImplTest.java @@ -36,7 +36,6 @@ import java.util.Random; import static org.apache.flink.runtime.checkpoint.channel.ChannelStateByteBuffer.wrap; -import static org.apache.flink.runtime.checkpoint.channel.ChannelStateReaderImplTest.generateData; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -146,4 +145,10 @@ private void readAndCheck(byte[] data, ChannelStateSerializerImpl serializer, By } } + static byte[] generateData(int len) { + byte[] bytes = new byte[len]; + new Random().nextBytes(bytes); + return bytes; + } + } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java index bf6330bb8b57d..07bcbbdb76086 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java @@ -22,7 +22,6 @@ import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader; import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; @@ -34,19 +33,13 @@ import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferBuilder; -import org.apache.flink.runtime.io.network.buffer.BufferBuilderAndConsumerTest; import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener; -import org.apache.flink.runtime.io.network.partition.PipelinedResultPartition; -import org.apache.flink.runtime.io.network.partition.PipelinedSubpartition; -import org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView; import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder; -import org.apache.flink.runtime.io.network.partition.ResultPartitionTest; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; -import org.apache.flink.runtime.io.network.partition.ResultSubpartition; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.util.DeserializationUtils; @@ -325,63 +318,6 @@ public void testIsAvailableOrNot() throws Exception { } } - @Test - public void testEmitRecordWithPartitionStateRecovery() throws Exception { - final int totalBuffers = 10; // enough for both states and normal records - final int totalStates = 2; - final int[] states = {1, 2, 3, 4}; - final int[] records = {5, 6, 7, 8}; - final int bufferSize = states.length * Integer.BYTES; - - final NetworkBufferPool globalPool = new NetworkBufferPool(totalBuffers, bufferSize); - final ChannelStateReader stateReader = new ResultPartitionTest.FiniteChannelStateReader(totalStates, states); - final PipelinedResultPartition partition = (PipelinedResultPartition) new ResultPartitionBuilder() - .setNetworkBufferPool(globalPool) - .build(); - final RecordWriter recordWriter = new RecordWriterBuilder().build(partition); - - try { - partition.setup(); - partition.readRecoveredState(stateReader); - - for (int record: records) { - // the record length 4 is also written into buffer for every emit - recordWriter.broadcastEmit(new IntValue(record)); - } - - // every buffer can contain 2 int records with 2 int length(4) - final int[][] expectedRecordsInBuffer = {{4, 5, 4, 6}, {4, 7, 4, 8}}; - - for (ResultSubpartition subpartition : partition.getAllPartitions()) { - // create the view to consume all the buffers with states and records - final ResultSubpartitionView view = new PipelinedSubpartitionView( - (PipelinedSubpartition) subpartition, - new NoOpBufferAvailablityListener()); - - int numConsumedBuffers = 0; - ResultSubpartition.BufferAndBacklog bufferAndBacklog; - while ((bufferAndBacklog = view.getNextBuffer()) != null) { - Buffer buffer = bufferAndBacklog.buffer(); - int[] expected = numConsumedBuffers < totalStates ? states : expectedRecordsInBuffer[numConsumedBuffers - totalStates]; - BufferBuilderAndConsumerTest.assertContent( - buffer, - partition.getBufferPool() - .getSubpartitionBufferRecyclers()[subpartition.getSubPartitionIndex()], - expected); - - buffer.recycleBuffer(); - numConsumedBuffers++; - } - - assertEquals(totalStates + expectedRecordsInBuffer.length, numConsumedBuffers); - } - } finally { - // cleanup - globalPool.destroyAllBufferPools(); - globalPool.destroy(); - } - } - private void verifyBroadcastBufferOrEventIndependence(boolean broadcastEvent) throws Exception { ResultPartition partition = createResultPartition(4096, 2); RecordWriter writer = createRecordWriter(partition); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java index a3db15a0a20ba..785ba97ff0301 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java @@ -19,8 +19,6 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader; -import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo; import org.apache.flink.runtime.io.disk.FileChannelManager; import org.apache.flink.runtime.io.disk.FileChannelManagerImpl; @@ -29,8 +27,6 @@ import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferBuilder; -import org.apache.flink.runtime.io.network.buffer.BufferBuilderAndConsumerTest; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.taskmanager.ConsumableNotifyingResultPartitionWriterDecorator; @@ -48,14 +44,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition; import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.verifyCreateSubpartitionViewThrowsException; @@ -454,119 +443,6 @@ private BufferWritingResultPartition createResultPartition(ResultPartitionType p return (BufferWritingResultPartition) resultPartition; } - @Test - public void testInitializeEmptyState() throws Exception { - final int totalBuffers = 2; - final NetworkBufferPool globalPool = new NetworkBufferPool(totalBuffers, 1); - final PipelinedResultPartition partition = (PipelinedResultPartition) new ResultPartitionBuilder() - .setNetworkBufferPool(globalPool) - .build(); - final ChannelStateReader stateReader = ChannelStateReader.NO_OP; - try { - partition.setup(); - partition.readRecoveredState(stateReader); - - for (ResultSubpartition subpartition : partition.getAllPartitions()) { - // no buffers are added into the queue for empty states - assertEquals(0, subpartition.getTotalNumberOfBuffers()); - } - - // destroy the local pool to verify that all the requested buffers by partition are recycled - partition.getBufferPool().lazyDestroy(); - assertEquals(totalBuffers, globalPool.getNumberOfAvailableMemorySegments()); - } finally { - // cleanup - globalPool.destroyAllBufferPools(); - globalPool.destroy(); - } - } - - @Test - public void testInitializeMoreStateThanBuffer() throws Exception { - final int totalBuffers = 2; // the total buffers are less than the requirement from total states - final int totalStates = 5; - final int[] states = {1, 2, 3, 4}; - final int bufferSize = states.length * Integer.BYTES; - - final NetworkBufferPool globalPool = new NetworkBufferPool(totalBuffers, bufferSize); - final ChannelStateReader stateReader = new FiniteChannelStateReader(totalStates, states); - final PipelinedResultPartition partition = (PipelinedResultPartition) new ResultPartitionBuilder() - .setNetworkBufferPool(globalPool) - .build(); - final ExecutorService executor = Executors.newFixedThreadPool(1); - - try { - final Callable partitionConsumeTask = () -> { - for (ResultSubpartition subpartition : partition.getAllPartitions()) { - final ResultSubpartitionView view = new PipelinedSubpartitionView( - (PipelinedSubpartition) subpartition, - new NoOpBufferAvailablityListener()); - - int numConsumedBuffers = 0; - while (numConsumedBuffers != totalStates) { - ResultSubpartition.BufferAndBacklog bufferAndBacklog = view.getNextBuffer(); - if (bufferAndBacklog != null) { - Buffer buffer = bufferAndBacklog.buffer(); - BufferBuilderAndConsumerTest.assertContent( - buffer, - partition.getBufferPool() - .getSubpartitionBufferRecyclers()[subpartition.getSubPartitionIndex()], - states); - buffer.recycleBuffer(); - numConsumedBuffers++; - } else { - Thread.sleep(5); - } - } - assertNull(view.getNextBuffer()); - } - return null; - }; - Future result = executor.submit(partitionConsumeTask); - - partition.setup(); - partition.readRecoveredState(stateReader); - - // wait the partition consume task finish - result.get(20, TimeUnit.SECONDS); - - // destroy the local pool to verify that all the requested buffers by partition are recycled - partition.getBufferPool().lazyDestroy(); - assertEquals(totalBuffers, globalPool.getNumberOfAvailableMemorySegments()); - } finally { - // cleanup - executor.shutdown(); - globalPool.destroyAllBufferPools(); - globalPool.destroy(); - } - } - - /** - * Tests that the buffer is recycled correctly if exception is thrown during - * {@link ChannelStateReader#readOutputData(ResultSubpartitionInfo, BufferBuilder)}. - */ - @Test - public void testReadRecoveredStateWithException() throws Exception { - final int totalBuffers = 2; - final NetworkBufferPool globalPool = new NetworkBufferPool(totalBuffers, 1); - final PipelinedResultPartition partition = (PipelinedResultPartition) new ResultPartitionBuilder() - .setNetworkBufferPool(globalPool) - .build(); - final ChannelStateReader stateReader = new ChannelStateReaderWithException(); - - try { - partition.setup(); - partition.readRecoveredState(stateReader); - } catch (IOException e) { - assertThat("should throw custom exception message", e.getMessage().contains("test")); - } finally { - globalPool.destroyAllBufferPools(); - // verify whether there are any buffers leak - assertEquals(totalBuffers, globalPool.getNumberOfAvailableMemorySegments()); - globalPool.destroy(); - } - } - @Test public void testIdleTime() throws IOException, InterruptedException { // setup @@ -650,80 +526,6 @@ public void testFlushBoundedBlockingResultPartition() throws IOException { assertNull(readView2.getNextBuffer()); } - /** - * The {@link ChannelStateReader} instance for restoring the specific number of states. - */ - public static final class FiniteChannelStateReader implements ChannelStateReader { - private final int totalStates; - private int numRestoredStates; - private final int[] states; - private final Map counters = new HashMap<>(); - - public FiniteChannelStateReader(int totalStates, int[] states) { - this.totalStates = totalStates; - this.states = states; - } - - @Override - public boolean hasChannelStates() { - return true; - } - - @Override - public ReadResult readInputData(InputChannelInfo info, Buffer buffer) { - for (int state: states) { - buffer.asByteBuf().writeInt(state); - } - int result = counters.compute(info, (unused, counter) -> (counter == null) ? 1 : ++counter); - - return getReadResult(result); - } - - @Override - public ReadResult readOutputData(ResultSubpartitionInfo info, BufferBuilder bufferBuilder) { - bufferBuilder.appendAndCommit(BufferBuilderAndConsumerTest.toByteBuffer(states)); - return getReadResult(++numRestoredStates); - } - - private ReadResult getReadResult(int numRestoredStates) { - if (numRestoredStates < totalStates) { - return ReadResult.HAS_MORE_DATA; - } else { - return ReadResult.NO_MORE_DATA; - } - } - - @Override - public void close() { - } - } - - /** - * The {@link ChannelStateReader} instance for throwing exception when - * {@link #readOutputData(ResultSubpartitionInfo, BufferBuilder)} and {@link #readInputData(InputChannelInfo, Buffer)}. - */ - public static final class ChannelStateReaderWithException implements ChannelStateReader { - - @Override - public boolean hasChannelStates() { - return true; - } - - @Override - public ReadResult readInputData(InputChannelInfo info, Buffer buffer) throws IOException { - throw new IOException("test"); - } - - @Override - public ReadResult readOutputData(ResultSubpartitionInfo info, BufferBuilder bufferBuilder) throws IOException { - throw new IOException("test"); - } - - @Override - public void close() { - } - } - private static class TestResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier { private JobID jobID; private ResultPartitionID partitionID; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java index e7128dcfd8cc7..b6e8c222de122 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java @@ -19,279 +19,17 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.metrics.SimpleCounter; -import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader; -import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferBuilderAndConsumerTest; -import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; -import org.apache.flink.runtime.io.network.partition.ResultPartitionTest; -import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.Optional; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import static org.apache.flink.runtime.io.network.partition.ResultPartitionTest.ChannelStateReaderWithException; -import static org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannelTest.cleanup; -import static org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannelTest.submitTasksAndWaitForResults; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; /** * Tests for {@link RecoveredInputChannel}. */ -@RunWith(Parameterized.class) public class RecoveredInputChannelTest { - private final boolean isRemote; - - @Parameterized.Parameters(name = "isRemote = {0}") - public static Collection parameters() { - return Arrays.asList(new Object[][] { - {true}, - {false}, - }); - } - - public RecoveredInputChannelTest(boolean isRemote) { - this.isRemote = isRemote; - } - - @Test - public void testConcurrentReadStateAndProcess() throws Exception { - testConcurrentReadStateAndProcess(isRemote); - } - - @Test - public void testConcurrentReadStateAndRelease() throws Exception { - testConcurrentReadStateAndRelease(isRemote); - } - - /** - * Tests that there are no potential deadlock and buffer leak issues while the following actions happen concurrently: - * 1. Task thread processes the recovered state buffer from RecoveredInputChannel. - * 2. Unspilling IO thread reads the recovered state and queues the buffer into RecoveredInputChannel. - * 3. Canceler thread closes the input gate and releases the RecoveredInputChannel. - */ - @Test - public void testConcurrentReadStateAndProcessAndRelease() throws Exception { - testConcurrentReadStateAndProcessAndRelease(isRemote); - } - - /** - * Tests that there are no buffers leak while recovering the empty input channel state. - */ - @Test - public void testReadEmptyState() throws Exception { - testReadEmptyStateOrThrowException(isRemote, ChannelStateReader.NO_OP); - } - - /** - * Tests that there are no buffers leak while throwing exception during state recovery. - */ - @Test(expected = IOException.class) - public void testReadStateWithException() throws Exception { - testReadEmptyStateOrThrowException(isRemote, new ChannelStateReaderWithException()); - } - - private void testReadEmptyStateOrThrowException(boolean isRemote, ChannelStateReader reader) throws Exception { - // setup - final int totalBuffers = 10; - final NetworkBufferPool globalPool = new NetworkBufferPool(totalBuffers, 32); - final SingleInputGate inputGate = createInputGate(globalPool); - final RecoveredInputChannel inputChannel = createRecoveredChannel(isRemote, inputGate); - - try { - inputGate.setInputChannels(inputChannel); - inputGate.setup(); - - // it would throw expected exception for the case of testReadStateWithException. - inputChannel.readRecoveredState(reader); - - // the channel only has one EndOfChannelStateEvent in the queue for the case of testReadEmptyState. - assertEquals(1, inputChannel.getNumberOfQueuedBuffers()); - assertFalse(inputChannel.getNextBuffer().isPresent()); - assertTrue(inputChannel.getStateConsumedFuture().isDone()); - } finally { - // cleanup and verify no buffer leak - inputGate.close(); - globalPool.destroyAllBufferPools(); - assertEquals(totalBuffers, globalPool.getNumberOfAvailableMemorySegments()); - globalPool.destroy(); - } - } - - /** - * Tests that the process of reading recovered state executes concurrently with channel - * buffer processing, based on the condition of the total number of states is more that - * the total buffer amount, to confirm that the lifecycle(recycle) of exclusive/floating - * buffers works well. - */ - private void testConcurrentReadStateAndProcess(boolean isRemote) throws Exception { - // setup - final int totalBuffers = 10; - final NetworkBufferPool globalPool = new NetworkBufferPool(totalBuffers, 32); - final SingleInputGate inputGate = createInputGate(globalPool); - final RecoveredInputChannel inputChannel = createRecoveredChannel(isRemote, inputGate); - - // the number of states is more that the total buffer amount - final int totalStates = 15; - final int[] states = {1, 2, 3, 4}; - final ChannelStateReader reader = new ResultPartitionTest.FiniteChannelStateReader(totalStates, states); - final ExecutorService executor = Executors.newFixedThreadPool(2); - - Throwable thrown = null; - try { - inputGate.setInputChannels(inputChannel); - inputGate.setup(); - - final Callable processTask = processRecoveredBufferTask(inputChannel, totalStates, states, false); - final Callable readStateTask = readRecoveredStateTask(inputChannel, reader, false); - - submitTasksAndWaitForResults(executor, new Callable[] {readStateTask, processTask}); - } catch (Throwable t) { - thrown = t; - } finally { - // cleanup - cleanup(globalPool, executor, null, thrown, inputChannel); - } - } - - private void testConcurrentReadStateAndRelease(boolean isRemote) throws Exception { - // setup - final int totalBuffers = 10; - final NetworkBufferPool globalPool = new NetworkBufferPool(totalBuffers, 32); - final SingleInputGate inputGate = createInputGate(globalPool); - final RecoveredInputChannel inputChannel = createRecoveredChannel(isRemote, inputGate); - - // the number of states is more that the total buffer amount - final int totalStates = 15; - final int[] states = {1, 2, 3, 4}; - final ChannelStateReader reader = new ResultPartitionTest.FiniteChannelStateReader(totalStates, states); - final ExecutorService executor = Executors.newFixedThreadPool(2); - - Throwable thrown = null; - try { - inputGate.setInputChannels(inputChannel); - inputGate.setup(); - - submitTasksAndWaitForResults( - executor, - new Callable[] {readRecoveredStateTask(inputChannel, reader, true), releaseChannelTask(inputChannel)}); - } catch (Throwable t) { - thrown = t; - } finally { - // cleanup - cleanup(globalPool, executor, null, thrown, inputChannel); - } - } - - private void testConcurrentReadStateAndProcessAndRelease(boolean isRemote) throws Exception { - // setup - final int totalBuffers = 10; - final NetworkBufferPool globalPool = new NetworkBufferPool(totalBuffers, 32); - final SingleInputGate inputGate = createInputGate(globalPool); - final RecoveredInputChannel inputChannel = createRecoveredChannel(isRemote, inputGate); - - // the number of states is more that the total buffer amount - final int totalStates = 15; - final int[] states = {1, 2, 3, 4}; - final ChannelStateReader reader = new ResultPartitionTest.FiniteChannelStateReader(totalStates, states); - final ExecutorService executor = Executors.newFixedThreadPool(2); - Throwable thrown = null; - try { - inputGate.setInputChannels(inputChannel); - inputGate.setup(); - - final Callable processTask = processRecoveredBufferTask(inputChannel, totalStates, states, true); - final Callable readStateTask = readRecoveredStateTask(inputChannel, reader, true); - final Callable releaseTask = releaseChannelTask(inputChannel); - - submitTasksAndWaitForResults(executor, new Callable[] {readStateTask, processTask, releaseTask}); - } catch (Throwable t) { - thrown = t; - } finally { - // cleanup - cleanup(globalPool, executor, null, thrown, inputChannel); - } - } - - private Callable readRecoveredStateTask(RecoveredInputChannel inputChannel, ChannelStateReader reader, boolean verifyRelease) { - return () -> { - try { - inputChannel.readRecoveredState(reader); - } catch (Throwable t) { - if (!(verifyRelease && inputChannel.isReleased())) { - throw new AssertionError("Exceptions are expected here only if the input channel was released", t); - } - } - - return null; - }; - } - - private Callable processRecoveredBufferTask(RecoveredInputChannel inputChannel, int totalStates, int[] states, boolean verifyRelease) { - return () -> { - // process all the queued state buffers and verify the data - int numProcessedStates = 0; - while (numProcessedStates < totalStates) { - if (verifyRelease && inputChannel.isReleased()) { - break; - } - if (inputChannel.getNumberOfQueuedBuffers() == 0) { - Thread.sleep(1); - continue; - } - try { - Optional bufferAndAvailability = inputChannel.getNextBuffer(); - if (bufferAndAvailability.isPresent()) { - Buffer buffer = bufferAndAvailability.get().buffer(); - BufferBuilderAndConsumerTest.assertContent(buffer, null, states); - buffer.recycleBuffer(); - numProcessedStates++; - } - } catch (Throwable t) { - if (!(verifyRelease && inputChannel.isReleased())) { - throw new AssertionError("Exceptions are expected here only if the input channel was released", t); - } - } - } - - return null; - }; - } - - private Callable releaseChannelTask(RecoveredInputChannel inputChannel) { - return () -> { - inputChannel.releaseAllResources(); - return null; - }; - } - - private RecoveredInputChannel createRecoveredChannel(boolean isRemote, SingleInputGate gate) { - if (isRemote) { - return new InputChannelBuilder().buildRemoteRecoveredChannel(gate); - } else { - return new InputChannelBuilder().buildLocalRecoveredChannel(gate); - } - } - - private SingleInputGate createInputGate(NetworkBufferPool globalPool) throws Exception { - return new SingleInputGateBuilder() - .setBufferPoolFactory(globalPool.createBufferPool(8, 8)) - .setSegmentProvider(globalPool) - .build(); - } - @Test(expected = IllegalStateException.class) public void testConversionOnlyPossibleAfterConsumed() throws IOException { buildChannel().toInputChannel(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index c066c95e27d7b..d164f946e1ee4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -21,7 +21,6 @@ import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; -import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader; import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; @@ -33,7 +32,6 @@ import org.apache.flink.runtime.io.network.TaskEventPublisher; import org.apache.flink.runtime.io.network.TestingConnectionManager; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferBuilderAndConsumerTest; import org.apache.flink.runtime.io.network.buffer.BufferCompressor; import org.apache.flink.runtime.io.network.buffer.BufferDecompressor; import org.apache.flink.runtime.io.network.buffer.BufferPool; @@ -49,7 +47,6 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; -import org.apache.flink.runtime.io.network.partition.ResultPartitionTest; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; import org.apache.flink.runtime.io.network.util.TestTaskEvent; @@ -66,14 +63,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Optional; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; import static java.util.Arrays.asList; @@ -145,102 +137,6 @@ public void testSetupLogic() throws Exception { } } - /** - * Tests when the total buffer amount is less than the number of recovered states, all the states - * can be read and processed correctly via reusing the recycled buffers. - */ - @Test - public void testReadRecoveredState() throws Exception { - final int totalStates = 7; - final int[] states = {1, 2, 3, 4, 5, 6}; - final ChannelStateReader stateReader = new ResultPartitionTest.FiniteChannelStateReader(totalStates, states); - - final int totalBuffers = 5; // the total buffers are less than the requirement from total states - final NettyShuffleEnvironment environment = new NettyShuffleEnvironmentBuilder() - .setBufferSize(states.length * Integer.BYTES) - .setNumNetworkBuffers(totalBuffers) - .build(); - final SingleInputGate inputGate = createInputGate(environment); - - final ExecutorService executor = Executors.newFixedThreadPool(1); - try (Closer closer = Closer.create()) { - closer.register(environment::close); - closer.register(executor::shutdown); - - inputGate.setup(); - CompletableFuture future = inputGate.readRecoveredState(executor, stateReader); - - int numConsumedBuffers = 0; - // consume all the state buffers from one local channel and one remote channel - while (!(future.isDone() && numConsumedBuffers == totalStates * 2)) { - if (getNextBufferAndVerify(inputGate, states)) { - numConsumedBuffers++; - } else { - Thread.sleep(3); - } - } - - // release the gate to verify that all the requested buffers are recycled - inputGate.close(); - assertEquals(totalBuffers, environment.getNetworkBufferPool().getNumberOfAvailableMemorySegments()); - } - } - - /** - * Tests that {@link SingleInputGate#readRecoveredState(ExecutorService, ChannelStateReader)} and - * {@link SingleInputGate#pollNext()} and {@link SingleInputGate#close()} execute concurrently, then - * it should not have deadlock issue and potential buffer leak. - */ - @Test - public void testConcurrentReadStateAndProcessAndClose() throws Exception { - final int totalStates = 7; - final int[] states = {1, 2, 3, 4, 5, 6}; - final ChannelStateReader stateReader = new ResultPartitionTest.FiniteChannelStateReader(totalStates, states); - - final int totalBuffers = 5; - final NettyShuffleEnvironment environment = new NettyShuffleEnvironmentBuilder() - .setBufferSize(states.length * Integer.BYTES) - .setNumNetworkBuffers(totalBuffers) - .build(); - final SingleInputGate inputGate = createInputGate(environment); - - final ExecutorService executor = Executors.newFixedThreadPool(3); - try (Closer closer = Closer.create()) { - closer.register(environment::close); - closer.register(executor::shutdown); - - inputGate.setup(); - - Callable closeTask = () -> { - inputGate.close(); - return null; - }; - - Callable readRecoveredStateTask = () -> { - inputGate.readRecoveredState(executor, stateReader); - return null; - }; - - Callable processStateTask = () -> { - while (true) { - try { - if (!getNextBufferAndVerify(inputGate, states)) { - Thread.sleep(1); - } - } catch (Throwable t) { - return null; - } - } - }; - - executor.invokeAll(Arrays.asList(closeTask, readRecoveredStateTask, processStateTask)); - - // wait until the internal channel state recover task finishes - assertEquals(totalBuffers, environment.getNetworkBufferPool().getNumberOfAvailableMemorySegments()); - assertTrue(inputGate.getCloseFuture().isDone()); - } - } - @Test public void testPartitionRequestLogic() throws Exception { final NettyShuffleEnvironment environment = new NettyShuffleEnvironmentBuilder().build(); @@ -953,18 +849,6 @@ private SingleInputGate createInputGate(NettyShuffleEnvironment environment) { return inputGate; } - private boolean getNextBufferAndVerify(SingleInputGate inputGate, int[] states) throws Exception { - Optional bufferOrEvent = inputGate.pollNext(); - if (bufferOrEvent.isPresent()) { - assertTrue(bufferOrEvent.get().isBuffer()); - Buffer buffer = bufferOrEvent.get().getBuffer(); - BufferBuilderAndConsumerTest.assertContent(buffer, null, states); - buffer.recycleBuffer(); - return true; - } - return false; - } - /** * A testing implementation of {@link ResultPartitionManager} which counts the number of * {@link ResultSubpartitionView} created. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java index 6329df78061e8..74be46a9197b5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java @@ -25,7 +25,6 @@ import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; -import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader; import org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -164,11 +163,6 @@ public LocalRecoveryConfig createLocalRecoveryConfig() { "Local state directory was never set for this test object!"); } - @Override - public ChannelStateReader getChannelStateReader() { - return ChannelStateReader.NO_OP; - } - @Override public SequentialChannelStateReader getSequentialChannelStateReader() { return SequentialChannelStateReader.NO_OP; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java index 56ccf00fd05c3..d4c4a3ba3fab7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader; import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; @@ -39,7 +38,6 @@ import java.util.Optional; import java.util.Random; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -229,7 +227,7 @@ public void setup() { } @Override - public CompletableFuture readRecoveredState(ExecutorService executor, ChannelStateReader reader) { + public CompletableFuture getStateConsumedFuture() { return CompletableFuture.completedFuture(null); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java index 2b976ae932bf3..6d56806b820d4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.runtime.io; -import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader; import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.event.TaskEvent; @@ -29,7 +28,6 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -54,7 +52,7 @@ public void setup() { } @Override - public CompletableFuture readRecoveredState(ExecutorService executor, ChannelStateReader reader) { + public CompletableFuture getStateConsumedFuture() { return CompletableFuture.completedFuture(null); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java index e91eb48c8aacf..55f93c34bc296 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.runtime.io; -import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader; import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; @@ -33,7 +32,6 @@ import java.util.Optional; import java.util.Queue; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -73,7 +71,7 @@ public void setup() { } @Override - public CompletableFuture readRecoveredState(ExecutorService executor, ChannelStateReader reader) { + public CompletableFuture getStateConsumedFuture() { return CompletableFuture.completedFuture(null); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 8a9f301da7f5a..23e231d49a1af 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -35,7 +35,6 @@ import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; -import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader; import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.TestingUncaughtExceptionHandler; @@ -48,10 +47,6 @@ import org.apache.flink.runtime.io.network.api.writer.AvailabilityTestResultPartitionWriter; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; -import org.apache.flink.runtime.io.network.partition.CheckpointedResultPartition; -import org.apache.flink.runtime.io.network.partition.CheckpointedResultSubpartition; -import org.apache.flink.runtime.io.network.partition.MockResultPartitionWriter; -import org.apache.flink.runtime.io.network.partition.ResultPartitionTest; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; @@ -81,10 +76,8 @@ import org.apache.flink.runtime.state.TaskLocalStateStoreImpl; import org.apache.flink.runtime.state.TaskStateManager; import org.apache.flink.runtime.state.TaskStateManagerImpl; -import org.apache.flink.runtime.state.TestTaskLocalStateStore; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.taskmanager.CheckpointResponder; -import org.apache.flink.runtime.taskmanager.NoOpCheckpointResponder; import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskExecutionState; @@ -108,7 +101,6 @@ import org.apache.flink.streaming.api.operators.StreamOperatorStateContext; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; -import org.apache.flink.streaming.runtime.io.MockIndexedInputGate; import org.apache.flink.streaming.runtime.io.StreamInputProcessor; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; @@ -157,7 +149,6 @@ import static java.util.Arrays.asList; import static org.apache.flink.runtime.checkpoint.StateObjectCollection.singleton; import static org.apache.flink.streaming.util.StreamTaskUtil.waitTaskIsRunning; -import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; @@ -182,7 +173,6 @@ /** * Tests for {@link StreamTask}. */ -@SuppressWarnings("deprecation") public class StreamTaskTest extends TestLogger { private static OneShotLatch syncLatch; @@ -1063,87 +1053,6 @@ protected void processInput(MailboxDefaultAction.Controller controller) throws E } } - @Test - public void testBeforeInvokeWithoutChannelStates() throws Exception { - int numWriters = 2; - int numGates = 2; - RecoveryResultPartition[] partitions = new RecoveryResultPartition[numWriters]; - for (int i = 0; i < numWriters; i++) { - partitions[i] = new RecoveryResultPartition(); - } - RecoveryInputGate[] gates = new RecoveryInputGate[numGates]; - for (int i = 0; i < numGates; i++) { - gates[i] = new RecoveryInputGate(partitions); - } - - MockEnvironment mockEnvironment = new MockEnvironmentBuilder().build(); - mockEnvironment.addOutputs(asList(partitions)); - mockEnvironment.addInputs(asList(gates)); - StreamTask task = new MockStreamTaskBuilder(mockEnvironment).build(); - try { - verifyResults(gates, partitions, false, false); - - task.beforeInvoke(); - - verifyResults(gates, partitions, false, true); - } finally { - task.cleanUpInvoke(); - } - } - - @Test - public void testBeforeInvokeWithChannelStates() throws Exception { - int numWriters = 2; - int numGates = 2; - RecoveryResultPartition[] partitions = new RecoveryResultPartition[numWriters]; - for (int i = 0; i < numWriters; i++) { - partitions[i] = new RecoveryResultPartition(); - } - RecoveryInputGate[] gates = new RecoveryInputGate[numGates]; - for (int i = 0; i < numGates; i++) { - gates[i] = new RecoveryInputGate(partitions); - } - - ChannelStateReader reader = new ResultPartitionTest.FiniteChannelStateReader(1, new int[] {0}); - TaskStateManager taskStateManager = new TaskStateManagerImpl( - new JobID(), - new ExecutionAttemptID(), - new TestTaskLocalStateStore(), - null, - NoOpCheckpointResponder.INSTANCE, - reader); - MockEnvironment mockEnvironment = new MockEnvironmentBuilder().setTaskStateManager(taskStateManager).build(); - mockEnvironment.addOutputs(asList(partitions)); - mockEnvironment.addInputs(asList(gates)); - StreamTask task = new MockStreamTaskBuilder(mockEnvironment).build(); - try { - verifyResults(gates, partitions, false, false); - - task.beforeInvoke(); - - verifyResults(gates, partitions, true, false); - - // execute the partition request mail inserted after input recovery completes - task.mailboxProcessor.drain(); - - for (RecoveryInputGate inputGate : gates) { - assertTrue(inputGate.isPartitionRequested()); - } - } finally { - task.cleanUpInvoke(); - } - } - - private void verifyResults(RecoveryInputGate[] gates, RecoveryResultPartition[] partitions, boolean recoveryExpected, boolean requestExpected) { - for (RecoveryResultPartition resultPartition : partitions) { - assertEquals(recoveryExpected, resultPartition.isStateRecovered()); - } - for (RecoveryInputGate inputGate : gates) { - assertEquals(recoveryExpected, inputGate.isStateRecovered()); - assertEquals(requestExpected, inputGate.isPartitionRequested()); - } - } - /** * Tests that some StreamTask methods are called only in the main task's thread. * Currently, the main task's thread is the thread that creates the task. @@ -1990,60 +1899,6 @@ public Class getStreamOperatorClass(ClassLoader classL } } - private static class RecoveryResultPartition extends MockResultPartitionWriter implements CheckpointedResultPartition { - private boolean isStateRecovered; - - RecoveryResultPartition() { - } - - @Override - public CheckpointedResultSubpartition getCheckpointedSubpartition(int subpartitionIndex) { - throw new UnsupportedOperationException(); - } - - @Override - public void readRecoveredState(ChannelStateReader stateReader) { - isStateRecovered = true; - } - - boolean isStateRecovered() { - return isStateRecovered; - } - } - - private static class RecoveryInputGate extends MockIndexedInputGate { - private final RecoveryResultPartition[] partitions; - private boolean isStateRecovered; - private boolean isPartitionRequested; - - RecoveryInputGate(RecoveryResultPartition[] partitions) { - this.partitions = checkNotNull(partitions); - } - - @Override - public CompletableFuture readRecoveredState(ExecutorService executor, ChannelStateReader reader) { - for (RecoveryResultPartition partition : partitions) { - checkState(partition.isStateRecovered(), "The output state recovery should happen before input state recovery."); - checkState(!isPartitionRequested, "The partition request should happen after completing all input gates recovery."); - } - isStateRecovered = true; - return CompletableFuture.completedFuture(null); - } - - @Override - public void requestPartitions() { - isPartitionRequested = true; - } - - boolean isStateRecovered() { - return isStateRecovered; - } - - boolean isPartitionRequested() { - return isPartitionRequested; - } - } - private static class ClosingOperator extends AbstractStreamOperator implements OneInputStreamOperator { static AtomicBoolean closed = new AtomicBoolean(); static AtomicInteger notified = new AtomicInteger();