Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-16645] Limit the maximum backlogs in subpartitions #11567

Closed

Conversation

Jiayi-Liao
Copy link
Contributor

@Jiayi-Liao Jiayi-Liao commented Mar 30, 2020

What is the purpose of the change

In the case of data skew, most of the buffers in #LocalBufferPool can be requested away by one certain partition, which increases the in-flight data and slow down the buffer alignment.

To solve this problem, we add a config to limit the maximum backlog per subpartition. In this way, we can check subpartition's backlog and make the stream task unavailable if the limitation is exceeded.

Brief change log

  1. Add taskmanager.network.max-backlogs-per-subpartition to limit the maximum backlogs per subpartition.
  2. Make the stream unavailable / available by monitoring buffers of each subpartition in LocalBufferPool
  3. Add class SubpartitionRecycler in LocalBufferPool as the recycler of buffer requested from subpartitions.
  4. Add subpartition parameter version of requestSegment() and recycle(MemorySegment segment).
  5. Rename getBufferBuilder() to getNewBufferBuilder(int targetChannel) in RecordWriter because we already have a function named getBufferBuilder(int targetChannel).

Verifying this change

  1. testIsAvailableOrNot tests #ResultPartition#getAvailableFuture when taskmanager.network.max-backlogs-per-subpartition is not set (default value is Integer.MAX_VALUE).
  2. testMaxBuffersPerChannelAndAvailability test #LocalBufferPool#getAvailableFuture when taskmanager.network.max-backlogs-per-subpartition is set.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): yes
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? yes

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit b0ea170 (Mon Mar 30 13:41:41 UTC 2020)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Mar 30, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution @Jiayi-Liao. I've left couple of comments, I think the code looks mostly ok, but I have to think more about the concurrency issue.

*/
@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
public static final ConfigOption<Integer> NETWORK_MAX_BACKLOGS_PER_SUBPARTITION =
key("taskmanager.network.max-backlogs-per-subpartition")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to max-buffers-per-channel to be consistent with buffers-per-channel?

Comment on lines 184 to 189
.withDescription("Number of max backlogs can be used for each output subpartition." +
" If a subpartition exceeds the number of max backlogs, it will make the ResultPartition unavailable and" +
" block the processing. This benefits in reducing the in-flight data and speeding up the barrier alignment" +
" when most of the buffers are going to one subpartition (data skew). This limitation is not strictly" +
" guaranteed, which usually happens in one-to-many operators like flatmap.");

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Number of max buffers that can be used for each channel. If a channel exceeds the number of max buffers, it will make the become unavailable, cause the back pressure and block the data processing. This might speed up checkpoint alignment by preventing excessive growth of the buffered in-flight data in case of data skew and high number of configured floating buffers. This limit is not strictly guaranteed, and can be ignored by things like flatMap operators, records spanning multiple buffers or single timer producing large amount of data.

?


private final int maxBacklogInSubpartition;

private int unavailableSubpartitionsCnt;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unavailableSubpartitionsCnt -> unavailableSubpartitionsCount

* Check whether all subpartitions' backlogs are less than the limitation of max backlogs, and make this partition
* available again if yes.
*/
@GuardedBy("buffers")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not true. First of all this class can not relay on an implementation detail of the PipelinedSubpartition (which is only one of two implementations of ResultSubpartition class).

Secondly each PipelinedSubpartition has it's own buffers field, so the updates here to unavailableSubpartitionsCnt are complete not synchronized. Some simple solution might be to change unavailableSubpartitionsCnt to an AtomicLong, but let me think about alternative solutions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another solution comes to my mind is BitSet structure and each subpartition take care of its own single bit. We can easily construct such a structure by using long[] (new long[1024] can be used by 64 * 1024 subpartitions). And instead of using a counter, we can use bit operations to check whether all long values in long[] are 0.

Update: The solution looks still not thread safe...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect the smallest atomic writes/reads would be bytes, not bits. With longs, threads performing

long l = bitSet[i];
l |= 1 << n;
bitSet[i] = l;

would be overriding bits from other threads sharing the same long. So you would need single byte per subpartition.

@@ -342,6 +342,7 @@ private void decreaseBuffersInBacklogUnsafe(boolean isBuffer) {
assert Thread.holdsLock(buffers);
if (isBuffer) {
buffersInBacklog--;
parent.notifyDecreaseBacklog(buffersInBacklog);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the above two lines can be replaced with:
parent.notifyDecreaseBacklog(--buffersInBacklog);

@@ -355,6 +356,7 @@ private void increaseBuffersInBacklog(BufferConsumer buffer) {

if (buffer != null && buffer.isBuffer()) {
buffersInBacklog++;
parent.notifyIncreaseBacklog(buffersInBacklog);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the above two lines can be replaced with:
parent.notifyDecreaseBacklog(++buffersInBacklog);

if (unavailableSubpartitionsCount == 0) {
CompletableFuture<?> toNotify = availabilityHelper.getUnavailableToResetAvailable();
toNotify.complete(null);
int[] a = new int[1024];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an accident... removing now.

public void notifyDecreaseBacklog(int buffersInBacklog) {
if (buffersInBacklog == maxBuffersPerChannel) {
unavailableSubpartitionsCount--;
if (unavailableSubpartitionsCount == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the above two lines can be replaced with:
if (--unavailableSubpartitionsCount == 0) {

public void notifyIncreaseBacklog(int buffersInBacklog) {
if (buffersInBacklog == maxBuffersPerChannel + 1) {
unavailableSubpartitionsCount++;
if (unavailableSubpartitionsCount == 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the above two lines can be replaced with:
if (++unavailableSubpartitionsCount == 1) {

@@ -74,6 +74,8 @@

private final String compressionCodec;

private final int maxBuffersPerChannel;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's better to update the hashCode, equals and toString method


assertTrue(resultPartition.getAvailableFuture().isDone());

BufferAvailabilityListener listener0 = mock(BufferAvailabilityListener.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's better to avoid the use of mock

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean I should replace "mock" with a "TestBufferAvailabilityListener" here? I can do this but I wonder what the difference is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I heard from other guys that the community is trying to avoid using mock, it's not a problem for me if there is no concern from other people.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pnowojski
Copy link
Contributor

From the Travis failure It looks like you need to update docs

if (buffersInBacklog == maxBuffersPerChannel) {
if (--unavailableSubpartitionsCount == 0) {
CompletableFuture<?> toNotify = availabilityHelper.getUnavailableToResetAvailable();
toNotify.complete(null);
Copy link
Contributor

@pnowojski pnowojski Apr 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is being executed under the buffers lock. In the past we had quite a bit of problem of executing callbacks under a lock, leading to deadlocks. However since we introduced mailbox model, it might be less risky to do so, as I think the only thing this call back can do is enqueue something into the mailbox/resume default action processing? What do you think @wsry ? Could you double check if I'm not missing anything?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right, the callback only enqueues a resume mail so there should be no deadlock issue.

Comment on lines 405 to 424
public void notifyDecreaseBacklog(int buffersInBacklog) {
if (buffersInBacklog == maxBuffersPerChannel) {
if (--unavailableSubpartitionsCount == 0) {
CompletableFuture<?> toNotify = availabilityHelper.getUnavailableToResetAvailable();
toNotify.complete(null);
}
}
}

/**
* Check whether any subpartition's backlog exceeds the limitation of max backlogs, and make this partition
* unavailabe if yes.
*/
public void notifyIncreaseBacklog(int buffersInBacklog) {
if (buffersInBacklog == maxBuffersPerChannel + 1) {
if (++unavailableSubpartitionsCount == 1) {
availabilityHelper.resetUnavailable();
}
}
}
Copy link
Contributor

@pnowojski pnowojski Apr 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Continuing the previous thread

each PipelinedSubpartition has it's own buffers field, so the updates here to unavailableSubpartitionsCnt are complete not synchronized. Some simple solution might be to change unavailableSubpartitionsCnt to an AtomicLong, but let me think about alternative solutions.

There is also another issue. AvailabilityHelper is not thread safe class, while you are accessing and modifying it's state from different threads, so my previous idea with the AtomicLong unavailableSubpartitionsCount wouldn't work. This also affects getAvailableFuture.

I don't know if that can be solved in an efficient way. One thing that could help a bit, is some old idea of mine
pnowojski@24e360e
that there is no reason for each subpartition to have unique lock. Having one global per partition lock would help with couple of issues, like flushAll(), some load rebalancing ideas that we had or here. However we would have to also synchronize ResultPartition#getAvailableFuture(...) which would affect performance on the happy/hot path :/

Maybe a solution would be to not use availabilityHelper here at all. But instead availabilityHelper.resetUnavailable(); enqueue an action in the mailbox, which would suspend default action and instead of toNotify.complete(null);, also enqueue a mailbox action to re-enable default actions. That would increase a cost of #notifyIncreaseBacklog call, but not on the happy/hot path, but only if this task is being back-pressured, so when performance shouldn't matter.

However we would still need either AtomicLong or global ResultPartition.lock to not have race conditions on unavailableSubpartitionsCount. global ResultPartition.lock would be cleaner and faster solution.

(lets think this though before jumping to implementation)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I've just thought about probably a simpler and better idea. Why don't we move this anti data skew logic inside the LocalBufferPool? It's already synchronized for recycling the memory segments from netty threads and requesting from the task thread and it already implements AvailabilityProvider with an AvailabilityHelper. It currently doesn't know which channel/subpartition assigned to which buffer but it might be doable to pass this knowledge there:

  • add subpartition index argument to ResultPartitionWriter#getBufferBuilder
  • embed it into the NetworkBuffer inside LocalBufferPool#toBuffer? Maybe via passing a custom BufferRecycler?
  • use the subpartition id during recycling

Couple of caveats:

  • LocalBufferPool is also used for input gates at least, so it might need to support managing buffers with and without channels accounting.
  • for BroadcastRecordWriter the accounting would either need to be disabled, or Integer.MAX_VALUE could be used to indicate to account all channels?
  • there might be some other code paths leading to LocalBufferPool that do not care about anti data skew accounting.

Copy link
Contributor Author

@Jiayi-Liao Jiayi-Liao Apr 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmm.. it seems to make sense, but let me make sure I understand this correctly.
Firstly we maintain a int[] subpartitionsBufferCount in LocalBufferPool to represent the count of each subpartition's buffers.
(1) About adding buffers. We add an new version of requestMemorySegment(), maybe something like requestMemorySegment(int subpartitionIndex) and operate the subpartitionsBufferCount inside the new function, which will only be called by ResultPartition without affecting the calling stack of InputGate.
(2) About recycling. Create a custom BufferRecyler with channel information like new CustomBufferRecycler(subpartitionIndex) to make sure we know where the buffer comes from when recycling. And this means we need to create a single BufferRecycler instance for every subpartition. If I understand this part correctly, wouldn't it be adding too many (thousands of) BufferRecycler instances here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, more or less that was my last idea (keep in mind that I might be missing something).

If I understand this part correctly, wouldn't it be adding too many (thousands of) BufferRecycler instances here?

For one thing, we could create one instance of CustomBufferRecycler per subpartitionIndex and keep it referenced on some array field CustomBufferRecycler[] recyclers - we don't have to keep creating them on each request - so it shouldn't affect performance/GC pressure.

Secondly, we already have per subpartition index multiple classes (for example PipelinedSubpartition, PipelinedSubpartitionView, RemoteInputChannel, BufferBuilder, BufferConsumer and all of theirs fields + buffers/segments themselves). One more shouldn't matter?

Thirdly, CustomBufferRecycler should be ~16 bytes, that's just 160KB for 10000 channels.

Lastly, we could also hide subpartitionIndex inside NetworkBuffer class, but I think that might be less clean taking into account that InputGate won't be using this field? But I might be wrong/persuaded the other direction here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Jiayi-Liao just a small heads up. There is another PR that's in progress which is touching similar places (take a look at RecordWriter#getBufferBuilder in here #11564) - it's adding non blocking variant of requestingBufferBuilder from the LocalBufferPool up to the RecordWriter. It will be probably merged before your PR, so you will probably have some some conflicts during one of the rebase, that should be not that difficult to resolve, but just keep it in mind.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pnowojski Thanks for reminding :). A new idea occurs to me this morning, maybe it'll help open our thoughts a little bit. Since each subpartition is synchronized on buffers, what if we let each subpartition know its buffers' limit and we create the availability future in every subpartition? A subpartition will be unavailable when number of buffers are increasing and exceeds the limit, will be available when number of buffers are decreasing. In ResultPartition, there'll be something like this:

public boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) {
        ...
        ...
        subpartition.add(bufferConsumer);

        if (!subpartition.isAvailabe()) {
                subpartitionsFuture.combine(subpartition.availableFuture);
        }
        ...
}

But the problem here is we may have hundreds of futures combined together in the subpartitionsFuture if user sets a low value to the number of max buffers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I guess you are right.

But the problem here is we may have hundreds of futures combined together in the subpartitionsFuture if user sets a low value to the number of max buffers.

I think that wouldn't be a serious issue. Once a single sub-partition is backpressured because of data skew, data processing should pause. So as long as we are respecting the availability, there should be at most one blocked supbartition.

But it would be an issue nonetheless. It could happen for example for flatMap operator, when it's producing a lot of output on a single call.

So the subpartitionsFuture would have to maintain a List<CompletableFuture> to keep track of all of the blocked subpartitions. With this in mind, I'm not entirely sure if this approach would be easier/better than going through LocalBufferPool? Maybe it would be better to have a single source of truth for output availability (LocalBufferPool), instead of distributing the logic among LocalBufferPool and subpartitions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. I'm going to implement this in your way. This will introduce a few function changes in LocalBufferPool:

  1. Interface changes in BufferProvider. Add subpartitionIndex as a parameter to requestBufferBuilder and requestBufferBuilderBlocking so that BufferPool knows which subpartition the request comes from.
  2. Add requestMemorySegment(int subpartitionIndex) and recycle(int subpartitionIndex), and new function will reuse most logic of the original function, and the original one will be like this:
private MemorySegment requestMemorySegment() throws IOException {
	return requestMemorySegment(INVALID_SUBPARTITION_INDEX);
}
  1. Introduce class SubpartitionBufferRecycler.
class SubpartitionBufferRecycler implements BufferRecycler {

	private int subpartitionIndex;
	private LocalBufferPool bufferPool;

	SubpartitionBufferRecycler(int subpartitionIndex, LocalBufferPool bufferPool) {
		this.subpartitionIndex = subpartitionIndex;
		this.bufferPool = bufferPool;
	}

	@Override
	public void recycle(MemorySegment memorySegment) {
		bufferPool.recycle(memorySegment, subpartitionIndex);
	}
}

Copy link
Contributor

@pnowojski pnowojski Apr 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. 👍
  2. INVALID_SUBPARTITION_INDEX -> UNKNOWN_SUBPARTITION_INDEX? I guess it would be nicer to avoid using some magic constants, but to just keep requestMemorySegment() without the subpartitionIndex parameter besides a version with the parameter, but I don't see how this could be done without duplicating quite a bit of code.
  3. 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pnowojski InputChannel and Subpartition share the function requestMemorySegment(), so it looks like we don't have another way except using UNKKNOW_SUBPARTITION_INDEX if we want to reuse the logic in original function. BTW, I've updated the description and the codes, perhaps you can take a look.

Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update @Jiayi-Liao . I think the change looks mostly good. I've left couple of smaller comments.

@Jiayi-Liao
Copy link
Contributor Author

Jiayi-Liao commented Apr 25, 2020

@pnowojski I've updated the codes according to your comments and here are the latest changes:

  1. rename getNewBufferBuilder(int) to requestNewBufferBuilder(int).
  2. add numSubpartitions(equals to 0 if not set) and maxBuffersPerChannel(equals to Integer.MAX_VALUE if not set) into the constructor of LocalBufferPool.
  3. keep requestBufferBuilder() and requestBufferBuilderBlocking() and make UNKNOWN_CHANNEL only visible inside LocalBufferPool.
  4. add unavailableSubpartitionsCount > 0 condition in LocalBufferPool#getAvailableFuture().
  5. optimize the tests.

Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update @Jiayi-Liao :) Change LGTM. I've fixed couple of minor formatting issues, squashed the commits and added a proper commit message:

pnowojski@6b6c066

I'm waiting for travis run and benchmarks to complete on this commit before merging.

@pnowojski
Copy link
Contributor

Thanks for your work @Jiayi-Liao. I applied couple of white space fixes and changed the default value of config parameter to 10 merged and to master as 02978da after passing private travis build https://travis-ci.org/github/pnowojski/flink/builds/680603777

Quick glance before merging on benchmarks didn't reveal any performance regression, but we should check the long term trends.

@pnowojski pnowojski closed this Apr 29, 2020
@Jiayi-Liao
Copy link
Contributor Author

@pnowojski @wsry @zhijiangW Thanks for your review!

@Jiayi-Liao Jiayi-Liao deleted the FLINK-16645-limit-backlog branch May 23, 2020 14:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants