Skip to content

[HUDI-6404] Implement ParquetToolsExecutionStrategy for clustering#9006

Open
suryaprasanna wants to merge 7 commits intoapache:masterfrom
suryaprasanna:parquet-tools
Open

[HUDI-6404] Implement ParquetToolsExecutionStrategy for clustering#9006
suryaprasanna wants to merge 7 commits intoapache:masterfrom
suryaprasanna:parquet-tools

Conversation

@suryaprasanna
Copy link
Contributor

Change Logs

This change implements a new clustering strategy to execute parquet tools commands for rewriting use cases.
If there is a use case of pruning some columns to save storage memory, current approach of clustering will iterate over every record and remove the unused column, this is so much time consuming. By directly using ParquetTools we can achieve this by running a command within the clustering strategy.
Here, the logic goes through the process of creating marker files, so that on any event of failure it could rely on rollback's MarkerBasedRollbackStrategy to remove the inflight files and parquet files.

Impact

No impact addition of a new feature.

Risk level (write none, low medium or high below)

None.

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@suryaprasanna suryaprasanna changed the title HUDI-6404 Implement ParquetToolsExecutionStrategy for clustering [HUDI-6404] Implement ParquetToolsExecutionStrategy for clustering Jun 17, 2023

private static final Logger LOG = LoggerFactory.getLogger(ParquetFileMetaToWriteStatusConvertor.class);
private final HoodieTable<T,I,K,O> hoodieTable;
private final HoodieWriteConfig writeConfig;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HoodieTable<T, I, K, O> hoodieTable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed indentation.

stat.setPartitionPath(writeStatus.getPartitionPath());
stat.setPath(new Path(writeConfig.getBasePath()), parquetFilePath);
stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
stat.setPrevCommit(String.valueOf(executionConfigs.get("prevCommit")));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is cool if we can avoid to hardcode these keys.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to static final variables.

