-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel #23927
Conversation
35eb891
to
b32f79c
Compare
Hi @TanYuxin-tyx could you please take a look at this PR? |
b32f79c
to
7fba841
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yunfengzhou-hub Thanks for the contribution. I have some comments on the change, PTAL.
.../src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexSet.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexSet.java
Outdated
Show resolved
Hide resolved
...me/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
Show resolved
Hide resolved
...time/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
Show resolved
Hide resolved
...rc/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexRange.java
Outdated
Show resolved
Hide resolved
...test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java
Outdated
Show resolved
Hide resolved
...test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java
Outdated
Show resolved
Hide resolved
...test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java
Show resolved
Hide resolved
...rc/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
Outdated
Show resolved
Hide resolved
1722add
to
2cef883
Compare
...apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriterImpl.java
Outdated
Show resolved
Hide resolved
2cef883
to
0766c3f
Compare
@yunfengzhou-hub Thanks for the update. I have no more comments on the change. |
.../src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexSet.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexRange.java
Outdated
Show resolved
Hide resolved
...me/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
Show resolved
Hide resolved
...org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReader.java
Outdated
Show resolved
Hide resolved
...he/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java
Outdated
Show resolved
Hide resolved
...he/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java
Outdated
Show resolved
Hide resolved
...he/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierConsumerAgent.java
Outdated
Show resolved
Hide resolved
eaea6b4
to
5d18a44
Compare
Hi @reswqa Could you please take a look at this PR? |
5d18a44
to
a95fb6e
Compare
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
Outdated
Show resolved
Hide resolved
...he/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java
Outdated
Show resolved
Hide resolved
...n/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
Show resolved
Hide resolved
...n/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
Show resolved
Hide resolved
7758b0a
to
54200b3
Compare
.../src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexSet.java
Show resolved
Hide resolved
...time/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
Show resolved
Hide resolved
...n/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
Outdated
Show resolved
Hide resolved
...n/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
Outdated
Show resolved
Hide resolved
...a/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/BufferAccumulator.java
Outdated
Show resolved
Hide resolved
7b58307
to
841bba0
Compare
…mically in tiered shuffle
…e in ResultSubpartitionView#getAvailabilityAndBacklog
…DataAvailable() method
841bba0
to
c1ad90d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @yunfengzhou-hub, merged(I somehow forgot to approval :)).
### What changes were proposed in this pull request? Support Flink 1.19. ### Why are the changes needed? Flink 1.19.0 is announced to release: [Announcing the Release of Apache Flink 1.19] (https://flink.apache.org/2024/03/18/announcing-the-release-of-apache-flink-1.19). The main changes includes: - `org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel` constructor change parameters: - `consumedSubpartitionIndex` changes to `consumedSubpartitionIndexSet`: [[FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel](apache/flink#23927). - adds `partitionRequestListenerTimeout`: [[FLINK-25055][network] Support listen and notify mechanism for partition request](apache/flink#23565). - `org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate` constructor removes parameters `subpartitionIndexRange`, `tieredStorageConsumerClient`, `nettyService` and `tieredStorageConsumerSpecs`: [[FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel](apache/flink#23927). - Change the default config file to `config.yaml` in `flink-dist`: [[FLINK-33577][dist] Change the default config file to config.yaml in flink-dist](apache/flink#24177). - `org.apache.flink.configuration.RestartStrategyOptions` uses `org.apache.commons.compress.utils.Sets` of `commons-compress` dependency: [[FLINK-33865][runtime] Adding an ITCase to ensure exponential-delay.attempts-before-reset-backoff works well](apache/flink#23942). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Local test: - Flink batch job submission ``` $ ./bin/flink run examples/streaming/WordCount.jar --execution-mode BATCH Executing example with default input data. Use --input to specify file input. Printing result to stdout. Use --output to specify output path. Job has been submitted with JobID 2e9fb659991a9c29d376151783bdf6de Program execution finished Job with JobID 2e9fb659991a9c29d376151783bdf6de has finished. Job Runtime: 1912 ms ``` - Flink batch job execution ![image](https://github.com/apache/incubator-celeborn/assets/10048174/18b60861-cafc-4df3-b94d-93307e728be2) - Celeborn master log ``` 24/03/18 20:52:47,513 INFO [celeborn-dispatcher-42] Master: Offer slots successfully for 1 reducers of 1710766312631-2e9fb659991a9c29d376151783bdf6de-0 on 1 workers. ``` - Celeborn worker log ``` 24/03/18 20:52:47,704 INFO [celeborn-dispatcher-1] StorageManager: created file at /Users/nicholas/Software/Celeborn/apache-celeborn-0.5.0-SNAPSHOT/shuffle/celeborn-worker/shuffle_data/1710766312631-2e9fb659991a9c29d376151783bdf6de/0/0-0-0 24/03/18 20:52:47,707 INFO [celeborn-dispatcher-1] Controller: Reserved 1 primary location and 0 replica location for 1710766312631-2e9fb659991a9c29d376151783bdf6de-0 24/03/18 20:52:47,874 INFO [celeborn-dispatcher-2] Controller: Start commitFiles for 1710766312631-2e9fb659991a9c29d376151783bdf6de-0 24/03/18 20:52:47,890 INFO [worker-rpc-async-replier] Controller: CommitFiles for 1710766312631-2e9fb659991a9c29d376151783bdf6de-0 success with 1 committed primary partitions, 0 empty primary partitions, 0 failed primary partitions, 0 committed replica partitions, 0 empty replica partitions, 0 failed replica partitions. ``` Closes #2399 from SteNicholas/CELEBORN-1310. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request? Refactor `SortBuffer` and `PartitionSortedBuffer` with introduction of `DataBuffer` and `SortBasedDataBuffer`. ### Why are the changes needed? `SortBuffer` and `PartitionSortedBuffer` is refactored in apache/flink#18505. Celeborn Flink should also refactor `SortBuffer` and `PartitionSortedBuffer` to sync the interface changes in Flink. Meanwhile, `SortBuffer` and `PartitionSortedBuffer` should distinguish channel and subpartition for apache/flink#23927. ### Does this PR introduce _any_ user-facing change? - `SortBuffer` renames to `DataBuffer`. - `PartitionSortedBuffer` renames to `SortBasedDataBuffer`. - `SortBuffer.BufferWithChannel` renames to `BufferWithSubpartition` ### How was this patch tested? UT and IT. Closes #2448 from SteNicholas/CELEBORN-1374. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
What is the purpose of the change
This pull request adds support for consuming multiple subpartitions in a single input channel.
Brief change log
Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation