Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-1575] Early Conflict Detection For Multi-writer #6133

Conversation

zhangyue19921010
Copy link
Contributor

@zhangyue19921010 zhangyue19921010 commented Jul 18, 2022

Replaced #6059

Change Logs

Please take a look at #6003 for more details.

Impact

no impact

Risk level low

Documentation Update

Please take a look at #6003 for more details.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@yanghua
Copy link
Contributor

yanghua commented Jul 20, 2022

@zhangyue19921010 Would you please update the PR to fix the conflicts.

@zhangyue19921010
Copy link
Contributor Author

Hi @yanghua and @yihua Sorry for the late response.
Resolved conflict! PTAL :)

@zhangyue19921010
Copy link
Contributor Author

@hudi-bot run azure

@zhangyue19921010
Copy link
Contributor Author

@hudi-bot run azure

Copy link
Contributor

@yihua yihua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, the logic looks good and follows the design. We need to think about better code abstraction and reuse to avoid any discrepancy compared to the existing conflict detection and resolution strategy.

*/
private TypedProperties refreshLockConfig(HoodieWriteConfig writeConfig, String key) {
TypedProperties props = new TypedProperties(writeConfig.getProps());
props.setProperty(LockConfiguration.ZK_LOCK_KEY_PROP_KEY, key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here it should check if the ZK-based lock is configured. Otherwise, it should throw an exception.

Generally, we should think about how to support different lock provider implementations. For the first cut, it may be okay to have this specific logic here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing Changed. Also we could mark a TODO here to support more lock provider as next step

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sg. Let's use LOCK_PROVIDER_CLASS_NAME instead of ZK_BASE_PATH_PROP_KEY for checking whether ZK-based lock is configured.

@zhangyue19921010 could you file a JIRA ticket besides the TODO because this requires more work?

}

if (earlyConflictDetectionStrategy.hasMarkerConflict()) {
earlyConflictDetectionStrategy.resolveMarkerConflict(basePath, markerDir, markerName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No exception should be thrown here at the timeline server if there is detected conflict. The timeline server should simply return false for the marker creation request and let the executor/write handle resolve the marker conflict (throw the exception).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same reason, this check is batch and async. For specific request get false result. It means maker checker find a conflict but maybe it is not current request related marker conflict.

So is it possible to let current executor to handle others' conflict based on timeline sever in async and batch mode. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timeline server should simply return false for the marker creation request

Totally agree with it.
For now timeline server will return false for executor request and and executor will

    if (success) {
      return Option.of(new Path(FSUtils.getPartitionPath(markerDirPath, partitionPath), markerFileName));
    } else {
      // this failed may due to early conflict detection, so we need to throw out.
      throw new HoodieEarlyConflictDetectionException(new ConcurrentModificationException("Early conflict detected but cannot resolve conflicts for overlapping writes"));
    }

* @param instants
* @return
*/
private List<String> getCandidateInstants(List<Path> instants, String currentInstantTime) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we adapt the common logic from ConflictResolutionStrategy instead of reinventing similar logic?

Copy link
Contributor Author

@zhangyue19921010 zhangyue19921010 Nov 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap, actually there are some diff here using the same name :)
for occ getCandidateInstants which depends on a state:


    // To find which instants are conflicting, we apply the following logic
    // 1. Get completed instants timeline only for commits that have happened since the last successful write.
    // 2. Get any scheduled or completed compaction or clustering operations that have started and/or finished
    // after the current instant. We need to check for write conflicts since they may have mutated the same files
    // that are being newly created by the current write.

For current early conflict detection getCandidateInstants:

  /**
   * Get Candidate Instant to do conflict checking:
   * 1. Skip current writer related instant(currentInstantTime)
   * 2. Skip all instants after currentInstantTime
   * 3. Skip dead writers related instants based on heart-beat
   * @param instants
   * @return
   */

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also Thanks for your reviewing here! Really appreciate!

*/
private TypedProperties refreshLockConfig(HoodieWriteConfig writeConfig, String key) {
TypedProperties props = new TypedProperties(writeConfig.getProps());
props.setProperty(LockConfiguration.ZK_LOCK_KEY_PROP_KEY, key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sg. Let's use LOCK_PROVIDER_CLASS_NAME instead of ZK_BASE_PATH_PROP_KEY for checking whether ZK-based lock is configured.

@zhangyue19921010 could you file a JIRA ticket besides the TODO because this requires more work?

long maxAllowableHeartbeatIntervalInMs = config.getHoodieClientHeartbeatIntervalInMs() * config.getHoodieClientHeartbeatTolerableMisses();

HoodieDirectMarkerBasedEarlyConflictDetectionStrategy strategy =
(HoodieDirectMarkerBasedEarlyConflictDetectionStrategy) ReflectionUtils.loadClass(config.getEarlyConflictDetectionStrategyClassName(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can think about loading the strategy class through reflection in a common place for reuse, instead of loading for every marker creation.

@zhangyue19921010
Copy link
Contributor Author

@hudi-bot run azure

Copy link
Contributor

@yihua yihua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. The marker APIs can be further improved which can be addressed in a separate PR. @zhangyue19921010 Good job on getting the implementation done!

@yihua
Copy link
Contributor

yihua commented Jan 20, 2023

I'm doing more thorough tests in CI. Please do not merge this PR now.

@yihua
Copy link
Contributor

yihua commented Jan 23, 2023

@hudi-bot run azure

@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@yihua
Copy link
Contributor

yihua commented Jan 23, 2023

The Azure CI run with the feature flag turned on by default (#7703) has succeeded. The CI failure of this PR is due to flaky tests. Merging this PR.

Screen Shot 2023-01-23 at 09 37 28

@yihua yihua merged commit c18d615 into apache:master Jan 23, 2023
fengjian428 pushed a commit to fengjian428/hudi that referenced this pull request Jan 31, 2023
Before this PR, Hudi implements OCC (Optimistic Concurrency Control) to detect the write conflict at the pre-commit time to ensure data consistency, integrity, and correctness between multiple writers. OCC detects the conflict at Hudi's file group level, i.e., two concurrent writers updating the same file group are detected as a conflict.  Currently, conflict detection is performed before committing metadata and after the data writing is completed. If any conflict is detected, it leads to a waste of cluster resources because computing and writing are finished already.

To solve this problem, this change implements an early conflict detection mechanism to detect the conflict during the data writing phase and abort the writing early if a conflict is detected, using Hudi's marker mechanism. Before writing each data file, the writer creates a corresponding marker to mark that the file is created, so that later on, the writer can use the markers to automatically clean up uncommitted data files in the failure and rollback scenarios. We leverage the markers to identify the conflict at the file group level during writing data. There are subtle differences in the early conflict detection workflow among different types of markers.  For direct markers, the writer lists necessary marker files directly and checks the conflict before creating markers and starting to write the corresponding data file. For the timeline-server-based markers, the writer gets the result of marker conflict detection by contacting the timeline server. The conflict detection is asynchronously and periodically executed at the timeline server so that the write conflicts can be detected as early as possible.  Both writers may still write the data files of the same file slice until the conflict is detected in the next round of detection.

Note that, the implemented early conflict detection operates within OCC. Any conflict detection outside the scope of OCC is not handled.  For example, the current OCC for multiple writers cannot detect the conflict if two concurrent writers perform INSERT operations for the same set of record keys, because the writers write to different file groups. This set of changes does not intend to address this problem.

Co-authored-by: yuezhang <yuezhang@freewheel.tv>
Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com>
fengjian428 pushed a commit to fengjian428/hudi that referenced this pull request Apr 5, 2023
Before this PR, Hudi implements OCC (Optimistic Concurrency Control) to detect the write conflict at the pre-commit time to ensure data consistency, integrity, and correctness between multiple writers. OCC detects the conflict at Hudi's file group level, i.e., two concurrent writers updating the same file group are detected as a conflict.  Currently, conflict detection is performed before committing metadata and after the data writing is completed. If any conflict is detected, it leads to a waste of cluster resources because computing and writing are finished already.

To solve this problem, this change implements an early conflict detection mechanism to detect the conflict during the data writing phase and abort the writing early if a conflict is detected, using Hudi's marker mechanism. Before writing each data file, the writer creates a corresponding marker to mark that the file is created, so that later on, the writer can use the markers to automatically clean up uncommitted data files in the failure and rollback scenarios. We leverage the markers to identify the conflict at the file group level during writing data. There are subtle differences in the early conflict detection workflow among different types of markers.  For direct markers, the writer lists necessary marker files directly and checks the conflict before creating markers and starting to write the corresponding data file. For the timeline-server-based markers, the writer gets the result of marker conflict detection by contacting the timeline server. The conflict detection is asynchronously and periodically executed at the timeline server so that the write conflicts can be detected as early as possible.  Both writers may still write the data files of the same file slice until the conflict is detected in the next round of detection.

Note that, the implemented early conflict detection operates within OCC. Any conflict detection outside the scope of OCC is not handled.  For example, the current OCC for multiple writers cannot detect the conflict if two concurrent writers perform INSERT operations for the same set of record keys, because the writers write to different file groups. This set of changes does not intend to address this problem.

Co-authored-by: yuezhang <yuezhang@freewheel.tv>
Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
big-needle-movers multi-writer priority:blocker writer-core Issues relating to core transactions/write actions
Projects
Status: 🚧 Needs Repro
Status: ✅ Done
Development

Successfully merging this pull request may close these issues.

5 participants