Skip to content

[HUDI-1517][HUDI-6758][HUDI-6761] Adding support for per-logfile marker to track all log files added by a commit and to assist with rollbacks#9553

Closed
jonvex wants to merge 24 commits intoapache:masterfrom
jonvex:rollback_mdt_mor_fix
Closed

[HUDI-1517][HUDI-6758][HUDI-6761] Adding support for per-logfile marker to track all log files added by a commit and to assist with rollbacks#9553
jonvex wants to merge 24 commits intoapache:masterfrom
jonvex:rollback_mdt_mor_fix

Conversation

@jonvex
Copy link
Contributor

@jonvex jonvex commented Aug 27, 2023

Change Logs

We are looking to fix two problems by adding per log file marker.
a. i. MOR data table rollbacks missed to sync original log files from failed commit to MDT.
a.ii. Along these lines, if rollback instant is retried multiple times, any log files added from failed rollback attempts should also be synced to MDT.
b. If there are spurious log files created even w/ successful commits, we need to ensure these spurious log files are also synced to MDT.

So, to fix all of the above, we are adding per log file marker. Any log file added or appended to will create markers. We don't really need to distinguish between create and append and so we will go w/ APPEND IoType for markers.

Fix for (a. i): Any log file added will emit a marker. If the commit of interest failed, hudi will trigger a rollback. During rollback planning, using markers we identify the original log files added by the failed commit and track it as part of the rollback plan. This also gets tracked in HoodieRollbackMetadata (had to upgrade the schema for this purpose).

Fix for (a.ii): Whenever a rollback is triggered, hudi adds a rollback command block. With this patch, we are also emitting markers for such log files. During rollback execution, apart from adding log files added by failed commit to HoodieRollbackMetadata, we also add these log files which could have been added by previous attempts of rollback for the same instant.

Fix for (b): During marker based reconciliation step, we check for log files from markers and compare it against HoodieCommitMetadata's HoodieWriteStat. If for any additional files tracked using markers (which could happen due to spark retries), we will add new HoodieWriteStat and update HoodieCommitMetadata. So, that when this syncs to MDT, we don't miss to track this spurious log files. We will use #9545 to skip such spurious log files on the reader side. So, on the writer side, we just want to ensure we don't miss to track any log file created by hudi.

Note: Please do note that the reconciliation for log files is kind of opposite of what happens w/ data files. w/ data files, any extraneous files are deleted. But for any extaneous log files, we can't afford to delete. Since there could be a concurrent reader trying to read the the file slice of interest. Eventually during execution, it might parse the log block header and might skip if its partially failed commit or inflight commit. Anyways, in short, we can't afford to delete any log files at any point in time except cleaner. So, for any extraneous log files detected, we fix the HoodieCommitMetadata to track these additional log files as well.

Notes to reviewers to assist in reviewing:

I will break down diff set of changes and the classes to review for the same.

  1. Adding per log file marker for regular log files: Added a callback(AppendLogWriteCallback) for this purpose since we may not know the log file name upfront (unlike data files). New apis are introduced to Markers for this purpose.
    Check files HoodieWriteHandle, HoodieAppendHandle, HoodieLogFormatWriter, HoodieLogFormat, HoodieLogFileWriterCallback
    DirectWriterMarkers, TimelineServerBasedMarkers, WriteMarkers, RequestHandler.

  2. Schema upgrade for rollback metadata.
    Check file HoodieRollbackMetadata.avsc

  3. Rollback from DT when synced to MDT: w/ the schema upgrade, we will fetch "logFilesFromFailedCommit" in HoodieRollbackMetadata and make a delta commit to MDT.
    HoodieBackedTableMetadataWriter and HoodieTableMetadataUtil.

4.Rollback plan changes:
When using Marker based rollback strategy, we poll markers to find the log files added. Apart from log file names, we also need the actual size. So, we do fs listing to fetch the file lengths. We track these as part of HoodieRollbackRequest.logFilesWithBlocksToRollback. There are chances that some files could be missing which are tracked in markers. We can ignore these files since this could happen (just after creating marker file, lets say the process crashed w/o creating the actual log files)
Classes to check: MarkerBasedRollbackStrategy

  1. New argument to StorageScheme named listStatusUnfriendly. Depending on storage scheme, the file system based listing to triage the actual size of the log files of interest could change. As per this patch, we only have one way to do this. But in a follow up patch, we might have to fix that.

  2. Fixing HoodieCommitMetadata to include HoodieWriteStat for any missing log files which are extraneous through markers.
    Classes to check: SparkRDDWriteClient.commit and addMissingLogFileIfNeeded(). This also includes the file system listing to fetch the actual size of spurious log files if any.

  3. Rollback execution:
    a. With this patch, we are also emitting markers for rollback command blocks (log files).
    b. During execution, we also need to fetch any log files added by previous attempts of rollback and update HoodieRollbackSat if need be. Note that we can have only one HoodieRollbackPlan per partition.
    Classes: BaseRollbackHelper.

  4. Misc:
    HoodiePairData to add join() support. HoodieListPairData, HoodieJavaPairRDD.
    FsUtils.getFileStatusesUnderPartition to assist in fetching file statuses for some interested log files.

Impact

MDT in sync with filesystem

Risk level (write none, low medium or high below)

High.
More rigorous testing will be done.

Documentation Update

N/A

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@jonvex jonvex changed the title Rollback mdt mor fix [HUDI-1517] Adding support for per-logfile marker to track all log files added by a commit and to assist with rollbacks Aug 27, 2023
@nsivabalan nsivabalan added release-0.14.0 priority:blocker Production down; release blocker labels Aug 28, 2023
@nsivabalan nsivabalan changed the title [HUDI-1517] Adding support for per-logfile marker to track all log files added by a commit and to assist with rollbacks [HUDI-1517][HUDI-6758][HUDI-6761] Adding support for per-logfile marker to track all log files added by a commit and to assist with rollbacks Aug 28, 2023
// remaining are log files generated by failed spark task, let's generate write stat for them
if (logFilesMarkerPath.size() > 0) {
// populate partition -> map (fileId -> HoodieWriteStat) // we just need one write stat per fileID to fetch some info about the file slice of interest when we want to add a new WriteStat.
List<Pair<String, Map<String, HoodieWriteStat>>> partitionToFileIdAndWriteStatList = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need to make similar changes to BaseSparkDeltaCommitActionExecutor

@nsivabalan nsivabalan force-pushed the rollback_mdt_mor_fix branch from 4be2f9e to a974474 Compare August 28, 2023 09:22
Copy link
Member

@codope codope left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just skimmed through the patch and left some comments. We definitely need more tests. This is a core change. Few more high level thoughts:

  1. Introducing markers for log files including rollback blocks would add another layer of complexity to the system, which means more code paths to maintain, test, and debug.
  2. Handling large number of markers can become a challenge, especially during cleanup, commit, and rollback phases. Are we doing batch creation/deletion?
  3. Perf + Storage footprint: Creating a marker for every log file could introduce significant overhead, slowing down the write operation. I also see some extra (but needed) list calls (per rollback) and reduceByKey operations in current logic. Let's keep S3/GCS list/put costs in mind as well. We should benchmark performance and understand the overhead introduced by the markers, especially in high-update scenarios (2d/2e-mutable).

I would caution against landing it in 0.14.0.

// list files may bring pressure to storage with centralized meta service like HDFS.
// when we want to get only part of files under a directory rather than all files, use getStatus may be more friendly than listStatus.
// here is a trade-off between rpc times and throughput of storage meta service
private Boolean listStatusUnfriendly;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do consider cloud costs as well. List calls on cloud object storage is not cheap.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. this is to assist when, we really needed to fetch some file status for a subset of files for a given path, we could have diff implementation.


private boolean createAppendMarker(HoodieLogFile logFileToAppend) {
WriteMarkers writeMarkers = WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
return writeMarkers.create(partitionPath, logFileToAppend.getFileName(), IOType.APPEND,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createIfNotExists?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix this

String rollbackInstantTime = createRollbackTimestamp(instantTime);
if (deltacommitsSinceCompaction.containsInstant(instantTime)) {
LOG.info("Rolling back MDT deltacommit for " + instantTime + ", since previous attempt failed");
if (!getWriteClient().rollback(instantTime)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so a rollback in DT could trigger 2 rollbacks in MDT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the first rollback fails, we need to attempt the rollback again. If it failed after the MDT write, then we need to rollback the changes that the first rollback did to the MDT

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this was a miss. we can remove this explicit rollbacks since any delta commit to MDT will be handled within commitInternal in HoodieBackedTableMetadataWriter.

if (!metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime)) {

.anyMatch(writeStat -> FSUtils.isLogFile(new Path(config.getBasePath(), writeStat.getPath()).getName()));
if (hasLogFileInDeltaCommit) {
// get all log files generated by log mark file
Set<String> logFilesMarkerPath = new HashSet<>(markers.getAppendedLogPaths(context, config.getFinalizeWriteParallelism()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will incur a list call for the marker dir (per deltacommit)

// fetch file sizes for missing log files
Path fullPartitionPath = new Path(config.getBasePath(), partitionPath);
FileSystem fileSystem = fullPartitionPath.getFileSystem(serializableConfiguration.get());
List<Option<FileStatus>> fileStatues = FSUtils.getFileStatusesUnderPartition(fileSystem, fullPartitionPath, missingLogFilesForPartition, true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then again list status per missing file right.. can we not change the WriteMarkers#getAppendedLogPaths to return a collection of FileStatus instead of strings.

Copy link
Contributor

@nsivabalan nsivabalan Aug 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope. there are some minor differences.
for rollback planning I agree anyways, we will need the file Status.
but during reconcile of regular delta commit, if every marker file returned by markers are present in WriteStatus, we will not trigger any additional FS calls.

// NOTE: Marker's don't carry information about the cumulative size of the blocks that have been appended,
// therefore we simply stub this value.
FileSystem fileSystem = table.getMetaClient().getFs();
List<Option<FileStatus>> fileStatuses = FSUtils.getFileStatusesUnderPartition(fileSystem, filePath.getParent(), Collections.singletonList(filePath.getName()), true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to use getFileStatus here as we only have one file? FSUtils.getFileStatusesUnderPartition this method is improve the performance when we are trying to get more than one file under a directory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have re-written some code snippets in here.
we will avoid doing fs calls per log file.
once we get hold of all rollbackRequests, we will merge them so that we will have 1 per file slice atleast


private boolean createAppendMarker(HoodieLogFile logFileToAppend) {
WriteMarkers writeMarkers = WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
return writeMarkers.create(partitionPath, logFileToAppend.getFileName(), IOType.APPEND,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix this

@danny0405
Copy link
Contributor

I would caution against landing it in 0.14.0.

-1 for landing it in 0.14.0.

Copy link
Contributor

@danny0405 danny0405 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-1 for a huge change like this to be in 0.14.0. We need enough tests either in OneHouse and opensource.

// NOTE: Marker's don't carry information about the cumulative size of the blocks that have been appended,
// therefore we simply stub this value.
FileSystem fileSystem = table.getMetaClient().getFs();
List<Option<FileStatus>> fileStatuses = FSUtils.getFileStatusesUnderPartition(fileSystem, filePath.getParent(), Collections.singletonList(filePath.getName()), true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have re-written some code snippets in here.
we will avoid doing fs calls per log file.
once we get hold of all rollbackRequests, we will merge them so that we will have 1 per file slice atleast

// list files may bring pressure to storage with centralized meta service like HDFS.
// when we want to get only part of files under a directory rather than all files, use getStatus may be more friendly than listStatus.
// here is a trade-off between rpc times and throughput of storage meta service
private Boolean listStatusUnfriendly;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. this is to assist when, we really needed to fetch some file status for a subset of files for a given path, we could have diff implementation.

Copy link
Member

@codope codope left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding tests. PR looks in good shape. Few more comments.


// lets join both to generate write stats for missing log files
List<Pair<String, List<HoodieWriteStat>>> additionalLogFileWriteStat = partitionToWriteStatHoodieData
.join(partitionToMissingLogFilesHoodieData)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that inner join can ignore some missing log files? I don't think on object storage with string read-after-write consistency this can happen.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there is no data file produced i.e. if a file slice is not part of in memory WriteStatus, I don't think there could be a scenario where we find additional missing files. These missing files are created due to spark retries.
and this is targetting a successful commit. So, I can't think of a scenario where in-memory write status does not have a file slice, while retries created them.

for a partially failed commit, it could happen which our markers will handle it.
but good point though.

});
}

private static HoodiePairData<String, Map<String, List<String>>> getPartitionToFileIdToMissingLogFileMap(String basePathStr, Set<String> logFilesMarkerPath, HoodieEngineContext context,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If i understand correctly, this method aims to convert a list of log file paths into a mapping between partition paths and a second mapping between file IDs and the corresponding missing log files. The method looks fine, but i guess there are some opportunities for optimization. Instead of first converting paths to pairs of partition paths and log file names, and then reducing them, you can directly use groupByKey to group all log file paths by partition. And then the conversion from log file paths to file IDs and their associated log files can be a part of the map function post grouping.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like below:

  private static HoodiePairData<String, Map<String, List<String>>> getPartitionToFileIdToMissingLogFileMap(
      String basePathStr,
      Set<String> logFilesMarkerPath,
      HoodieEngineContext context,
      int parallelism) {
    List<String> logFilePaths = new ArrayList<>(logFilesMarkerPath);

    return context.parallelize(logFilePaths, parallelism).mapToPair(logFilePath -> {
      Path logFileFullPath = new Path(basePathStr, logFilePath);
      String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePathStr), logFileFullPath.getParent());
      return Pair.of(partitionPath, logFileFullPath.getName());
    })
        .groupByKey()
        .mapToPair(partitionAndLogFiles -> {
          String partitionPath = partitionAndLogFiles.getKey();
          Path fullPartitionPath = StringUtils.isNullOrEmpty(partitionPath) ? new Path(basePathStr) : new Path(basePathStr, partitionPath);
          Map<String, List<String>> fileIdToLogFiles = new HashMap<>();
          for (String logFile : partitionAndLogFiles.getValue()) {
            String fileId = FSUtils.getFileIdFromLogPath(new Path(fullPartitionPath, logFile));
            fileIdToLogFiles
                .computeIfAbsent(fileId, k -> new ArrayList<>())
                .add(logFile);
          }
          return Pair.of(partitionPath, fileIdToLogFiles);
        });
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks ok to me.
after this, we have to do the join anyways so that we can create write stat for each missing file.

instead, in the cur logic, we are keeping everything in one join and map call.
lmk if you think we have compelling reason to go w/ your suggestion


// Old marker files may be generated from base file name before HUDI-1517. keep compatible with them.
// TODO: deprecated in HUDI-1517, may be removed in the future. @guanziyue.gzy

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's remove these unnecessary newlines.

* @return all the log file paths of write IO type "APPEND"
* @throws IOException
*/
public abstract Set<String> getAppendedLogPaths(HoodieEngineContext context, int parallelism) throws IOException;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A unit test for this method

.withParallelism(2, 2)
.withDeleteParallelism(2)
.withAutoCommit(autoCommit)
.withEmbeddedTimelineServerEnabled(false).forTable("test-trip-table")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is timeline server disabled?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are operating at lower layers like RollbackActionExecutor etc. Did not want to set up timeline servers for these

import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;

@Tag("functional")
public class TestHoodieSparkCopyOnWriteTableRollback extends TestHoodieSparkRollback {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for all the tests here, the behabior shouldn't change right? as this feature is only for MOR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, these are only for MOR table. we did not have tests for some of the flows and hence adding them now

@hudi-bot
Copy link
Collaborator

hudi-bot commented Sep 6, 2023

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@danny0405
Copy link
Contributor

Take a glimpse of the description and I'm not very approval about these changes:

  1. Add extraneous log files to the commit metadata either for writer commit or repetitive rollback attemp, somehow we should fix the file slice to shadow/hide these log files instead;
  2. New argument to StorageScheme named listStatusUnfriendly, I don't think we should rely on the list API of filesystem to fix the issue


for (Pair<String, List<HoodieWriteStat>> partitionDeltaStats : additionalLogFileWriteStat) {
String partitionPath = partitionDeltaStats.getKey();
partitionDeltaStats.getValue().forEach(ws -> commitMetadata.addWriteStat(partitionPath, ws));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the incremental queries would see these missing log files right? If there are any valid log blocks in these file, could there be any duplications for consumption ?

@nsivabalan nsivabalan added priority:critical Production degraded; pipelines stalled release-0.14.1 and removed priority:blocker Production down; release blocker release-0.14.0 labels Sep 12, 2023
@github-actions github-actions bot added the size:XL PR with lines of changes > 1000 label Feb 26, 2024
@yihua
Copy link
Contributor

yihua commented Sep 11, 2024

The log file marker support is added by #11187 to branch-0.x for 0.15.0 release. Closing this PR.

@yihua yihua closed this Sep 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

priority:critical Production degraded; pipelines stalled release-0.14.1 size:XL PR with lines of changes > 1000

Projects

Status: ✅ Done

Development

Successfully merging this pull request may close these issues.

7 participants