Skip to content

Conversation

wsry
Copy link
Contributor

@wsry wsry commented Jan 25, 2022

What is the purpose of the change

Currently, for result partition of sort-shuffle, there is extra record copy overhead Introduced by clustering records by subpartition index. For small records, this overhead can cause even 20% performance regression. This ticket aims to solve the problem.

In fact, the hash-based implementation is a nature way to achieve the goal of sorting records by partition index. However, it incurs some serious weaknesses. For example, when there is no enough buffers or there is data skew, it can waste buffers and influence compression efficiency which can cause performance regression.

This ticket tries to solve the issue by dynamically switching between the two implementations, that is, if there are enough buffers, the hash-based implementation will be used and if there is no enough buffers, the sort-based implementation will be used.

Brief change log

  • Dynamically switching between the two implementations, that is, if there are enough buffers, the hash-based implementation will be used and if there is no enough buffers, the sort-based implementation will be used.

Verifying this change

This change added tests and existing tests can also help to verify the change.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 7937706 (Tue Jan 25 13:09:59 UTC 2022)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Jan 25, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

wsry pushed a commit to wsry/flink that referenced this pull request Jan 26, 2022
…-shuffle if there are enough buffers for better performance

Currently, for result partition of sort-shuffle, there is extra record copy overhead introduced by clustering records by subpartition index. For small records, this overhead can cause even 20% performance regression. This patch aims to solve the problem.

In fact, the hash-based implementation is a nature way to achieve the goal of sorting records by partition index. However, it incurs some serious weaknesses. For example, when there is no enough buffers or there is data skew, it can waste buffers and influence compression efficiency which can cause performance regression.

This patch solves the issue by dynamically switching between the two implementations, that is, if there are enough buffers, the hash-based implementation will be used and if there is no enough buffers, the sort-based implementation will be used.

This closes apache#18505.
wsry pushed a commit to wsry/flink that referenced this pull request Jan 26, 2022
…-shuffle if there are enough buffers for better performance

Currently, for result partition of sort-shuffle, there is extra record copy overhead introduced by clustering records by subpartition index. For small records, this overhead can cause even 20% performance regression. This patch aims to solve the problem.

In fact, the hash-based implementation is a nature way to achieve the goal of sorting records by partition index. However, it incurs some serious weaknesses. For example, when there is no enough buffers or there is data skew, it can waste buffers and influence compression efficiency which can cause performance regression.

This patch solves the issue by dynamically switching between the two implementations, that is, if there are enough buffers, the hash-based implementation will be used and if there is no enough buffers, the sort-based implementation will be used.

This closes apache#18505.
wsry pushed a commit to wsry/flink that referenced this pull request Jan 26, 2022
…-shuffle if there are enough buffers for better performance

Currently, for result partition of sort-shuffle, there is extra record copy overhead introduced by clustering records by subpartition index. For small records, this overhead can cause even 20% performance regression. This patch aims to solve the problem.

In fact, the hash-based implementation is a nature way to achieve the goal of sorting records by partition index. However, it incurs some serious weaknesses. For example, when there is no enough buffers or there is data skew, it can waste buffers and influence compression efficiency which can cause performance regression.

This patch solves the issue by dynamically switching between the two implementations, that is, if there are enough buffers, the hash-based implementation will be used and if there is no enough buffers, the sort-based implementation will be used.

This closes apache#18505.
wsry pushed a commit to wsry/flink that referenced this pull request Jan 26, 2022
…-shuffle if there are enough buffers for better performance

Currently, for result partition of sort-shuffle, there is extra record copy overhead introduced by clustering records by subpartition index. For small records, this overhead can cause even 20% performance regression. This patch aims to solve the problem.

In fact, the hash-based implementation is a nature way to achieve the goal of sorting records by partition index. However, it incurs some serious weaknesses. For example, when there is no enough buffers or there is data skew, it can waste buffers and influence compression efficiency which can cause performance regression.

This patch solves the issue by dynamically switching between the two implementations, that is, if there are enough buffers, the hash-based implementation will be used and if there is no enough buffers, the sort-based implementation will be used.

This closes apache#18505.
Copy link
Contributor

@gaoyunhaii gaoyunhaii left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very thanks @wsry for the PR! I have left some comments.

