Skip to content

[HUDI-6366] Prevent flink offline table service rerun completed instant#8950

Merged
yihua merged 3 commits intoapache:masterfrom
hbgstc123:fix_rerun_complete_service_instant
Jun 15, 2023
Merged

[HUDI-6366] Prevent flink offline table service rerun completed instant#8950
yihua merged 3 commits intoapache:masterfrom
hbgstc123:fix_rerun_complete_service_instant

Conversation

@hbgstc123
Copy link
Contributor

If flink offline table service fail after commit the compaction/clustering instant, and a restart strategy is enable, then the completed instant will be rerun.

Consequence is, if the completed instant happen to be archived, there will be a file not found error; if not archived, then there will be duplicated base files.

Change Logs

check if the compaction/clustering instant is pending in active timeline before running flink offline table service

Impact

none

Risk level (write none, low medium or high below)

none

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@hbgstc123 hbgstc123 force-pushed the fix_rerun_complete_service_instant branch from 3c3905b to 379ab53 Compare June 13, 2023 02:40
@danny0405
Copy link
Contributor

if not archived, then there will be duplicated base files.

How are these duplicates generated?

@hbgstc123
Copy link
Contributor Author

hbgstc123 commented Jun 13, 2023

if not archived, then there will be duplicated base files.

How are these duplicates generated?

For compaction, if the first run compact fileID1_timestamp1.log and fileID1_0-1-0_timestamp1.parquet, genrate fileID1_0-1-0_timestamp2.parquet, the job fail after compaction committed, then job restart and rerun this compaction instant, this second run will again compact fileID1_timestamp1.log and fileID1_0-1-0_timestamp1.parquet, but genrate fileID1_0-1-1_timestamp2.parquet, then fail to complete because its already completed in the first run. These 2 files fileID1_0-1-0_timestamp2.parquet and fileID1_0-1-1_timestamp2.parquet are duplicated.

Similar for clustering but worse, because the second run will generate a parquet file with a new file ID, when you read from the table again the result will be wrong.

@danny0405
Copy link
Contributor

the job fail after compaction committed, then job restart and rerun this compaction instant, this second run will again compact fileID1_timestamp1.log and fileID1_0-1-0_timestamp1.parquet,

We should skip the completed instant, is that the behavior of current code then?

     // fetch the instant based on the configured execution sequence
      HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();

@danny0405 danny0405 self-assigned this Jun 14, 2023
@danny0405 danny0405 added engine:flink Flink integration area:table-service Table services usability priority:medium Moderate impact; usability gaps labels Jun 14, 2023
@hbgstc123
Copy link
Contributor Author

the job fail after compaction committed, then job restart and rerun this compaction instant, this second run will again compact fileID1_timestamp1.log and fileID1_0-1-0_timestamp1.parquet,

We should skip the completed instant, is that the behavior of current code then?

     // fetch the instant based on the configured execution sequence
      HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();

yes, but that is the client code, task fail and restart won't rerun that code.

Copy link
Contributor

@danny0405 danny0405 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, nice catch ~

public void open(Configuration parameters) throws Exception {
// no operation
isPending = StreamerUtil.createMetaClient(conf).getActiveTimeline()
.getInstantsAsStream().anyMatch(i -> clusteringInstantTime.equals(i.getTimestamp()) && !i.isCompleted());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a method named containsInstant, can we use that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can also move the code into run so there is no need to keep a class member isPending.

@yihua
Copy link
Contributor

yihua commented Jun 14, 2023

Rebased this PR to fix build error.

@yihua yihua force-pushed the fix_rerun_complete_service_instant branch from d9be799 to 79add0e Compare June 14, 2023 23:38
@hbgstc123 hbgstc123 force-pushed the fix_rerun_complete_service_instant branch from 79add0e to 102ae95 Compare June 15, 2023 06:43
@hbgstc123 hbgstc123 force-pushed the fix_rerun_complete_service_instant branch from 102ae95 to 87974a6 Compare June 15, 2023 07:47
@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@yihua yihua merged commit cc33f53 into apache:master Jun 15, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:table-service Table services engine:flink Flink integration priority:medium Moderate impact; usability gaps

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants