Skip to content

[HUDI-4521] Ensure flink coordinator do re-commit during restart for changing write.tasks number (i.e., write parallelism)#6273

Open
YuweiXiao wants to merge 2 commits intoapache:masterfrom
YuweiXiao:HUDI-4521
Open

[HUDI-4521] Ensure flink coordinator do re-commit during restart for changing write.tasks number (i.e., write parallelism)#6273
YuweiXiao wants to merge 2 commits intoapache:masterfrom
YuweiXiao:HUDI-4521

Conversation

@YuweiXiao
Copy link
Contributor

Tips

What is the purpose of the pull request

Patch to ensure StreamWriteOperatorCoordinator do re-commit during restart when the write.tasks parameter changes.

For details, please check out HUDI-4521.

Brief change log

  • Persist write.tasks parameter (i.e., parallelism) in writers' ckp states.
  • Include parallelism and total event number in the bootstrap metadata event.
  • Use parallelism & total event number to determine whether or not to do the re-commit, rather than relying on the slot number (i.e., the current write.tasks setup).

Verify this pull request

Unit tests is added to verify the changes.

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

ValidationUtils.checkArgument(Arrays.stream(eventBuffer).allMatch(evt -> evt != null && evt.isBootstrap()));
List<WriteMetadataEvent> events = Arrays.stream(eventBuffer).filter(evt -> !evt.getInstantTime().equals("")).collect(Collectors.toList());
String instant = events.stream().map(WriteMetadataEvent::getInstantTime).reduce((a, b) -> a.equals(b) ? a : "").orElse("");
// instant and parallelism should be unique
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix may be right but there are some thoughts about code engineering:

  1. we should not put static config options in WriteMetadataEvent which is a dynamic metadata event from write task.

  2. do we really need to care about the parallelism of last run ? Can we just merge the bootstrap events first before sending it to the coordinator ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to persist the parallelism somewhere because it is used to validate if all flushes in the last run complete. And it may change across runs. So I choose the most straightforward way to store it (i.e., in metadata state). Another option is to let user to pass in parallelism of the last run through configs, which is not user-friendly as users need to understand the logic behind it.

Merging bootstrap events before sending it a good point! Will improve it and seems we could remove the transient variable mergeCount based on it.

Copy link
Contributor

@danny0405 danny0405 Aug 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to persist the parallelism somewhere because it is used to validate if all flushes in the last run complete. And it may change across runs

Did you notice that we send an empty bootstrap event even if there is nothing in the metadata state ? There are 2 cases that the metadata state is empty:

  1. the first run of the app
  2. the parallelism is tweaked as larger: say from 4 to 8

And in this case ,we still send bootstrap event to coordinator so that it can be used for validation.

Copy link
Contributor Author

@YuweiXiao YuweiXiao Aug 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and those empty event will be excluded as the instant time is empty.

Lets say we increase write.task from 4 -> 8. During restart, we will receive 8 metadata event (with 4 empty bootstrap event and potentially more if some tasks failed to flush in the last run). In order to do know if the remaining non-empty metadata events are the full collection of events from last run, we have to get the parallelism of the last run (i.e., 4) rather than checking using the new parallelism (i.e., 8)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The operator state backend would spread the events evenly within the write tasks, and i want to say that the empty bootstrap event is also valid for coordinator validation.

But here is a problem, when we decrease the parallelism, one write task may have several bootstrap events, we should merge these events first before sending them to coordinator, or the coordinator may commit eagerly before it receives all the bootstrap events.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, so I add eventNumOfTask/mergeCount variables and change the eager commit behavior. Following your 'merge before send' suggestion, the mergeCount is no longer needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, merge events before sending to coordinator makes the logic much simpler.

@danny0405 danny0405 added the priority:medium Moderate impact; usability gaps label Aug 3, 2022
@danny0405 danny0405 self-assigned this Aug 3, 2022
@YuweiXiao
Copy link
Contributor Author

@hudi-bot run azure

@hudi-bot
Copy link
Collaborator

hudi-bot commented Aug 3, 2022

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

return Option.empty();
}

int totalNumOfMetadataStates = events.stream().mapToInt(WriteMetadataEvent::getNumOfMetadataState).sum();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can handle the bootstrap event just like before because we already merge them before sending.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before means? Here we have to validate number of events against parallelism of the last run.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have to validate number of events against parallelism of the last run.

There is no need to validate it, the checkpoint mechanism would ensure the integrity of the metadata events.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey Danny, it is true that the validation is not necessary if we use the same write.task across runs. But it may not be the case when users change the write parallelism across runs.

For example, when the user reduces parallelism from 8 -> 4, the coordinator will receive 4 merged events with the merging optimization you proposed. To determine if we can do re-commit, we need to validate the events against 8 rather than only checking if all 4 event buffer are not empty. This is also way I persist parallelism to the writer state now since it may be used in the next run.

Not sure if there is better solution to this and the fix may not be helpful in most cases. But for usecases where the source purely relies on Flink ckp to determine the consumption offset, the mis-handling of re-commit during restart may cause lost of one batch data in the hudi side (or duplicate data).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And in the case where we increase the parallelism (e.g., 4 -> 8), empty bootstrap events will be sent by writers. Following the old logic, the re-commit may be skipped because the last message received by the coordinator may be an empty events rather than events with instant time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to validate the events against 8 rather than only checking if all 4 event buffer are not empty

We do not need to because we know that each write task sends only exactly one event.

Copy link
Contributor Author

@YuweiXiao YuweiXiao Aug 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello, is there any progress for this pr ?

Hey, sorry for the late reply. Could you help me understand why the validation is not necessary? Among those events sent by writers, they could be empty or carrying metadata, which should be normal in changing parallelism case.

@danny0405
Copy link
Contributor

Hello, is there any progress for this pr ?

@danny0405 danny0405 added engine:flink Flink integration writer-core labels Aug 15, 2022
@github-actions github-actions bot added the size:M PR with lines of changes in (100, 300] label Feb 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

engine:flink Flink integration priority:medium Moderate impact; usability gaps size:M PR with lines of changes in (100, 300]

Projects

Status: 🏁 Triaged
Status: 🔖 Ready for review

Development

Successfully merging this pull request may close these issues.

4 participants