* Write handle that is used to work on top of files rather than on individual records.
*/
public class HoodieFileWriteHandler<T extends HoodieRecordPayload, I, K, O> extends HoodieWriteHandle<T, I, K, O> {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HoodieFileWriteHandler -> HoodieFileWriteHandle

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am also thinking, if we can name this
HoodieFileRewriteHandle


public HoodieFileWriteHandler(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, TaskContextSupplier taskContextSupplier,
Path srcPath) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we give some document to these parameters, especially srcPath.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using srcPath is confusing changed it to oldFilePath, to keep it consistent with other handles.

this.writeStatus = generateWriteStatus(path.toString(), partitionPath, executionConfigs);

// TODO: Create completed marker file here once the marker PR is landed.
// createCompleteMarkerFile throws hoodieException, if marker directory is not present.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what these TODO really mean, the marker file should be created anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please do add jira id here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is dependent on Marker changes PR from Balajee. So, for now added these comments will revert as soon as those changes are landed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added it as part of TODO statement.

* that use parquet-tools commands.
*/
public abstract class ParquetToolsExecutionStrategy<T extends HoodieRecordPayload<T>>
extends SingleSparkJobExecutionStrategy<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any impl class not exclusively for testing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have column prune and other encryption related implementation classes that use parquet-tools. Will need to check with other teams before pushing them to OSS.

@danny0405 danny0405 self-assigned this Jun 19, 2023
@danny0405 danny0405 added area:table-service Table services engine:spark Spark integration labels Jun 19, 2023
private final HoodieWriteConfig writeConfig;
private final FileSystem fs;

public ParquetFileMetaToWriteStatusConvertor(HoodieTable<T, I, K, O> hoodieTable, HoodieWriteConfig writeConfig) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awesome. we might need something like this to support insert_overwrite, delete_partition w/ RLI.

stat.setPrevCommit(String.valueOf(executionConfigs.get("prevCommit")));

writeStatus.setStat(stat);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh this is not populating any succ records. anyways, we can build something on top of this for insert_overwrite support

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can improve this to include record_keys later, for now we can keep this simple to convert the parquet meta to write status.

/**
* Write handle that is used to work on top of files rather than on individual records.
*/
public class HoodieFileWriteHandler<T extends HoodieRecordPayload, I, K, O> extends HoodieWriteHandle<T, I, K, O> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to "HoodieFileWriteHandle". no "r" in the end. all of your handles are named this way. lets not add "r" in the end.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


// Create inProgress marker file
createMarkerFile(partitionPath, FSUtils.makeBaseFileName(this.instantTime, this.writeToken, this.fileId, hoodieTable.getBaseFileExtension()));
// TODO: Create inprogress marker here and remove above marker file creation, once the marker PR is landed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add the jira number here please

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added it.

createMarkerFile(partitionPath, FSUtils.makeBaseFileName(this.instantTime, this.writeToken, this.fileId, hoodieTable.getBaseFileExtension()));
// TODO: Create inprogress marker here and remove above marker file creation, once the marker PR is landed.
// createInProgressMarkerFile(partitionPath,FSUtils.makeDataFileName(this.instantTime, this.writeToken, this.fileId, hoodieTable.getBaseFileExtension()));
LOG.info("New CreateHandle for partition :" + partitionPath + " with fileId " + fileId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix logging. this is no CreateHandle

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* This class gives skeleton implementation for set of clustering execution strategy
* that use parquet-tools commands.
*/
public abstract class ParquetToolsExecutionStrategy<T extends HoodieRecordPayload<T>>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we name this class as EfficientParquetReWriteExecutionStrategy

embedding Parquettools in the class name somehow does not sit well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since, the class runs on parquet_tools commands, so I thought ParquetToolsExecutionStrategy name might be better. By renaming it to EfficientParquetReWriteExecutionStrategy, we are reducing the emphasis on the ParquetTools. Let me know, what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taking a closer look at the patch, I don't see anything specific to parquet tools here. cna you help me understand?
we could also name this
BaseFileRewriteStrategy (or BaseFileTransformStrategy) since it takes in a old file and returns a new file.
any base file format should be supported.
for BaseFileRewriteStrategy, we can introduce ParquetFileRewriteStrategy if need be.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right, this patch only add an interface but no specific impl for parquet-tools.

LOG.info("Starting clustering operation on input file ids.");
List<ClusteringOperation> clusteringOperations = clusteringOps.getOperations();
if (clusteringOperations.size() > 1) {
throw new HoodieClusteringException("Expect only one clustering operation during rewrite: " + getClass().getName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whey throw if we have more than 1 CO?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parquet tools operate at file level. So, HoodieClusteringGroups that are created during clustering plan creation will take creating one group per file group. That way SingleSparkJobExecutionStrategy class can parallelize the execution on all these file groups in parallel.
Later when including other tools like merge etc we can relax this condition.

* Write handle that is used to work on top of files rather than on individual records.
*/
public class HoodieFileWriteHandler<T extends HoodieRecordPayload, I, K, O> extends HoodieWriteHandle<T, I, K, O> {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am also thinking, if we can name this
HoodieFileRewriteHandle

* In this method parquet-tools command can be created and executed.
* Assuming that the parquet-tools command operate per file basis this interface allows command to run once per file.
*/
protected abstract void executeTools(Path srcFilePath, Path destFilePath);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executeRewrite or executeConvert

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the class is ParquetToolsExecutionStrategy I thought executeTools might be better here since it will actually run parqeut_tools commands here. Let me know what you think.

final Iterator<HoodieRecord<T>> records, final int numOutputGroups, final String instantTime,
final Map<String, String> strategyParams, final Schema schema, final List<HoodieFileGroupId> fileGroupIdList,
final boolean preserveHoodieMetadata, final TaskContextSupplier taskContextSupplier) {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then, should we throw here then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, corrected it.

@nsivabalan nsivabalan added the priority:critical Production degraded; pipelines stalled label Jun 21, 2023
@danny0405
Copy link
Contributor

If there is a use case of pruning some columns to save storage memory, current approach of clustering will iterate over every record and remove the unused column, this is so much time consuming.

Thanks @suryaprasanna , can you clarify what's the relationship between column pruning and clustering, for regular notion of Hudi clustering, it only merges small file groups into larger ones with optional soring on columns, there is no pruning happens here, how the user expects to improve the efficiency with this patch overall?

@suryaprasanna
Copy link
Contributor Author

suryaprasanna commented Jun 29, 2023

If there is a use case of pruning some columns to save storage memory, current approach of clustering will iterate over every record and remove the unused column, this is so much time consuming.

Thanks @suryaprasanna , can you clarify what's the relationship between column pruning and clustering, for regular notion of Hudi clustering, it only merges small file groups into larger ones with optional soring on columns, there is no pruning happens here, how the user expects to improve the efficiency with this patch overall?

Clustering is initially added to do sorting and stitching. But its framework is flexible enough to accommodate wide variety of rewriter use cases. Following are the other rewriter usecases that can be done using Clustering framework.

  1. Encryption. Async encryption on data files can be done on demand basis, by restricting the clustering group to be 1. Which then becomes a update of the file.
  2. Column pruning. This current change be used to run parquet_tools prune command on unused columns to reduce the storage footprint.

@danny0405
Copy link
Contributor

2. Column pruning. This current change be used to run parquet_tools prune command on unused columns to reduce the storage footprint.

So you mean, a user action like alter table drop column a, b, c may utilize this new strategy. Makes sense to me.

Copy link
Contributor

@danny0405 danny0405 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, I'm fine with the change.

@danny0405
Copy link
Contributor

danny0405 commented Jul 3, 2023

Summary: Create new ParquetToolsExecutionStrategy within hudi clustering to support parquet-tools commands like column prune.

Reviewers: O955 Project Hoodie Project Reviewer: Add blocking reviewers!, PHID-PROJ-pxfpotkfgkanblb3detq!, #ldap_hudi

JIRA Issues: HUDI-1011

Differential Revision: https://code.uberinternal.com/D7271275
@hudi-bot
Copy link
Collaborator

hudi-bot commented Jul 5, 2023

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

partitionPath, fileId, taskContextSupplier, oldFilePath);

// Executes the parquet-tools command.
executeTools(oldFilePath, writeHandler.getPath());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move the executeTools within HoodieFileWriteHandle

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:table-service Table services engine:spark Spark integration priority:critical Production degraded; pipelines stalled size:L PR with lines of changes in (300, 1000]

Projects

Status: 🏗 Under discussion

Development

Successfully merging this pull request may close these issues.

5 participants