diff --git a/CLAUDE.md b/CLAUDE.md index 62368de..a410516 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -217,6 +217,7 @@ Key settings in `internal/config/config.go`: - `LOG_FTS_ENABLED` (false) — when truthy (`true`/`yes`/`on`/`1`), provisions the SQLite FTS5 `logs_fts` virtual table + sync triggers at startup; when false, log-search uses vectordb (semantic) plus a 24h-clamped LIKE fallback. Toggle off and reclaim disk via `POST /api/admin/drop_fts` (refused while the flag is on). - `DLQ_MAX_FILES` (1000), `DLQ_MAX_DISK_MB` (500), `DLQ_MAX_RETRIES` (10) - `GRAPHRAG_WORKER_COUNT` (16), `GRAPHRAG_EVENT_QUEUE_SIZE` (100000) — sized for 100–200 services; raise further if `otelcontext_graphrag_events_dropped_total` climbs +- `INGEST_MIN_SEVERITY` (`INFO`), `STORE_MIN_SEVERITY` (`""` = same as ingest) — two-tier log severity gate. The ingest gate runs at the OTLP receiver and **drops the log entirely** below the threshold (no in-memory enrichment either). The store gate runs at the persist boundary inside the async pipeline (`internal/ingest/pipeline.go:process`) and **only skips the DB row write** — the log still flows through `LogCallback` so vectordb indexing, GraphRAG Drain template mining, and span/trace correlation see it. Use case: `INGEST_MIN_SEVERITY=DEBUG STORE_MIN_SEVERITY=WARN` keeps SQLite small while letting in-memory anomaly detection benefit from the verbose stream. Setting `STORE_MIN_SEVERITY` ≤ `INGEST_MIN_SEVERITY` is a no-op (logged as a warning at startup). Drops surface via `Pipeline.Stats().StoreFiltered`. - `INGEST_ASYNC_ENABLED` (true), `INGEST_PIPELINE_QUEUE_SIZE` (50000), `INGEST_PIPELINE_WORKERS` (8) — async ingest pipeline (`internal/ingest/pipeline.go`). Hybrid backpressure: <90% accept all, 90–100% drop healthy batches (errors/slow always pass), 100% return gRPC `RESOURCE_EXHAUSTED`. Set `INGEST_ASYNC_ENABLED=false` to revert to synchronous DB writes inside `Export()`. Drops surface as `otelcontext_ingest_pipeline_dropped_total{signal,reason}`. - `GRPC_MAX_RECV_MB` (16), `GRPC_MAX_CONCURRENT_STREAMS` (1000) — OTLP gRPC server caps, validated to 1..256 and 1..1_000_000 - `RETENTION_BATCH_SIZE` (50000), `RETENTION_BATCH_SLEEP_MS` (1) — purge pacing; raise the sleep on busy production DBs diff --git a/internal/config/config.go b/internal/config/config.go index 18a7047..68c6423 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -25,6 +25,14 @@ type Config struct { IngestAllowedServices string IngestExcludedServices string + // Storage Filtering. Logs that pass IngestMinSeverity (so they reach the + // receiver and feed in-memory consumers like vectordb / GraphRAG) but + // fall below StoreMinSeverity are skipped during the DB persist pass — + // only the row-write is dropped, not the in-memory enrichment. Empty + // (default) means StoreMinSeverity == IngestMinSeverity, i.e. no + // behavior change vs. the single-threshold semantics. + StoreMinSeverity string + // DB Connection Pool DBMaxOpenConns int DBMaxIdleConns int @@ -244,6 +252,7 @@ func Load(customPath string) (*Config, error) { DLQReplayInterval: getEnv("DLQ_REPLAY_INTERVAL", "5m"), IngestMinSeverity: getEnv("INGEST_MIN_SEVERITY", "INFO"), + StoreMinSeverity: getEnv("STORE_MIN_SEVERITY", ""), IngestAllowedServices: getEnv("INGEST_ALLOWED_SERVICES", ""), IngestExcludedServices: getEnv("INGEST_EXCLUDED_SERVICES", ""), diff --git a/internal/ingest/otlp.go b/internal/ingest/otlp.go index dce13d6..d78d054 100644 --- a/internal/ingest/otlp.go +++ b/internal/ingest/otlp.go @@ -682,6 +682,11 @@ func getServiceName(attrs []*commonpb.KeyValue) string { return "unknown-service" } +// ParseSeverity is the exported wrapper for parseSeverity. Used by main.go +// to translate the STORE_MIN_SEVERITY env value into the integer rank the +// pipeline's second-tier filter expects. +func ParseSeverity(level string) int { return parseSeverity(level) } + // Filtering Helpers func parseSeverity(level string) int { switch strings.ToUpper(level) { diff --git a/internal/ingest/pipeline.go b/internal/ingest/pipeline.go index b28ca83..868a300 100644 --- a/internal/ingest/pipeline.go +++ b/internal/ingest/pipeline.go @@ -152,6 +152,15 @@ type Pipeline struct { tenantMu sync.Mutex tenantInFlight map[string]int + // storeMinSeverity is the second-tier severity gate applied at persist + // time inside process(). Logs in a Batch with severity below this + // threshold are dropped from the BatchCreateAll write but still feed + // the LogCallback (so vectordb / GraphRAG / Drain mining still see + // them). 0 disables the second tier — every log that survived + // IngestMinSeverity at the receiver is also persisted. + storeMinSeverity int + storeFiltered atomic.Int64 + stopCh chan struct{} once sync.Once wg sync.WaitGroup @@ -225,6 +234,22 @@ func (p *Pipeline) SetPerTenantCap(n int) { p.perTenantCap = n } +// SetStoreMinSeverity configures the second-tier severity gate applied at +// persist time. Logs below `level` are dropped from the BatchCreateAll write +// but still flow through the LogCallback so in-memory consumers (vectordb, +// GraphRAG Drain mining, anomaly correlation) keep working. 0 disables the +// second tier — every log surviving IngestMinSeverity at the receiver is +// also persisted (legacy behavior). +// +// `level` is the integer rank from parseSeverity ("DEBUG"=10 .. "FATAL"=50). +// Startup-only — call before Start(). +func (p *Pipeline) SetStoreMinSeverity(level int) { + if level < 0 { + level = 0 + } + p.storeMinSeverity = level +} + // TenantDropped reports the cumulative number of healthy submissions // rejected because the submitting tenant was at the per-tenant cap. // Distinct from RejectedFull (queue at hard capacity) and @@ -353,6 +378,7 @@ func (p *Pipeline) Stats() PipelineStats { DroppedHealthy: p.droppedHealthy.Load(), RejectedFull: p.rejectedFull.Load(), ProcessFailures: p.processFailures.Load(), + StoreFiltered: p.storeFiltered.Load(), QueueDepth: len(p.queue), Capacity: p.cfg.Capacity, } @@ -365,6 +391,7 @@ type PipelineStats struct { DroppedHealthy int64 RejectedFull int64 ProcessFailures int64 + StoreFiltered int64 // logs dropped by STORE_MIN_SEVERITY at persist time QueueDepth int Capacity int } @@ -434,7 +461,23 @@ func (p *Pipeline) process(b *Batch) { return } - if err := p.writer.BatchCreateAll(b.Traces, b.Spans, b.Logs); err != nil { + // Apply the second-tier store-severity gate. Logs below the threshold + // are dropped from the persist set but still flow through the callback + // so in-memory enrichers (vectordb, GraphRAG Drain) keep seeing them. + logsToPersist := b.Logs + if p.storeMinSeverity > 0 && len(b.Logs) > 0 { + kept := make([]storage.Log, 0, len(b.Logs)) + for _, l := range b.Logs { + if shouldIngestSeverity(l.Severity, p.storeMinSeverity) { + kept = append(kept, l) + } else { + p.storeFiltered.Add(1) + } + } + logsToPersist = kept + } + + if err := p.writer.BatchCreateAll(b.Traces, b.Spans, logsToPersist); err != nil { slog.Error("ingest pipeline: BatchCreateAll failed", "error", err) p.processFailures.Add(1) return @@ -442,7 +485,9 @@ func (p *Pipeline) process(b *Batch) { // Callbacks fire only after the transaction commits successfully — a // rolled-back batch must not feed downstream consumers (GraphRAG etc.) - // data that no longer exists in the DB. + // data that no longer exists in the DB. The LogCallback intentionally + // iterates over the FULL b.Logs slice, not logsToPersist — even logs + // dropped by the store-severity gate must reach in-memory enrichers. if b.SpanCallback != nil { for _, s := range b.Spans { b.SpanCallback(s) diff --git a/internal/ingest/pipeline_test.go b/internal/ingest/pipeline_test.go index 1374f13..fdf1a9a 100644 --- a/internal/ingest/pipeline_test.go +++ b/internal/ingest/pipeline_test.go @@ -547,3 +547,127 @@ func TestPipeline_PanicInCallbackRecovered(t *testing.T) { t.Errorf("expected ProcessFailures > 0 after callback panic") } } + +// TestPipeline_StoreMinSeverity_DropsBelowThresholdFromPersist verifies that +// when SetStoreMinSeverity is configured, logs below the threshold are +// dropped from BatchCreateAll — but the LogCallback still fires for them +// so in-memory enrichers (vectordb, GraphRAG) keep seeing every log that +// passed IngestMinSeverity at the receiver. +func TestPipeline_StoreMinSeverity_DropsBelowThresholdFromPersist(t *testing.T) { + t.Parallel() + w := &fakeWriter{} + p := NewPipeline(w, nil, PipelineConfig{Capacity: 10, Workers: 1, SoftThreshold: 0.9}) + // Threshold = WARN (rank 30); INFO (20) is below, ERROR (40) is above. + p.SetStoreMinSeverity(ParseSeverity("WARN")) + p.Start(context.Background()) + defer p.Stop() + + var callbackSeen []string + var cbMu sync.Mutex + cb := func(l storage.Log) { + cbMu.Lock() + defer cbMu.Unlock() + callbackSeen = append(callbackSeen, l.Severity) + } + + b := &Batch{ + Type: SignalLogs, + Tenant: "t1", + Logs: []storage.Log{ + {Body: "info-row", Severity: "INFO"}, + {Body: "warn-row", Severity: "WARN"}, + {Body: "err-row", Severity: "ERROR"}, + }, + LogCallback: cb, + } + if err := p.Submit(b); err != nil { + t.Fatalf("submit: %v", err) + } + + if !waitFor(t, 5*time.Second, func() bool { return p.Stats().Processed >= 1 }) { + t.Fatalf("batch never processed") + } + + // Persist: only WARN + ERROR should reach the writer. + w.mu.Lock() + persistedCount := 0 + persistedSeverities := []string{} + for _, call := range w.logsCalls { + for _, l := range call { + persistedCount++ + persistedSeverities = append(persistedSeverities, l.Severity) + } + } + w.mu.Unlock() + if persistedCount != 2 { + t.Fatalf("expected 2 logs persisted (WARN+ERROR), got %d: %v", persistedCount, persistedSeverities) + } + for _, sev := range persistedSeverities { + if sev == "INFO" { + t.Errorf("INFO log was persisted but should have been gated by store-min-severity") + } + } + + // Callback: must fire for ALL THREE logs (INFO included), since the + // in-memory enrichment path is independent of the persist gate. + cbMu.Lock() + defer cbMu.Unlock() + if len(callbackSeen) != 3 { + t.Fatalf("expected LogCallback to fire 3 times (incl. gated INFO), got %d: %v", len(callbackSeen), callbackSeen) + } + infoCb := false + for _, sev := range callbackSeen { + if sev == "INFO" { + infoCb = true + } + } + if !infoCb { + t.Errorf("INFO log did not reach LogCallback — in-memory enrichment path broken: %v", callbackSeen) + } + + // Stats: storeFiltered should report exactly 1 (the INFO drop). + if got := p.Stats().StoreFiltered; got != 1 { + t.Errorf("Stats().StoreFiltered = %d, want 1", got) + } +} + +// TestPipeline_StoreMinSeverity_Disabled_PersistsAllLogs verifies the legacy +// path: when SetStoreMinSeverity is NOT called (or set to 0), every log in a +// Batch is persisted regardless of severity. +func TestPipeline_StoreMinSeverity_Disabled_PersistsAllLogs(t *testing.T) { + t.Parallel() + w := &fakeWriter{} + p := NewPipeline(w, nil, PipelineConfig{Capacity: 10, Workers: 1, SoftThreshold: 0.9}) + // No SetStoreMinSeverity call → gate disabled. + p.Start(context.Background()) + defer p.Stop() + + b := &Batch{ + Type: SignalLogs, + Tenant: "t1", + Logs: []storage.Log{ + {Body: "info-row", Severity: "INFO"}, + {Body: "debug-row", Severity: "DEBUG"}, + {Body: "err-row", Severity: "ERROR"}, + }, + } + if err := p.Submit(b); err != nil { + t.Fatalf("submit: %v", err) + } + if !waitFor(t, 5*time.Second, func() bool { return p.Stats().Processed >= 1 }) { + t.Fatalf("batch never processed") + } + + w.mu.Lock() + defer w.mu.Unlock() + total := 0 + for _, call := range w.logsCalls { + total += len(call) + } + if total != 3 { + t.Fatalf("expected all 3 logs persisted with gate disabled, got %d", total) + } + if got := p.Stats().StoreFiltered; got != 0 { + t.Errorf("Stats().StoreFiltered = %d, want 0 with gate disabled", got) + } +} diff --git a/main.go b/main.go index bbfbe47..7bd412f 100644 --- a/main.go +++ b/main.go @@ -505,6 +505,32 @@ func main() { Workers: cfg.IngestPipelineWorkers, }) ingestPipeline.SetPerTenantCap(cfg.IngestPipelinePerTenantCap) + + // Second-tier severity gate. Empty STORE_MIN_SEVERITY means "use the + // same threshold as INGEST_MIN_SEVERITY" — i.e. behavior is identical + // to the legacy single-threshold path. Only enable the gate when the + // store threshold is strictly higher than the ingest threshold; equal + // or lower is wasted work since the receiver has already dropped the + // affected logs. + ingestRank := ingest.ParseSeverity(cfg.IngestMinSeverity) + storeRank := ingestRank + if cfg.StoreMinSeverity != "" { + storeRank = ingest.ParseSeverity(cfg.StoreMinSeverity) + } + if storeRank > ingestRank { + ingestPipeline.SetStoreMinSeverity(storeRank) + slog.Info("🪛 Store-severity gate enabled", + "ingest_min", cfg.IngestMinSeverity, + "store_min", cfg.StoreMinSeverity, + "note", "logs below store_min reach in-memory consumers but are not persisted", + ) + } else if cfg.StoreMinSeverity != "" && storeRank < ingestRank { + slog.Warn("STORE_MIN_SEVERITY is lower than INGEST_MIN_SEVERITY — has no effect; receiver already filters", + "ingest_min", cfg.IngestMinSeverity, + "store_min", cfg.StoreMinSeverity, + ) + } + ingestPipeline.Start(context.Background()) traceServer.SetPipeline(ingestPipeline) logsServer.SetPipeline(ingestPipeline)