New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-1371] [HUDI-1893] Support metadata based listing for Spark DataSource and Spark SQL #2893
Conversation
Codecov Report
@@ Coverage Diff @@
## master #2893 +/- ##
============================================
+ Coverage 45.79% 45.84% +0.04%
- Complexity 5270 5274 +4
============================================
Files 909 908 -1
Lines 39390 39400 +10
Branches 4244 4253 +9
============================================
+ Hits 18039 18063 +24
+ Misses 19508 19480 -28
- Partials 1843 1857 +14
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
@pengzhiwei2018 could you also please review this PR? |
That is all right! I will spend some time on this PR tonight. |
} | ||
} catch (Exception e) { | ||
if (metadataConfig.enableFallback()) { | ||
LOG.error("Failed to retrieve files in partitions from metadata", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If enable the fallback here, an empty partitionsFilesMap
will return if there is an Exception happened, is it right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. It wasn't falling back to use file system to do the listing. Will fix it.
hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
Show resolved
Hide resolved
val properties = new Properties() | ||
// To support metadata listing via Spark SQL we allow users to pass the config via Hadoop Conf. Spark SQL does not |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we get these configurations
from the spark.sessionState.conf
for spark?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. It will be a better experience than having to pass it via hadoop conf. Customers would be able to enable in the spark sql session using SET
commands.
1ce0f37
to
c50f97d
Compare
@pengzhiwei2018 @vinothchandar I have further re-factored This ensures that filesystem is listed just once if filesystem listing is used. In case of metadata, it ensures the it will be read just once and no addition listing or reading is done to fetch log files. |
That sounds good to me! |
c50f97d
to
aa544d7
Compare
@vinothchandar @pengzhiwei2018 do take a look again. |
ok, will start review tomorrow~ But can you fix the CI? |
aa544d7
to
50e2f1b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments. High level looks ok to me. If you both are happy, feel free to land this @umehrot2
@@ -107,34 +113,61 @@ case class HoodieFileIndex( | |||
} | |||
|
|||
@transient @volatile private var fileSystemView: HoodieTableFileSystemView = _ | |||
@transient @volatile private var cachedAllInputFiles: Array[HoodieBaseFile] = _ | |||
@transient @volatile private var cachedAllInputFiles: Map[PartitionRowPath, Map[HoodieBaseFile, Seq[HoodieLogFile]]] = _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we just reuse a FileSlice object here? instead of the map of base to logs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made the change.
prunedPartitions.map { partition => | ||
val fileStatues = fileSystemView.getLatestBaseFiles(partition.partitionPath).iterator() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so previously, we were actually performing the listing for each listFiles()
call? Without actually using the cached values?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No! We will cache the files
in the fileSystemView
. So each call of listFiles
will reuse the cache values.
override def inputFiles: Array[String] = { | ||
cachedAllInputFiles.map(_.getFileStatus.getPath.toString) | ||
cachedAllInputFiles.values.flatten.flatMap(baseLogFilesMapping => { | ||
Iterator(baseLogFilesMapping._1.getPath) ++ baseLogFilesMapping._2.map(_.getFileStatus.getPath.toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any chance we can reuse code across this method and allFiles below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL) => | ||
// Fetch and store latest base and log files, and their sizes | ||
cachedAllInputFiles = partitionFiles.map(p => { | ||
val latestSlices = fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p._1.partitionPath, activeInstants.lastInstant().get().getTimestamp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any error handling needed for the case where the timeline is empty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is no commit success yet, activeInstants.lastInstant().get()
may lead to query crash. So we'd better to return empty file list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added check.
cachedAllInputFiles = partitionFiles.map(p => { | ||
val latestSlices = fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p._1.partitionPath, activeInstants.lastInstant().get().getTimestamp) | ||
val baseAndLogFilesMapping = latestSlices.iterator().asScala.map(slice => { | ||
(slice.getBaseFile.get(), slice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toSeq) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth double checking that the comparator is actually sorting in the desired order. Just a random word of caution
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
Show resolved
Hide resolved
(partitionRowPath, filesInPartition) | ||
}.collect().map(f => f._1 -> f._2).toMap | ||
|
||
var fetchedPartition2Files: Map[PartitionRowPath, Array[FileStatus]] = Map() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename: fetchedPartitionToFiles ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
List<Pair<String, FileStatus[]>> partitionToFiles = engineContext.map(partitionPaths, partitionPathStr -> { | ||
Path partitionPath = new Path(partitionPathStr); | ||
FileSystem fs = partitionPath.getFileSystem(hadoopConf.get()); | ||
return Pair.of(partitionPathStr, FSUtils.getAllDataFilesInPartition(fs, partitionPath)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FileStatus
is not a Serializable class by the default spark serializer. If user have not specify the serializer to kryo when query hudi table, an NotSerializableException will throw out. There is a same problem for FSUtils.getAllPartitionPaths.
But I think we fix this in another PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see yeah FileStatus is not serializable in Hadoop 2, but has been made Serializable in Hadoop 3. We should fix this in a separate PR for all methods by introducing SerializableFileStatus
similar to Spark https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala#L347.
Hi @umehrot2 , Over all LGTM! I have left some comments on the PR. After these comments and the conflicts are resolved, we can land it. |
70b70e9
to
8dddccd
Compare
@vinothchandar @pengzhiwei2018 addressed the latest comments. If it looks good to you guys, I can land it. |
@hudi-bot run azure |
LGTM! Thanks for this great contribution @umehrot2 . |
8dddccd
to
88b2110
Compare
I rebased this off master and started fixing the compile errors. Still needs work to get it in mergeable state, more compile errors to be resolved due to the config PR landing |
@umehrot2 I tried to rebase this off wenning's PR while you were gone. its midway now. Could you rebase and repush. We can then work on landing this |
0d885a5
to
25302f8
Compare
@hudi-bot run azure |
…Source and Spark SQL
25302f8
to
68bf6fc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM overall
…Source and Spark SQL (apache#2893)
Tips
What is the purpose of the pull request
This pr adds support for metadata based listing for Hudi Spark DataSource and Spark SQL based queries. The detailed design for Spark integration (V2 implementation specifically) can be found at https://cwiki.apache.org/confluence/display/HUDI/RFC+-+15%3A+HUDI+File+Listing+Improvements#RFC15:HUDIFileListingImprovements-Spark. Two parts of the V2 design have already been implemented:
In this pr we build on top of the FileIndex implementation to get file listing using Hudi's metadata table if the feature is enabled, and otherwise fallback to distributed listing using Spark Context. The metadata table will be read just once and it will reduce O(N) list calls to O(1) get calls for N partitions. We also refactor the Hudi metadata table contract to add a new API which can fetch lists for multiple partitions (opens the reader just once).
I have further re-factored HoodieFileIndex for more efficient integration in case of MOR real time queries. Earlier we were just listing base files using the file index and later it would again perform listing for log files in
MergeOnReadSnapshotRelation
usinggroupLogsByBaseFile
. Now, I will be storing and fetching both base and log files in-case of real time queries. This ensures that filesystem is listed just once if filesystem listing is used. In case of metadata, it ensures the it will be read just once and no addition listing or reading is done to fetch log files.Fixes #2935
Brief change log
Verify this pull request
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.