Skip to content

Span Buffer Multiprocess Enhancement with Health Monitoring#78

Open
camcalaquian wants to merge 1 commit into
tenki/base-6from
tenki/head-6
Open

Span Buffer Multiprocess Enhancement with Health Monitoring#78
camcalaquian wants to merge 1 commit into
tenki/base-6from
tenki/head-6

Conversation

@camcalaquian
Copy link
Copy Markdown

@camcalaquian camcalaquian commented Apr 28, 2026

@camcalaquian camcalaquian marked this pull request as draft April 30, 2026 16:11
@camcalaquian camcalaquian marked this pull request as ready for review April 30, 2026 16:11
Copy link
Copy Markdown

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 2 potential issues.

View 5 additional findings in Devin Review.

Open in Devin Review

Comment on lines +253 to +256
try:
if isinstance(process, multiprocessing.Process):
process.kill()
except (ValueError, AttributeError):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 isinstance check against multiprocessing.Process fails for SpawnProcess, preventing kill() of hung/crashed flusher processes

The isinstance(process, multiprocessing.Process) check on line 254 is always False for processes created via multiprocessing.get_context("spawn").Process, because SpawnProcess and multiprocessing.Process are sibling classes (both inherit from BaseProcess), not parent-child. I verified this in the repo's Python runtime: isinstance(ctx.Process(target=lambda: None), multiprocessing.Process) is False.

This is a regression from the old code at flusher.py:186-188 which simply called self.process.kill() without the isinstance guard. Now, when a flusher process crashes or hangs, the code correctly detects the unhealthy state but fails to kill the old process before spawning a new one via _create_process_for_shards. This leads to two processes simultaneously flushing the same shards, which can cause duplicate segment production to Kafka. After MAX_PROCESS_RESTARTS (10) failed restarts, the consumer crashes entirely.

Suggested change
try:
if isinstance(process, multiprocessing.Process):
process.kill()
except (ValueError, AttributeError):
try:
if not isinstance(process, threading.Thread):
process.kill()
except (ValueError, AttributeError):
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Comment on lines +346 to +347
if isinstance(process, multiprocessing.Process):
process.terminate()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 isinstance check against multiprocessing.Process fails for SpawnProcess in join(), preventing terminate()

Same root cause as the kill() issue: isinstance(process, multiprocessing.Process) is False for SpawnProcess instances. In join(), after waiting for the process to stop, terminate() is never actually called. This was a pre-existing bug (the old code had the same check), but since this PR rewrote the join loop to iterate over multiple processes, it's worth fixing here. The impact is that if a flusher process doesn't exit cleanly via the stopped flag, it won't be terminated and will linger until the parent process exits (since it's a daemon process).

Suggested change
if isinstance(process, multiprocessing.Process):
process.terminate()
if not isinstance(process, threading.Thread):
process.terminate()
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

@camcalaquian camcalaquian marked this pull request as draft May 5, 2026 07:59
@camcalaquian camcalaquian marked this pull request as ready for review May 5, 2026 07:59
@camcalaquian camcalaquian marked this pull request as draft May 5, 2026 08:00
@camcalaquian camcalaquian marked this pull request as ready for review May 5, 2026 08:00
@camcalaquian camcalaquian marked this pull request as draft May 5, 2026 08:04
@camcalaquian camcalaquian marked this pull request as ready for review May 5, 2026 08:04
@camcalaquian
Copy link
Copy Markdown
Author

@tenki-reviewer

@tenki-reviewer
Copy link
Copy Markdown

tenki-reviewer Bot commented May 5, 2026

Insufficient balance to process this code review. Please add funds or upgrade your plan in billing.

@camcalaquian
Copy link
Copy Markdown
Author

@tenki-reviewer

@tenki-reviewer
Copy link
Copy Markdown

tenki-reviewer Bot commented May 5, 2026

Tenki Code Review - Complete

Files Reviewed: 6
Findings: 6

