Skip to content

[SPARK-56023][SS] Better load balance in LowLatencyMemoryStream#54848

Closed
eason-yuchen-liu wants to merge 5 commits intoapache:masterfrom
eason-yuchen-liu:lowLatencyMemoryStreamLoadBalance
Closed

[SPARK-56023][SS] Better load balance in LowLatencyMemoryStream#54848
eason-yuchen-liu wants to merge 5 commits intoapache:masterfrom
eason-yuchen-liu:lowLatencyMemoryStreamLoadBalance

Conversation

@eason-yuchen-liu
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Rewrite addData to use records.size % numPartitions for better load balance across partitions.

Why are the changes needed?

Previously, it will only load balance across partitions when a sequence of data is input altogether. This change enables load balance for one-row-at-a-time input patterns.

Does this PR introduce any user-facing change?

No. This is a test only source.

How was this patch tested?

CI.

Was this patch authored or co-authored using generative AI tooling?

No.

val partitionId = index % numPartitions
records(partitionId) += ((toRow(item).copy().asInstanceOf[UnsafeRow], timestamp))
data.iterator.foreach { item =>
val partitionId = records.size % numPartitions
Copy link
Copy Markdown
Contributor

@HeartSaVioR HeartSaVioR Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How this works? records.size will be always the same (= numPartitions) regardless of how the events are currently distributed, right? This change will simply put the data in a single partition, the first partition.

While we are here, I'd love to see the test at this point.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching it. My bad to make such silly mistake. I have fixed the issue, and added a new unit test. Thanks.

val partitionId = index % numPartitions
records(partitionId) += ((toRow(item).copy().asInstanceOf[UnsafeRow], timestamp))
data.iterator.foreach { item =>
val partitionId = records.map(_.size).sum % numPartitions
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: shall we just track the overall count separately?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

val timestamp = clock.getTimeMillis()
data.iterator.foreach { item =>
records(partitionId) += ((toRow(item).copy().asInstanceOf[UnsafeRow], timestamp))
numRecords += 1
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Beyond the PR so this comment doesn't block this PR to merge.

Do we have a pattern of mix-up between writing to specific partition & writing without specifying partition? We probably need to be smarter if we want to keep the balance for that pattern, but I agree this is sorta over engineering, and I don't know we ever have that pattern.

Copy link
Copy Markdown
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 pending CI

@HeartSaVioR
Copy link
Copy Markdown
Contributor

@eason-yuchen-liu Looks like there is build failure - could you please check the CI and fix it? Thanks in advance!

@HeartSaVioR
Copy link
Copy Markdown
Contributor

@HeartSaVioR
Copy link
Copy Markdown
Contributor

Thanks! Merging to master.

terana pushed a commit to terana/spark that referenced this pull request Mar 23, 2026
### What changes were proposed in this pull request?

Rewrite `addData` to use `records.size % numPartitions` for better load balance across partitions.

### Why are the changes needed?

Previously, it will only load balance across partitions when a sequence of data is input altogether. This change enables load balance for one-row-at-a-time input patterns.

### Does this PR introduce _any_ user-facing change?

No. This is a test only source.

### How was this patch tested?

CI.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#54848 from eason-yuchen-liu/lowLatencyMemoryStreamLoadBalance.

Authored-by: Yuchen Liu <170372783+eason-yuchen-liu@users.noreply.github.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants