Skip to content

Comments

[HUDI-8469][WIP][DNM] Adding optimized writes to MDT#12236

Closed
nsivabalan wants to merge 3 commits intoapache:masterfrom
nsivabalan:mdt_dag_rewrite3
Closed

[HUDI-8469][WIP][DNM] Adding optimized writes to MDT#12236
nsivabalan wants to merge 3 commits intoapache:masterfrom
nsivabalan:mdt_dag_rewrite3

Conversation

@nsivabalan
Copy link
Contributor

@nsivabalan nsivabalan commented Nov 12, 2024

Change Logs

Re-writing writes DAG to write to both DT and MDT using same stage boundaries. This will avoid any out of sync issues that might crop up which are needing special handling as of now. The intention behind this dag rewrite is to ensure we write to both DT and MDT table using single dag w/o any breaks inbetween.

This is a WIP patch which might get split into multiple patches depending on feedback.

Before we go into new DAG, lets revisit how the current dag looks like.
Here is a pictorial representation of how the write dag looks like as of today.

Screenshot 2024-11-11 at 6 14 29 PM

Screenshot 2024-11-11 at 6 14 38 PM

Given the current dag, we feel we could do a streaming write to MDT directly relying on writes to Datatable w/o triggering the actions. So, incase of task retries or stage retries, our marker reconciliation will automatically take care of reconciling any spurious data files.

With that in mind, here is what the proposal looks like for the new dag.

Screenshot 2024-11-11 at 6 19 49 PM
Screenshot 2024-11-11 at 6 20 15 PM

I am opening up this patch to get feedback on the design while we try to iterate and get to full blown implementation.

Lets go over one piece at a time.

1. Enabling NBCC for MT

We are enabling NBCC(Non Blocking Concurrency Control) and multi-writer to MDT to account for multiple writers to write concurrently to Metadata table. This is a pre-requisite since data table could have multiple writers and each of the dag could be running concurrently. Previous dag does not need this necessity, but the redesigned dag need to have NBCC with MDT. Draft patch: #12209. changes have been incorporated in this patch as well.

2. Removing auto commit flow to reduce complexity.

In general we have two flows wrt write commits, namely auto commit enabled and auto commit disabled flow. We are unifying this and we plan to support only auto commit disabled flow. All user facing writers (batch writers and streaming writers) are using auto commit disabled flow and so this should not have any impact to end users.

Just that lot of tests are written using auto commit enabled flow and those need to be fixed to use auto commit disabled flow. draft patch: #12204. Again changes have been incorporated in this patch as well.

Auto commit enabled flow

writeClient.startcommit(instantTime)
returnVal = writeClient.upsert(RDD<records>, instantTime)

by this time, the dag is triggered and commit is expected to be complete if there are no errors. 

Auto commit disabled flow

writeClient.startcommit(instantTime)
returnVal = writeClient.upsert(RDD<records>, instantTime)
writeClient.commitStats(returnVal,....) 

So, unless user calls `writeClient.commitStats`, the dag may not be triggered and write may not be completed. 

3. MDT Writer instance lifecycle:

4. Notes on ingestion writes dag.

Lets go over upsert operation in data table.

  • a. BaseHoodieWriteClient.startCommitWithTime() -> will start a new commit in data table. Also, instantiates a new HoodieTableMetadataWriter and starts a new commit. We are introducing new apis in HoodieTableMetadataWriter to support these directly from the Data table write client.
void startCommit(String instantTime);
  • b. User calls writeClient.upsert()
    Lets unpack what happens w/n this method.
.
.
HoodieWriteMetadata writeMetadata = HoodieTable.upsert(records, commitTime, ...)
writeMetadata holds a reference to RDD<WriteStatus> 
if (metadata table is enabled) {
 RDD<WriteStatus> mdtWriteStatuses =   getMetadataWriter(commitTime).get().prepareAndWriteToMDT(writeMetadata.getDataTableWriteStatuses(), commitTime);
 writeMetadata.setAllWriteStatus(writeMetadata.getDataTableWriteStatuses().union(mdtWriteStatuses))
}
return writeMetadata.clone(allWriteStats)

After we write to data table, we have an RDD
We have introduced new apis in HoodieTableMetadataWriter (prepareAndWriteToMDT) to write to MDT directly based on RDD from data table.

  HoodieData<WriteStatus> prepareAndWriteToMDT(HoodieData<WriteStatus> writeStatus, String instantTime);

We wanted to keep the FILES partition out of this write so that we can write in finally in the end after reconciling the commit metadata for data table. So, every other partition or index in Metadata table gets written using this api.

This method(prepareAndWriteToMDT) will return metadataTable's RDD.
We stitch both writeStatus' and send it back. So, WriteClient.upsert() will return a RDD which has a mix of data table write status and metadata table write status.

btw, do remember that the dag is not yet triggered next api (c) is called. In other words, just by calling writeClient.upsert(), even data files to data table will not be written.

Ref code to review:
https://github.com/apache/hudi/pull/12236/files#r1839032523
https://github.com/apache/hudi/pull/12236/files#r1839024169
https://github.com/apache/hudi/pull/12236/files#r1839020184
https://github.com/apache/hudi/pull/12236/files#r1839025342

c. User calls writeClient.commit(commitTime, return value from (b) above)
Lets unpack, what happens within this call.

  • i. We finally trigger the action from the RDD (i.e. return value from (b) above). This will result in all data files written to data table and all files written to Metadata table for all partitions except FILES partition.
  • ii. Prepare HoodieCommit Metadata for data table.
  • iii. Perform marker reconciliation for data table.
  • iv. In step c.i above, we would have collected List of metadata table HoodieWriteStats as well. We re-use this and call into HoodieTableMetadataWriter.writeToFilesPartitionAndCommit(instantTime, mdtHoodieWriteStats, HoodieCommitMetadata of interest).
  void writeToFilesPartitionAndCommit(String instantTime, HoodieEngineContext context, List<HoodieWriteStat> partialMdtWriteStats, HoodieCommitMetadata commitMetadata);

What this api does is:

  • Using HoodieCommitMetadata from data table, we prepare and write to FILES partition in MDT.

  • Stitch List from c.i (i.e. partialMdtWriteStats) and List from writes to FILES partition above and complete the commit to MetadataTable. This means, that we would have performed marker reconciliation for Metadata table as well as part of this step. i.e. delete any spurious files in MDT.

  • v. Wrap up the commit in Data table.

Please checkout changes in SparkRDDWriteClient, HoodieTableMetadataWriter, HoodieBackedTableMetadataWriter and SparkHoodieBackedTableMetadataWriter.

In this patch, we have fixed upsert() operation to test this dag and it works as expected. i.e. writes to both data table and metadata table happens within a single dag w/o any breaks. writes to FILES partition in MDT happens in the end and finally we wrap up the commit in both metadata table and data table.

Ref code:
https://github.com/apache/hudi/pull/12236/files#r1839037534
https://github.com/apache/hudi/pull/12236/files#r1839039325
https://github.com/apache/hudi/pull/12236/files#r1839040074

5. Metadata Partitioner:

One tricky part to achieve above is to design the metadata table partitioner. If we use the out of the box UpsertPartitioner, the workload profile building stage will trigger the action. So, here is what we have done to circumvent that dag trigger.
While initializing the HoodieTableMetadataWriter itself, we will know what partitions in MDT is enabled and file group count for the same. So, we use that info to build SparkMetadataUpsertPartitioner. All records are expected to be tagged w/ the fileID location by the time we reach the metadata table upsertPrepped call. So, we leverage that to decide the spark partitioner.
By this trick, we completely avoid triggering the dag and keep it streaming from data table writes all the way to metadata table writes.

Ref code: https://github.com/apache/hudi/pull/12236/files#diff-c13aba8c32dad9b38d5b82bd5f6c99c26b932d79d89e42ef83c8206b5e3005db

6. UpsertPreppedPartial:

Based on the dag re-design, we are writing to Metadata table twice using the same delta commit time. So, none of our apis in writeClient are designed to work that way. So, we are introducing upsertPreppedPartial to assist in this case. We have validations in place to ensure this is used only for metadata table writes. So, its feasible to call writeClient.startCommit(t10.dc), writeClient.upsertPreppedPartial(batch1), writeClient.upsertPreppedPartial(batch2)
and finally writeClient.commit(t10.dc..)

Ref code: https://github.com/apache/hudi/pull/12236/files#r1839046857

7. New MT Upsert Commit Action Executor

We have introduced SparkMetadataTableUpsertCommitActionExecutor to assist w/ writing to Metadata table. This will receive RDD, creates an empty inflight file (empty workload profile), use SparkMetadataUpsertPartitioner to repartition records, and write to them.

Ref code: https://github.com/apache/hudi/pull/12236/files#diff-4bc75096f7c67d1b0e302b4d2be9e01702db698821c6dfe8c7a28c1614b6f89f

8. Zooming in on prepareAndWriteToMDT Impl:

High level steps unpacked in this method impl is given below:

  • Input : RDD.
  • We do flatMap over RDD and prepare Metadata table records (col stats, RLI, bloom etc).
  • Some partitions in MDT needs one spark task per physical hudi partition. For eg, partition stats index. So, we also repartition based on hudi partition and process all writeStatuses to prepare records for partition stats index.
  • Union above set of records.
  • Tag location
  • return RDD

We have introduced MetadataIndexGenerator and SparkMetadataIndexGenerator to assist with preparing MDT records.

Note: We will be fixing all other write operations (bulk insert, insert, delete, insert overwrite, etc) in a similar fashion. In this patch, upsert() is implemented as a reference.

Ref code:
https://github.com/apache/hudi/pull/12236/files#diff-4bad9395622cd04b5332d2dd195b7a407487cfcd43ed421f74c74ce1ead564b9
https://github.com/apache/hudi/pull/12236/files#diff-1f59d767b16a65c15c2795a81bbb46edf03b2c41152339259582cdf40059bdc8

11. Metadata table rollbacks:

Prior to this design, clean failed rollback policy was eager in MDT. So, whenever we start a new commit in MDT, if there are any pending commits, we auto rollback it. But w/ NBCC, clean failed rollback policy is lazy. So, this means that heart beat will be emitted by the mdt writeClient when the delta commit starts. Lazily if the processed crashes, later when cleaner in MDT executes, it will check for failed writes (elapsed heart beats) and trigger rollback. With the dag re-design, we can't let this happen. So, we are disabling this rollback by the cleaner for Metadata table. Any failed write in Metadata table will have a failed write in DataTable as well. So, data table will have to trigger rollback at somepoint (based on whether its single writer or multi writer). So, the same will trigger a rollback in Metadata table if the commit of interest exists in Metadata table. So, its safe to disable the auto rollbacks in MDT.

Ref code:
https://github.com/apache/hudi/pull/12236/files#diff-87151a25afbb138b6d05bc0f187b3111af5199c8f0f2628a8beb8a21c3c14980

12. WriteStatus changes:

Since we have a single RDD at the end of writeClient.upsert(), we have introduced a boolean flag in WriteStatus to denote whether is it for data table or Metadata table. So, we use that to bucketize and prepare HoodieCommitMetadata for the Data table.

Ref code: https://github.com/apache/hudi/pull/12236/files#r1839056049

13. DT Compaction

If you have gotten a grasp of ingestion commits dag, lets go to compaction. I have not fixed data table clustering yet in this patch. But changes to compaction should pretty much give you an idea.

