Skip to content

[CELEBORN-2331] Parallelize batch open stream client creation#3692

Closed
sunchao wants to merge 4 commits into
apache:mainfrom
sunchao:dev/chao/codex/port-pr72-to-oss-main
Closed

[CELEBORN-2331] Parallelize batch open stream client creation#3692
sunchao wants to merge 4 commits into
apache:mainfrom
sunchao:dev/chao/codex/port-pr72-to-oss-main

Conversation

@sunchao

@sunchao sunchao commented May 17, 2026

Copy link
Copy Markdown
Member

Why are the changes needed?

CelebornShuffleReader batches stream-open requests by worker, but it previously created the data client for each worker serially before sending those already-parallel batch requests. When a reducer reads from multiple workers, connection setup for a slow or unavailable worker can delay useful work against the remaining healthy workers.

Parallelizing this setup removes the worker-by-worker wait from the normal path. Because this changes task-side connection scheduling, the optimization also needs an operational fallback that restores the prior behavior without requiring a code rollback.

What changes were proposed in this PR?

The reader now first gathers pending stream-open locations by worker address, then creates one data client per distinct worker concurrently using the existing stream-creator pool. Once client setup completes, it sends the existing BATCH_OPEN_STREAM requests only for workers with an available client, allowing healthy workers to proceed even if another worker fails during setup.

The client-creation phase preserves the prior retry behavior for later locations on the same worker when an earlier client attempt fails. It also handles task cancellation explicitly: if the waiting Spark task is interrupted, it restores the interrupt status and cancels unfinished setup work; worker-side interruption is propagated rather than treated as an ordinary retryable failure.

This optimization is controlled by celeborn.client.spark.batch.openStream.parallelClientCreation.enabled, which defaults to true. Setting it to false selects the original serial client-creation and request-building flow, giving deployments a targeted rollback switch if parallel connection setup causes unexpected operational behavior.

How was this PR tested?

  • Unit tests for parallel client setup, failure/retry handling, cancellation on interruption, and the new configuration default and override.
  • Configuration documentation generation validation for the new client setting.
  • Spotless formatting validation.

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

@RexXiong

Copy link
Copy Markdown
Contributor

Overall: Clean refactoring that parallelizes data-client creation across workers. The separation into group → parallel create → build request is well-structured, and the test coverage is thorough (parallel execution, failure isolation, retry, interruption).

1. Redundant retries with identical connection parameters

createClientsInParallel iterates through all locations for a given hostPort until createClient succeeds. However, all locations at the same hostPort share identical host and fetchPort, so each retry is an identical connection attempt:

while (!clientCreated && locationsIterator.hasNext) {
  val location = locationsIterator.next()
  try {
    clientsByHostPort.put(hostPort, createClient(location))  // same host:port every time
    clientCreated = true
  } catch { ... }
}

For a reducer reading 1000 partitions from a failing worker, this means up to 1000 identical connection attempts (each paying the full connection timeout). Since futures.foreach(_.get()) blocks until ALL futures complete, one slow-failing worker would dominate the entire parallel creation phase — partially defeating the purpose of parallelization.

Consider either:

  • Taking only the first location per hostPort for client creation (the original behavior was also 1 attempt)
  • Adding a configurable max retry count (e.g., 2-3 attempts per hostPort)

2. +1 to SteNicholas's suggestion for a config switch

A celeborn.client.spark.batch.openStream.parallelClientCreation.enabled (or similar) flag would be prudent for a new parallelization path. This allows users to fall back to serial behavior if unexpected issues arise in production. Could default to true.

3. Minor: onClientCreateFailure invoked per failed location

In the original code, excludeFailedFetchLocation was called at most once per failing worker. Now it can be called N times (once per location at that hostPort). While excludeFailedFetchLocation is idempotent (ConcurrentHashMap), the associated logWarning will emit N log lines for the same worker, which could be noisy for large reducers. This ties back to point 1 — bounding retries would also bound log spam.

Reviewed with Claude Code

@sunchao

sunchao commented May 25, 2026

Copy link
Copy Markdown
Member Author

@RexXiong Thanks for reviewing. The requested rollback switch is added in d44fa47 as celeborn.client.spark.batch.openStream.parallelClientCreation.enabled, defaulting to true; setting it to false restores the pre-change serial client-creation and request-building path. For the retry and warning-count point, I kept the existing semantics intentionally: before this PR, if createClient failed for a location, workerRequestMap remained empty and a later location on the same hostPort attempted createClient again, with failure handling per failed attempt. Reducing this to one attempt would be a separate behavioral change rather than preserving the existing reader behavior.

@SteNicholas

Copy link
Copy Markdown
Member

Thanks. Merged to main(v0.7.0).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants