feat(taskbroker): Dual-write new parameters_bytes#602
Conversation
…ameters (JSON) Introduces the new parameters_bytes field on TaskActivation so tasks can carry raw bytes via msgpack, and adds the worker-side reader that prefers parameters_bytes with a fallback to the legacy JSON parameters field. The producer dual-writes both fields by default so this rolls out in a single commit regardless of worker/broker upgrade order. The TASKBROKER_CLIENT_PARAMETERS_FORMAT env var (both|json|msgpack) narrows this once everything is on the new reader. First step toward STREAM-882 (taskbroker passthrough mode for arbitrary Kafka topics): https://linear.app/getsentry/issue/STREAM-882 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…bytes # Conflicts: # clients/python/tests/worker/test_worker.py
- upkeep.rs: move #[allow(deprecated)] from individual rstest-expanded tests (where it gets swallowed by the macro expansion) to the test module as a whole. - task.py: black formatting. - pyproject.toml: tell mypy to ignore missing msgpack stubs. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces the module-wide allow with three narrow block-scoped allows exactly where the tests touch the legacy parameters field, so we keep deprecation warnings active for everything else in the test module. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
|
||
| should_compress = ( | ||
| self.compression_type == CompressionType.ZSTD | ||
| or (len(msgpack_bytes) or len(json_bytes or b"")) |
There was a problem hiding this comment.
Do you want the sum of the bytes in both mode?
There was a problem hiding this comment.
yeah that makes sense.
| "taskname": self.name, | ||
| "topic": self._namespace.topic, | ||
| }, | ||
| len(parameters_bytes_val) or len(parameters_str), |
There was a problem hiding this comment.
Bug: The parameter size metric len(parameters_bytes_val) or len(parameters_str) incorrectly reports only the size of parameters_bytes_val in BOTH mode, ignoring parameters_str.
Severity: MEDIUM
Suggested Fix
The metric calculation should be changed to sum the lengths of both fields to accurately reflect the total size. Modify the line to be len(parameters_bytes_val) + len(parameters_str).
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.
Location: clients/python/src/taskbroker_client/task.py#L256
Potential issue: When `parameters_format` is `BOTH` (the default), both `msgpack` and
`json` parameter representations are sent. The metric intended to track the size of
these parameters is calculated with `len(parameters_bytes_val) or len(parameters_str)`.
Because `parameters_bytes_val` is non-empty in this mode, Python's `or` operator causes
the expression to always evaluate to `len(parameters_bytes_val)`, completely ignoring
the size of the `parameters_str` field. This results in the metric underreporting the
actual wire size by approximately 50%, which undermines the stated goal of monitoring
for message bloat.
There was a problem hiding this comment.
I guess that would be more correct, but I think changing this would cause too many artifacts in metrics, for no obvious gain.
There was a problem hiding this comment.
Yeah, metrics will spike up when the both mode is used. I think what you have is fine as we mostly use this metric to understand the shape production traffic.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit f549b2d. Configure here.
| self.compression_type == CompressionType.ZSTD | ||
| or (len(msgpack_bytes) + len(json_bytes or b"")) | ||
| > MAX_PARAMETER_BYTES_BEFORE_COMPRESSION | ||
| ) |
There was a problem hiding this comment.
Auto-compression threshold effectively halved in default mode
Medium Severity
In the default BOTH mode, should_compress checks len(msgpack_bytes) + len(json_bytes or b"") against MAX_PARAMETER_BYTES_BEFORE_COMPRESSION (3 MB). Since both serializations are roughly the same size, the sum is ~2× a single field, so compression now triggers at ~1.5 MB per field instead of the original 3 MB. This changes the auto-compression behavior for every task using the default ParametersFormat.BOTH, adding compression CPU overhead for payloads that previously didn't need it. The threshold likely needs to compare each field individually, or use max() instead of +.
Reviewed by Cursor Bugbot for commit f549b2d. Configure here.
There was a problem hiding this comment.
we're trying to bring down the overall size of the activation so I think the new version makes more sense. on that note, I think we should just unconditionally compress everything, I think zstd is probably fast enough.
There was a problem hiding this comment.
Yeah, compression being enabled earlier is desirable as we want to keep the overall size of activations low.
| self.compression_type == CompressionType.ZSTD | ||
| or (len(msgpack_bytes) + len(json_bytes or b"")) | ||
| > MAX_PARAMETER_BYTES_BEFORE_COMPRESSION | ||
| ) |
There was a problem hiding this comment.
Yeah, compression being enabled earlier is desirable as we want to keep the overall size of activations low.
| "taskname": self.name, | ||
| "topic": self._namespace.topic, | ||
| }, | ||
| len(parameters_bytes_val) or len(parameters_str), |
There was a problem hiding this comment.
Yeah, metrics will spike up when the both mode is used. I think what you have is fine as we mostly use this metric to understand the shape production traffic.


Introduces the new parameters_bytes field on TaskActivation so tasks can
carry raw bytes via msgpack, and adds the worker-side reader that prefers
parameters_bytes with a fallback to the legacy JSON parameters field.
The producer dual-writes both fields by default so this rolls out in a
single commit regardless of worker/broker upgrade order. This, I think,
is a good default for self-hosted, as that effectively makes workers and
clients forwards and backwards-compatible, and we can delete
parametersone month from now.
In prod, I think this will however bloat activations too much, particularly
if those are already at the brink of being too large for kafka.
So the actual rollout plan there would be:
TASKBROKER_CLIENT_PARAMETERS_FORMAT=jsonenvvar for all getsentry and launchpad podsTASKBROKER_CLIENT_PARAMETERS_FORMAT=msgpacka few days afterref https://linear.app/getsentry/issue/STREAM-882
Co-Authored-By: Claude Opus 4.7 (1M context) noreply@anthropic.com