Lets take a peek at how compaction control flow looks like w/ the new dag.
We are not going to touch the scheduling of compaction w/ this dag rewrite exercise. Only during compaction execution, we will touch MDT.

  • a. WriteClient.compact(compactionTime)
  •  b. (a) will call into TableServiceClient.compact(compactionTime, shouldComplete). 
    
  •  All of our HoodieTableMetadataWriter instances and the map is maintained by the data table BaseHoodieTableWriteClient. So, while instantiating TableServiceClient, we pass in a function which can assist in fetching the metadata writer instance 
    
Functions.Function2<String, HoodieTableMetaClient, Option<HoodieTableMetadataWriter>>
  •      c. lets expand on compact() impl in TableServiceClient. 
    
.
.
Apply the metadataWriterFunc to get an instance of HoodieTableMetadataWriter. 
HoodieTableMetadataWriter.startCommit(compactionCommit)
HoodieWriteMetadata compactionWriteMetadata = HoodieTable.compact(...) 
// this compactionWriteMetadata contains RDD<WriteStatus> which is not yet triggered. 
Write to Metadata table (partial writes except FILES partition) and get hold of RDD<WriteStatus> for metadata table. 
Stitch both writeStatuses and set it as part of compactionWriteMetadata. 
return compactionWriteMetadata

