New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner #13228
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 36c3664 (Fri Feb 19 07:24:38 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
CI shows a timeout in {{DataSinkTaskTest.testDataSinkTask}}. |
Thanks. I converted to draft until I resolved it. I also wanted to collect some feedback anyways before considering it a serious PR (in the sense of being mergeable). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the draft @AHeise. I've left some comments/questions.
...ntime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
Outdated
Show resolved
Hide resolved
...ntime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
Outdated
Show resolved
Hide resolved
...n/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
Outdated
Show resolved
Hide resolved
...n/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
Outdated
Show resolved
Hide resolved
...me/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
Outdated
Show resolved
Hide resolved
...treaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
Show resolved
Hide resolved
while (pollNext().map(BufferOrEvent::morePriorityEvents).orElse(false)) { | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I understand it, it assumes that this pollNext()
can not return anything else besides a priority event?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, first this method checks if there is at least one priority event (priority future completed). If there is at least one, it starts processing the first one. At this point, it relies on BufferOrEvent::morePriorityEvents
to be correct in both directions (no false positives or negatives; although a false negative would just be a tad slower).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe add a checkState
, that we are not loosing some unexpected data?
...reaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
Show resolved
Hide resolved
...treaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
Outdated
Show resolved
Hide resolved
...ing-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update @AHeise. So far I haven't got to the new parts, mostly revisited the first commits to refresh my memory.
note to myself, I've finished reading the code until:
Use futures to listen to priority events and handle them in StreamTaskNetworkInput.
...time/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
Show resolved
Hide resolved
...ntime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
Show resolved
Hide resolved
...ntime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
Outdated
Show resolved
Hide resolved
private boolean isUnalignedCheckpoint(BufferConsumer bufferConsumer) { | ||
boolean unalignedCheckpoint; | ||
try (BufferConsumer bc = bufferConsumer.copy()) { | ||
Buffer buffer = bc.build(); | ||
try { | ||
final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader()); | ||
unalignedCheckpoint = event instanceof CheckpointBarrier; | ||
} catch (IOException e) { | ||
throw new IllegalStateException("Should always be able to deserialize in-memory event", e); | ||
} finally { | ||
buffer.recycleBuffer(); | ||
} | ||
} | ||
return unalignedCheckpoint; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method is only isCheckpointBarrier
and it seems to not care if it's aligned or not, right?
Besides, do we really need to deserialise the event? Previously we were snapshotting in-flight data every time we were inserting buffer as a head. I think it was just as not elegant, but simpler.
I guess this is currently a dead code, but would change if we ever want to have priority cancelation markers? If that's a sole motivation, I would revisit this problem in the future. Who knows if we will need this with checkpoint abort RPC. And if we will do, there is also another option:
Inserting priority UC barrier, could go through a separate method , that would return overtaken in-flight data:
Collection<Buffer> insertAsHeadAndGetInFlightData(checkpointBarrier)
which would also eliminate the currently existing assumption/hack that requestInflightBufferSnapshot
has to be always called immediately after inserting as a head.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved this method to the commit that spills immediately. We need it in that method to retrieve the checkpoint id to spill correctly.
Deserialization is only necessary for priority events, which are very rare and rather cheap (30 bytes). I'd argue that adding a new call chain just to optimize it is not warranted.
* @param priority flag indicating if it's a priority or non-priority element | ||
* @param alreadyContained flag that hints that the element is already in this deque, potentially as non-priority element. | ||
*/ | ||
public void add(T element, boolean priority, boolean alreadyContained) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this method being used? I think at least not in this commit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm moving it to the commit that starts using it.
...me/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
Show resolved
Hide resolved
...ime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
Outdated
Show resolved
Hide resolved
|
||
CompletableFuture<?> toNotifyPriority = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optional nit:
extract toNotify
and toNotifyPriority
pair to some simple inner class `
public static class DataNotification() {
@Nullable
CompletableFuture<?> toNotifyPriority = null;
@Nullable
CompletableFuture<?> toNotify = null;
// two setters
setXYZ(...);
void complete() {
if (toNotifyPriority != null) {
toNotifyPriority.complete(null);
}
if (toNotify != null) {
toNotify.complete(null);
}
}
}
and re-use in UnionInputGate
as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extracted into GateNotificationHelper
, please check if it's actually helping to reduce complexity.
...ime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
Outdated
Show resolved
Hide resolved
*/ | ||
default boolean notifyPriorityEvent(BufferConsumer eventBufferConsumer) throws IOException { | ||
return false; | ||
default void notifyPriorityEvent() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you elaborate a bit more in the commit message, what has been simplified, why and what are the benefits?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a few thoughts. Let me know if it makes things clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update @AHeise .
I've left a couple of comments more. I think there are no ground braking issues.
...g-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
Outdated
Show resolved
Hide resolved
...reaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
Show resolved
Hide resolved
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
Outdated
Show resolved
Hide resolved
channelStateWriter.addOutputData( | ||
barrier.getId(), | ||
subpartitionInfo, | ||
ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN, | ||
inflightBuffers.toArray(new Buffer[0])); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again I would point to the previous comment:
#13228 (comment)
Collection<Buffer> insertAsHeadAndGetInFlightData(checkpointBarrier)
might be a better option. (It might not, as I haven't tried to implement it)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not relevant in the final version where the channel spills by itself (no return value on this method). I can make it clearer in the commit message if you want.
...ntime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
Show resolved
Hide resolved
@@ -369,9 +369,7 @@ public void testMissingCancellationBarriers() throws Exception { | |||
inputGate = createBarrierBuffer(2, sequence, validator); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why has this test and CheckpointBarrierTrackerTest.java
changed in this commit? Rebasing/squashing mistake, or am I missing something about this commit (I thought it's a pure clean up without functional changes).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A side-effect of this commit is that all events are handed over from CheckpointedInputGate to StreamTaskNetworkInput and break up the poll loop. However, since events are rare, it should have no visible impact on the throughput.
The changes to the tests are now handling the additionally emitted events. Imho tests are easier to read now (no magically disappearing buffers in the sequence).
mainMailboxExecutor.execute(() -> { | ||
prioritySelectionHandler.setAvailableInput(index); | ||
priorityAvailability.getUnavailableToResetAvailable().complete(null); | ||
}, "priority event {}", index); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add StreamMultipleInputProcessor
to the mail's name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commit removed; on CheckpointedInputGate
, I'm adding the gate.toString().
// set the priority flag in a mail before notifying StreamTask of availability | ||
mainMailboxExecutor.execute(() -> { | ||
prioritySelectionHandler.setAvailableInput(index); | ||
priorityAvailability.getUnavailableToResetAvailable().complete(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can not you maybe handle the priority message directly here, in this mail? Instead of relaying on the processDefaultAction
to pick this up?
(I'm asking/loudly thinking)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved it even further up towards CheckpointedInputGate
. At this point, we need to make sure that a priority event is really at the top (hence the optimistic lock protocol for notification).
for (final InputGate gate : inputGates) { | ||
for (int index = 0, numChannels = gate.getNumberOfInputChannels(); index < numChannels; index++) { | ||
gate.getChannel(index).checkpointStopped(currentCheckpointId); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the story behind this change?
- it seems it differs only by a single line
numBarriersReceived = 0;
, so at the very least we should deduplicate some code here - can you explain what's the functional change?
- aren't we missing a unit test for that? It would help answer point 2., and if there was a bug discovered in e2e test, it would be nice to have a faster unit test for that as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry that was just a test commit to see if the stuck e2e failed because of this change. I removed it. The original change is covered by a few unit tests already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed all comments but also did major changes as a result:
- Priority events are completely handled in CheckpointedInputGate; no processing on
StreamTaskNetworkInput
,Processors
,StreamTask
. The last commit is dropped for that reason. - To avoid spurious priority notifications, there is a new commit
[FLINK-19026][network] Move sequence number into PipelinedSubpartition and relay through BufferAndAvailability and BufferAndBacklog.
I'd like to squash it with the previous commit as it touches very similar files. I left it unsquashed for easier review.
...ntime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
Show resolved
Hide resolved
private boolean isUnalignedCheckpoint(BufferConsumer bufferConsumer) { | ||
boolean unalignedCheckpoint; | ||
try (BufferConsumer bc = bufferConsumer.copy()) { | ||
Buffer buffer = bc.build(); | ||
try { | ||
final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader()); | ||
unalignedCheckpoint = event instanceof CheckpointBarrier; | ||
} catch (IOException e) { | ||
throw new IllegalStateException("Should always be able to deserialize in-memory event", e); | ||
} finally { | ||
buffer.recycleBuffer(); | ||
} | ||
} | ||
return unalignedCheckpoint; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved this method to the commit that spills immediately. We need it in that method to retrieve the checkpoint id to spill correctly.
Deserialization is only necessary for priority events, which are very rare and rather cheap (30 bytes). I'd argue that adding a new call chain just to optimize it is not warranted.
* @param priority flag indicating if it's a priority or non-priority element | ||
* @param alreadyContained flag that hints that the element is already in this deque, potentially as non-priority element. | ||
*/ | ||
public void add(T element, boolean priority, boolean alreadyContained) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm moving it to the commit that starts using it.
...n/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
Show resolved
Hide resolved
...n/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
Outdated
Show resolved
Hide resolved
...ntime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
Show resolved
Hide resolved
...reaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
Show resolved
Hide resolved
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
Show resolved
Hide resolved
@@ -369,9 +369,7 @@ public void testMissingCancellationBarriers() throws Exception { | |||
inputGate = createBarrierBuffer(2, sequence, validator); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A side-effect of this commit is that all events are handed over from CheckpointedInputGate to StreamTaskNetworkInput and break up the poll loop. However, since events are rare, it should have no visible impact on the throughput.
The changes to the tests are now handling the additionally emitted events. Imho tests are easier to read now (no magically disappearing buffers in the sequence).
@@ -22,13 +22,10 @@ | |||
import org.apache.flink.annotation.VisibleForTesting; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I admit that it's an awkward cut.
However, it's only spilled in one place as the BufferReceivedListener
methods are effectively not called in the previous commits anymore. I will make a later pass to see that all tests pass though.
...ntime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
Outdated
Show resolved
Hide resolved
...ntime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
Show resolved
Hide resolved
...ing-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
Show resolved
Hide resolved
@@ -812,242 +788,13 @@ public void testQueuedBuffers() throws Exception { | |||
} | |||
} | |||
|
|||
@Test | |||
public void testBufferReceivedListener() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you double check if indeed all of those test should be removed? I don't see how things like testPartitionNotFoundExceptionWhileGetNextBuffer
should be related to this commit/PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, my intent was to delete testBufferReceivedListener
and testPartitionNotFoundExceptionWhileGetNextBuffer
but not the test in between them.
testBufferReceivedListener
tests BufferReceivedListener
which this commits renders useless (and is later removed).
testPartitionNotFoundExceptionWhileGetNextBuffer
tests concurrent spilling of lingering buffers and receiving of such lingering buffers. Both now happens in the same thread, so the test does not make any sense.
while (pollNext().map(BufferOrEvent::morePriorityEvents).orElse(false)) { | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe add a checkState
, that we are not loosing some unexpected data?
...treaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
Show resolved
Hide resolved
…atorContext#unregisterSourceReader. If an error happens during startup, the reader may not be registered (yet), but cleanup is triggered anyways.
It generalizes the current special treatment of unaligned checkpoints and will allow a consistent treatment of priority events on both input and output. The new type facilitates detection of priority events without the need to inspect the contents of a buffer on input side. It also eases the special treatment of priority event on output side as the contextual priority flag is now inlined after the buffer has been created.
…dSubpartition. PrioritizedDeque supports enqueue elements with priority such that it will be polled after all existing priority elements but before any non-priority element. It is a building block for fair scheduling with priority elevation that will be used also on input side in the next commits.
…ide. The priority information is fully incorporated in Buffer.DataType now.
…ndBacklog to capture the DataType of the next record. The data type of the next record allows to check for availability, whether it's an event, and whether the event has a priority. It also will allow handling future data types more smoothly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the udpates @AHeise . I think the code LGTM %:
- https://github.com/apache/flink/pull/13228/files#r493039856 is still not addressed I think?
- azure green
- benchmark request will not show a visible regression :)
…n and relay through BufferAndAvailability and BufferAndBacklog. The sequence number will be used in input side to avoid spurious priority notification in later commits.
Better distinguishes between optional and non-optional variables. Add short cut for empty buffers. Next commit will build on this refactoring to incorporate priority events.
…polling. This is an alternative fix for FLINK-12510, where a cyclic deadlock can happen when a subpartition is being requested, another subpartition is freed, and data is being polled at the same time on an UnionInputGate. This fix avoids double-lock acquisition on polling by moving the availability notification for a newly acquired subpartitions outside of the lock of ResultPartitionManager. This change may trigger a availability notification on Subpartition without data being available, however, all relevant components are guarded against spurious wakeups.
PriorityDeque is also used in InputGates. BufferOrEvent and InputWithData are enriched with a flag indicating that there are more priority events. (Note relaying the DataType as on the output side would require lock acquisitions which are not warranted at this point in time)
BufferAvailabilityListener#notifyPriorityEvent now is a simple notification to avoid any kind of secondary data flow on output side as it was originally intended before. For remote channels, notifyPriorityEvent behaves like an extra notifyDataAvailable call as an event is always pollable. For local channels, notifyPriorityEvent ultimately informs InputGate that the respective channel has a priority event.
A future commit moves the Unaligner completely into task thread, which would result in late spilling of in-flight data during polling and potentially delay un Because channels are now responsible for spilling in-flight data during unaligned checkpoint, in-flight data can be spilled as soon as the checkpoint has been.
…ivedListener to CheckpointedInputGate. This commit renders BufferReceivedListener obsolete and will allow a following commit to remove it entirely. A side-effect of this commit is that all events are handed over from CheckpointedInputGate to StreamTaskNetworkInput and break up the poll loop. However, since events are rare, it should have no visible impact on the throughput.
…rrierUnaligner. This concludes the refactoring: All priority events use the same buffer hand-over as normal events; the buffers are just reordered. Notification of priority event bypasses CheckpointBarrierHandler and directly triggers CheckpointedInputGate#processPriorityEvents. Note that checkpoints are not cancelled anymore if Unaligner received all barriers. This behavior is now in line with Aligner.
AbstractInvokable#executeInTaskThread.
…ource interface. The rewritten test induces heavy backpressure which would not work at all with aligned checkpoints or legacy sources during the timeout period.
✅
✅
✅ |
What is the purpose of the change
Currently, netty threads and task thread are both accessing Unaligner. Netty thread access it indirectly through
BufferReceivedListener
. Thus, we need some costly synchronization and weaken the threading model of the Unaligner.This PR mitigates it by letting priority buffers use the same handover protocol as regular buffers. Instead buffers are reordered where needed an additional wake-up mechanism is used for priority events, such that they can interrupt regular buffer processing.
Brief change log
BufferAndAvailability
,BufferAndBacklog
,InputWithData
, ...) directly or indirectly.PriorityDeque
to support such reordering.CheckpointedInputGate
(moved out from Unaligner). Generalized hand-over of events fromCheckpointedInputGate
toStreamTaskNetworkInput
, such that all events are forwarded.StreamTaskNetworkInput
to Unaligner.BufferReceivedListener
and letting Unaligner react on in-flight data.Verifying this change
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation
This change is