-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add shuffleSegmentPusher for data shuffle #8115
Conversation
/subscribe |
@himanshug I requested a review to you :) |
@jihoonson sure, I did want to go through it. |
final String relativeSegmentPath = localtionPath | ||
.relativize(eachFile.toPath().toAbsolutePath()) | ||
.toString(); | ||
final File reservedFile = location.reserve( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
couldn't understand why we need to do this, can you please add some comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
// Create a zipped segment in a temp directory. | ||
final File taskTempDir = taskConfig.getTaskTempDir(subTaskId); | ||
if (taskTempDir.mkdirs()) { | ||
taskTempDir.deleteOnExit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will delete on jvm exit, it is probably ok for existing peon processes as they do exit but wouldn't log anything if jvm was not able to delete this location at exit.
it wouldn't work for the long running indexer
process
I think we should do the cleanup in the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, yeah. Don't remember why I wrote this code. Fixed to clean up properly.
taskTempDir.deleteOnExit(); | ||
} | ||
final File tempZippedFile = new File(taskTempDir, segment.getId().toString()); | ||
final long unzippedSizeBytes = CompressionUtils.zip(segmentDir, tempZippedFile, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fsync=true
here is useless as this is only a temp location
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
if (destFile != null) { | ||
try { | ||
FileUtils.forceMkdirParent(destFile); | ||
StreamUtils.retryCopy( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we should use FileUtils.writeAtomically(..)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall lgtm, +1 after @himanshug's comments are addressed
import java.util.Map; | ||
|
||
/** | ||
* DataSegmentPusher used for storing intermeidary data in local storage during data shuffle of native parallel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: intermeidary
-> intermediary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -174,6 +184,19 @@ private void discoverSupervisorTaskPartitions() | |||
supervisorTaskCheckTimes.computeIfAbsent( | |||
supervisorTaskId, | |||
k -> { | |||
for (File eachFile : FileUtils.listFiles(supervisorTaskDir, null, true)) { | |||
final String relativeSegmentPath = localtionPath |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: localtionPath
-> locationPath
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@himanshug do you have more comments? |
@himanshug @clintropolis thank you for the review. |
This PR is for #8061 and based on #8114.
Description
ShuffleDataSegmentPusher
is a dataSegmentPusher used for writing shuffle data in local storage.ShuffleDataSegmentPusher
usesIntermediaryDataManager
internally which coordinates the segment writes in a round-robin fashion per supervisor task across sub tasks. This is to fully utilize the local disk bandwidth for shuffle.The middleManager and the indexer can use this. However, with the middleManager, each task uses a separate
IntermediaryDataManager
instance. This could potentially result in two issues:IntermediaryDataSegment
needs to smoosh segment files into larger ones to avoid "too many open files" problem. This could also be an issue if there are a lot of tasks sinceIntermediaryDataSegment
can't smoosh files across tasks with middleManager.I think this would be ok for now and could be improved if required in the future.
This PR has: