From 251b652d3dd70ffa4167a6d6f43dc4e59a53e4ab Mon Sep 17 00:00:00 2001 From: Giannis Gkiortzis Date: Thu, 30 Oct 2025 11:22:54 +0100 Subject: [PATCH 1/9] add initial buffer docs --- .../sdk/miscellaneous/telemetry-buffer.mdx | 475 +++++++++++------- 1 file changed, 296 insertions(+), 179 deletions(-) diff --git a/develop-docs/sdk/miscellaneous/telemetry-buffer.mdx b/develop-docs/sdk/miscellaneous/telemetry-buffer.mdx index ef745a05f0973..904723de7cf4f 100644 --- a/develop-docs/sdk/miscellaneous/telemetry-buffer.mdx +++ b/develop-docs/sdk/miscellaneous/telemetry-buffer.mdx @@ -5,215 +5,332 @@ sidebar_order: 35 ## Telemetry Buffer Layer: Prioritized, Bounded, Rate-Aware Envelope Delivery -### Current State +### Overview -The current transport implementation of most of the SDKs uses a simple FIFO queue that processes all envelope types with equal priority. As our SDKs collect more and more telemetry with different types, it may become a problem that critical telemetry (such as errors/crashes) get delayed and de-prioritized due to other telemetry (traces, replays, logs) occupying the queue. This is especially relevant with replays, logs and continuous profiling which we periodically flush to the queue. +The buffer system sits between the SDK client and the HTTP transport layer, ensuring that critical telemetry like errors take priority over high-volume data like logs and traces. This prevents important events from getting lost when your application is under heavy load or sending large amounts of telemetry. -### Proposal +### Motivation -Introduce a per-telemetry buffer layer between `Client` and `Transport` to: -- Batch telemetry when protocol-safe (all types, not only logs) -- Apply early rate limiting (don’t enqueue when limited) -- Enforce bounded memory via fixed-capacity ring buffers -- Prioritize delivery by telemetry criticality via weighted round-robin -- Keep existing transport/offline cache semantics unchanged (might change) -- (Stretch) Have a http connection per telemetry type (only backend SDKs) +- Aggregation lives in a unified buffer layer (this way we avoid creating multiple batch processors for different telemetry types). +- All telemetry types use capture APIs (CaptureX) routed through the Client. +- Rate-limit awareness is built-in across categories. +- Buffers support two modes: normal ring buffer and bucket-by-trace (for spans). +- For spans, dropping an entire trace under pressure is preferable. ### Architecture Overview +Introduce a `Buffer` layer between the `Client` and the `Transport`. This `Buffer` wraps prioritization and scheduling and exposes a minimal API to the SDK: + +- Add(item). +- Flush(timeout). +- Close(timeout). + ``` -┌───────────────────────────────────────────────────────────────────────────┐ -│ Client │ -│ captureEvent / captureTransaction / captureReplay / captureLogs / ... │ -└───────────────────────────────────────────────────────────────────────────┘ - │ - ▼ -┌───────────────────────────────────────────────────────────────────────────┐ -│ TelemetryBuffer │ -│ - Holds per-category buffers │ -│ - Early rate-limit check (shared RateLimiter) │ -│ - Method-based submit to per-category buffers │ -└───────────────────────────────────────────────────────────────────────────┘ - │ - ┌───────────────┼────────────────────────────────┐ - ▼ ▼ ▼ -┌───────────────────────┐ ┌───────────────────────┐ ┌───────────────────────┐ -│ Errors/Feedback │ │ Sessions/CheckIns │ │ Log │ -│ (CRITICAL) │ │ (HIGH) │ │ (MEDIUM) │ -│ RingBuffer + Batcher │ │ RingBuffer + Batcher │ │ RingBuffer + Batcher │ -└───────────────────────┘ └───────────────────────┘ └───────────────────────┘ - │ │ │ - ▼ ▼ ▼ -┌───────────────────────────────────────────────────────────────────────────┐ -│ EnvelopeScheduler (Weighted RR) │ -│ - Cross-buffer selection by priority (5..1) │ -│ - Re-checks RateLimiter before send │ -│ - Submits envelopes to transport │ -└───────────────────────────────────────────────────────────────────────────┘ - │ - ▼ -┌───────────────────────────────────────────────────────────────────────────┐ -│ Transport (unchanged). │ -│ - Single worker, disk cache, offline retry, client reports │ -└───────────────────────────────────────────────────────────────────────────┘ +┌────────────────────────────────────────────────────────────────────────────┐ +│ Client │ +│ captureEvent / captureTransaction / captureCheckIn / captureLog │ +└────────────────────────────────────────────────────────────────────────────┘ + + ▼ +┌────────────────────────────────────────────────────────────────────────────┐ +│ Buffer │ +│ Add(item) · Flush(timeout) · Close(timeout) │ +│ │ +│ ┌──────────────────────┐ ┌──────────────────────┐ ┌──────────────────┐ │ +│ │ Error Store │ │ Check-in Store │ │ Log Store │ │ +│ │ (CRITICAL) │ │ (HIGH) │ │ (MEDIUM) │ │ +│ │ Timeout: N/A │ │ Timeout: N/A │ │ Timeout: 5s │ │ +│ │ BatchSize: 1 │ │ BatchSize: 1 │ │ BatchSize: 100 │ │ +│ └──────────────────────┘ └──────────────────────┘ └──────────────────┘ │ +│ │ │ +│ ▼ │ +│ ┌─────────────────────────────────────────────────────────────────────┐ │ +│ │ Scheduler (Weighted Round-Robin) │ │ +│ │ - Priority weights: CRITICAL=5, HIGH=4, MEDIUM=3, LOW=2, LOWEST=1 │ │ +│ │ - Processes a batch of items based on BatchSize and/or Timeout │ │ +│ │ - Builds envelopes from batch │ │ +│ │ - Submits envelopes to transport │ │ +│ └─────────────────────────────────────────────────────────────────────┘ │ +└────────────────────────────────────────────────────────────────────────────┘ + + ▼ +┌────────────────────────────────────────────────────────────────────────────┐ +│ Transport │ +│ - Single worker, disk cache, offline retry, client reports │ +└────────────────────────────────────────────────────────────────────────────┘ ``` -### Priorities (TBD) -- CRITICAL: Error, Feedback -- HIGH: Session, CheckIn -- MEDIUM: Log, ClientReport, Span -- LOW: Transaction, Profile, ProfileChunk -- LOWEST: Replay +#### How the Buffer works + +- **Smart batching**: Logs are batched into single requests; errors, transactions, and monitors are sent immediately. +- **Pre-send rate limiting**: The scheduler checks rate limits before dispatching, avoiding unnecessary requests while keeping items buffered. +- **Category isolation**: Separate ring buffers for each telemetry type prevent head-of-line blocking. +- **Weighted scheduling**: High-priority telemetry gets sent more frequently via round-robin selection. +- **Transport compatibility**: Works with existing HTTP transport implementations without modification. + +### Priorities +- CRITICAL: Error, Feedback. +- HIGH: Session, CheckIn. +- MEDIUM: Log, ClientReport, Span. +- LOW: Transaction, Profile, ProfileChunk. +- LOWEST: Replay. Configurable via weights. ### Components -- **Client** - - Owns per-category buffers and is the single entry for all capture paths. - - Consults `RateLimiter` early; on active rate limit do not enqueue and record `DiscardReason.RATELIMIT_BACKOFF`. - - Submits items from capture methods to the matching per-category buffer. - -- **TelemetryBuffer\** (per DataCategory) - - Fixed-capacity ring buffer (bounded memory). - - Stores raw items (pre-envelope). - - Type-aware batching policy (size and/or time). Examples: - - Errors/Feedback/Sessions/CheckIns: typically single-item; allow small batch if protocol-safe. - - Logs: size/time-based (reuse semantics of `LoggerBatchProcessor`). - - Spans: trace-based. - - Transactions/Profiles/Replay: default single-item; - - Overflow policy: drop-oldest (default). Record `DiscardReason.QUEUE_OVERFLOW`. - -- **EnvelopeScheduler** - - Single worker; weighted round-robin across priorities: weights 5,4,3,2,1. - - Pulls ready batches from buffers; builds envelopes from batches; re-checks `RateLimiter` at send-time. - - Submits envelopes to `ITransport`. If transport unhealthy or recently rejected, backs off briefly. - -- **Rate Limiting** - - Shared `RateLimiter` is the source of truth. - - Checked both at buffer ingress (to avoid queueing) and at egress (to avoid sending). - -- **Transport and Offline Cache** - - Unchanged. Disk caching, retry semantics, client report recording remain in transport. - - Buffers are in-memory only (for now). - -### Configuration (new options) -- `bufferCapacityByCategory`: map\ (defaults tuned per volume) -- `priorityWeights`: CRITICAL..LOWEST (default 5,4,3,2,1) -- `overflowPolicy`: `drop_oldest` | `drop_newest` (default `drop_oldest`) -- `preemptLowerPriorityForCritical`: boolean (default false) -- `scheduler`: - - `backoffMsOnTransportUnhealthy` (e.g., 250–1000 ms for backpressure) - - `maxQueueSize` (soft cap; default derived from transport queue) - -### Pseudocode (Kotlin-ish) - -```kotlin -enum class EnvelopePriority(val weight: Int) { - CRITICAL(5), HIGH(4), MEDIUM(3), LOW(2), LOWEST(1) +#### Storage + +Each telemetry category maintains a store interface; a fixed-size circular array/ring buffer (not to be confused with the `Buffer` wrapper) that stores items before transmission: + +- **Bounded capacity**: Default to 100 items for errors, logs, and monitors; 1000 for transactions. This prevents unbounded memory growth regardless of telemetry volume and backpressure handling. +- **Overflow policies**: + - `drop_oldest` (default): Evicts the oldest item when the buffer is full, making room for new data. + - `drop_newest`: Rejects incoming items when full, preserving what's already queued. +- **Batching configuration**: + - `batchSize`: Number of items to combine into a single batch (1 for errors, transactions, and monitors; 100 for logs). + - `timeout`: Maximum time to wait before sending a partial batch (5 seconds for logs). +- **Bucketed Storage Support**: The storage interface should satisfy both bucketed and single-item implementations, allowing sending spans per trace id. +- **Observability**: Each store tracks offered, accepted, and dropped item counts for client reports. + +##### Single-item ring buffer (default) + +- Data structure: fixed-size circular array with head/tail indices; O(1) `Offer`/`Poll`. +- Offer semantics: if not full, append; when full, apply `overflowPolicy`: + - `drop_oldest`: evict the oldest item, insert the new one, and invoke the dropped callback with reason `buffer_full_drop_oldest`. + - `drop_newest`: reject the new item and invoke the dropped callback with reason `buffer_full_drop_newest`. +- Readiness: a store is ready when `size >= batchSize` or when `timeout` has elapsed since `lastFlushTime` (and it is non-empty). +- Polling: `PollIfReady()` returns up to `batchSize` items and updates `lastFlushTime`; `Drain()` empties the store. + +##### Bucketed-by-trace storage (spans) + +- Purpose: keep spans from the same trace together and flush them as a unit to avoid partial-trace delivery under pressure. +- Grouping: a new bucket is created per trace id; a map (`traceIndex`) provides O(1) lookup. Items without a trace id are accepted but grouped without an index. +- Capacity model: two limits are enforced—overall `itemCapacity` and a derived `bucketCapacity ~= capacity/10` (minimum 10). Additionally, a `perBucketItemLimit` (100) prevents a single trace from monopolizing storage. +- Readiness: when total buffered items reach `batchSize` or `timeout` elapses, the entire oldest bucket is flushed to preserve trace coherence. +- Overflow behavior: + - `drop_oldest`: evict the oldest bucket (dropping all its items) and invoke the dropped callback for each (`buffer_full_drop_oldest_bucket`). Preferred for spans to drop an entire trace. + - `drop_newest`: reject the incoming item (`buffer_full_drop_newest`). +- Lifecycle: empty buckets are removed and their trace ids are purged from the index; `MarkFlushed()` updates `lastFlushTime`. + +Stores are mapped to [DataCategories](https://github.com/getsentry/relay/blob/master/relay-base-schema/src/data_category.rs), which determine their scheduling priority and rate limits. + +#### Scheduler + +The scheduler runs as a background worker, coordinating the flow of telemetry from storage to the transport: + +- **Initialization**: Constructs a weighted priority cycle (e.g., `[CRITICAL×5, HIGH×4, MEDIUM×3, ...]`) based on configured weights. +- **Event loop**: Wakes when explicitly signaled from the `captureX` methods on the client when new data is available (if the language does not support this, then a periodic ticker can be used). +- **Buffer selection**: Iterates through the priority cycle, selecting buffers that are ready to flush and not rate limited. +- **Rate limit coordination**: Queries the transport's rate limit state before attempting to send any category. +- **Envelope construction**: Converts buffered items into Sentry protocol envelopes. + - Log items are batched together into a single envelope with multiple log entries. + - Other categories typically send one item per envelope. +- **Graceful shutdown**: During client shutdown, force-drains all buffers to prevent data loss. + +#### Transport + +The transport layer handles HTTP communication with Sentry's ingestion endpoints: + +### Configuration + +#### Buffer Options +- **Capacity**: 100 items for errors, logs, and monitors; 1000 for transactions. +- **Overflow policy**: `drop_oldest`. +- **Batch size**: 1 for errors and monitors (immediate send), 100 for logs. +- **Batch timeout**: 5 seconds for logs. + +#### Scheduler Options +- **Priority weights**: CRITICAL=5, HIGH=4, MEDIUM=3, LOW=2, LOWEST=1. + +#### Transport Options +- **Queue size**: 1000 envelopes for AsyncTransport. +- **HTTP timeout**: 30 seconds. + +### Implementation Example (Go) + +The `sentry-go` SDK provides a reference implementation of this architecture: + +#### Storage Interface + +```go +type Storage[T any] interface { + // Core operations + Offer(item T) bool + Poll() (T, bool) + PollBatch(maxItems int) []T + PollIfReady() []T + Drain() []T + Peek() (T, bool) + + // State queries + Size() int + Capacity() int + IsEmpty() bool + IsFull() bool + Utilization() float64 + + // Flush management + IsReadyToFlush() bool + MarkFlushed() + + // Category/Priority + Category() ratelimit.Category + Priority() ratelimit.Priority + + // Metrics + OfferedCount() int64 + DroppedCount() int64 + AcceptedCount() int64 + DropRate() float64 + GetMetrics() BufferMetrics + + // Configuration + SetDroppedCallback(callback func(item T, reason string)) + Clear() } -interface TelemetryBuffer { - val category: DataCategory - val priority: EnvelopePriority - fun offer(item: T): OfferResult // may drop; record client report - fun nextBatchReady(nowMs: Long): Boolean - fun drainBatch(maxItems: Int, nowMs: Long): List - fun size(): Int + +// Single item store +func (b *RingBuffer[T]) PollIfReady() []T { + b.mu.Lock() + defer b.mu.Unlock() + + if b.size == 0 { + return nil + } + + ready := b.size >= b.batchSize || + (b.timeout > 0 && time.Since(b.lastFlushTime) >= b.timeout) + + if !ready { + return nil + } + + itemCount := b.batchSize + if itemCount > b.size { + itemCount = b.size + } + + result := make([]T, itemCount) + var zero T + + for i := 0; i < itemCount; i++ { + result[i] = b.items[b.head] + b.items[b.head] = zero + b.head = (b.head + 1) % b.capacity + b.size-- + } + + b.lastFlushTime = time.Now() + return result } -class Client( - private val buffers: Map>, - private val rateLimiter: RateLimiter, - private val clientReports: ClientReportRecorder -) { - fun captureEvent(event: Any) = submit(DataCategory.Error, event) - fun captureTransaction(tx: Any) = submit(DataCategory.Transaction, tx) - fun captureReplay(replay: Any) = submit(DataCategory.Replay, replay) - fun captureLog(log: Any) = submit(DataCategory.Log, log) - // ... other capture methods ... - - private fun submit(category: DataCategory, item: Any) { - if (rateLimiter.isRateLimitActive(category)) { - clientReports.recordLostEvent(DiscardReason.RATELIMIT_BACKOFF, category) - return - } - val res = buffers[category]?.offer(item) - if (res is OfferResult.Dropped) { - clientReports.recordLostEvent(DiscardReason.QUEUE_OVERFLOW, category, res.count) - } - } +// Bucketed store +func (b *BucketedBuffer[T]) PollIfReady() []T { + b.mu.Lock() + defer b.mu.Unlock() + if b.bucketCount == 0 { + return nil + } + // the batchSize is satisfied based on total items + ready := b.totalItems >= b.batchSize || (b.timeout > 0 && time.Since(b.lastFlushTime) >= b.timeout) + if !ready { + return nil + } + // keep track of oldest bucket + oldest := b.buckets[b.head] + if oldest == nil { + return nil + } + items := oldest.items + if oldest.traceID != "" { + delete(b.traceIndex, oldest.traceID) + } + b.buckets[b.head] = nil + b.head = (b.head + 1) % b.bucketCapacity + b.totalItems -= len(items) + b.bucketCount-- + b.lastFlushTime = time.Now() + return items } -class EnvelopeScheduler( - private val buffersByPriority: Map>, - private val transport: ITransport, - private val rateLimiter: RateLimiter, - private val clientReports: ClientReportRecorder, - private val weights: Map, - private val backoffMs: Long -) : Thread("TelemetryEnvelopeScheduler") { - override fun run() { - val order = generatePriorityCycle(weights) // e.g., [CRITICAL×5, HIGH×4, ...] - while (true) { - var sentSomething = false - for (p in order) { - val buf = selectReadyBuffer(buffersByPriority[p]) - if (buf != null) { - val cat = buf.category - val batch = buf.drainBatch(maxItemsFor(cat), nowMs()) - if (batch.isNotEmpty()) { - if (!rateLimiter.isRateLimitActive(cat)) { - val envelopes = buildEnvelopes(cat, batch) - for (env in envelopes) { - transport.send(env) - } - sentSomething = true - } else { - clientReports.recordLostEvent(DiscardReason.RATELIMIT_BACKOFF, cat, batch.size) - } - } +``` + +#### Scheduler Processing + +```go +func (s *Scheduler) processNextBatch() { + // Select priority from weighted cycle + priority := s.currentCycle[s.cyclePos] + s.cyclePos = (s.cyclePos + 1) % len(s.currentCycle) + + // Find ready buffer for this priority + for category, buffer := range s.buffers { + if buffer.Priority() == priority && + !s.transport.IsRateLimited(category) && + buffer.IsReadyToFlush() { + items := buffer.PollIfReady() + s.sendItems(category, items) + // only process one batch per tick + break } - } - if (!sentSomething) sleep(backoffMs) } - } +} +``` + + +#### Flushing + +```go +func (s *Scheduler) flush() { + // should process all store buffers and send to transport + for category, buffer := range s.buffers { + if !buffer.IsEmpty() { + s.processItems(buffer, category, true) + } + } +} - private fun buildEnvelopes(category: DataCategory, batch: List): List { - // Applies SDK version, trace context if provided, and size constraints - // Returns one or more envelopes constructed from the batch - TODO() - } +// The Buffer exposes the flush method that calls both +func (b *Buffer) Flush(timeout time.Time) { + scheduler.flush() + transport.flush(timeout) } ``` -Ring buffer semantics (per buffer): -- Fixed capacity N; on offer when full: - - `drop_oldest`: evict 1 from head, enqueue new; record queue overflow client report - - `drop_newest`: reject incoming; record queue overflow client report +### Batching Policies -Batching policy examples: -- Logs: batch up to 100 items or 5s; split if envelope size limit reached -- Spans: batch per trace -- Errors/Feedback: batch size 1 (default) -- Sessions/CheckIns: small batches if safe (e.g., 10 or 1s) -- Transactions/Profiles/Replay: default 1 +Different telemetry types use batching strategies optimized for their characteristics: -### Observability +- **Errors**: Single-item envelopes for immediate delivery (latency-sensitive). +- **Monitors**: Single-item envelopes to maintain check-in timing accuracy. +- **Logs**: Batches of up to 100 items or 5-second timeout, whichever comes first (volume-optimized). +- **Transactions**: Single-item envelopes (trace-aware batching is a future enhancement). + +#### Batch Processing Details + +For high-volume telemetry like logs, the buffer uses time and count-based batching: -- Counters per category: enqueued, sent, queue_overflow, rate_limited - - Create a dashboard per category with reasons for dropped events -- Health: include buffer health in `Client.isHealthy()` alongside transport +**Timeout-based flushing**: +- When the first item enters an empty log buffer, a timeout starts (5 seconds). +- When the timeout expires, all buffered log items are sent regardless of batch size. +- The timeout resets after each flush. -### Defaults and Safety +**Count-based flushing**: +- When the number of buffered log items reaches the batch size (100), they are sent immediately. + +**Ordering and lifecycle**: +- Filtering and sampling happen before buffering to avoid wasting buffer space. +- Rate limiting is checked before dispatch; if limited, items remain buffered. +- Items are batched into a single envelope with multiple entries of the same type (logs). + +### Observability -- Enabled by default with conservative capacities and weights -- No change to envelope format, transport, or disk caching +The buffer system exposes metrics to help you understand telemetry flow and identify issues: -### Open Questions +- **Per-category counters**: Items offered, sent successfully, and dropped. +- **Drop reasons**: Distinguish between buffer overflow and rate limit drops. +- **Buffer utilization**: Current size vs. capacity for each category. -- Default capacities per category (especially logs/replay vs. critical) -- Category weights (logs and spans will be high volume, so we might want to send them more often) -- Safe batching across categories beyond logs/client reports - - Shall we adapt Relay to accept multiple "top-level" items (like errors or transactions) in a single envelope? -- Multiple http connections per telemetry type +These metrics enable dashboards that visualize why events are being dropped, helping you tune buffer sizes or identify rate limiting issues. From e89c19be0d148ba50e1e442c1a63aa971453b721 Mon Sep 17 00:00:00 2001 From: Giannis Gkiortzis Date: Fri, 7 Nov 2025 10:29:53 +0100 Subject: [PATCH 2/9] remove redundancies --- .../sdk/miscellaneous/telemetry-buffer.mdx | 80 ++++--------------- 1 file changed, 17 insertions(+), 63 deletions(-) diff --git a/develop-docs/sdk/miscellaneous/telemetry-buffer.mdx b/develop-docs/sdk/miscellaneous/telemetry-buffer.mdx index 904723de7cf4f..377a7f74b3752 100644 --- a/develop-docs/sdk/miscellaneous/telemetry-buffer.mdx +++ b/develop-docs/sdk/miscellaneous/telemetry-buffer.mdx @@ -34,7 +34,7 @@ Introduce a `Buffer` layer between the `Client` and the `Transport`. This `Buffe ▼ ┌────────────────────────────────────────────────────────────────────────────┐ │ Buffer │ -│ Add(item) · Flush(timeout) · Close(timeout) │ +│ Add(item) · Flush(timeout) · Close(timeout) │ │ │ │ ┌──────────────────────┐ ┌──────────────────────┐ ┌──────────────────┐ │ │ │ Error Store │ │ Check-in Store │ │ Log Store │ │ @@ -71,8 +71,8 @@ Introduce a `Buffer` layer between the `Client` and the `Transport`. This `Buffe ### Priorities - CRITICAL: Error, Feedback. - HIGH: Session, CheckIn. -- MEDIUM: Log, ClientReport, Span. -- LOW: Transaction, Profile, ProfileChunk. +- MEDIUM: Transaction, ClientReport, Span. +- LOW: Log, Profile, ProfileChunk. - LOWEST: Replay. Configurable via weights. @@ -90,8 +90,8 @@ Each telemetry category maintains a store interface; a fixed-size circular array - **Batching configuration**: - `batchSize`: Number of items to combine into a single batch (1 for errors, transactions, and monitors; 100 for logs). - `timeout`: Maximum time to wait before sending a partial batch (5 seconds for logs). -- **Bucketed Storage Support**: The storage interface should satisfy both bucketed and single-item implementations, allowing sending spans per trace id. -- **Observability**: Each store tracks offered, accepted, and dropped item counts for client reports. +- **Bucketed Storage Support**: The storage interface should satisfy both bucketed and single-item implementations, allowing sending spans per trace id (required for Span First). +- **Observability**: Each store tracks dropped item counts for client reports. ##### Single-item ring buffer (default) @@ -104,15 +104,20 @@ Each telemetry category maintains a store interface; a fixed-size circular array ##### Bucketed-by-trace storage (spans) -- Purpose: keep spans from the same trace together and flush them as a unit to avoid partial-trace delivery under pressure. -- Grouping: a new bucket is created per trace id; a map (`traceIndex`) provides O(1) lookup. Items without a trace id are accepted but grouped without an index. -- Capacity model: two limits are enforced—overall `itemCapacity` and a derived `bucketCapacity ~= capacity/10` (minimum 10). Additionally, a `perBucketItemLimit` (100) prevents a single trace from monopolizing storage. -- Readiness: when total buffered items reach `batchSize` or `timeout` elapses, the entire oldest bucket is flushed to preserve trace coherence. -- Overflow behavior: - - `drop_oldest`: evict the oldest bucket (dropping all its items) and invoke the dropped callback for each (`buffer_full_drop_oldest_bucket`). Preferred for spans to drop an entire trace. +- **Purpose**: keep spans from the same trace together and flush them as a unit to avoid partial-trace delivery under pressure. This addresses a gap in standard implementations where individual span drops can create incomplete traces. +- **Grouping**: a new bucket is created per trace id; a map (`traceIndex`) provides O(1) lookup. +- **Capacity model**: two limits are enforced—overall `itemCapacity` and a derived `bucketCapacity ~= capacity/10` (minimum 10). +- **Readiness**: when total buffered items reach `batchSize` or `timeout` elapses, the entire oldest bucket is flushed to preserve trace coherence. +- **Overflow behavior**: + - `drop_oldest`: evict the oldest bucket (dropping all its items). Preferred for spans to drop an entire trace. - `drop_newest`: reject the incoming item (`buffer_full_drop_newest`). - Lifecycle: empty buckets are removed and their trace ids are purged from the index; `MarkFlushed()` updates `lastFlushTime`. +##### Trace Consistency Trade-offs + +There still remains a small subset of cases that might result in partial traces, where either an old trace bucket was dropped and a new span with the same trace arrived, or we dropped an incoming span of this trace. +The preferred overflow behavior in most cases should be `drop_oldest` since it results in the fewest incomplete traces from the two scenarios. + Stores are mapped to [DataCategories](https://github.com/getsentry/relay/blob/master/relay-base-schema/src/data_category.rs), which determine their scheduling priority and rate limits. #### Scheduler @@ -135,7 +140,7 @@ The transport layer handles HTTP communication with Sentry's ingestion endpoints ### Configuration #### Buffer Options -- **Capacity**: 100 items for errors, logs, and monitors; 1000 for transactions. +- **Capacity**: 100 items for errors and check-ins, 10*BATCH_SIZE for logs, 1000 for transactions. - **Overflow policy**: `drop_oldest`. - **Batch size**: 1 for errors and monitors (immediate send), 100 for logs. - **Batch timeout**: 5 seconds for logs. @@ -143,10 +148,6 @@ The transport layer handles HTTP communication with Sentry's ingestion endpoints #### Scheduler Options - **Priority weights**: CRITICAL=5, HIGH=4, MEDIUM=3, LOW=2, LOWEST=1. -#### Transport Options -- **Queue size**: 1000 envelopes for AsyncTransport. -- **HTTP timeout**: 30 seconds. - ### Implementation Example (Go) The `sentry-go` SDK provides a reference implementation of this architecture: @@ -177,17 +178,6 @@ type Storage[T any] interface { // Category/Priority Category() ratelimit.Category Priority() ratelimit.Priority - - // Metrics - OfferedCount() int64 - DroppedCount() int64 - AcceptedCount() int64 - DropRate() float64 - GetMetrics() BufferMetrics - - // Configuration - SetDroppedCallback(callback func(item T, reason string)) - Clear() } @@ -298,39 +288,3 @@ func (b *Buffer) Flush(timeout time.Time) { transport.flush(timeout) } ``` - -### Batching Policies - -Different telemetry types use batching strategies optimized for their characteristics: - -- **Errors**: Single-item envelopes for immediate delivery (latency-sensitive). -- **Monitors**: Single-item envelopes to maintain check-in timing accuracy. -- **Logs**: Batches of up to 100 items or 5-second timeout, whichever comes first (volume-optimized). -- **Transactions**: Single-item envelopes (trace-aware batching is a future enhancement). - -#### Batch Processing Details - -For high-volume telemetry like logs, the buffer uses time and count-based batching: - -**Timeout-based flushing**: -- When the first item enters an empty log buffer, a timeout starts (5 seconds). -- When the timeout expires, all buffered log items are sent regardless of batch size. -- The timeout resets after each flush. - -**Count-based flushing**: -- When the number of buffered log items reaches the batch size (100), they are sent immediately. - -**Ordering and lifecycle**: -- Filtering and sampling happen before buffering to avoid wasting buffer space. -- Rate limiting is checked before dispatch; if limited, items remain buffered. -- Items are batched into a single envelope with multiple entries of the same type (logs). - -### Observability - -The buffer system exposes metrics to help you understand telemetry flow and identify issues: - -- **Per-category counters**: Items offered, sent successfully, and dropped. -- **Drop reasons**: Distinguish between buffer overflow and rate limit drops. -- **Buffer utilization**: Current size vs. capacity for each category. - -These metrics enable dashboards that visualize why events are being dropped, helping you tune buffer sizes or identify rate limiting issues. From d677af4254e0c244e9499995516fc508eb5a5112 Mon Sep 17 00:00:00 2001 From: Giannis Gkiortzis Date: Fri, 7 Nov 2025 12:20:15 +0100 Subject: [PATCH 3/9] add backend telemetry buffer --- .../sdk/miscellaneous/telemetry-buffer.mdx | 290 ------------------ .../backend-telemetry-buffer.mdx | 286 ++++++++++++++++- 2 files changed, 285 insertions(+), 291 deletions(-) delete mode 100644 develop-docs/sdk/miscellaneous/telemetry-buffer.mdx diff --git a/develop-docs/sdk/miscellaneous/telemetry-buffer.mdx b/develop-docs/sdk/miscellaneous/telemetry-buffer.mdx deleted file mode 100644 index 377a7f74b3752..0000000000000 --- a/develop-docs/sdk/miscellaneous/telemetry-buffer.mdx +++ /dev/null @@ -1,290 +0,0 @@ ---- -title: Telemetry Buffer -sidebar_order: 35 ---- - -## Telemetry Buffer Layer: Prioritized, Bounded, Rate-Aware Envelope Delivery - -### Overview - -The buffer system sits between the SDK client and the HTTP transport layer, ensuring that critical telemetry like errors take priority over high-volume data like logs and traces. This prevents important events from getting lost when your application is under heavy load or sending large amounts of telemetry. - -### Motivation - -- Aggregation lives in a unified buffer layer (this way we avoid creating multiple batch processors for different telemetry types). -- All telemetry types use capture APIs (CaptureX) routed through the Client. -- Rate-limit awareness is built-in across categories. -- Buffers support two modes: normal ring buffer and bucket-by-trace (for spans). -- For spans, dropping an entire trace under pressure is preferable. - -### Architecture Overview - -Introduce a `Buffer` layer between the `Client` and the `Transport`. This `Buffer` wraps prioritization and scheduling and exposes a minimal API to the SDK: - -- Add(item). -- Flush(timeout). -- Close(timeout). - -``` -┌────────────────────────────────────────────────────────────────────────────┐ -│ Client │ -│ captureEvent / captureTransaction / captureCheckIn / captureLog │ -└────────────────────────────────────────────────────────────────────────────┘ - - ▼ -┌────────────────────────────────────────────────────────────────────────────┐ -│ Buffer │ -│ Add(item) · Flush(timeout) · Close(timeout) │ -│ │ -│ ┌──────────────────────┐ ┌──────────────────────┐ ┌──────────────────┐ │ -│ │ Error Store │ │ Check-in Store │ │ Log Store │ │ -│ │ (CRITICAL) │ │ (HIGH) │ │ (MEDIUM) │ │ -│ │ Timeout: N/A │ │ Timeout: N/A │ │ Timeout: 5s │ │ -│ │ BatchSize: 1 │ │ BatchSize: 1 │ │ BatchSize: 100 │ │ -│ └──────────────────────┘ └──────────────────────┘ └──────────────────┘ │ -│ │ │ -│ ▼ │ -│ ┌─────────────────────────────────────────────────────────────────────┐ │ -│ │ Scheduler (Weighted Round-Robin) │ │ -│ │ - Priority weights: CRITICAL=5, HIGH=4, MEDIUM=3, LOW=2, LOWEST=1 │ │ -│ │ - Processes a batch of items based on BatchSize and/or Timeout │ │ -│ │ - Builds envelopes from batch │ │ -│ │ - Submits envelopes to transport │ │ -│ └─────────────────────────────────────────────────────────────────────┘ │ -└────────────────────────────────────────────────────────────────────────────┘ - - ▼ -┌────────────────────────────────────────────────────────────────────────────┐ -│ Transport │ -│ - Single worker, disk cache, offline retry, client reports │ -└────────────────────────────────────────────────────────────────────────────┘ -``` - -#### How the Buffer works - -- **Smart batching**: Logs are batched into single requests; errors, transactions, and monitors are sent immediately. -- **Pre-send rate limiting**: The scheduler checks rate limits before dispatching, avoiding unnecessary requests while keeping items buffered. -- **Category isolation**: Separate ring buffers for each telemetry type prevent head-of-line blocking. -- **Weighted scheduling**: High-priority telemetry gets sent more frequently via round-robin selection. -- **Transport compatibility**: Works with existing HTTP transport implementations without modification. - -### Priorities -- CRITICAL: Error, Feedback. -- HIGH: Session, CheckIn. -- MEDIUM: Transaction, ClientReport, Span. -- LOW: Log, Profile, ProfileChunk. -- LOWEST: Replay. - -Configurable via weights. - -### Components - -#### Storage - -Each telemetry category maintains a store interface; a fixed-size circular array/ring buffer (not to be confused with the `Buffer` wrapper) that stores items before transmission: - -- **Bounded capacity**: Default to 100 items for errors, logs, and monitors; 1000 for transactions. This prevents unbounded memory growth regardless of telemetry volume and backpressure handling. -- **Overflow policies**: - - `drop_oldest` (default): Evicts the oldest item when the buffer is full, making room for new data. - - `drop_newest`: Rejects incoming items when full, preserving what's already queued. -- **Batching configuration**: - - `batchSize`: Number of items to combine into a single batch (1 for errors, transactions, and monitors; 100 for logs). - - `timeout`: Maximum time to wait before sending a partial batch (5 seconds for logs). -- **Bucketed Storage Support**: The storage interface should satisfy both bucketed and single-item implementations, allowing sending spans per trace id (required for Span First). -- **Observability**: Each store tracks dropped item counts for client reports. - -##### Single-item ring buffer (default) - -- Data structure: fixed-size circular array with head/tail indices; O(1) `Offer`/`Poll`. -- Offer semantics: if not full, append; when full, apply `overflowPolicy`: - - `drop_oldest`: evict the oldest item, insert the new one, and invoke the dropped callback with reason `buffer_full_drop_oldest`. - - `drop_newest`: reject the new item and invoke the dropped callback with reason `buffer_full_drop_newest`. -- Readiness: a store is ready when `size >= batchSize` or when `timeout` has elapsed since `lastFlushTime` (and it is non-empty). -- Polling: `PollIfReady()` returns up to `batchSize` items and updates `lastFlushTime`; `Drain()` empties the store. - -##### Bucketed-by-trace storage (spans) - -- **Purpose**: keep spans from the same trace together and flush them as a unit to avoid partial-trace delivery under pressure. This addresses a gap in standard implementations where individual span drops can create incomplete traces. -- **Grouping**: a new bucket is created per trace id; a map (`traceIndex`) provides O(1) lookup. -- **Capacity model**: two limits are enforced—overall `itemCapacity` and a derived `bucketCapacity ~= capacity/10` (minimum 10). -- **Readiness**: when total buffered items reach `batchSize` or `timeout` elapses, the entire oldest bucket is flushed to preserve trace coherence. -- **Overflow behavior**: - - `drop_oldest`: evict the oldest bucket (dropping all its items). Preferred for spans to drop an entire trace. - - `drop_newest`: reject the incoming item (`buffer_full_drop_newest`). -- Lifecycle: empty buckets are removed and their trace ids are purged from the index; `MarkFlushed()` updates `lastFlushTime`. - -##### Trace Consistency Trade-offs - -There still remains a small subset of cases that might result in partial traces, where either an old trace bucket was dropped and a new span with the same trace arrived, or we dropped an incoming span of this trace. -The preferred overflow behavior in most cases should be `drop_oldest` since it results in the fewest incomplete traces from the two scenarios. - -Stores are mapped to [DataCategories](https://github.com/getsentry/relay/blob/master/relay-base-schema/src/data_category.rs), which determine their scheduling priority and rate limits. - -#### Scheduler - -The scheduler runs as a background worker, coordinating the flow of telemetry from storage to the transport: - -- **Initialization**: Constructs a weighted priority cycle (e.g., `[CRITICAL×5, HIGH×4, MEDIUM×3, ...]`) based on configured weights. -- **Event loop**: Wakes when explicitly signaled from the `captureX` methods on the client when new data is available (if the language does not support this, then a periodic ticker can be used). -- **Buffer selection**: Iterates through the priority cycle, selecting buffers that are ready to flush and not rate limited. -- **Rate limit coordination**: Queries the transport's rate limit state before attempting to send any category. -- **Envelope construction**: Converts buffered items into Sentry protocol envelopes. - - Log items are batched together into a single envelope with multiple log entries. - - Other categories typically send one item per envelope. -- **Graceful shutdown**: During client shutdown, force-drains all buffers to prevent data loss. - -#### Transport - -The transport layer handles HTTP communication with Sentry's ingestion endpoints: - -### Configuration - -#### Buffer Options -- **Capacity**: 100 items for errors and check-ins, 10*BATCH_SIZE for logs, 1000 for transactions. -- **Overflow policy**: `drop_oldest`. -- **Batch size**: 1 for errors and monitors (immediate send), 100 for logs. -- **Batch timeout**: 5 seconds for logs. - -#### Scheduler Options -- **Priority weights**: CRITICAL=5, HIGH=4, MEDIUM=3, LOW=2, LOWEST=1. - -### Implementation Example (Go) - -The `sentry-go` SDK provides a reference implementation of this architecture: - -#### Storage Interface - -```go -type Storage[T any] interface { - // Core operations - Offer(item T) bool - Poll() (T, bool) - PollBatch(maxItems int) []T - PollIfReady() []T - Drain() []T - Peek() (T, bool) - - // State queries - Size() int - Capacity() int - IsEmpty() bool - IsFull() bool - Utilization() float64 - - // Flush management - IsReadyToFlush() bool - MarkFlushed() - - // Category/Priority - Category() ratelimit.Category - Priority() ratelimit.Priority -} - - -// Single item store -func (b *RingBuffer[T]) PollIfReady() []T { - b.mu.Lock() - defer b.mu.Unlock() - - if b.size == 0 { - return nil - } - - ready := b.size >= b.batchSize || - (b.timeout > 0 && time.Since(b.lastFlushTime) >= b.timeout) - - if !ready { - return nil - } - - itemCount := b.batchSize - if itemCount > b.size { - itemCount = b.size - } - - result := make([]T, itemCount) - var zero T - - for i := 0; i < itemCount; i++ { - result[i] = b.items[b.head] - b.items[b.head] = zero - b.head = (b.head + 1) % b.capacity - b.size-- - } - - b.lastFlushTime = time.Now() - return result -} - -// Bucketed store -func (b *BucketedBuffer[T]) PollIfReady() []T { - b.mu.Lock() - defer b.mu.Unlock() - if b.bucketCount == 0 { - return nil - } - // the batchSize is satisfied based on total items - ready := b.totalItems >= b.batchSize || (b.timeout > 0 && time.Since(b.lastFlushTime) >= b.timeout) - if !ready { - return nil - } - // keep track of oldest bucket - oldest := b.buckets[b.head] - if oldest == nil { - return nil - } - items := oldest.items - if oldest.traceID != "" { - delete(b.traceIndex, oldest.traceID) - } - b.buckets[b.head] = nil - b.head = (b.head + 1) % b.bucketCapacity - b.totalItems -= len(items) - b.bucketCount-- - b.lastFlushTime = time.Now() - return items -} - -``` - -#### Scheduler Processing - -```go -func (s *Scheduler) processNextBatch() { - // Select priority from weighted cycle - priority := s.currentCycle[s.cyclePos] - s.cyclePos = (s.cyclePos + 1) % len(s.currentCycle) - - // Find ready buffer for this priority - for category, buffer := range s.buffers { - if buffer.Priority() == priority && - !s.transport.IsRateLimited(category) && - buffer.IsReadyToFlush() { - items := buffer.PollIfReady() - s.sendItems(category, items) - // only process one batch per tick - break - } - } -} -``` - - -#### Flushing - -```go -func (s *Scheduler) flush() { - // should process all store buffers and send to transport - for category, buffer := range s.buffers { - if !buffer.IsEmpty() { - s.processItems(buffer, category, true) - } - } -} - -// The Buffer exposes the flush method that calls both -func (b *Buffer) Flush(timeout time.Time) { - scheduler.flush() - transport.flush(timeout) -} -``` diff --git a/develop-docs/sdk/telemetry/telemetry-buffer/backend-telemetry-buffer.mdx b/develop-docs/sdk/telemetry/telemetry-buffer/backend-telemetry-buffer.mdx index 27c0b9cf2c5b5..7d9307b46aa33 100644 --- a/develop-docs/sdk/telemetry/telemetry-buffer/backend-telemetry-buffer.mdx +++ b/develop-docs/sdk/telemetry/telemetry-buffer/backend-telemetry-buffer.mdx @@ -4,4 +4,288 @@ description: Detailed backend telemetry buffer design. sidebar_order: 1 --- -To be defined — full spec lives here. +## Telemetry Buffer Layer: Prioritized, Bounded, Rate-Aware Envelope Delivery + +### Overview + +The buffer system sits between the SDK client and the HTTP transport layer, ensuring that critical telemetry like errors take priority over high-volume data like logs and traces. This prevents important events from getting lost when your application is under heavy load or sending large amounts of telemetry. + +### Motivation + +- Aggregation lives in a unified buffer layer (this way we avoid creating multiple batch processors for different telemetry types). +- All telemetry types use capture APIs (CaptureX) routed through the Client. +- Rate-limit awareness is built-in across categories. +- Buffers support two modes: normal ring buffer and bucket-by-trace (for spans). +- For spans, dropping an entire trace under pressure is preferable. + +### Architecture Overview + +Introduce a `Buffer` layer between the `Client` and the `Transport`. This `Buffer` wraps prioritization and scheduling and exposes a minimal API to the SDK: + +- Add(item). +- Flush(timeout). +- Close(timeout). + +``` +┌────────────────────────────────────────────────────────────────────────────┐ +│ Client │ +│ captureEvent / captureTransaction / captureCheckIn / captureLog │ +└────────────────────────────────────────────────────────────────────────────┘ + + ▼ +┌────────────────────────────────────────────────────────────────────────────┐ +│ Buffer │ +│ Add(item) · Flush(timeout) · Close(timeout) │ +│ │ +│ ┌──────────────────────┐ ┌──────────────────────┐ ┌──────────────────┐ │ +│ │ Error Store │ │ Check-in Store │ │ Log Store │ │ +│ │ (CRITICAL) │ │ (HIGH) │ │ (MEDIUM) │ │ +│ │ Timeout: N/A │ │ Timeout: N/A │ │ Timeout: 5s │ │ +│ │ BatchSize: 1 │ │ BatchSize: 1 │ │ BatchSize: 100 │ │ +│ └──────────────────────┘ └──────────────────────┘ └──────────────────┘ │ +│ │ │ +│ ▼ │ +│ ┌─────────────────────────────────────────────────────────────────────┐ │ +│ │ Scheduler (Weighted Round-Robin) │ │ +│ │ - Priority weights: CRITICAL=5, HIGH=4, MEDIUM=3, LOW=2, LOWEST=1 │ │ +│ │ - Processes a batch of items based on BatchSize and/or Timeout │ │ +│ │ - Builds envelopes from batch │ │ +│ │ - Submits envelopes to transport │ │ +│ └─────────────────────────────────────────────────────────────────────┘ │ +└────────────────────────────────────────────────────────────────────────────┘ + + ▼ +┌────────────────────────────────────────────────────────────────────────────┐ +│ Transport │ +│ - Single worker, disk cache, offline retry, client reports │ +└────────────────────────────────────────────────────────────────────────────┘ +``` + +#### How the Buffer works + +- **Smart batching**: Logs are batched into single requests; errors, transactions, and monitors are sent immediately. +- **Pre-send rate limiting**: The scheduler checks rate limits before dispatching, avoiding unnecessary requests while keeping items buffered. +- **Category isolation**: Separate ring buffers for each telemetry type prevent head-of-line blocking. +- **Weighted scheduling**: High-priority telemetry gets sent more frequently via round-robin selection. +- **Transport compatibility**: Works with existing HTTP transport implementations without modification. + +### Priorities +- CRITICAL: Error, Feedback. +- HIGH: Session, CheckIn. +- MEDIUM: Transaction, ClientReport, Span. +- LOW: Log, Profile, ProfileChunk. +- LOWEST: Replay. + +Configurable via weights. + +### Components + +#### Storage + +Each telemetry category maintains a store interface; a fixed-size circular array/ring buffer (not to be confused with the `Buffer` wrapper) that stores items before transmission: + +- **Bounded capacity**: Default to 100 items for errors, logs, and monitors; 1000 for transactions. This prevents unbounded memory growth regardless of telemetry volume and backpressure handling. +- **Overflow policies**: + - `drop_oldest` (default): Evicts the oldest item when the buffer is full, making room for new data. + - `drop_newest`: Rejects incoming items when full, preserving what's already queued. +- **Batching configuration**: + - `batchSize`: Number of items to combine into a single batch (1 for errors, transactions, and monitors; 100 for logs). + - `timeout`: Maximum time to wait before sending a partial batch (5 seconds for logs). +- **Bucketed Storage Support**: The storage interface should satisfy both bucketed and single-item implementations, allowing sending spans per trace id (required for Span First). +- **Observability**: Each store tracks dropped item counts for client reports. + +##### Single-item ring buffer (default) + +- Data structure: fixed-size circular array with head/tail indices; O(1) `Offer`/`Poll`. +- Offer semantics: if not full, append; when full, apply `overflowPolicy`: + - `drop_oldest`: evict the oldest item, insert the new one, and invoke the dropped callback with reason `buffer_full_drop_oldest`. + - `drop_newest`: reject the new item and invoke the dropped callback with reason `buffer_full_drop_newest`. +- Readiness: a store is ready when `size >= batchSize` or when `timeout` has elapsed since `lastFlushTime` (and it is non-empty). +- Polling: `PollIfReady()` returns up to `batchSize` items and updates `lastFlushTime`; `Drain()` empties the store. + +##### Bucketed-by-trace storage (spans) + +- **Purpose**: keep spans from the same trace together and flush them as a unit to avoid partial-trace delivery under pressure. This addresses a gap in standard implementations where individual span drops can create incomplete traces. +- **Grouping**: a new bucket is created per trace id; a map (`traceIndex`) provides O(1) lookup. +- **Capacity model**: two limits are enforced—overall `itemCapacity` and a derived `bucketCapacity ~= capacity/10` (minimum 10). +- **Readiness**: when total buffered items reach `batchSize` or `timeout` elapses, the entire oldest bucket is flushed to preserve trace coherence. +- **Overflow behavior**: + - `drop_oldest`: evict the oldest bucket (dropping all its items). Preferred for spans to drop an entire trace. + - `drop_newest`: reject the incoming item (`buffer_full_drop_newest`). +- Lifecycle: empty buckets are removed and their trace ids are purged from the index; `MarkFlushed()` updates `lastFlushTime`. + +##### Trace Consistency Trade-offs + +There still remains a small subset of cases that might result in partial traces, where either an old trace bucket was dropped and a new span with the same trace arrived, or we dropped an incoming span of this trace. +The preferred overflow behavior in most cases should be `drop_oldest` since it results in the fewest incomplete traces from the two scenarios. + +Stores are mapped to [DataCategories](https://github.com/getsentry/relay/blob/master/relay-base-schema/src/data_category.rs), which determine their scheduling priority and rate limits. + +#### Scheduler + +The scheduler runs as a background worker, coordinating the flow of telemetry from storage to the transport: + +- **Initialization**: Constructs a weighted priority cycle (e.g., `[CRITICAL×5, HIGH×4, MEDIUM×3, ...]`) based on configured weights. +- **Event loop**: Wakes when explicitly signaled from the `captureX` methods on the client when new data is available (if the language does not support this, then a periodic ticker can be used). +- **Buffer selection**: Iterates through the priority cycle, selecting buffers that are ready to flush and not rate limited. +- **Rate limit coordination**: Queries the transport's rate limit state before attempting to send any category. +- **Envelope construction**: Converts buffered items into Sentry protocol envelopes. + - Log items are batched together into a single envelope with multiple log entries. + - Other categories typically send one item per envelope. +- **Graceful shutdown**: During client shutdown, force-drains all buffers to prevent data loss. + +#### Transport + +The transport layer handles HTTP communication with Sentry's ingestion endpoints: + +### Configuration + +#### Buffer Options +- **Capacity**: 100 items for errors and check-ins, 10*BATCH_SIZE for logs, 1000 for transactions. +- **Overflow policy**: `drop_oldest`. +- **Batch size**: 1 for errors and monitors (immediate send), 100 for logs. +- **Batch timeout**: 5 seconds for logs. + +#### Scheduler Options +- **Priority weights**: CRITICAL=5, HIGH=4, MEDIUM=3, LOW=2, LOWEST=1. + +### Implementation Example (Go) + +The `sentry-go` SDK provides a reference implementation of this architecture: + +#### Storage Interface + +```go +type Storage[T any] interface { + // Core operations + Offer(item T) bool + Poll() (T, bool) + PollBatch(maxItems int) []T + PollIfReady() []T + Drain() []T + Peek() (T, bool) + + // State queries + Size() int + Capacity() int + IsEmpty() bool + IsFull() bool + Utilization() float64 + + // Flush management + IsReadyToFlush() bool + MarkFlushed() + + // Category/Priority + Category() ratelimit.Category + Priority() ratelimit.Priority +} + + +// Single item store +func (b *RingBuffer[T]) PollIfReady() []T { + b.mu.Lock() + defer b.mu.Unlock() + + if b.size == 0 { + return nil + } + + ready := b.size >= b.batchSize || + (b.timeout > 0 && time.Since(b.lastFlushTime) >= b.timeout) + + if !ready { + return nil + } + + itemCount := b.batchSize + if itemCount > b.size { + itemCount = b.size + } + + result := make([]T, itemCount) + var zero T + + for i := 0; i < itemCount; i++ { + result[i] = b.items[b.head] + b.items[b.head] = zero + b.head = (b.head + 1) % b.capacity + b.size-- + } + + b.lastFlushTime = time.Now() + return result +} + +// Bucketed store +func (b *BucketedBuffer[T]) PollIfReady() []T { + b.mu.Lock() + defer b.mu.Unlock() + if b.bucketCount == 0 { + return nil + } + // the batchSize is satisfied based on total items + ready := b.totalItems >= b.batchSize || (b.timeout > 0 && time.Since(b.lastFlushTime) >= b.timeout) + if !ready { + return nil + } + // keep track of oldest bucket + oldest := b.buckets[b.head] + if oldest == nil { + return nil + } + items := oldest.items + if oldest.traceID != "" { + delete(b.traceIndex, oldest.traceID) + } + b.buckets[b.head] = nil + b.head = (b.head + 1) % b.bucketCapacity + b.totalItems -= len(items) + b.bucketCount-- + b.lastFlushTime = time.Now() + return items +} + +``` + +#### Scheduler Processing + +```go +func (s *Scheduler) processNextBatch() { + // Select priority from weighted cycle + priority := s.currentCycle[s.cyclePos] + s.cyclePos = (s.cyclePos + 1) % len(s.currentCycle) + + // Find ready buffer for this priority + for category, buffer := range s.buffers { + if buffer.Priority() == priority && + !s.transport.IsRateLimited(category) && + buffer.IsReadyToFlush() { + items := buffer.PollIfReady() + s.sendItems(category, items) + // only process one batch per tick + break + } + } +} +``` + + +#### Flushing + +```go +func (s *Scheduler) flush() { + // should process all store buffers and send to transport + for category, buffer := range s.buffers { + if !buffer.IsEmpty() { + s.processItems(buffer, category, true) + } + } +} + +// The Buffer exposes the flush method that calls both +func (b *Buffer) Flush(timeout time.Time) { + scheduler.flush() + transport.flush(timeout) +} +``` From 690c090870e66d2b15aaa7779822b07e803e7d9b Mon Sep 17 00:00:00 2001 From: Giannis Gkiortzis Date: Mon, 17 Nov 2025 17:27:36 +0100 Subject: [PATCH 4/9] add drop specifications --- .../backend-telemetry-buffer.mdx | 34 +++++++++++++------ 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/develop-docs/sdk/telemetry/telemetry-buffer/backend-telemetry-buffer.mdx b/develop-docs/sdk/telemetry/telemetry-buffer/backend-telemetry-buffer.mdx index 7d9307b46aa33..ba9729c893ee6 100644 --- a/develop-docs/sdk/telemetry/telemetry-buffer/backend-telemetry-buffer.mdx +++ b/develop-docs/sdk/telemetry/telemetry-buffer/backend-telemetry-buffer.mdx @@ -64,7 +64,8 @@ Introduce a `Buffer` layer between the `Client` and the `Transport`. This `Buffe #### How the Buffer works - **Smart batching**: Logs are batched into single requests; errors, transactions, and monitors are sent immediately. -- **Pre-send rate limiting**: The scheduler checks rate limits before dispatching, avoiding unnecessary requests while keeping items buffered. +- **Pre-send rate limiting**: The scheduler checks rate limits before serialization to avoid unnecessary processing. When a telemetry is rate-limited the selected batch should +be dropped, to avoid filling up the buffers. - **Category isolation**: Separate ring buffers for each telemetry type prevent head-of-line blocking. - **Weighted scheduling**: High-priority telemetry gets sent more frequently via round-robin selection. - **Transport compatibility**: Works with existing HTTP transport implementations without modification. @@ -136,11 +137,20 @@ The scheduler runs as a background worker, coordinating the flow of telemetry fr #### Transport -The transport layer handles HTTP communication with Sentry's ingestion endpoints: +The transport layer handles HTTP communication with Sentry's ingestion endpoints. + + + +The only layer responsible for dropping events is the Buffer. In case that the transport is full, then the Buffer should drop the batch. + + ### Configuration -#### Buffer Options +#### Transport Options +- **Capacity**: 1000 items. + +#### Telemetry Buffer Options - **Capacity**: 100 items for errors and check-ins, 10*BATCH_SIZE for logs, 1000 for transactions. - **Overflow policy**: `drop_oldest`. - **Batch size**: 1 for errors and monitors (immediate send), 100 for logs. @@ -258,13 +268,15 @@ func (s *Scheduler) processNextBatch() { // Find ready buffer for this priority for category, buffer := range s.buffers { - if buffer.Priority() == priority && - !s.transport.IsRateLimited(category) && - buffer.IsReadyToFlush() { - items := buffer.PollIfReady() - s.sendItems(category, items) - // only process one batch per tick - break + if buffer.Priority() == priority && buffer.IsReadyToFlush() { + items := buffer.PollIfReady() + if s.transport.IsRateLimited(category) { + // drop the batch and return + return + } + s.sendItems(category, items) + // only process one batch per tick + break } } } @@ -276,7 +288,7 @@ func (s *Scheduler) processNextBatch() { ```go func (s *Scheduler) flush() { // should process all store buffers and send to transport - for category, buffer := range s.buffers { + for category, buffer := range s.buffers { if !buffer.IsEmpty() { s.processItems(buffer, category, true) } From 2cad4c93802d9686bf2a0852a97f81052872bda5 Mon Sep 17 00:00:00 2001 From: Giannis Gkiortzis Date: Tue, 25 Nov 2025 11:34:45 +0100 Subject: [PATCH 5/9] chore: wording fixes --- .../telemetry/telemetry-buffer/backend-telemetry-buffer.mdx | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/develop-docs/sdk/telemetry/telemetry-buffer/backend-telemetry-buffer.mdx b/develop-docs/sdk/telemetry/telemetry-buffer/backend-telemetry-buffer.mdx index ba9729c893ee6..5a149963549e5 100644 --- a/develop-docs/sdk/telemetry/telemetry-buffer/backend-telemetry-buffer.mdx +++ b/develop-docs/sdk/telemetry/telemetry-buffer/backend-telemetry-buffer.mdx @@ -86,14 +86,14 @@ Configurable via weights. Each telemetry category maintains a store interface; a fixed-size circular array/ring buffer (not to be confused with the `Buffer` wrapper) that stores items before transmission: - **Bounded capacity**: Default to 100 items for errors, logs, and monitors; 1000 for transactions. This prevents unbounded memory growth regardless of telemetry volume and backpressure handling. -- **Overflow policies**: - - `drop_oldest` (default): Evicts the oldest item when the buffer is full, making room for new data. +- **Overflow policies** (optional): + - `drop_oldest` (default): Evicts the oldest item when the buffer is full, making room for new data. This should be the normal SDK behavior. - `drop_newest`: Rejects incoming items when full, preserving what's already queued. - **Batching configuration**: - `batchSize`: Number of items to combine into a single batch (1 for errors, transactions, and monitors; 100 for logs). - `timeout`: Maximum time to wait before sending a partial batch (5 seconds for logs). - **Bucketed Storage Support**: The storage interface should satisfy both bucketed and single-item implementations, allowing sending spans per trace id (required for Span First). -- **Observability**: Each store tracks dropped item counts for client reports. +- **Observability**: Each store tracks dropped item counts for client reports. Depending on implementation we can either send reports by adding them to existing events or on a timeout basis (Consult [Client Reports](https://develop.sentry.dev/sdk/telemetry/client-reports) for more details). In both cases our source of truth is the dropped counter. ##### Single-item ring buffer (default) From 304cd53f68b9a9394793f6926e3cb3b4f6a63233 Mon Sep 17 00:00:00 2001 From: Giannis Gkiortzis Date: Tue, 25 Nov 2025 11:36:30 +0100 Subject: [PATCH 6/9] chore: use relative link --- .../sdk/telemetry/telemetry-buffer/backend-telemetry-buffer.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/develop-docs/sdk/telemetry/telemetry-buffer/backend-telemetry-buffer.mdx b/develop-docs/sdk/telemetry/telemetry-buffer/backend-telemetry-buffer.mdx index 5a149963549e5..ed62b26e262f4 100644 --- a/develop-docs/sdk/telemetry/telemetry-buffer/backend-telemetry-buffer.mdx +++ b/develop-docs/sdk/telemetry/telemetry-buffer/backend-telemetry-buffer.mdx @@ -93,7 +93,7 @@ Each telemetry category maintains a store interface; a fixed-size circular array - `batchSize`: Number of items to combine into a single batch (1 for errors, transactions, and monitors; 100 for logs). - `timeout`: Maximum time to wait before sending a partial batch (5 seconds for logs). - **Bucketed Storage Support**: The storage interface should satisfy both bucketed and single-item implementations, allowing sending spans per trace id (required for Span First). -- **Observability**: Each store tracks dropped item counts for client reports. Depending on implementation we can either send reports by adding them to existing events or on a timeout basis (Consult [Client Reports](https://develop.sentry.dev/sdk/telemetry/client-reports) for more details). In both cases our source of truth is the dropped counter. +- **Observability**: Each store tracks dropped item counts for client reports. Depending on implementation we can either send reports by adding them to existing events or on a timeout basis (Consult [Client Reports](/sdk/telemetry/client-reports/) for more details). In both cases our source of truth is the dropped counter. ##### Single-item ring buffer (default) From 1f5b54fc2ab80956a4bed66d34c6a369c635901e Mon Sep 17 00:00:00 2001 From: Giannis Gkiortzis Date: Tue, 25 Nov 2025 11:48:28 +0100 Subject: [PATCH 7/9] chore: remove client reports line --- .../sdk/telemetry/telemetry-buffer/backend-telemetry-buffer.mdx | 1 - 1 file changed, 1 deletion(-) diff --git a/develop-docs/sdk/telemetry/telemetry-buffer/backend-telemetry-buffer.mdx b/develop-docs/sdk/telemetry/telemetry-buffer/backend-telemetry-buffer.mdx index ed62b26e262f4..5153ebaf795b4 100644 --- a/develop-docs/sdk/telemetry/telemetry-buffer/backend-telemetry-buffer.mdx +++ b/develop-docs/sdk/telemetry/telemetry-buffer/backend-telemetry-buffer.mdx @@ -93,7 +93,6 @@ Each telemetry category maintains a store interface; a fixed-size circular array - `batchSize`: Number of items to combine into a single batch (1 for errors, transactions, and monitors; 100 for logs). - `timeout`: Maximum time to wait before sending a partial batch (5 seconds for logs). - **Bucketed Storage Support**: The storage interface should satisfy both bucketed and single-item implementations, allowing sending spans per trace id (required for Span First). -- **Observability**: Each store tracks dropped item counts for client reports. Depending on implementation we can either send reports by adding them to existing events or on a timeout basis (Consult [Client Reports](/sdk/telemetry/client-reports/) for more details). In both cases our source of truth is the dropped counter. ##### Single-item ring buffer (default) From bd49c968d4343f588731a8072038e9e475a0dfba Mon Sep 17 00:00:00 2001 From: Giannis Gkiortzis Date: Tue, 25 Nov 2025 11:56:29 +0100 Subject: [PATCH 8/9] update snippet examples --- .../backend-telemetry-buffer.mdx | 62 +++++++++++++------ 1 file changed, 44 insertions(+), 18 deletions(-) diff --git a/develop-docs/sdk/telemetry/telemetry-buffer/backend-telemetry-buffer.mdx b/develop-docs/sdk/telemetry/telemetry-buffer/backend-telemetry-buffer.mdx index 5153ebaf795b4..232983aea9ce0 100644 --- a/develop-docs/sdk/telemetry/telemetry-buffer/backend-telemetry-buffer.mdx +++ b/develop-docs/sdk/telemetry/telemetry-buffer/backend-telemetry-buffer.mdx @@ -260,24 +260,50 @@ func (b *BucketedBuffer[T]) PollIfReady() []T { #### Scheduler Processing ```go +func (s *Scheduler) run() { + for { + s.mu.Lock() + + for !s.hasWork() && s.ctx.Err() == nil { + // signal the scheduler to sleep till we receive a signal for an added item. + s.cond.Wait() + } + + s.mu.Unlock() + s.processNextBatch() + } +} + +func (s *Scheduler) hasWork() bool { + for _, buffer := range s.buffers { + if buffer.IsReadyToFlush() { + return true + } + } + return false +} + func (s *Scheduler) processNextBatch() { - // Select priority from weighted cycle - priority := s.currentCycle[s.cyclePos] - s.cyclePos = (s.cyclePos + 1) % len(s.currentCycle) - - // Find ready buffer for this priority - for category, buffer := range s.buffers { - if buffer.Priority() == priority && buffer.IsReadyToFlush() { - items := buffer.PollIfReady() - if s.transport.IsRateLimited(category) { - // drop the batch and return - return - } - s.sendItems(category, items) - // only process one batch per tick - break - } - } + if len(s.currentCycle) == 0 { + return + } + + priority := s.currentCycle[s.cyclePos] + s.cyclePos = (s.cyclePos + 1) % len(s.currentCycle) + + var bufferToProcess Storage[protocol.EnvelopeItemConvertible] + var categoryToProcess ratelimit.Category + for category, buffer := range s.buffers { + if buffer.Priority() == priority && buffer.IsReadyToFlush() { + bufferToProcess = buffer + categoryToProcess = category + break + } + } + + if bufferToProcess != nil { + s.processItems(bufferToProcess, categoryToProcess, false) + } } ``` @@ -295,7 +321,7 @@ func (s *Scheduler) flush() { } // The Buffer exposes the flush method that calls both -func (b *Buffer) Flush(timeout time.Time) { +func (b *Buffer) Flush(timeout time.Duration) { scheduler.flush() transport.flush(timeout) } From bce0b1bee8fc47fb7b2b02ec17549c6888391912 Mon Sep 17 00:00:00 2001 From: Giannis Gkiortzis Date: Tue, 25 Nov 2025 12:00:02 +0100 Subject: [PATCH 9/9] fix: diagram priorities --- .../sdk/telemetry/telemetry-buffer/backend-telemetry-buffer.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/develop-docs/sdk/telemetry/telemetry-buffer/backend-telemetry-buffer.mdx b/develop-docs/sdk/telemetry/telemetry-buffer/backend-telemetry-buffer.mdx index 232983aea9ce0..bbcfe96595fdd 100644 --- a/develop-docs/sdk/telemetry/telemetry-buffer/backend-telemetry-buffer.mdx +++ b/develop-docs/sdk/telemetry/telemetry-buffer/backend-telemetry-buffer.mdx @@ -39,7 +39,7 @@ Introduce a `Buffer` layer between the `Client` and the `Transport`. This `Buffe │ │ │ ┌──────────────────────┐ ┌──────────────────────┐ ┌──────────────────┐ │ │ │ Error Store │ │ Check-in Store │ │ Log Store │ │ -│ │ (CRITICAL) │ │ (HIGH) │ │ (MEDIUM) │ │ +│ │ (CRITICAL) │ │ (HIGH) │ │ (LOW) │ │ │ │ Timeout: N/A │ │ Timeout: N/A │ │ Timeout: 5s │ │ │ │ BatchSize: 1 │ │ BatchSize: 1 │ │ BatchSize: 100 │ │ │ └──────────────────────┘ └──────────────────────┘ └──────────────────┘ │