By Severity:

  • 🔴 High: 2
  • 🟠 Medium: 3
  • 🟡 Low: 1

This PR refactors the span flusher from a single-process design to a multi-process architecture with per-shard load distribution. However, 6 significant bugs were identified: a silent behavioral regression in default process scaling (default=1 instead of scaling with shards), unterminated child processes on shutdown timeout, redundant Redis INFO commands, dead code, and a misleading help text. The high-severity issues require fixes before merge.

Files Reviewed (6 files)
CLAUDE.md
src/sentry/consumers/__init__.py
src/sentry/spans/consumers/process/factory.py
src/sentry/spans/consumers/process/flusher.py
tests/sentry/spans/consumers/process/test_consumer.py
tests/sentry/spans/consumers/process/test_flusher.py

Copy link
Copy Markdown

@tenki-reviewer tenki-reviewer Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overview

This PR introduces configurable multi-process flushing for Sentry span processing, adding a new --flusher-processes CLI option and refactoring SpanFlusher to manage multiple child processes with shard-based load balancing.

Issues Found

🔴 High-Severity Issues (2)

  1. Default flusher_processes=1 silently downgrades scaling behavior (src/sentry/consumers/init.py)

    • Impact: Before this PR, the flusher scaled to one process per assigned shard. The new CLI option defaults to 1, causing single-process operation even when multiple shards exist.
    • Fix: Change default=1 to default=None to preserve the original "scale with shards" behavior.
  2. Processes skipped by timeout break are never terminated (src/sentry/spans/consumers/process/flusher.py)

    • Impact: If the join() timeout expires early, remaining processes are skipped by the break statement and never receive process.terminate(), potentially leaving daemon processes running after shutdown.
    • Fix: Separate the wait-and-break pass from an unconditional terminate-all pass.

🟡 Medium-Severity Issues (3)

  1. Redundant Redis INFO calls per sub-buffer (src/sentry/spans/consumers/process/flusher.py, line 299)

    • Impact: Each sub-buffer calls get_memory_info() which issues a Redis cluster INFO command. With N flusher processes, this becomes N×(cluster_node_count) redundant calls per submit.
    • Fix: Call get_memory_info() on a single buffer (all share the same Redis cluster).
  2. Dead code: _create_process_for_shard never called (src/sentry/spans/consumers/process/flusher.py, line 127)

    • Impact: Unused method with a confusingly similar name to the active _create_process_for_shards can mislead maintainers.
    • Fix: Remove the unused method.
  3. Misleading help text for --flusher-processes (src/sentry/consumers/init.py, line 436)

    • Impact: Help text doesn't mention the shard-count cap (min(flusher_processes, assigned_shards)), and redundantly restates the default.
    • Fix: Update to clarify actual process count and shard-count cap.

🔵 Low-Severity Issues (1)

  1. time.sleep(0.1) in test_basic is a no-op (tests/sentry/spans/consumers/process/test_consumer.py, line 62)
    • Impact: The sleep is monkeypatched away but the comment claims it gives threads time to process. Test relies on step.join(), not the (ineffective) sleep.
    • Fix: Remove the sleep or use a real sleep captured before the monkeypatch.

Code Quality Summary

  • Parameter threading: The new flusher_processes parameter flows correctly through factory → SpanFlusher → child process args, with proper type annotations.
  • Multiprocessing safety: Uses multiprocessing.Value("i", ...) for IPC, appropriate for single-writer pattern.
  • Resource management: Each process gets a fresh SpansBuffer(shards) instance, properly isolating shard assignments.
  • Test coverage: New test for process limit is construction-only; actual flushing behavior not validated.

Recommendation

Request Changes: Fix the two high-severity issues before merge. The medium/low issues should also be addressed for code quality.