For the long run, I tend to we move the difference implementation to the implementation class, like the strategy to split write buffers and sort buffers and how they write the buffers to the files. Perhaps we could create a new issue for the future refactor?

Also if possible I tend to we rename the classes to DataBuffer, HashBasedDataBuffer and SortBasedDataBuffer and also the variables to avoid reuse the word sort.

private void writeLargeRecord(
ByteBuffer record, int targetSubpartition, DataType dataType, boolean isBroadcast)
throws IOException {
checkState(numBuffersForWrite > 0, "No buffers available for writing.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this cause problem if there is large records when using hash-based implementation? Might we keep at least one buffer for write?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For hash-based implementation, large record will be appended to the sort buffer, when the data buffer is full the partial data of the record will be spilled as a data region and the remaining data of the large record will be appended to the sort buffer again. That is to say, a large record can span multiple data regions.

if (!isFull) {
++numTotalRecords;
}
numTotalBytes += totalBytes - source.remaining();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If source takes 5 buffers and 3 buffers are written, do we expect to write the remaining buffers in the next buffer? If so might add some comments in the method docs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will add some comments to explain that.

boolean isReleased();

/** Resets this {@link SortBuffer} to be reused for data appending. */
void reset();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might move this method before finish().

Perhaps we could also add some description of the lifecycle of the SortBuffer in the class document? Like describe the process of write, writer, full, read, read, reset, ..., finish, release.

* Number of reserved network buffers for data writing. This value can be 0 and 0 means that
* {@link HashBasedPartitionSortedBuffer} will be used.
*/
private int numBuffersForWrite;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the following modification, it seems the variable here mainly plays a role to indicate whether we want to use hash-based implementation and sort-based implementation. I think perhaps we could directly use a variable like sortBufferType or useHashBuffer to make it more clear. The numBuffersForWrite could be changed to be a local variable in the constructor. We could change the implementation to be like

if (numRequiredBuffer >= 2 * numSubpartitions) {
    useHashBuffer = true;
} else {
    useHashBuffer = false;
}

if (!useHashBuffer) {
    int expectedWriteBuffers;
    if (numRequiredBuffer >= 2 * numSubpartitions) {
        expectedWriteBuffers = 0;
    } else if (networkBufferSize >= NUM_WRITE_BUFFER_BYTES) {
        expectedWriteBuffers = 1;
    } else {
        expectedWriteBuffers =
            Math.min(EXPECTED_WRITE_BATCH_SIZE, NUM_WRITE_BUFFER_BYTES / networkBufferSize);
    }

    int numBuffersForWrite = Math.min(numRequiredBuffer / 2, expectedWriteBuffers);
    numBuffersForSort = numRequiredBuffer - numBuffersForWrite;

    try {
        for (int i = 0; i < numBuffersForWrite; ++i) {
            MemorySegment segment = bufferPool.requestMemorySegmentBlocking();
            writeSegments.add(segment);
        }
    } catch (InterruptedException exception) {
        // the setup method does not allow InterruptedException
        throw new IOException(exception);
    }
}


fileWriter.writeBuffers(toWrite);
}
BufferWithChannel bufferWithChannel = sortBuffer.copyIntoSegment(segments.poll());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is also a bit weird that copyIntoSegment might pass a null segment. In consideration of the deadline I think we might rename the method to be like getNextBuffer(@Nullable MemorySegment transitBuffer) and add proper comments ?

@wsry
Copy link
Contributor Author

wsry commented Feb 9, 2022

Very thanks @wsry for the PR! I have left some comments.

For the long run, I tend to we move the difference implementation to the implementation class, like the strategy to split write buffers and sort buffers and how they write the buffers to the files. Perhaps we could create a new issue for the future refactor?

Also if possible I tend to we rename the classes to DataBuffer, HashBasedDataBuffer and SortBasedDataBuffer and also the variables to avoid reuse the word sort.

@gaoyunhaii Thanks for the review and comments. I agree that we can rename the sort buffer class in this PR. As for other refactor, I will create a new ticket to do it latter. I will update the PR soon.

wsry pushed a commit to wsry/flink that referenced this pull request Feb 10, 2022
…-shuffle if there are enough buffers for better performance

Currently, for result partition of sort-shuffle, there is extra record copy overhead introduced by clustering records by subpartition index. For small records, this overhead can cause even 20% performance regression. This patch aims to solve the problem.

In fact, the hash-based implementation is a nature way to achieve the goal of sorting records by partition index. However, it incurs some serious weaknesses. For example, when there is no enough buffers or there is data skew, it can waste buffers and influence compression efficiency which can cause performance regression.

This patch solves the issue by dynamically switching between the two implementations, that is, if there are enough buffers, the hash-based implementation will be used and if there is no enough buffers, the sort-based implementation will be used.

This closes apache#18505.
@wsry
Copy link
Contributor Author

wsry commented Feb 10, 2022

@gaoyunhaii I have updated the PR.

Copy link
Contributor

@gaoyunhaii gaoyunhaii left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM % one small comment.

Also @wsry could you squash the comments and rebase to the latest master to retrigger the ci pipeline? The architecture check issue should be fixed.


private static final int numThreads = 4;

private final boolean useHashBasedSortBuffer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: might change to useHashBasedDataBuffer ?

wsry pushed a commit to wsry/flink that referenced this pull request Feb 11, 2022
…-shuffle if there are enough buffers for better performance

Currently, for result partition of sort-shuffle, there is extra record copy overhead introduced by clustering records by subpartition index. For small records, this overhead can cause even 20% performance regression. This patch aims to solve the problem.

In fact, the hash-based implementation is a nature way to achieve the goal of sorting records by partition index. However, it incurs some serious weaknesses. For example, when there is no enough buffers or there is data skew, it can waste buffers and influence compression efficiency which can cause performance regression.

This patch solves the issue by dynamically switching between the two implementations, that is, if there are enough buffers, the hash-based implementation will be used and if there is no enough buffers, the sort-based implementation will be used.

This closes apache#18505.
@wsry
Copy link
Contributor Author

wsry commented Feb 11, 2022

@flinkbot run azure

…-shuffle if there are enough buffers for better performance

Currently, for result partition of sort-shuffle, there is extra record copy overhead introduced by clustering records by subpartition index. For small records, this overhead can cause even 20% performance regression. This patch aims to solve the problem.

In fact, the hash-based implementation is a nature way to achieve the goal of sorting records by partition index. However, it incurs some serious weaknesses. For example, when there is no enough buffers or there is data skew, it can waste buffers and influence compression efficiency which can cause performance regression.

This patch solves the issue by dynamically switching between the two implementations, that is, if there are enough buffers, the hash-based implementation will be used and if there is no enough buffers, the sort-based implementation will be used.

This closes apache#18505.
@wsry wsry closed this in 3be35d9 Feb 12, 2022
MrWhiteSike pushed a commit to MrWhiteSike/flink that referenced this pull request Mar 3, 2022
…-shuffle if there are enough buffers for better performance

Currently, for result partition of sort-shuffle, there is extra record copy overhead introduced by clustering records by subpartition index. For small records, this overhead can cause even 20% performance regression. This patch aims to solve the problem.

In fact, the hash-based implementation is a nature way to achieve the goal of sorting records by partition index. However, it incurs some serious weaknesses. For example, when there is no enough buffers or there is data skew, it can waste buffers and influence compression efficiency which can cause performance regression.

This patch solves the issue by dynamically switching between the two implementations, that is, if there are enough buffers, the hash-based implementation will be used and if there is no enough buffers, the sort-based implementation will be used.

This closes apache#18505.
RexXiong pushed a commit to apache/celeborn that referenced this pull request Apr 9, 2024
### What changes were proposed in this pull request?

Refactor `SortBuffer` and `PartitionSortedBuffer` with introduction of `DataBuffer` and `SortBasedDataBuffer`.

### Why are the changes needed?

`SortBuffer` and `PartitionSortedBuffer` is refactored in apache/flink#18505. Celeborn Flink should also refactor `SortBuffer` and `PartitionSortedBuffer` to sync the interface changes in Flink. Meanwhile, `SortBuffer` and `PartitionSortedBuffer` should distinguish channel and subpartition for apache/flink#23927.

### Does this PR introduce _any_ user-facing change?

- `SortBuffer` renames to `DataBuffer`.
- `PartitionSortedBuffer` renames to `SortBasedDataBuffer`.
- `SortBuffer.BufferWithChannel` renames to `BufferWithSubpartition`

### How was this patch tested?

UT and IT.

Closes #2448 from SteNicholas/CELEBORN-1374.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants