Skip to content

Commit

Permalink
Unconditionally read channel state
Browse files Browse the repository at this point in the history
  • Loading branch information
rkhachatryan committed Aug 27, 2020
1 parent 817a460 commit 3c79716
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@
@Internal
public interface SequentialChannelStateReader extends AutoCloseable {

/**
* Return whether there are any channel states to be read.
*/
boolean hasChannelStates();

void readInputData(InputGate[] inputGates) throws IOException, InterruptedException;

void readOutputData(ResultPartitionWriter[] writers) throws IOException, InterruptedException;
Expand All @@ -42,11 +37,6 @@ public interface SequentialChannelStateReader extends AutoCloseable {

SequentialChannelStateReader NO_OP = new SequentialChannelStateReader() {

@Override
public boolean hasChannelStates() {
return false;
}

@Override
public void readInputData(InputGate[] inputGates) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,6 @@ public SequentialChannelStateReaderImpl(TaskStateSnapshot taskStateSnapshot, Cha
this.chunkReader = chunkReader;
}

@Override
public boolean hasChannelStates() {
return taskStateSnapshot.getSubtaskStateMappings().stream().anyMatch(subtaskStateEntry ->
subtaskStateEntry.getValue().getInputChannelState().stream().anyMatch(h -> !h.getOffsets().isEmpty()) ||
subtaskStateEntry.getValue().getResultSubpartitionState().stream().anyMatch(h -> !h.getOffsets().isEmpty()));
}

@Override
public void readInputData(InputGate[] inputGates) throws IOException {
try (InputChannelRecoveredStateHandler stateHandler = new InputChannelRecoveredStateHandler(inputGates)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ public static Object[][] parameters() {
private final int stateBytesPerPart;
private final int bufferSize;
private final int stateParLevel;
private final boolean expectToHaveState;
private final int buffersPerChannel;

public SequentialChannelStateReaderImplTest(String desc, int stateParLevel, int statePartsPerChannel, int stateBytesPerPart, int parLevel, int bufferSize) {
Expand All @@ -100,7 +99,6 @@ public SequentialChannelStateReaderImplTest(String desc, int stateParLevel, int
this.stateBytesPerPart = stateBytesPerPart;
this.bufferSize = bufferSize;
this.stateParLevel = stateParLevel;
this.expectToHaveState = stateParLevel * statePartsPerChannel * stateBytesPerPart > 0;
// will read without waiting for consumption
this.buffersPerChannel = Math.max(1, statePartsPerChannel * (bufferSize >= stateBytesPerPart ? 1 : stateBytesPerPart / bufferSize));
}
Expand All @@ -111,7 +109,6 @@ public void testReadPermutedState() throws Exception {
Map<ResultSubpartitionInfo, List<byte[]>> resultPartitionsData = generateState(ResultSubpartitionInfo::new);

SequentialChannelStateReader reader = new SequentialChannelStateReaderImpl(buildSnapshot(writePermuted(inputChannelsData, resultPartitionsData)));
assertEquals(expectToHaveState, reader.hasChannelStates());

withResultPartitions(resultPartitions -> {
reader.readOutputData(resultPartitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,24 +488,19 @@ protected void beforeInvoke() throws Exception {

private void readRecoveredChannelStateSequentially() throws IOException, InterruptedException {
SequentialChannelStateReader reader = getEnvironment().getTaskStateManager().getSequentialChannelStateReader();
if (reader.hasChannelStates()) {
reader.readOutputData(getEnvironment().getAllWriters());
channelIOExecutor.execute(() -> {
try {
reader.readInputData(getEnvironment().getAllInputGates());
} catch (Exception e) {
asyncExceptionHandler.handleAsyncException("Unable to read channel state", e);
}
});

CompletableFuture.allOf(Arrays.stream(getEnvironment().getAllInputGates())
.map(InputGate::getStateConsumedFuture)
.collect(Collectors.toList())
.toArray(new CompletableFuture[]{})
).thenRun(() -> mainMailboxExecutor.execute(this::requestPartitions, "Input gates request partitions"));
} else {
requestPartitions();
}
reader.readOutputData(getEnvironment().getAllWriters());
channelIOExecutor.execute(() -> {
try {
reader.readInputData(getEnvironment().getAllInputGates());
} catch (Exception e) {
asyncExceptionHandler.handleAsyncException("Unable to read channel state", e);
}
});
CompletableFuture.allOf(Arrays.stream(getEnvironment().getAllInputGates())
.map(InputGate::getStateConsumedFuture)
.collect(Collectors.toList())
.toArray(new CompletableFuture[]{})
).thenRun(() -> mainMailboxExecutor.execute(this::requestPartitions, "Input gates request partitions"));
}

private void requestPartitions() throws IOException {
Expand Down

0 comments on commit 3c79716

Please sign in to comment.