-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
option to use deep storage for storing shuffle data #11507
Conversation
67689d0
to
a40c852
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall implementation looks good to me. Please add some unit tests before merging (especially on the new classes like DeepStorageShuffleClient, DeepStorageIntermediaryDataManager, etc.)
...ce/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartitionLocation.java
Outdated
Show resolved
Hide resolved
...ervice/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartitionStat.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java
Outdated
Show resolved
Hide resolved
segment.getInterval(), | ||
bucketNumberedShardSpec.getBucketId() // we must use the bucket ID instead of partition ID | ||
); | ||
return dataSegmentPusher.pushToPath(segmentDir, segment, SHUFFLE_DATA_DIR_PREFIX + "/" + partitionFilePath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The temporary zip file created will no longer be at taskConfig.getTaskTempDir(subTaskId);
Will this be a problem? Should we document this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the path taskConfig.getTaskTempDir(subTaskId);
is more relevant for local storage and not for deep store. For deep store it would sense to have a fixed separate dir to store shuffle data so that either a coordinator duty can clean it up or the dir can be marked for auto-cleanup. Either I can document this and if required can have this prefix configurable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The taskConfig.getTaskTempDir(subTaskId);
is not the local for storing the segment in local storage. The config is for the temp directory to store the zipped file, before moving the zipped file to StorageLocation#path/supervisorTaskId/startTimeOfSegment/endTimeOfSegment/bucketIdOfSegment.
.
Deep storage also does create a zip file (same as the local storage) before copying it over to the final deep storage location. However, this temporary zip file will not be created at taskConfig.getTaskTempDir(subTaskId);
but will be at File.createTempFile("druid", "index.zip");
.
I am not sure the exact purpose for taskConfig.getTaskTempDir(subTaskId);
but this will no longer holds for the temporary zip file location before pushing to deep storage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see what you are saying, the temporary zip file created by DeepStorageIntermediaryDataManager
will be cleaned up after the push so I don't think it matters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was my thought too however, im not sure why is it configurable and exposed as a taskConfig. I tried looking at the PR that added this taskConfig but isn't sure. Maybe @jihoonson can confirm? Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm where will that temp file be created? My concerns are 2 folds. In general, I prefer having all temp files in one place. It will simplify the problem of cleaning them up.
- How will the temp file be deleted if the task fails before it deletes the file?
taskTempDir
is cleaned up automatically after a task failure. - Users could want to use a particular disk space for ingestion temp files and allocate a reasonable amount of disk for
taskDir
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thought here is simple, tasks already push segments to deep storage and they use some temporary space for that. I am just using the same mechanism so it will be same lifecycle as of segment being pushed. Am I missing something here ?
...c/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pjain1 thank you for the PR! It looks nice. I left a couple of comments especially on the interface design. Also please check the CI failure. It seems legit.
...liyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssDataSegmentPusher.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionLocation.java
Outdated
Show resolved
Hide resolved
...main/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionStat.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java
Outdated
Show resolved
Hide resolved
...-service/src/main/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManager.java
Outdated
Show resolved
Hide resolved
return Paths.get(getPartitionDir(supervisorTaskId, interval, bucketId), subTaskId).toString(); | ||
} | ||
|
||
default String getPartitionDir( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe getPartitionDirPath()
@pjain1 , Thank you for your contribution! This is awesome. Would you please add unit tests for DeepStorageIntermediaryDataManager, DeepStorageShuffleClient, DeepStoragePartitionLocation, and DeepStoragePartitionStat classes, and integration tests as well for this? We want to make sure that this feature does not break between releases. For the integration tests, you should be able to piggy back on the existing S3 deep storage integration tests. |
...c/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java
Show resolved
Hide resolved
...main/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStorageShuffleClient.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionLocation.java
Show resolved
Hide resolved
...main/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionStat.java
Outdated
Show resolved
Hide resolved
- test of deep store intermediatery data manager - change PartitionStat and PartitionLocation to interface
@jihoonson I moved PartitionLocation to be an interface but realized there are other abstract classes like |
@@ -106,7 +111,7 @@ public DataSegment push(final File indexFilesDir, DataSegment segment, final boo | |||
} | |||
|
|||
segment = segment.withSize(indexSize) | |||
.withLoadSpec(ImmutableMap.of("type", "c*", "key", key)) | |||
.withLoadSpec(ImmutableMap.of("type", "c*", "key", storageDirSuffix)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks its a mistake
...dra-storage/src/main/java/org/apache/druid/storage/cassandra/CassandraDataSegmentPusher.java
Outdated
Show resolved
Hide resolved
@pjain1, Thanks again for your contribution. We'd like to help you accelerate this PR if possible. What is your timeline for getting this in, with proper unit tests and integration tests written? This week, 2 weeks, month, etc? What is a good email for us to use to communicate with you? |
@zachjsh I want to get this in as soon as possible I added few tests and was adding more but need some clarification on this. My email is pjain1[at]apache[dot]org. Once things are clear I can add unit tests that should not take much time, I am not familiar with IT test framework so that may take some time so overall targeting it to finish this week. |
Thanks for your quick reply @pjain1. Happy to hear that you are trying to get this in as soon as possible. Will work with you through any doubts or uncertainties that you have. Will let @jihoonson reply to the comment that you linked in your previous message. As for adding an Integration test for this. I think that ITPerfectRollupParallelIndexTest is a good example test to base this after, or extend. |
...re/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java
Show resolved
Hide resolved
...main/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionStat.java
Outdated
Show resolved
Hide resolved
...main/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionLocation.java
Outdated
Show resolved
Hide resolved
@pjain1 I wanted to understand what the difficulty is, so took a stab and implemented what I suggested in my previous comment. The code is available in my branch. I haven't tested my change, but at least it compiles successfully.
|
@jihoonson All the different classes with |
@jihoonson I see in your branch you have made |
added IT |
thanks @pjain1 ! I noticed that you are only running with mm, the existing rollup IT runs on both mm and indexer, any reason to not also run with indexer? |
@zachjsh added test with indexer. BTW some checks are failing because of branch coverage issue in classes |
Travis checks passed except for test coverage issues as mentioned in above comment. |
@pjain1 the second phase integration tests did not because of the coverage failure. I've manually started those just now. It looks like a lot of the missed branches are coming from equals method. Can you add tests for these, can use EqualsVerifier to make your life easier. It also looks like you have legitimate failures
|
@zachjsh they ran once but then I pushed some minor doc change so it did not. Anyways now the |
travis checks passed except from test coverage failures as mentioned here @jihoonson @maytasm @zachjsh |
@pjain1 it is also failing a test and the stacktrace shows classes that you've modified. If you look at the failing phase 1 test, you will see this:
Also for the coverage, it looks like a lot of the missed branches are from equals method. This can be tested using EqualsVerifier, as mentioned before. |
@zachjsh thanks! I missed that, fixed it and added tests using EqualsVerifier, all checks passing now. |
Thanks for your hard work in getting all the tests an IT working @pjain1! The change looks good to me, but would like @maytasm and @jihoonson to take a final look here. One more thing, can you provide specific steps you took to manually verify this? A specific ingestion spec and data used would be very helpful. Thanks again! |
@zachjsh I tested it by indexing example wikipedia data on local druid cluster with |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks @pjain1
Fixes #11297.
Description
Description and design in the proposal #11297
Key changed/added classes in this PR
*DataSegmentPusher
*ShuffleClient
*PartitionStat
*PartitionLocation
*IntermediaryDataManager
This PR has: