Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-6528] Fix premature RDD unpersist during index lookup #9188

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

xushiyan
Copy link
Member

@xushiyan xushiyan commented Jul 13, 2023

Change Logs

Currently when bloom/simple index tag locations for input records, incoming RDDs are supposed to be cached (by default), but rdd.unpersist() was invoked prematurely to make caching ineffective. This PR fixes the behavior by marking caching RDD for uncaching at SparkRDDWriteClient#releaseResources stage.

Impact

Indexing performance

Risk level

Medium

  • e2e testing & verification

Documentation Update

NA

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@xushiyan xushiyan closed this Jul 13, 2023
@xushiyan xushiyan reopened this Jul 13, 2023
@apache apache deleted a comment from hudi-bot Jul 14, 2023
@@ -80,7 +81,7 @@ public O updateLocation(O writeStatuses, HoodieEngineContext context,
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public abstract <R> HoodieData<HoodieRecord<R>> tagLocation(
HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
HoodieTable hoodieTable) throws HoodieIndexException;
HoodieTable hoodieTable, Option<String> instantTime) throws HoodieIndexException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a public api. we might have to deprecate and add a new one if we wish to change the signature

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the api is marked as "Evolving" so changes are expected in major release

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get it. we do the same for key generation and other public interfaces. lets add a new method w/o breaking any existing users.

records.persist(new HoodieConfig(config.getProps())
.getString(HoodieIndexConfig.BLOOM_INDEX_INPUT_STORAGE_LEVEL_VALUE));
if (config.getBloomIndexUseCaching() && instantTime.isPresent()) {
String storageLevel = config.getString(HoodieIndexConfig.BLOOM_INDEX_INPUT_STORAGE_LEVEL_VALUE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move this to constructor and use it everywhere instead of parsing multiple times?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good

@@ -103,11 +102,6 @@ record -> new ImmutablePair<>(record.getPartitionPath(), record.getRecordKey()))
// Step 3: Tag the incoming records, as inserts or updates, by joining with existing record keys
HoodieData<HoodieRecord<R>> taggedRecords = tagLocationBacktoRecords(keyFilenamePairs, records, hoodieTable);

if (config.getBloomIndexUseCaching()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this was intentional. After this, taggedRecords is what is getting used. and we do cache that in BaseSparkCommitActionExecutor.execute

 @Override
  public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(HoodieData<HoodieRecord<T>> inputRecords) {
    // Cache the tagged records, so we don't end up computing both
    JavaRDD<HoodieRecord<T>> inputRDD = HoodieJavaRDD.getJavaRDD(inputRecords);
    if (inputRDD.getStorageLevel() == StorageLevel.NONE()) {
      HoodieJavaRDD.of(inputRDD).persist(config.getTaggedRecordStorageLevel(),
          context, HoodieDataCacheKey.of(config.getBasePath(), instantTime));
    } else {
      LOG.info("RDD PreppedRecords was persisted at: " + inputRDD.getStorageLevel());
    }
.
.

So, not sure if we want to keep the persistance until the very end for these rdds which may not be used only.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the main purpose of this PR is about fixing premature un-persisting like this example here.

@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets do a manual verification that both Rdds are persisted and is not unpersisted until the release resources is invoked.
LGTM otherwise

@@ -80,7 +81,7 @@ public O updateLocation(O writeStatuses, HoodieEngineContext context,
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public abstract <R> HoodieData<HoodieRecord<R>> tagLocation(
HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
HoodieTable hoodieTable) throws HoodieIndexException;
HoodieTable hoodieTable, Option<String> instantTime) throws HoodieIndexException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get it. we do the same for key generation and other public interfaces. lets add a new method w/o breaking any existing users.

@codope codope added priority:critical production down; pipelines stalled; Need help asap. and removed priority:blocker labels Aug 4, 2023
@github-actions github-actions bot added the size:M PR with lines of changes in (100, 300] label Feb 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority:critical production down; pipelines stalled; Need help asap. size:M PR with lines of changes in (100, 300]
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants