Skip to content

feat(taskbroker): Batch Claimed to Processing Updates#654

Open
george-sentry wants to merge 4 commits into
mainfrom
george/push-taskbroker/batched-claimed-to-processing-updates
Open

feat(taskbroker): Batch Claimed to Processing Updates#654
george-sentry wants to merge 4 commits into
mainfrom
george/push-taskbroker/batched-claimed-to-processing-updates

Conversation

@george-sentry
Copy link
Copy Markdown
Member

Linear

Refs STREAM-920

Description

After a "claimed" activation is pushed successfully, it needs to be updated to "processing." Right now this happens one at a time, which is a performance bottleneck. This PR adds optional batching for these updates, which adds several thousand additional tasks per second on my typical testing workload (100 millisecond dummy tasks).

Concerning data loss, what if taskbroker crashes while these updates are queued? Those updates are lost, meaning even though those tasks were claimed and sent successfully, they weren't updated to processing. Three things can happen here...

  • If the worker completes a task fast enough, it will be updated to "completed" directly from "claimed" and later removed by upkeep ✅
  • If the worker doesn't complete a task fast enough, it may be reset back to "pending" before the worker completes it. In that case, it will be updated from "pending" directly to "completed" and later removed by upkeep ✅
  • If the worker doesn't complete a task fast enough and the taskbroker has enough time to reclaim and / or resend it again, the task will be completed twice ✅ ✅

In all cases, tasks aren't dropped but may be repeated.

@george-sentry george-sentry requested a review from a team as a code owner May 29, 2026 06:27
@linear-code
Copy link
Copy Markdown

linear-code Bot commented May 29, 2026

STREAM-920

Comment thread src/push/updater.rs
Comment thread src/push/updater.rs
Comment thread src/push/thread.rs
Comment thread src/push/updater.rs Outdated
Comment thread src/push/updater.rs
Comment thread src/push/tests.rs Outdated
Comment thread src/push/updater.rs Outdated
Comment thread src/push/thread.rs Outdated
Comment thread src/push/updater.rs Outdated

// Make sure we aren't flushing too soon
let now = Utc::now().timestamp_millis();
let elapsed = self.last_flush.timestamp_millis() - now;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be now - last_update?

Comment thread src/push/mod.rs Outdated
Comment on lines +62 to +65
let updaterd = tokio::spawn({
let updater = updater.clone();
async move { updater.start().await }
});
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do the push threads append to this updater?

Base automatically changed from george/push-taskbroker/add-push-thread-abstraction to main May 29, 2026 17:31
@george-sentry george-sentry force-pushed the george/push-taskbroker/batched-claimed-to-processing-updates branch from c78e0a7 to 814e9cd Compare May 29, 2026 17:50
Comment thread src/push/updater.rs Outdated
Comment thread src/push/updater.rs
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit 6d6f61d. Configure here.

Comment thread src/push/mod.rs Outdated
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants