-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-2243] Support Time Travel Query For Hoodie Table #3360
Conversation
CI report:
Bot commands@hudi-bot supports the following commands:
|
07ef77f
to
6b944e2
Compare
val df2 = Seq((1, "a1", 12, 1001)).toDF("id", "name", "value", "version") | ||
df2.write.format("org.apache.hudi") | ||
.options(commonOpts) | ||
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we set this (and other instances below) to tableType
just like on line 70?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
@@ -58,6 +58,11 @@ | |||
*/ | |||
Stream<HoodieBaseFile> getLatestBaseFiles(); | |||
|
|||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
/**
* Stream all the latest version data files across partitions with precondition that commitTime(file) before
* maxCommitTime.
*/
More in line with the existing doc. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sense!
metaClient.reloadActiveTimeline() | ||
val secondCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp | ||
|
||
// Third write |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering what happens when clean commits are interleaved in between, say as.of.instant is 1002 and there are couple of clean commits before that. I believe the behavior would be same as we have today when latest instant is passed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think so. It should be the same behavior with query the latest instant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pengzhiwei2018 Thanks for this. Maybe we could have couple of follow on tasks.
- Allow user to specify
as.of.instant
inYYYY-MM-DD
toYYYY-MM-DD hh:mm:ss
formts. - Support this with SQL dml as well, e.g.
select a,b,c from hudi_table AS OF 20210728141108
. This would really help useers to rollback using CTAS directly. What do you think?
Both Agreed! I will submit a PR to support time travel query for spark sql after #3277 has merged as we need do some sql extension for |
7fe97f7
to
f421cd8
Compare
@pengzhiwei2018 Thanks for quickly adding the first suggestion. This diff looks good to me. Can you resolve the conflicts and then we can land it? |
f421cd8
to
e84a04f
Compare
The PR has updated to solve the conflicts. Please take a review again~ |
1bd1655
to
b9e30d2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added feedback for source code
public final Stream<HoodieBaseFile> getLatestBaseFilesBeforeOrOn(String maxCommitTime) { | ||
try { | ||
readLock.lock(); | ||
return fetchAllStoredFileGroups() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see an opportunity for code re-use between this and getLatestBaseFilesBeforeOrOn(String partitionStr, String maxCommitTime)(lines 470 to 486).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Infact we could change the signature of existing method to
getLatestBaseFilesBeforeOrOn(Option<String> partitionStr, String maxCommitTime)
and not introduce a new method.
@@ -162,28 +163,19 @@ | |||
} | |||
|
|||
// Return parquet file with a list of log files in the same file group. | |||
public static List<Pair<Option<HoodieBaseFile>, List<String>>> groupLogsByBaseFile(Configuration conf, List<Path> partitionPaths) { | |||
public static List<Pair<Option<HoodieBaseFile>, List<String>>> groupLogsByBaseFile(HoodieTableMetaClient metaClient, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are the changes in this method an optimization or is there anything required for this patch as such?
I am not aware of why this was designed this way. But there should a reason for it. Lets take this up once we have the release. so that we can consult w/ vinoth on the improvisations.
Can we please revert those changes not really required for this patch.
I meant the perpartitionMetaclient related changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think we can revert these changes.
@@ -238,7 +249,12 @@ case class HoodieFileIndex( | |||
case (_, _) => | |||
// Fetch and store latest base files and its sizes | |||
cachedAllInputFileSlices = partitionFiles.map(p => { | |||
(p._1, fileSystemView.getLatestFileSlices(p._1.partitionPath).iterator().asScala.toSeq) | |||
val fileSlices = (if (queryInstant.isDefined) { | |||
fileSystemView.getLatestFileSlicesBeforeOrOn(p._1.partitionPath, queryInstant.get, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to do Option.map().OrElse() to make it nicer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will try it.
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala
Show resolved
Hide resolved
return viewManager.getFileSystemView(basePath).getLatestBaseFiles().map(BaseFileDTO::fromHoodieBaseFile) | ||
.collect(Collectors.toList()); | ||
public List<BaseFileDTO> getLatestDataFiles(String basePath, Option<String> maxCommitTime) { | ||
if (maxCommitTime.isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if possible, Option.map().OrElse()
d70057b
to
f13bafc
Compare
|
||
(tableType, queryType) match { | ||
case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL) => | ||
// Fetch and store latest base and log files, and their sizes | ||
cachedAllInputFileSlices = partitionFiles.map(p => { | ||
val latestSlices = if (activeInstants.lastInstant().isPresent) { | ||
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p._1.partitionPath, | ||
activeInstants.lastInstant().get().getTimestamp).iterator().asScala.toSeq | ||
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p._1.partitionPath, queryInstant.get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't line no 234 be like
if (queryInstant.isPresent)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
@@ -238,7 +249,11 @@ case class HoodieFileIndex( | |||
case (_, _) => | |||
// Fetch and store latest base files and its sizes | |||
cachedAllInputFileSlices = partitionFiles.map(p => { | |||
(p._1, fileSystemView.getLatestFileSlices(p._1.partitionPath).iterator().asScala.toSeq) | |||
val fileSlices = queryInstant |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just so I understand correctly. Here, we are changing existing behavior. Even if not for time travel query, previsouly we were calling fileSystemView.getLatestFileSlices. But now, since we assign queryInstant upfront (either to specified query or latest instant), we will call fileSystemView.getLatestFileSlicesBeforeOrOn.
Is that intentional ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will change the code to keep the origin behavior for non-time travel query.
d9d348f
to
c6d8da9
Compare
@hudi-bot run azure |
@hudi-bot run azure |
77bf630
to
a58afb7
Compare
@hudi-bot run azure |
a58afb7
to
096eea6
Compare
@hudi-bot run azure |
trinodb/trino#8773 after that maybe hudi time travel could be added to trino |
What is the purpose of the pull request
Support time travel query for hoodie table for both COW and MOR table.
spark.read.format("hudi").option("as.of.instant", "20210728141108").load(basePath).show()
spark.read.format("hudi").option("as.of.instant", "2021-07-28 14: 11: 08").load(basePath).show()
spark.read.format("hudi").option("as.of.instant", "2021-07-28").load(basePath).show()
Brief change log
(for example:)
Verify this pull request
(Please pick either of the following options)
This pull request is a trivial rework / code cleanup without any test coverage.
(or)
This pull request is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.