Skip to content

ref(spans): Extract flush_segment pipeline helpers#116149

Merged
lvthanh03 merged 1 commit into
masterfrom
tony/flush_segments_refactor
May 26, 2026
Merged

ref(spans): Extract flush_segment pipeline helpers#116149
lvthanh03 merged 1 commit into
masterfrom
tony/flush_segments_refactor

Conversation

@lvthanh03
Copy link
Copy Markdown
Member

@lvthanh03 lvthanh03 commented May 25, 2026

Refs STREAM-1001

Refactors the flush_segments to be an orchestration function with unit testable helpers:

  • _load_flush_candidates: loading ready flush candidates from queue shards
  • _acquire_locks_for_flush_candidates: acquiring per-segment flush locks
  • _load_segment_data: loading payload keys and payload bytes
  • _build_flushed_segments: building producer-ready FlushedSegment objects
  • _record_segment_loss_metrics: recording segment loss / expiration metrics

Also adds/moves datamodels (FlushCandidate, FlushedSegment, OutputSpan) into buffer_types.py alongside the existing span buffer value types.

@lvthanh03 lvthanh03 requested review from a team as code owners May 25, 2026 19:55
@github-actions github-actions Bot added the Scope: Backend Automatically applied to PRs that change backend components label May 25, 2026
Base automatically changed from tony/spans-refactor-flush to master May 26, 2026 14:08
@lvthanh03 lvthanh03 force-pushed the tony/flush_segments_refactor branch from d4c017a to 596579f Compare May 26, 2026 15:23
@lvthanh03 lvthanh03 merged commit 34d677b into master May 26, 2026
62 checks passed
@lvthanh03 lvthanh03 deleted the tony/flush_segments_refactor branch May 26, 2026 16:24
max_flush_segments = options.get("spans.buffer.max-flush-segments")
max_segments_per_shard = math.ceil(max_flush_segments / shard_factor)

flush_candidates, load_ids_latency_ms = self._load_flush_candidates(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One tip to separate observability better:
create a FlushStats class with the following methods:

def add_load_ids_latency
def add_load_data_latency_ms
def add_decompress_latency_ms
def add_segments(segment_keys, segment_to_queue, payloads)

def flush()

Create the object at the beginning of the methods and pass it to the other funcitons.
Each of the functions contributes by adding observability data.
At the end of the method you flush it once.

Now the observability code is totally outside this method.

Comment on lines +702 to +703
flush_candidates: Sequence[FlushCandidate],
loaded_segment_data: LoadedSegmentData,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tip:
you are passing two sequences that are supposed to match 1:1. This generally introduces ways this method can break and additional conditions to verify, which increase complexity: what if the two lists do not match ?
This is one of the reasons dealing with the redis api is such a pain.

I'd recommend adjusting the data structures so you have to pass only one (maybe loadedSegmentData) and ensure it contains all the objects you need from flush_candidates.

@linear-code
Copy link
Copy Markdown

linear-code Bot commented May 27, 2026

STREAM-1001

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Scope: Backend Automatically applied to PRs that change backend components

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants