Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ Key settings in `internal/config/config.go`:
- `LOG_FTS_ENABLED` (false) — when truthy (`true`/`yes`/`on`/`1`), provisions the SQLite FTS5 `logs_fts` virtual table + sync triggers at startup; when false, log-search uses vectordb (semantic) plus a 24h-clamped LIKE fallback. Toggle off and reclaim disk via `POST /api/admin/drop_fts` (refused while the flag is on).
- `DLQ_MAX_FILES` (1000), `DLQ_MAX_DISK_MB` (500), `DLQ_MAX_RETRIES` (10)
- `GRAPHRAG_WORKER_COUNT` (16), `GRAPHRAG_EVENT_QUEUE_SIZE` (100000) — sized for 100–200 services; raise further if `otelcontext_graphrag_events_dropped_total` climbs
- `INGEST_MIN_SEVERITY` (`INFO`), `STORE_MIN_SEVERITY` (`""` = same as ingest) — two-tier log severity gate. The ingest gate runs at the OTLP receiver and **drops the log entirely** below the threshold (no in-memory enrichment either). The store gate runs at the persist boundary inside the async pipeline (`internal/ingest/pipeline.go:process`) and **only skips the DB row write** — the log still flows through `LogCallback` so vectordb indexing, GraphRAG Drain template mining, and span/trace correlation see it. Use case: `INGEST_MIN_SEVERITY=DEBUG STORE_MIN_SEVERITY=WARN` keeps SQLite small while letting in-memory anomaly detection benefit from the verbose stream. Setting `STORE_MIN_SEVERITY` ≤ `INGEST_MIN_SEVERITY` is a no-op (logged as a warning at startup). Drops surface via `Pipeline.Stats().StoreFiltered`.
- `INGEST_ASYNC_ENABLED` (true), `INGEST_PIPELINE_QUEUE_SIZE` (50000), `INGEST_PIPELINE_WORKERS` (8) — async ingest pipeline (`internal/ingest/pipeline.go`). Hybrid backpressure: <90% accept all, 90–100% drop healthy batches (errors/slow always pass), 100% return gRPC `RESOURCE_EXHAUSTED`. Set `INGEST_ASYNC_ENABLED=false` to revert to synchronous DB writes inside `Export()`. Drops surface as `otelcontext_ingest_pipeline_dropped_total{signal,reason}`.
- `GRPC_MAX_RECV_MB` (16), `GRPC_MAX_CONCURRENT_STREAMS` (1000) — OTLP gRPC server caps, validated to 1..256 and 1..1_000_000
- `RETENTION_BATCH_SIZE` (50000), `RETENTION_BATCH_SLEEP_MS` (1) — purge pacing; raise the sleep on busy production DBs
Expand Down
9 changes: 9 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ type Config struct {
IngestAllowedServices string
IngestExcludedServices string

// Storage Filtering. Logs that pass IngestMinSeverity (so they reach the
// receiver and feed in-memory consumers like vectordb / GraphRAG) but
// fall below StoreMinSeverity are skipped during the DB persist pass —
// only the row-write is dropped, not the in-memory enrichment. Empty
// (default) means StoreMinSeverity == IngestMinSeverity, i.e. no
// behavior change vs. the single-threshold semantics.
StoreMinSeverity string

// DB Connection Pool
DBMaxOpenConns int
DBMaxIdleConns int
Expand Down Expand Up @@ -244,6 +252,7 @@ func Load(customPath string) (*Config, error) {
DLQReplayInterval: getEnv("DLQ_REPLAY_INTERVAL", "5m"),

IngestMinSeverity: getEnv("INGEST_MIN_SEVERITY", "INFO"),
StoreMinSeverity: getEnv("STORE_MIN_SEVERITY", ""),
IngestAllowedServices: getEnv("INGEST_ALLOWED_SERVICES", ""),
IngestExcludedServices: getEnv("INGEST_EXCLUDED_SERVICES", ""),

Expand Down
5 changes: 5 additions & 0 deletions internal/ingest/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,11 @@ func getServiceName(attrs []*commonpb.KeyValue) string {
return "unknown-service"
}

// ParseSeverity is the exported wrapper for parseSeverity. Used by main.go
// to translate the STORE_MIN_SEVERITY env value into the integer rank the
// pipeline's second-tier filter expects.
func ParseSeverity(level string) int { return parseSeverity(level) }

// Filtering Helpers
func parseSeverity(level string) int {
switch strings.ToUpper(level) {
Expand Down
49 changes: 47 additions & 2 deletions internal/ingest/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,15 @@ type Pipeline struct {
tenantMu sync.Mutex
tenantInFlight map[string]int

// storeMinSeverity is the second-tier severity gate applied at persist
// time inside process(). Logs in a Batch with severity below this
// threshold are dropped from the BatchCreateAll write but still feed
// the LogCallback (so vectordb / GraphRAG / Drain mining still see
// them). 0 disables the second tier — every log that survived
// IngestMinSeverity at the receiver is also persisted.
storeMinSeverity int
storeFiltered atomic.Int64

stopCh chan struct{}
once sync.Once
wg sync.WaitGroup
Expand Down Expand Up @@ -225,6 +234,22 @@ func (p *Pipeline) SetPerTenantCap(n int) {
p.perTenantCap = n
}

// SetStoreMinSeverity configures the second-tier severity gate applied at
// persist time. Logs below `level` are dropped from the BatchCreateAll write
// but still flow through the LogCallback so in-memory consumers (vectordb,
// GraphRAG Drain mining, anomaly correlation) keep working. 0 disables the
// second tier — every log surviving IngestMinSeverity at the receiver is
// also persisted (legacy behavior).
//
// `level` is the integer rank from parseSeverity ("DEBUG"=10 .. "FATAL"=50).
// Startup-only — call before Start().
func (p *Pipeline) SetStoreMinSeverity(level int) {
if level < 0 {
level = 0
}
p.storeMinSeverity = level
}

// TenantDropped reports the cumulative number of healthy submissions
// rejected because the submitting tenant was at the per-tenant cap.
// Distinct from RejectedFull (queue at hard capacity) and
Expand Down Expand Up @@ -353,6 +378,7 @@ func (p *Pipeline) Stats() PipelineStats {
DroppedHealthy: p.droppedHealthy.Load(),
RejectedFull: p.rejectedFull.Load(),
ProcessFailures: p.processFailures.Load(),
StoreFiltered: p.storeFiltered.Load(),
QueueDepth: len(p.queue),
Capacity: p.cfg.Capacity,
}
Expand All @@ -365,6 +391,7 @@ type PipelineStats struct {
DroppedHealthy int64
RejectedFull int64
ProcessFailures int64
StoreFiltered int64 // logs dropped by STORE_MIN_SEVERITY at persist time
QueueDepth int
Capacity int
}
Expand Down Expand Up @@ -434,15 +461,33 @@ func (p *Pipeline) process(b *Batch) {
return
}

if err := p.writer.BatchCreateAll(b.Traces, b.Spans, b.Logs); err != nil {
// Apply the second-tier store-severity gate. Logs below the threshold
// are dropped from the persist set but still flow through the callback
// so in-memory enrichers (vectordb, GraphRAG Drain) keep seeing them.
logsToPersist := b.Logs
if p.storeMinSeverity > 0 && len(b.Logs) > 0 {
kept := make([]storage.Log, 0, len(b.Logs))
for _, l := range b.Logs {
if shouldIngestSeverity(l.Severity, p.storeMinSeverity) {
kept = append(kept, l)
} else {
p.storeFiltered.Add(1)
}
}
logsToPersist = kept
}

if err := p.writer.BatchCreateAll(b.Traces, b.Spans, logsToPersist); err != nil {
slog.Error("ingest pipeline: BatchCreateAll failed", "error", err)
p.processFailures.Add(1)
return
}

// Callbacks fire only after the transaction commits successfully — a
// rolled-back batch must not feed downstream consumers (GraphRAG etc.)
// data that no longer exists in the DB.
// data that no longer exists in the DB. The LogCallback intentionally
// iterates over the FULL b.Logs slice, not logsToPersist — even logs
// dropped by the store-severity gate must reach in-memory enrichers.
if b.SpanCallback != nil {
for _, s := range b.Spans {
b.SpanCallback(s)
Expand Down
124 changes: 124 additions & 0 deletions internal/ingest/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,3 +547,127 @@ func TestPipeline_PanicInCallbackRecovered(t *testing.T) {
t.Errorf("expected ProcessFailures > 0 after callback panic")
}
}

// TestPipeline_StoreMinSeverity_DropsBelowThresholdFromPersist verifies that
// when SetStoreMinSeverity is configured, logs below the threshold are
// dropped from BatchCreateAll — but the LogCallback still fires for them
// so in-memory enrichers (vectordb, GraphRAG) keep seeing every log that
// passed IngestMinSeverity at the receiver.
func TestPipeline_StoreMinSeverity_DropsBelowThresholdFromPersist(t *testing.T) {
t.Parallel()
w := &fakeWriter{}
p := NewPipeline(w, nil, PipelineConfig{Capacity: 10, Workers: 1, SoftThreshold: 0.9})
// Threshold = WARN (rank 30); INFO (20) is below, ERROR (40) is above.
p.SetStoreMinSeverity(ParseSeverity("WARN"))
p.Start(context.Background())
defer p.Stop()

var callbackSeen []string
var cbMu sync.Mutex
cb := func(l storage.Log) {
cbMu.Lock()
defer cbMu.Unlock()
callbackSeen = append(callbackSeen, l.Severity)
}

b := &Batch{
Type: SignalLogs,
Tenant: "t1",
Logs: []storage.Log{
{Body: "info-row", Severity: "INFO"},
{Body: "warn-row", Severity: "WARN"},
{Body: "err-row", Severity: "ERROR"},
},
LogCallback: cb,
}
if err := p.Submit(b); err != nil {
t.Fatalf("submit: %v", err)
}

if !waitFor(t, 5*time.Second, func() bool { return p.Stats().Processed >= 1 }) {
t.Fatalf("batch never processed")
}

// Persist: only WARN + ERROR should reach the writer.
w.mu.Lock()
persistedCount := 0
persistedSeverities := []string{}
for _, call := range w.logsCalls {
for _, l := range call {
persistedCount++
persistedSeverities = append(persistedSeverities, l.Severity)
}
}
w.mu.Unlock()
if persistedCount != 2 {
t.Fatalf("expected 2 logs persisted (WARN+ERROR), got %d: %v", persistedCount, persistedSeverities)
}
for _, sev := range persistedSeverities {
if sev == "INFO" {
t.Errorf("INFO log was persisted but should have been gated by store-min-severity")
}
}

// Callback: must fire for ALL THREE logs (INFO included), since the
// in-memory enrichment path is independent of the persist gate.
cbMu.Lock()
defer cbMu.Unlock()
if len(callbackSeen) != 3 {
t.Fatalf("expected LogCallback to fire 3 times (incl. gated INFO), got %d: %v", len(callbackSeen), callbackSeen)
}
infoCb := false
for _, sev := range callbackSeen {
if sev == "INFO" {
infoCb = true
}
}
if !infoCb {
t.Errorf("INFO log did not reach LogCallback — in-memory enrichment path broken: %v", callbackSeen)
}

// Stats: storeFiltered should report exactly 1 (the INFO drop).
if got := p.Stats().StoreFiltered; got != 1 {
t.Errorf("Stats().StoreFiltered = %d, want 1", got)
}
}

// TestPipeline_StoreMinSeverity_Disabled_PersistsAllLogs verifies the legacy
// path: when SetStoreMinSeverity is NOT called (or set to 0), every log in a
// Batch is persisted regardless of severity.
func TestPipeline_StoreMinSeverity_Disabled_PersistsAllLogs(t *testing.T) {
t.Parallel()
w := &fakeWriter{}
p := NewPipeline(w, nil, PipelineConfig{Capacity: 10, Workers: 1, SoftThreshold: 0.9})
// No SetStoreMinSeverity call → gate disabled.
p.Start(context.Background())
defer p.Stop()

b := &Batch{
Type: SignalLogs,
Tenant: "t1",
Logs: []storage.Log{
{Body: "info-row", Severity: "INFO"},
{Body: "debug-row", Severity: "DEBUG"},
{Body: "err-row", Severity: "ERROR"},
},
}
if err := p.Submit(b); err != nil {
t.Fatalf("submit: %v", err)
}
if !waitFor(t, 5*time.Second, func() bool { return p.Stats().Processed >= 1 }) {
t.Fatalf("batch never processed")
}

w.mu.Lock()
defer w.mu.Unlock()
total := 0
for _, call := range w.logsCalls {
total += len(call)
}
if total != 3 {
t.Fatalf("expected all 3 logs persisted with gate disabled, got %d", total)
}
if got := p.Stats().StoreFiltered; got != 0 {
t.Errorf("Stats().StoreFiltered = %d, want 0 with gate disabled", got)
}
}
26 changes: 26 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,32 @@ func main() {
Workers: cfg.IngestPipelineWorkers,
})
ingestPipeline.SetPerTenantCap(cfg.IngestPipelinePerTenantCap)

// Second-tier severity gate. Empty STORE_MIN_SEVERITY means "use the
// same threshold as INGEST_MIN_SEVERITY" — i.e. behavior is identical
// to the legacy single-threshold path. Only enable the gate when the
// store threshold is strictly higher than the ingest threshold; equal
// or lower is wasted work since the receiver has already dropped the
// affected logs.
ingestRank := ingest.ParseSeverity(cfg.IngestMinSeverity)
storeRank := ingestRank
if cfg.StoreMinSeverity != "" {
storeRank = ingest.ParseSeverity(cfg.StoreMinSeverity)
}
if storeRank > ingestRank {
ingestPipeline.SetStoreMinSeverity(storeRank)
slog.Info("🪛 Store-severity gate enabled",
"ingest_min", cfg.IngestMinSeverity,
"store_min", cfg.StoreMinSeverity,
"note", "logs below store_min reach in-memory consumers but are not persisted",
)
} else if cfg.StoreMinSeverity != "" && storeRank < ingestRank {
slog.Warn("STORE_MIN_SEVERITY is lower than INGEST_MIN_SEVERITY — has no effect; receiver already filters",
"ingest_min", cfg.IngestMinSeverity,
"store_min", cfg.StoreMinSeverity,
)
}

ingestPipeline.Start(context.Background())
traceServer.SetPipeline(ingestPipeline)
logsServer.SetPipeline(ingestPipeline)
Expand Down
Loading