*multiprocessing_options(default_max_batch_size=100),
click.Option(
["--flusher-processes", "flusher_processes"],
default=1,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Default flusher_processes=1 silently changes existing scaling behavior (bug)

Before this PR, ProcessSpansStrategyFactory received flusher_processes=None (its own default), and SpanFlusher.__init__ handled None by setting max_processes = len(buffer.assigned_shards) — one process per shard (line 51: self.max_processes = max_processes or len(buffer.assigned_shards)). The new CLI option unconditionally passes default=1 to the factory, overriding that None-based default. Any deployment that does not explicitly pass --flusher-processes will now run the flusher with a single process regardless of shard count, potentially reducing throughput by a factor equal to the number of assigned shards.

💡 Suggestion: Either align the CLI default with the actual flusher's implicit default by using default=None (and type=int, required=False), or document the intentional change and pick a sensible non-1 default matching typical production shard counts. If the intent is to keep 1 as the new default, the SpanFlusher docstring and factory should be updated to reflect this change.

Suggested change
default=1,
default=None,
📋 Prompt for AI Agents

In src/sentry/consumers/init.py around line 434, the new --flusher-processes Click option uses default=1, which silently overrides the existing behavior in SpanFlusher.__init__ where max_processes=None triggers self.max_processes = len(buffer.assigned_shards) (one process per shard). Change the default from 1 to None so that omitting --flusher-processes at the CLI preserves the previous scale-with-shards behavior. If default=1 is intentional (e.g. to reduce resource usage), add a prominent comment explaining the behavioral change and update the help text to say 'Defaults to one process per assigned shard if not set' vs the current incorrect implication.

Comment on lines +340 to +347
if remaining_time <= 0:
break

while process.is_alive() and (deadline is None or deadline > time.time()):
time.sleep(0.1)

if isinstance(self.process, multiprocessing.Process):
self.process.terminate()
if isinstance(process, multiprocessing.Process):
process.terminate()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Processes skipped by timeout break are never terminated in join() (bug)

In join() (line 340-341), when remaining_time <= 0 the outer loop calls break, immediately skipping all remaining processes. The process.terminate() call at line 346 is inside the loop body after the per-process wait, so any processes whose turn never came (because break fired) are left running as daemon processes that were never explicitly terminated. With the old single-process design this was impossible; with N processes it can silently leave N-1 child processes running during a deadline-constrained shutdown.

💡 Suggestion: Separate the termination pass from the wait pass. First wait for each process up to the deadline, then in a second pass call terminate() on all multiprocessing.Process instances regardless of whether the deadline was hit.

📋 Prompt for AI Agents

In src/sentry/spans/consumers/process/flusher.py, in the join() method (lines 336-347), refactor the process-joining loop so that process.terminate() is called for all multiprocessing.Process instances even when the deadline expires early. Currently, the break on line 341 skips remaining processes entirely, leaving them without a terminate() call. Fix by splitting into two loops: one that waits (with break on deadline) and a second unconditional pass that terminates all still-alive multiprocessing.Process instances.

Comment on lines +299 to 304
memory_infos: list[ServiceMemory] = []
for buffer in self.buffers.values():
memory_infos.extend(buffer.get_memory_info())
used = sum(x.used for x in memory_infos)
available = sum(x.available for x in memory_infos)
if available > 0 and used / available > max_memory_percentage:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 Redundant Redis INFO calls per sub-buffer in memory check (bug)

In submit() (lines 298-304), buffer.get_memory_info() is called for each sub-buffer in self.buffers. Each call invokes iter_cluster_memory_usage(self.client) which issues a Redis INFO command to every node in the cluster. Since all sub-buffers share the same underlying Redis cluster client (same SENTRY_SPAN_BUFFER_CLUSTER), N sub-buffers issue N×(cluster_node_count) INFO commands per submit() call. With 8 flusher processes and a 3-node cluster, that is 24 INFO calls where 3 are sufficient. The resulting used/available ratio is mathematically identical (both are scaled by N) so there is no wrong result, but the overhead is O(N) where it should be O(1).

💡 Suggestion: Collect memory info from a single buffer (they all query the same cluster), or deduplicate by cluster client identity before calling get_memory_info().

📋 Prompt for AI Agents

In src/sentry/spans/consumers/process/flusher.py in the submit() method around lines 298-304, replace the loop that calls buffer.get_memory_info() on each sub-buffer with a single call on the first available buffer (or any one buffer), since all sub-buffers share the same Redis cluster. This eliminates N-1 redundant Redis cluster INFO round-trips per submit() call.

Comment on lines +127 to +132
def _create_process_for_shard(self, shard: int):
# Find which process this shard belongs to and restart that process
for process_index, shards in self.process_to_shards_map.items():
if shard in shards:
self._create_process_for_shards(process_index, shards)
break
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 _create_process_for_shard is dead code, never called (bug)

The method _create_process_for_shard(self, shard: int) defined at line 127 is never invoked anywhere in the file. The actual restart path in _ensure_processes_alive calls _create_process_for_shards(process_index, shards) directly. The orphaned method has a near-identical name (_shard vs _shards) which could mislead future maintainers into thinking there are two active code paths for process restart (per-shard and per-process-group).

💡 Suggestion: Remove _create_process_for_shard if there is no planned use. If it is intended as a future API, add a comment explaining its intended future use.

📋 Prompt for AI Agents

In src/sentry/spans/consumers/process/flusher.py, remove the unused method _create_process_for_shard at lines 127-132. This method is never called anywhere in the file or codebase — the actual restart logic in _ensure_processes_alive calls _create_process_for_shards (plural) directly.

["--flusher-processes", "flusher_processes"],
default=1,
type=int,
help="Maximum number of processes for the span flusher. Defaults to 1.",
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Help text for --flusher-processes is misleading about the actual process count (style)

The help string reads "Maximum number of processes for the span flusher. Defaults to 1." but SpanFlusher.__init__ computes self.num_processes = min(self.max_processes, len(buffer.assigned_shards)) (line 60). The actual number of processes spawned is therefore at most min(flusher_processes, num_assigned_shards). Describing it as 'Maximum number' is technically correct but incomplete; without mentioning the shard-count cap, operators may pass a large value expecting proportional parallelism and be surprised.

💡 Suggestion: Update the help text to clarify the shard-count cap and remove the manually-stated default. For example: "Maximum number of processes for the span flusher. Capped at the number of assigned shards. Set to 0 or omit to use one process per shard."

📋 Prompt for AI Agents

In src/sentry/consumers/init.py at line 436, update the help string for the --flusher-processes click.Option to accurately reflect how SpanFlusher.init uses the value, mentioning the shard-count cap.

Comment on lines +62 to +63
time.sleep(0.1)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 time.sleep(0.1) in test_basic is a no-op due to the active monkeypatch (bug)

At line 15, monkeypatch.setattr('time.sleep', lambda _: None) replaces time.sleep on the time module with a no-op lambda. At line 62, the test then calls time.sleep(0.1) (via import time at the top of the file), which resolves to the same patched attribute. The comment # Give flusher threads time to process after drift change is therefore misleading — no real delay occurs. The test's correctness after the drift change depends entirely on step.join() (which sets stopped=True and waits), not on this sleep.

💡 Suggestion: Either use the real sleep by importing it with a different alias before monkeypatching (e.g. from time import sleep as real_sleep before monkeypatch.setattr), or remove the sleep and rely solely on step.join(), which already waits for the background thread to exit by setting stopped=True.

📋 Prompt for AI Agents

In tests/sentry/spans/consumers/process/test_consumer.py around lines 1-62: the time.sleep(0.1) call on line 62 is a no-op because monkeypatch.setattr('time.sleep', lambda _: None) on line 15 has already replaced time.sleep on the module. Fix by either: (a) capturing real_sleep = time.sleep before the monkeypatch and using real_sleep(0.1) at line 62, or (b) removing lines 61-62 entirely and relying on step.join() (line 64) to wait for the background thread.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants