[CELEBORN-914] Support memory file storage#2300
Conversation
fdfff8d to
a0e672f
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2300 +/- ##
==========================================
+ Coverage 48.96% 49.33% +0.38%
==========================================
Files 209 211 +2
Lines 13102 13238 +136
Branches 1134 1149 +15
==========================================
+ Hits 6414 6530 +116
- Misses 6270 6276 +6
- Partials 418 432 +14 ☔ View full report in Codecov by Sentry. |
efdf6e3 to
6cc81ac
Compare
a29e592 to
462f5c4
Compare
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
Show resolved
Hide resolved
| } | ||
|
|
||
| public boolean addStream(long streamId) { | ||
| ReduceFileMeta reduceFileMeta = (ReduceFileMeta) fileMeta; |
There was a problem hiding this comment.
should not use actual type ReduceFileMeta directly, Can we delegate stream operation to FileMeta or use instanceof?
| return length; | ||
| } | ||
|
|
||
| public void addLength(int length) { |
There was a problem hiding this comment.
MemoryInfo.addLength seems the same like DiskFileInfo.updateBytesFlushed
| import org.apache.celeborn.common.network.buffer.ManagedBuffer; | ||
|
|
||
| public interface ManagedBuffers { | ||
| int numChunks(); |
| private final int numChunks; | ||
| private final CompositeByteBuf buffer; | ||
|
|
||
| public MemoryFileManagedBuffers(MemoryFileInfo memoryFileInfo) { |
There was a problem hiding this comment.
We can create BaseChunk/ReduceManagedBuffers to eliminate duplicate codes
| return indexMap; | ||
| } | ||
|
|
||
| public static void reorganizeBuffer( |
There was a problem hiding this comment.
reorganizeBuffer -> sortBufferByMapRange?
| .categories("worker") | ||
| .doc("Max ratio of direct memory to store shuffle data") | ||
| .version("0.2.0") | ||
| .doc("Max ratio of direct memory to store shuffle data.") |
There was a problem hiding this comment.
0.4.1->0.5, what's the meaning about the default value
| null) | ||
| } | ||
|
|
||
| if (location.getStorageInfo.localDiskAvailable() || location.getStorageInfo.HDFSAvailable()) { |
| <include>**/*Test*.*</include> | ||
| </includes> | ||
| <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> | ||
| <argLine>${argLine} -ea -Xmx4g -Xss4m -XX:MaxMetaspaceSize=2g -XX:ReservedCodeCacheSize=128m ${extraJavaTestArgs} -Dio.netty.tryReflectionSetAccessible=true</argLine> |
There was a problem hiding this comment.
Don't change that in this pr
worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
Outdated
Show resolved
Hide resolved
worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
Show resolved
Hide resolved
worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
Outdated
Show resolved
Hide resolved
worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
Outdated
Show resolved
Hide resolved
| createFile(writerContext); | ||
|
|
||
| // Reduce partition data writers support memory storage now | ||
| if (supportInMemory && createFileResult._1() != null) { |
There was a problem hiding this comment.
IMO better to delegate the actual writing and eviction to something like a TieredFileStorage. This would make the PartitionDataWriter more clear.
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
Show resolved
Hide resolved
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
Show resolved
Hide resolved
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
Outdated
Show resolved
Hide resolved
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
Outdated
Show resolved
Hide resolved
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionDataWriter.java
Show resolved
Hide resolved
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
Outdated
Show resolved
Hide resolved
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
Outdated
Show resolved
Hide resolved
# Conflicts: # worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
# Conflicts: # client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java # worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2300 +/- ##
==========================================
- Coverage 40.17% 38.98% -1.19%
==========================================
Files 218 219 +1
Lines 13742 13547 -195
Branches 1214 1191 -23
==========================================
- Hits 5520 5280 -240
- Misses 7905 7966 +61
+ Partials 317 301 -16 ☔ View full report in Codecov by Sentry. |
# Conflicts: # worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java
common/src/main/java/org/apache/celeborn/common/meta/ReduceFileMeta.java
Outdated
Show resolved
Hide resolved
common/src/main/java/org/apache/celeborn/common/meta/ReduceFileMeta.java
Outdated
Show resolved
Hide resolved
common/src/main/java/org/apache/celeborn/common/network/buffer/FileChunkBuffers.java
Outdated
Show resolved
Hide resolved
common/src/main/java/org/apache/celeborn/common/network/buffer/ChunkBuffers.java
Outdated
Show resolved
Hide resolved
common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
Outdated
Show resolved
Hide resolved
| for (int i = startMapIndex; i < endMapIndex; i++) { | ||
| List<ShuffleBlockInfo> blockInfos = indexMap.get(i); | ||
| if (blockInfos != null) { | ||
| for (ShuffleBlockInfo blockInfo : blockInfos) { |
There was a problem hiding this comment.
Just calculate start and end, then slice once from sortedByteBuf. Check contiguous during calculation.
worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
Outdated
Show resolved
Hide resolved
worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
Outdated
Show resolved
Hide resolved
worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
Outdated
Show resolved
Hide resolved
| } | ||
| // trigger resume | ||
| if (memoryUsage < resumeThreshold) { | ||
| if (workerMemoryUsageRatio() < resumeRatio) { |
There was a problem hiding this comment.
Just use memoryUsage / (double) (maxDirectMemory) here to avoid duplicate calculation
There was a problem hiding this comment.
There is a gauge to export this usage ratio to Grafana.
worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
Outdated
Show resolved
Hide resolved
| MemoryManager.instance().incrementDiskBuffer(numBytes); | ||
| // read flush buffer to generate correct chunk offsets | ||
| // data header layout (mapId, attemptId, nextBatchId, length) | ||
| ByteBuffer headerBuf = ByteBuffer.allocate(16); |
There was a problem hiding this comment.
Better to check if flushBuffer exceeds chunkSize, if yes, do the following logic, else just use numBytes.
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
Outdated
Show resolved
Hide resolved
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
Outdated
Show resolved
Hide resolved
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
Show resolved
Hide resolved
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
Outdated
Show resolved
Hide resolved
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
Outdated
Show resolved
Hide resolved
common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
Outdated
Show resolved
Hide resolved
common/src/main/java/org/apache/celeborn/common/meta/MemoryFileInfo.java
Outdated
Show resolved
Hide resolved
common/src/main/java/org/apache/celeborn/common/meta/ReduceFileMeta.java
Outdated
Show resolved
Hide resolved
common/src/main/java/org/apache/celeborn/common/util/ShuffleBlockInfoUtils.java
Outdated
Show resolved
Hide resolved
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java
Outdated
Show resolved
Hide resolved
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
Show resolved
Hide resolved
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
Outdated
Show resolved
Hide resolved
# Conflicts: # assets/grafana/celeborn-dashboard.json
waitinfuture
left a comment
There was a problem hiding this comment.
LGTM, thanks for the effort! Merging to main(v0.5.0). Please add more UTs for this feature in followup PRs.
|
@FMX metrics added by this PR is not added to the Celeborn website Monitoring page. Also, should we start adding such changes in the release notes as well. WDYT? |
…nitoring.md ### What changes were proposed in this pull request? Adding documentation for missing memory file storage metrics. ### Why are the changes needed? Few new metrics were added in #2300 but they were missing their documentation in monitoring.md ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? NA Closes #2705 from s0nskar/memory_metrics. Authored-by: Sanskar Modi <sanskarmodi97@gmail.com> Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
…nitoring.md Adding documentation for missing memory file storage metrics. Few new metrics were added in #2300 but they were missing their documentation in monitoring.md NO NA Closes #2705 from s0nskar/memory_metrics. Authored-by: Sanskar Modi <sanskarmodi97@gmail.com> Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com> (cherry picked from commit b7027b6) Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
…nitoring.md ### What changes were proposed in this pull request? Adding documentation for missing memory file storage metrics. ### Why are the changes needed? Few new metrics were added in apache#2300 but they were missing their documentation in monitoring.md ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? NA Closes apache#2705 from s0nskar/memory_metrics. Authored-by: Sanskar Modi <sanskarmodi97@gmail.com> Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
…Ids of worker service log in startup document ### What changes were proposed in this pull request? Add `emptyFilePrimaryIds` and `emptyFileReplicaIds` of worker service log in startup document. ### Why are the changes needed? #2300 has added `emptyFilePrimaryIds` and `emptyFileReplicaIds` of startup log of for worker service in `Controller`, which should also add into the log of startup document. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? No. Closes #2774 from SteNicholas/CELEBORN-914. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
…Ids of worker service log in startup document ### What changes were proposed in this pull request? Add `emptyFilePrimaryIds` and `emptyFileReplicaIds` of worker service log in startup document. ### Why are the changes needed? #2300 has added `emptyFilePrimaryIds` and `emptyFileReplicaIds` of startup log of for worker service in `Controller`, which should also add into the log of startup document. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? No. Closes #2774 from SteNicholas/CELEBORN-914. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com> (cherry picked from commit c5ff12b) Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
…Ids of worker service log in startup document ### What changes were proposed in this pull request? Add `emptyFilePrimaryIds` and `emptyFileReplicaIds` of worker service log in startup document. ### Why are the changes needed? apache#2300 has added `emptyFilePrimaryIds` and `emptyFileReplicaIds` of startup log of for worker service in `Controller`, which should also add into the log of startup document. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? No. Closes apache#2774 from SteNicholas/CELEBORN-914. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
…nitoring.md ### What changes were proposed in this pull request? Adding documentation for missing memory file storage metrics. ### Why are the changes needed? Few new metrics were added in apache#2300 but they were missing their documentation in monitoring.md ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? NA Closes apache#2705 from s0nskar/memory_metrics. Authored-by: Sanskar Modi <sanskarmodi97@gmail.com> Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
…Ids of worker service log in startup document ### What changes were proposed in this pull request? Add `emptyFilePrimaryIds` and `emptyFileReplicaIds` of worker service log in startup document. ### Why are the changes needed? apache#2300 has added `emptyFilePrimaryIds` and `emptyFileReplicaIds` of startup log of for worker service in `Controller`, which should also add into the log of startup document. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? No. Closes apache#2774 from SteNicholas/CELEBORN-914. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
What changes were proposed in this pull request?
To support memory file storage.
Why are the changes needed?
To improve shuffle performance for small shuffle files.
Design doc: https://docs.google.com/document/d/1SM-oOM0JHEIoRHTYhE9PYH60_1D3NMxDR50LZIM7uW0/edit?usp=sharing
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Pass GA and manually test on a cluster.