So, tableServiceWriteClient and writeClient.compact() will return HoodieWriteMetadata compactionWriteMetadata which will contain RDD. Dag is not yet triggered.

  • d. caller is expected to call writeClient.commitCompaction(compactionTime, compactionWriteMetadata (from compact() call), ..)
    Lets zoom in a bit here.
  • we call collect on the RDD in compactionWriteMetadata. This is when the dag will be triggered (i.e writes to both Data table and MDT partitions(except FILES) will happen.
  • Bucketize data table HoodieWriteStat and MetadataTable HoodieWriteStats.
  • Prepare HoodieCommitMetadata for data table.
  • Perform marker reconciliation for data table.
  • Write to MDT files partition using writeToFilesPartitionAndCommit.
  • Complete the dc in MDT for compactionTime
  • Complete the compaction commit in Data table.

Changes for ingestion writes were mostly straightforward w/ the revamped dag. But for compaction, we had to make some changes.

  • Before this patch, HoodieTable.compact() call itself triggers the compaction dag in data table. We prepare the HoodieCommitMetadata and return it as part of HoodieTable.compact(). So, HoodieWriteMetadata that was returned as part of HoodieTable.compact() call, contains a List and RDD too.
    But this is against our goal of making the dag fully streaming from data table all the way to metadata table. So, we had to make changes to HoodieTable.compact() to trigger the dag. Hence HoodieCommitMetadata cannot be prepared until writeClient.commitCompaction() is invoked which is when the dag will be triggered.

Note: As you could see, we are changing the compaction dag here. For eg, even if someone disabled Metadata table completely, the compaction execution uses a diff flow w/ this revamp. So, if you are reviewing, do pay close attention to these code blocks.

Ref code:
https://github.com/apache/hudi/pull/12236/files#r1839059354
https://github.com/apache/hudi/pull/12236/files#r1839062145
https://github.com/apache/hudi/pull/12236/files#r1839063857

14. MDT stats generation:

As per the design, our intention is to generate all required stats for MDT record generation in the data table write handle itself and pass it along w/ WriteStatus. FILES, RLI and col stats are taken care in this patch. For col stats, prior to this patch, only Append handle was generating the stats. But with this patch, even create handle and merge handle is generating col stats and attaching it to the WriteStatus. General idea here is to embed all required stats (col stats, bloom filter, functional index, secondary index) in the WriteStatus returned from writehandles in data table.

Ref code:
https://github.com/apache/hudi/pull/12236/files#diff-b85f7289adc6d3bd6ecc32feab502e2e95f70539e1186d479fd0815d58485f84
https://github.com/apache/hudi/pull/12236/files#diff-44402d60ebabe11a4826e9c8a548b1737b32b0664f6af3c379d933b899e295a8
https://github.com/apache/hudi/pull/12236/files#diff-63a77e05c924278c190061a1a18a992a7f9480af14f0f34f4328bf72ae673fe9

Things to flesh out/Pending items:

  • Non incremental write operations might need some special handling for some MDT partitions. In general, our design relies on the fact that everything will be embedded in write status by the write handles. Incase of insert_overwrite for RLI partition, we might have to poll FSV to fetch latest base files and then record keys in them. So, polling FSV is not a generally recommended from executor. and here we are in need of doing it for lot of files and so we can't just delegate the work to one write handle. So, this needs to be figured out.
  • One more pending piece in the overall picture is the row writer flow for bulk insert. We also need to align the row writer code path w/ the new dag. We do not want to have any divergence w/ the dag re-write. Current patch does not have the fix for row writer.
  • In the interest of time, I am wondering if we can fix auto commit enabled flow as well for upserts an other write operations so that we do not need to chase 300+ test failures. and take up the deprecation of auto commit enabled flow separately.

Tests:

Note: We will be introducing a flag to enable the new optimized dag and will be turned off by default. Optimized dag will be fully implemented for spark. Flink and java will be taken up later.

Feel feel to comment on the overall design or any code blocks in particular. Since this has a very large blast radius, we wanted to ensure we don't have any gaps in the design or impl.

In the next one day or two, I will try to add `Notes to reviewer" as well so as to help w/ reviewing.

Impact

Describe any public API or user-facing feature change or any performance impact.

Risk level (write none, low medium or high below)

If medium or high, explain what verification was done to mitigate the risks.

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:XL PR with lines of changes > 1000 label Nov 12, 2024
initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));
table.validateUpsertSchema();
preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());
Option<HoodieTableMetadataWriter> metadataWriterOpt = getMetadataWriter(instantTime, table.getMetaClient());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changes in upsert

initializeFromFilesystem(instantTime, partitionTypes, Option.empty());
}

public void startCommit(String instantTime) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

starting a commit in MDT

/**
*
*/
public HoodieData<WriteStatus> prepareAndWriteToMDT(HoodieData<WriteStatus> writeStatus, String instantTime) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write data to MDT for all partitions except FILES

instantTime));
}

Option<HoodieTableMetadataWriter> metadataWriterOpt = getMetadataWriter(instantTime, metaClient);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

starting a commit in MDT

context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: " + config.getTableName());
List<HoodieWriteStat> writeStats = writeStatuses.map(WriteStatus::getStat).collect();
return commitStats(instantTime, HoodieJavaRDD.of(writeStatuses), writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds, extraPreCommitFunc);
// writeStatuses is a mix of data table write status and mdt write status
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

triggering the dag for both data table and mdt


protected void commit(HoodieTable table, String commitActionType, String instantTime, HoodieCommitMetadata metadata,
List<HoodieWriteStat> stats, HoodieData<WriteStatus> writeStatuses) throws IOException {
List<HoodieWriteStat> dataTablestats, List<HoodieWriteStat> mdtStats, HoodieData<WriteStatus> writeStatuses) throws IOException {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrapping up the commit in DT.

try {
// wraps the commit in MDT
// todo: do we need to add completion time to MDT or not yet.
metadataWriterMap.get(instantTime).get().writeToFilesPartitionAndCommit(instantTime, context, mdtStats, metadata);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writing to FILES partition in MDT and committing in MDT

getWriteClient().startCommitWithTime(instantTime, HoodieTimeline.DELTA_COMMIT_ACTION);
}

public void writeToFilesPartitionAndCommit(String instantTime, HoodieEngineContext context, List<HoodieWriteStat> partialMdtWriteStats, HoodieCommitMetadata metadata) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writing to FILES partition

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can break up the writeXXX method and committing? i.e call both from layer above.

}

@Override
public JavaRDD<WriteStatus> upsertPreppedPartialRecords(JavaRDD<HoodieRecord<T>> preppedRecords, String instantTime, boolean initialCall,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

upsert prepped partial

private final transient Random random;

public WriteStatus(Boolean trackSuccessRecords, Double failureFraction) {
public WriteStatus(Boolean trackSuccessRecords, Double failureFraction, Boolean isMetadataTable) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writeStatus to hold isMetadata flag.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all the naming needs to be revisited.. not adding any code comments for now

table.getMetaClient().reloadActiveTimeline();
}
compactionTimer = metrics.getCompactionCtx();
// start commit in MDT if enabled
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compaction changes starts here


public void commitCompaction(String compactionInstantTime, HoodieWriteMetadata<O> compactionWriteMetadata, Option<HoodieTable> tableOpt,
Option<HoodieTableMetadataWriter> metadataWriterOpt) {
// dereferencing the write dag for compaction for the first time.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

committing compaction.
here is where (processAndFetchHoodieWriteStats), we trigger the dag which will end up writing to both data table and metadata table.

Option<HoodieTableMetadataWriter> metadataWriterOpt) {
// dereferencing the write dag for compaction for the first time.
Pair<List<HoodieWriteStat>, List<HoodieWriteStat>> dataTableAndMetadataTableHoodieWriteStats = processAndFetchHoodieWriteStats(compactionWriteMetadata);
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(true);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is where the commit metadata is prepared for Compaction

List<HoodieWriteStat> updateStatusMap = statuses.map(WriteStatus::getStat).collectAsList();

// we are triggering the dag here.
// thinking if we can keep the RDD<WriteStatus> as is and dereference it in BaseHoodieTableServiceClient just before complete Compaction.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are removing collect() from here

@danny0405
Copy link
Contributor

High-level it seemes we introduce many MDT specific APIs and components that I don't like just to avoid the Spark DAG retries, let's try to avoid this. Also think through the effect to other engines like Flink and Java.

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nsivabalan Thanks for this.

On open items:

  • I don't think deprecating autocommit is a requirement per se. If we think, this can be implemented reliably regardless, then we can revisit that later.
  • We need to do this with row writer code path handled. Cannot special case that. Can we start looking into what those changes will be like.
  • Can we add a table for each write operation and how MT is updated for them. Love to get a comprehensive discussion on that before we go ahead. for e.g insert_overwrite on DT translates to _______. on MT..

Reviewing code itself now.

public void close() {
stopEmbeddedServerView(true);
// close all metadata writer instances
metadataWriterMap.entrySet().forEach(entry -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if a writeClient is reused across writes, these MDTWriter instances will hang around till close()?

@vinothchandar
Copy link
Member

vinothchandar commented Nov 13, 2024

@danny0405 Code/APIs aside.

Actually this is a general idea, not specific to Spark retry handling. I think this design is lot more friendly to pure streaming engines like Flink. Don't you think? otherwise how'd we update larger MT partitions like RLI from Flink.

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments.. overall - lets flesh all open items out. and work through how this interplays with Flink (or) keep that unaffected.

compactionTimer = metrics.getCompactionCtx();
// start commit in MDT if enabled
Option<HoodieTableMetadataWriter> metadataWriterOpt = getMetadataWriterFunc.apply(compactionInstantTime, table.getMetaClient());
if (metadataWriterOpt.isPresent()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be not done in a single place - where we start the commit for the write operation? i.e whoever generates compactionInstantTime

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

table services are done this way (and is different from ingestion commits), bcoz the schedulding and execution could happen separately. but with MDT, if we start the commit during compaction scheduling in data table, and defer the execution later, some other thread in MDT could detect failed heart beats for the corresponding DC in MDT and can trigger rollback. So, we are deferring the starting of DC in MDT for data table table services just when the execution of table services start. So, that we know the heart beats will be continuous and if anything failed mid-way, it will get lazily rolled back.

But wanted to jam something on this end. Can we completely disable auto rollbacks in MDT. the data table writer is the only one that can trigger rollbacks for the current commit its dealing with.

What this means is:
When an ingestion commit in DT fails mid-way in MDT:

  • the resp DC in MDT will be inflight until the rollback of data table kicks in. And when the rollback in data table reaches MDT layer, it can rollback as usual.

For compaction and clustering:

  • Compaction in DT failed mid-way while writing to resp DC in MDT. This will stay inflight until the next attempt of DT compaction resumes. On resuming, hudi triggers a rollback of the compaction commit in DT which will then gets applied to MDT as well. i..e result in rolling back the compaction commit. and then the compaction in DT will go through 2nd attempt. which in turn will get applied as DC in MDT.

So, if there are any table services or ingestion commit stays inflight in data table for a long duration, this could also mean, inflight hanging around in MDT.

finalizeWrite(table, compactionCommitTime, writeStats);
// commit to data table after committing to metadata table.
writeTableMetadata(table, compactionCommitTime, metadata, context.emptyHoodieData());
// write to MDT FILES partition and commit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to nicely compartmentalize all this MT vs DT write stat splitting and committing.. so we don't change every method call path.. Will take some work

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of what @danny0405 is pointing to

return Option.empty();
}

public void maybeDeleteMetadataTable(HoodieTableMetaClient metaClient) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should simplify all this enabled/disable business for MDT. just turn it on by default and be the only mode, now that Flink is also turned on

// wraps the commit in MDT
// todo: do we need to add completion time to MDT or not yet.
metadataWriterMap.get(instantTime).get().writeToFilesPartitionAndCommit(instantTime, context, mdtStats, metadata);
metadataWriterMap.get(instantTime).get().close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

answered my previous q here.

private final transient Random random;

public WriteStatus(Boolean trackSuccessRecords, Double failureFraction) {
public WriteStatus(Boolean trackSuccessRecords, Double failureFraction, Boolean isMetadataTable) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all the naming needs to be revisited.. not adding any code comments for now

}
}

protected void attachColStats(HoodieWriteStat stat, List<HoodieRecord> recordList, List<Schema.Field> fieldsToIndex,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs to be done on-the-fly

getWriteClient().startCommitWithTime(instantTime, HoodieTimeline.DELTA_COMMIT_ACTION);
}

public void writeToFilesPartitionAndCommit(String instantTime, HoodieEngineContext context, List<HoodieWriteStat> partialMdtWriteStats, HoodieCommitMetadata metadata) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can break up the writeXXX method and committing? i.e call both from layer above.

import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;

public class MetadataIndexGenerator implements Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need some sort of interface for generating index record for each type of index given WriteStat..


private final List<Pair<HoodieRecordDelegate, Throwable>> failedRecords = new ArrayList<>();

private boolean isMetadataTable;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this track col-stats already? I guess yes

HoodieWriteConfig clientConfig,
Option<EmbeddedTimelineService> timelineService) {
super(context, clientConfig, timelineService);
Option<EmbeddedTimelineService> timelineService,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 can you put together or have a diagram that shows the class dependencies in Flink.. i.e what code runs on Driver/Coordinator vs Executor. classes involved etc.

I want to make sure this works for Flink easily as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The write to MDT all happens on the Driver/coordinator, the commit also happens on Driver for DT, the actual write happens on executors.

@danny0405
Copy link
Contributor

danny0405 commented Nov 14, 2024

@danny0405 Code/APIs aside.

Actually this is a general idea, not specific to Spark retry handling. I think this design is lot more friendly to pure streaming engines like Flink. Don't you think? otherwise how'd we update larger MT partitions like RLI from Flink.

I didn't see there is any gains after this change for Flink, still there is no chance to plug in the Flink RLI operators on the fly flexibly. Actually the union of the DT/MDT rdds make the pipeline even more complex.

Copy link
Contributor

@yihua yihua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closing this PR as streaming write DAG for MDT is released in Hudi 1.1., with changes merged in a sequence of PRs including #13292 #13295 #13305 #13307 #13402 #13491

@yihua yihua closed this Dec 17, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:XL PR with lines of changes > 1000

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants