New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-1054][Peformance] Several performance fixes during finalizing writes #1768
Conversation
12debb4
to
6fb6941
Compare
@bvaradar fyi |
hudi-common/pom.xml
Outdated
<!-- Spark --> | ||
<dependency> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-core_${scala.binary.version}</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-sql_${scala.binary.version}</artifactId> | ||
</dependency> | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hudi-common
is a base module and feels a little weird that it relies on spark-core/spark-sql, would we remove them and move getAllDataFilesForMarkers
method to hudi-client
module, wdyt? cc @bvaradar @vinothchandar
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes.. we cannot depend on spark in hudi-common
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay so is the suggestion to move this method over to HoodieTable
in hudi-client
module, considering that is the only place this method is used ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes.. that ll do..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@umehrot2 Thanks for raising this.. Seems like an important difference w.r.t hdfs.
Waiting on some more feedback.. left Some minor comments
hudi-common/pom.xml
Outdated
<!-- Spark --> | ||
<dependency> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-core_${scala.binary.version}</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-sql_${scala.binary.version}</artifactId> | ||
</dependency> | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes.. we cannot depend on spark in hudi-common
String pathStr = status.getPath().toString(); | ||
if (pathStr.endsWith(HoodieTableMetaClient.MARKER_EXTN)) { | ||
dataFiles.add(FSUtils.translateMarkerToDataPath(basePath, pathStr, instantTs, baseFileExtension)); | ||
public static Set<String> getAllDataFilesForMarkers(JavaSparkContext jsc, FileSystem fs, String basePath, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this is the reason for needing spark in hudi-common
.. we can move refactor the code to hudi-client
..
In fact, #1755 has already modularized this more..
LOG.info("Removing marker directory=" + markerDir); | ||
LOG.info("Removing marker directory = " + markerDir); | ||
|
||
FileStatus[] fileStatuses = fs.listStatus(markerDir); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@umehrot2 so seems like, for object stores this is different.. and makes sense completely to do parallel cleaning of individual files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @n3nash should we have flag to protect this for HDFS.. i.e if the recursive delete works better there (IIUC). you might want to tradeoff less RPCs ..?
we can override defaults at spark datasource level, and set these based on StorageSchemes
as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vinothchandar I don't think there is any different between EmrFS which uses S3
vs HDFS
working w.r.t RPC calls made here. EmrFS just implements the HDFS interface, but the internals like RPC calls to the namenode etc. remain the same.
}, false); | ||
} | ||
|
||
parallelism = subDirectories.size() < parallelism ? subDirectories.size() : parallelism; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Math.min(subDirectories.size(), parallelism)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will address in the next revision, one I have more feedback as needed from @n3nash
} | ||
|
||
parallelism = subDirectories.size() < parallelism ? subDirectories.size() : parallelism; | ||
dataFiles.addAll(jsc.parallelize(subDirectories, parallelism).flatMap(directory -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar question here.. cc @n3nash ..
marked this a blocker for 0.6.0.. @n3nash can you please chime in with any side effects you see for hdfs? |
@umehrot2 just landed the changes I mentioned. can we rework this PR and try again . We can make things parallel i.e working for s3 for now. and then we can adjust for HDFS later on. So we should be able close the loop faster. I do want to get this into 0.6.0 so also please let me know if you are unable to take a stab at this |
Working on it @vinothchandar. There has been quite a refactoring it seems, which is making the re-basing tricky as now these functions are being called from places which do not even have |
39108c1
to
be8e6f4
Compare
@vinothchandar finally got the unit and integration tests to pass. This is ready for review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM overall.
One high level question though. this parallelizes till the first level only right? so we are assuming this helps the common cases like date based tables with multiple years of data? I mean - if you only have a few years of data <10 say, and yyyy
is the top level partitioning field, would this parallelization still help?
jsc.parallelize(markerDirSubPaths, parallelism).foreach(subPathStr -> { | ||
Path subPath = new Path(subPathStr); | ||
FileSystem fileSystem = subPath.getFileSystem(conf.get()); | ||
fileSystem.delete(subPath, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note to self: this will still work when subPath
is a file. i.e non-partitioned tables
@vinothchandar you are right about this. It will parallelize only on the top level partition folder. I think this will still help with parallelization, and would work best where there is only one level of partitioning. But I agree there is scope to further improve this by getting leaf level partition directories instead to help with multi level partitioning scenario. Is it okay if I open a JIRA for this and pursue it separately ? |
Sounds good. let's lump this into the JIRA we have for marker file improvements more holistically ? |
Added a comment about this on https://issues.apache.org/jira/browse/HUDI-1138. Let me try to fix some conflict that it is showing with current master. |
be8e6f4
to
8ad8a26
Compare
8ad8a26
to
4cecc15
Compare
Hi guys, this PR helps a lot to do archive works based on S3! Also Just find an another performance issues and raise a PR trying to fix it. #3920 |
Tips
What is the purpose of the pull request
This PR does several performance improvements described in https://issues.apache.org/jira/browse/HUDI-1054 that are specially useful for S3, but will be beneficial in general for hudi performance too.
My sample test data is a 1 TB data set, having 8000 partitions and approximately 190000 files for which finalize writes used to take 35-40 minutes. With these changes I am able to bring it down to less than 5 minutes.
Brief change log
Verify this pull request
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.