Skip to content

feat(taskworker): Add Push Mode to Taskworker#576

Merged
george-sentry merged 12 commits intomainfrom
george/push-taskbroker/add-push-mode-to-taskworker
Mar 26, 2026
Merged

feat(taskworker): Add Push Mode to Taskworker#576
george-sentry merged 12 commits intomainfrom
george/push-taskbroker/add-push-mode-to-taskworker

Conversation

@george-sentry
Copy link
Copy Markdown
Member

Linear

Completes STREAM-822

Description

Currently, taskworkers pull tasks from taskbrokers via RPC. This approach works, but has some drawbacks. Therefore, we want taskbrokers to push tasks to taskworkers instead. Read this page on Notion for more information.

This PR allows users to run the taskworker in push mode via the --push-mode and --grpc-port CLI options.

Option Type Default Description
--push-mode bool False Whether to run in push or pull mode.
--grpc-port int 50052 The port to use for the taskworker gRPC server.

Details

Dependencies

  • Upgrade sentry-protos from 0.4.11 to 0.8.5 (to use the new worker service schema)

Additions

  • Define WorkerServicer class in worker.py
  • Add push_mode and grpc_port fields to TaskWorker and TaskbrokerClient classes
  • Add push_task method to TaskWorker class
  • Add gauge method to metrics backend abstract class
  • Add unit tests for push mode

Modifications

  • Start gRPC server if push mode, enter pull loop if pull mode
  • Add _get_stub method to TaskbrokerClient to connect to brokers on the fly by using the activation callback URL

@george-sentry george-sentry requested a review from a team as a code owner March 19, 2026 00:20
@linear-code
Copy link
Copy Markdown

linear-code bot commented Mar 19, 2026

Copy link
Copy Markdown
Member

@evanh evanh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still a couple bugs from the AI, but otherwise looks good!

Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Autofix Details

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: Unused _push_mode and _grpc_port in TaskbrokerClient
    • Removed the unused self._push_mode and self._grpc_port assignments from TaskbrokerClient.__init__, eliminating the dead stores without changing behavior.

Create PR

Or push these changes by commenting:

@cursor push fb5751843e
Preview (fb5751843e)
diff --git a/clients/python/src/taskbroker_client/worker/client.py b/clients/python/src/taskbroker_client/worker/client.py
--- a/clients/python/src/taskbroker_client/worker/client.py
+++ b/clients/python/src/taskbroker_client/worker/client.py
@@ -147,8 +147,6 @@
         self._hosts = hosts
         self._rpc_secret = rpc_secret
         self._metrics = metrics
-        self._push_mode = push_mode
-        self._grpc_port = grpc_port
 
         self._grpc_options: list[tuple[str, Any]] = [
             ("grpc.max_receive_message_length", MAX_ACTIVATION_SIZE)

This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.

Comment on lines +313 to +317
mock_metrics = mock.MagicMock()
taskworker._metrics = mock_metrics
mock_queue = mock.MagicMock()
mock_queue.full.return_value = False
taskworker._child_tasks = mock_queue
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have optional constructor parameters on Taskworker to make it easier to pass mocks in?

"""
Records a gauge metric (a point-in-time value).
"""
raise NotImplementedError
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll have to be careful about implementing this when sentry's client library version is updated. I don't think there are any tests in sentry that will fail because of this new abstract method.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you expand on this?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sentry (and launchpad) now have implementations of this MetricsBackend but the won't have this method defined. When the worker runtime calls metrics.gauge() it will raise as the applications won't have implemented this abstract method.

When we update applications to the new client library release, we have to remember to implement this method, or the push worker will be broken.

Copy link
Copy Markdown
Member

@evanh evanh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except for those bugs around auth I think this PR looks good!

Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

if getattr(_RPC_SIGNATURE_AUTH_TLS, "failed", False):
context.abort(grpc.StatusCode.UNAUTHENTICATED, "Authentication failed")

return original(request, context)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Auth bypass: context.abort() doesn't stop handler execution

High Severity

In gRPC Python's synchronous server, context.abort() does not raise an exception — it sends the error to the client but execution continues (this is a known, unresolved gRPC Python bug: grpc/grpc#30306, #37518). In RequestSignatureServerInterceptor, when HMAC verification fails, context.abort(UNAUTHENTICATED) is called but original(request, context) still executes on the next line, passing the (empty-deserialized) request to the real PushTask handler. This effectively bypasses authentication, allowing unauthenticated requests to enqueue tasks. The same pattern in WorkerServicer.PushTask is less severe but similarly relies on abort() halting execution. A return statement is needed after each context.abort() call.

Additional Locations (1)
Fix in Cursor Fix in Web

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty sure this is not a problem. But something to remember for the future.

@george-sentry george-sentry merged commit a035439 into main Mar 26, 2026
23 checks passed
@george-sentry george-sentry deleted the george/push-taskbroker/add-push-mode-to-taskworker branch March 26, 2026 00:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants