-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-4372] Enable matadata table by default for flink #11124
Conversation
8c1fe04
to
d18ce47
Compare
@@ -554,8 +554,7 @@ protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, Stri | |||
*/ | |||
protected void mayBeCleanAndArchive(HoodieTable table) { | |||
autoCleanOnCommit(); | |||
// reload table to that timeline reflects the clean commit | |||
autoArchiveOnCommit(createTable(config, hadoopConf)); | |||
autoArchiveOnCommit(table); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is unnecessary, we just need to refresh the table metadata active timeline.
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
Show resolved
Hide resolved
@@ -289,6 +289,14 @@ public HoodieTestTable moveInflightCommitToComplete(String instantTime, HoodieCo | |||
return this; | |||
} | |||
|
|||
public void moveCompleteCommitToInflight(String instantTime) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this method also used for MOR testing with compaction? In that case, deleteCommit should be called instead of deleteDeltaCommit right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, maybe we can extend it when there is a necessity.
@@ -1100,7 +1106,7 @@ public void testArchiveRollbacksAndCleanTestTable() throws Exception { | |||
testTable.doClean(cleanInstant, partitionToFileDeleteCount); | |||
} | |||
|
|||
for (int i = 5; i <= 13; i += 3) { | |||
for (int i = 5; i <= 11; i += 2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a comment above for why we need to jump in steps of 2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the jump step per run is antually the num of new commits in one loop.
@@ -663,7 +665,11 @@ public void testArchivalWithMultiWriters(boolean enableMetadata) throws Exceptio | |||
|
|||
// do ingestion and trigger archive actions here. | |||
for (int i = 1; i < 30; i++) { | |||
testTable.doWriteOperation("0000000" + String.format("%02d", i), WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); | |||
String instant = metaClient.createNewInstantTime(); | |||
if (i == 29) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe declare the number of rounds before this loop and use that variable?
for (int i = 1; i <= 4; i++) { | ||
doWriteOperation(testTable, "000000" + (commitTime.getAndIncrement()), INSERT); | ||
String instant = metaClient.createNewInstantTime(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, maybe we can declar a variable numWrites = 4. Test is more readable and easy to understand.
// We cannot have unbounded commit files. Archive commits if we have to archive | ||
// We cannot have unbounded commit files. Archive commits if we have to archive. | ||
// Reload table timeline to reflect the latest commit. | ||
table.getMetaClient().reloadActiveTimeline(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we reload timeline here, it is possible that write client and table service client have different fs view. Is that expected? Also, why should we not then reload timeline for other table services before executing that table service?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only the archive needs this now because the cleaning may be executed before the archiving, and the archiving needs to see these cleaning commits. In all other other cases, there is no need to refresh either the timeline or the fs view.
@@ -558,9 +558,6 @@ protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata me | |||
return; | |||
} | |||
|
|||
if (config.isMetadataTableEnabled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember there’s a reason to sync the FS view for MDT. Is this unnecessary now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have checked all the inline table services, all we need is the timeline, there is no need to sync the view because each service actually re-create their own table when necessary.
|
||
// Reload table timeline to reflect the latest commits, | ||
// there are some table services (for e.g, the cleaning) that executed right before the archiving. | ||
table.getMetaClient().reloadActiveTimeline(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this going to archive more instants since the view is refreshed? Maybe we can avoid the overhead if that’s the case, since the next write should get the timeline refreshed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The master recreate the hoodie table to do the same thing, and I have checked that the archiving does not utilize the fs view from the table at all.
Change Logs
Enable the flink metadata table by default, and the MDT with async table service can work smoothly now.
Impact
none
Risk level (write none, low medium or high below)
none
Documentation Update
metadata.enabled
is switched from false to trueContributor's checklist