From 15d146f30c8f591196e32e8f8f03bc1da25ed8fc Mon Sep 17 00:00:00 2001 From: simonsmallchua Date: Sat, 25 Apr 2026 08:58:58 +1000 Subject: [PATCH 1/5] Stabilise worker under heavy load Fixes the two root causes Sentry exposed during the 2026-04-24 overnight load test: deterministic lock ordering in promote_waiting_with_outbox (HOVER-K2) and in counters.go DefaultDBSyncFunc (HOVER-K4), so concurrent promoters and counter syncs no longer deadlock on tasks/jobs. Also bounds link-discovery fan-out (HOVER-KG, 2,153 live goroutines), extends the outbox sweeper timeout (HOVER-K3), removes the redundant GNH_PRESSURE_INITIAL_LIMIT env var from prod fly tomls, flips ForceAttemptHTTP2 off to silence H2 DATA-after-END_STREAM log noise, demotes the body-cap warn to debug, and drops job.id labels from broker gauges / skew histogram to cut Mimir series cardinality ~85x (probe aggregates totals once per tick so dashboard sum() queries still work). --- cmd/worker/main.go | 3 + fly.toml | 4 +- fly.worker.toml | 4 +- internal/broker/counters.go | 42 +++++++-- internal/broker/outbox.go | 9 +- internal/broker/probe.go | 34 +++++--- internal/crawler/crawler.go | 13 ++- internal/jobs/executor.go | 6 +- internal/jobs/stream_worker.go | 57 +++++++++--- internal/observability/observability.go | 39 +++++---- ...omote_waiting_deterministic_lock_order.sql | 86 +++++++++++++++++++ 11 files changed, 241 insertions(+), 56 deletions(-) create mode 100644 supabase/migrations/20260425000001_promote_waiting_deterministic_lock_order.sql diff --git a/cmd/worker/main.go b/cmd/worker/main.go index 7abe10bd..d193b09f 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -298,6 +298,9 @@ func main() { if v := envInt("OUTBOX_SWEEP_BATCH_SIZE", 0); v > 0 { outboxOpts.BatchSize = v } + if v := envInt("OUTBOX_SWEEP_STATEMENT_TIMEOUT_MS", 0); v > 0 { + outboxOpts.StatementTimeout = time.Duration(v) * time.Millisecond + } // Sweeper reads task_outbox, which is written in the same tx as tasks // — so it belongs on queueDB in split deployments. outboxSweeper := broker.NewOutboxSweeper(queueDB.GetDB(), scheduler, outboxOpts) diff --git a/fly.toml b/fly.toml index 17aa98e4..f0966846 100644 --- a/fly.toml +++ b/fly.toml @@ -20,7 +20,9 @@ primary_region = 'syd' DB_QUEUE_MAX_CONCURRENCY = "100" GNH_PRESSURE_HIGH_MARK_MS = "80" GNH_PRESSURE_LOW_MARK_MS = "40" - GNH_PRESSURE_INITIAL_LIMIT = "100" + # GNH_PRESSURE_INITIAL_LIMIT intentionally unset — pressure.go defaults it to + # DB_QUEUE_MAX_CONCURRENCY (safeMax). Setting it explicitly invites drift and + # the "exceeds queue cap" clamp warning if the two values get out of sync. GNH_PRESSURE_MIN_LIMIT = "30" GNH_PRESSURE_STEP_DOWN = "5" REDIS_POOL_SIZE = "50" diff --git a/fly.worker.toml b/fly.worker.toml index 1d079c7c..492ab9dc 100644 --- a/fly.worker.toml +++ b/fly.worker.toml @@ -32,7 +32,9 @@ primary_region = 'syd' DB_TX_MAX_RETRIES = "5" GNH_PRESSURE_HIGH_MARK_MS = "80" GNH_PRESSURE_LOW_MARK_MS = "40" - GNH_PRESSURE_INITIAL_LIMIT = "100" + # GNH_PRESSURE_INITIAL_LIMIT intentionally unset — pressure.go defaults it to + # DB_QUEUE_MAX_CONCURRENCY (safeMax). Setting it explicitly invites drift and + # the "exceeds queue cap" clamp warning if the two values get out of sync. GNH_PRESSURE_MIN_LIMIT = "30" GNH_PRESSURE_STEP_DOWN = "5" REDIS_POOL_SIZE = "200" diff --git a/internal/broker/counters.go b/internal/broker/counters.go index a3f62b31..9ab1a8f0 100644 --- a/internal/broker/counters.go +++ b/internal/broker/counters.go @@ -5,6 +5,7 @@ import ( "database/sql" "fmt" "math" + "sort" "strconv" "time" @@ -145,9 +146,19 @@ func DefaultDBSyncFunc(sqlDB *sql.DB) DBSyncFunc { return func(ctx context.Context, counts map[string]int64) error { // When counts is empty all jobs have finished — reset any stale // positive running_tasks left in Postgres. + // ORDER BY id ensures deterministic row-lock order; without it + // concurrent sync ticks across worker VMs deadlock (HOVER-K4). if len(counts) == 0 { _, err := sqlDB.ExecContext(ctx, - `UPDATE jobs SET running_tasks = 0 WHERE running_tasks > 0 AND status IN ('running', 'pending')`) + `WITH targets AS ( + SELECT id FROM jobs + WHERE running_tasks > 0 + AND status IN ('running', 'pending') + ORDER BY id + FOR UPDATE + ) + UPDATE jobs SET running_tasks = 0 + FROM targets WHERE jobs.id = targets.id`) return err } @@ -161,10 +172,14 @@ func DefaultDBSyncFunc(sqlDB *sql.DB) DBSyncFunc { // Redis-vs-PG skew per job. A consistent skew > small noise // usually means a counter leak (increment/decrement imbalance) // or a sync lag spike. + // Sorted job IDs give every worker the same row-lock order + // when the per-job UPDATEs below run inside this transaction + // (fixes 40P01 deadlock, HOVER-K4). jobIDsForSnapshot := make([]string, 0, len(counts)) for jobID := range counts { jobIDsForSnapshot = append(jobIDsForSnapshot, jobID) } + sort.Strings(jobIDsForSnapshot) priorCounts := make(map[string]int64, len(counts)) if len(jobIDsForSnapshot) > 0 { rows, qerr := tx.QueryContext(ctx, @@ -190,26 +205,35 @@ func DefaultDBSyncFunc(sqlDB *sql.DB) DBSyncFunc { // "prepared statement already exists" (SQLSTATE 42P05) and kills // the sync tick. ExecContext honours default_query_exec_mode= // simple_protocol set on the pool, avoiding PREPARE entirely. - jobIDs := make([]string, 0, len(counts)) - for jobID, count := range counts { + // Iterate jobs in sorted order so every worker takes per-row + // locks in the same sequence (fixes HOVER-K4 deadlock). + jobIDs := jobIDsForSnapshot + for _, jobID := range jobIDs { + count := counts[jobID] if _, err := tx.ExecContext(ctx, `UPDATE jobs SET running_tasks = $1 WHERE id = $2 AND status IN ('running', 'pending')`, count, jobID); err != nil { return fmt.Errorf("update job %s: %w", jobID, err) } - jobIDs = append(jobIDs, jobID) observability.RecordBrokerCounterSyncSkew(ctx, jobID, math.Abs(float64(count-priorCounts[jobID]))) } // Zero out any active jobs whose counters are no longer tracked - // (they finished between sync intervals). + // (they finished between sync intervals). ORDER BY id keeps the + // row-lock order deterministic across concurrent sync ticks. if len(jobIDs) > 0 { if _, err := tx.ExecContext(ctx, - `UPDATE jobs SET running_tasks = 0 - WHERE running_tasks > 0 - AND status IN ('running', 'pending') - AND id != ALL($1)`, + `WITH targets AS ( + SELECT id FROM jobs + WHERE running_tasks > 0 + AND status IN ('running', 'pending') + AND id != ALL($1) + ORDER BY id + FOR UPDATE + ) + UPDATE jobs SET running_tasks = 0 + FROM targets WHERE jobs.id = targets.id`, pq.Array(jobIDs), ); err != nil { return fmt.Errorf("zero stale running_tasks: %w", err) diff --git a/internal/broker/outbox.go b/internal/broker/outbox.go index 9b60effc..4dbf56bc 100644 --- a/internal/broker/outbox.go +++ b/internal/broker/outbox.go @@ -45,6 +45,13 @@ type OutboxSweeperOpts struct { // up to 5s for its newly-discovered siblings to reach a worker, which // dominated end-to-end throughput on small jobs. The sweep is an // index-only SKIP LOCKED query; running it 10× more often is cheap. +// +// StatementTimeout was raised from 5s to 15s after HOVER-K3: when the +// shared queue pool saturates under bulk-lane load, pool acquire alone +// can eat several seconds of the tick budget, leaving sub-second +// headroom for the actual SELECT/UPDATE work and surfacing as +// "bump attempts: context deadline exceeded". 15s is comfortably +// shorter than session/idle timeouts but tolerates pool wait spikes. func DefaultOutboxSweeperOpts() OutboxSweeperOpts { return OutboxSweeperOpts{ Interval: 500 * time.Millisecond, @@ -52,7 +59,7 @@ func DefaultOutboxSweeperOpts() OutboxSweeperOpts { BaseBackoff: 2 * time.Second, MaxBackoff: 5 * time.Minute, MaxAttempts: DefaultOutboxMaxAttempts, - StatementTimeout: 5 * time.Second, + StatementTimeout: 15 * time.Second, } } diff --git a/internal/broker/probe.go b/internal/broker/probe.go index 23e6f0bd..d84471d8 100644 --- a/internal/broker/probe.go +++ b/internal/broker/probe.go @@ -144,18 +144,31 @@ func (p *Probe) probeJobs(ctx context.Context) { return } + // Accumulate totals across all active jobs. Per-job labels were removed + // from the gauges to bound Mimir series cardinality — we emit the + // aggregate once per tick so dashboard sum(...) queries keep working. + var totals observability.BrokerStreamStats for _, jobID := range jobIDs { if ctx.Err() != nil { return } - p.probeJob(ctx, jobID) + streamLen, zDepth, pendingCount, ok := p.probeJob(ctx, jobID) + if !ok { + continue + } + totals.StreamLength += streamLen + totals.ScheduledDepth += zDepth + totals.Pending += pendingCount } + observability.RecordBrokerStreamStats(ctx, totals) } // probeJob issues XLEN, ZCARD, and XLEN-of-pending via XPENDING for a // single job. The three calls are issued in one pipeline so the probe -// adds one round-trip per job rather than three. -func (p *Probe) probeJob(ctx context.Context, jobID string) { +// adds one round-trip per job rather than three. Returns ok=false when +// the pipeline errored with anything other than NOGROUP so the caller +// skips accumulation rather than polluting the aggregate with zeroes. +func (p *Probe) probeJob(ctx context.Context, jobID string) (streamLen, zDepth, pendingCount int64, ok bool) { pipe := p.client.rdb.Pipeline() streamLenCmd := pipe.XLen(ctx, StreamKey(jobID)) zsetCmd := pipe.ZCard(ctx, ScheduleKey(jobID)) @@ -170,22 +183,15 @@ func (p *Probe) probeJob(ctx context.Context, jobID string) { // masquerade as a healthy empty queue. if !isNoGroupErr(err) { brokerLog.Debug("broker probe pipeline error", "error", err, "job_id", jobID) - return + return 0, 0, 0, false } } - streamLen, _ := streamLenCmd.Result() - zDepth, _ := zsetCmd.Result() + streamLen, _ = streamLenCmd.Result() + zDepth, _ = zsetCmd.Result() - var pendingCount int64 if pending, err := pendingCmd.Result(); err == nil && pending != nil { pendingCount = pending.Count } - - observability.RecordBrokerStreamStats(ctx, observability.BrokerStreamStats{ - JobID: jobID, - StreamLength: streamLen, - ScheduledDepth: zDepth, - Pending: pendingCount, - }) + return streamLen, zDepth, pendingCount, true } diff --git a/internal/crawler/crawler.go b/internal/crawler/crawler.go index a014e330..78ae6d63 100644 --- a/internal/crawler/crawler.go +++ b/internal/crawler/crawler.go @@ -362,7 +362,13 @@ func New(config *Config, id ...string) *Crawler { // Create metrics map for this crawler instance metricsMap := &sync.Map{} - // Set up base transport with SSRF-safe dialer + // Set up base transport with SSRF-safe dialer. + // ForceAttemptHTTP2 disabled: under sustained crawl load some upstreams + // (notably misbehaving HTTP/2 servers) trigger Go's net/http2 stream + // state machine to log "received DATA after END_STREAM" by the + // thousand. Falling back to ALPN-negotiated HTTP/1.1 removes the noise + // without measurable throughput impact for short-lived single-page + // fetches. baseTransport := &http.Transport{ MaxIdleConns: 150, // Global cap — prevents idle socket accumulation across hosts MaxIdleConnsPerHost: 25, @@ -370,7 +376,7 @@ func New(config *Config, id ...string) *Crawler { IdleConnTimeout: 120 * time.Second, TLSHandshakeTimeout: 10 * time.Second, DisableCompression: true, - ForceAttemptHTTP2: true, + ForceAttemptHTTP2: false, } // Add SSRF-safe DialContext if protection is enabled @@ -1205,6 +1211,7 @@ func (c *Crawler) CreateHTTPClient(timeout time.Duration) *http.Client { timeout = c.config.DefaultTimeout } + // ForceAttemptHTTP2 disabled — see baseTransport above. transport := &http.Transport{ MaxIdleConns: 150, // Global cap — prevents idle socket accumulation across hosts MaxIdleConnsPerHost: 25, @@ -1212,7 +1219,7 @@ func (c *Crawler) CreateHTTPClient(timeout time.Duration) *http.Client { IdleConnTimeout: 120 * time.Second, TLSHandshakeTimeout: 10 * time.Second, DisableCompression: true, - ForceAttemptHTTP2: true, + ForceAttemptHTTP2: false, } // Add SSRF-safe DialContext if protection is enabled diff --git a/internal/jobs/executor.go b/internal/jobs/executor.go index da4ab4cf..86da6124 100644 --- a/internal/jobs/executor.go +++ b/internal/jobs/executor.go @@ -540,7 +540,11 @@ func buildHTMLUpload(task *db.Task, result *crawler.CrawlResult, capturedAt time } if len(result.Body) > maxHTMLUploadBodyBytes { - jobsLog.Warn("Skipping HTML archive upload: body exceeds size cap", + // Demoted from Warn to Debug: the skip is safe (no archive is + // uploaded but crawl results still propagate) and a single large + // site can spam thousands of these per job. Volume metrics already + // surface the skip count at the dashboard level. + jobsLog.Debug("Skipping HTML archive upload: body exceeds size cap", "task_id", task.ID, "size_bytes", len(result.Body), "cap_bytes", maxHTMLUploadBodyBytes, diff --git a/internal/jobs/stream_worker.go b/internal/jobs/stream_worker.go index ed879c50..786064a9 100644 --- a/internal/jobs/stream_worker.go +++ b/internal/jobs/stream_worker.go @@ -126,23 +126,47 @@ type StreamWorkerPool struct { activeJobs []string activeJobsMu sync.RWMutex + // linkDiscoverySem caps concurrent calls into ProcessDiscoveredLinks. + // Each link-discovery call runs CreatePageRecords + EnqueueJobURLs on + // the bulk DB lane, so under heavy load all NumWorkers × TasksPerWorker + // goroutines used to pile onto the pool simultaneously and saturate it + // (HOVER-KG, 2k+ goroutines at one event). Capping this path leaves + // pool headroom for promotion, counter sync, and the outbox sweeper. + linkDiscoverySem chan struct{} + wg sync.WaitGroup cancel context.CancelFunc } +const defaultLinkDiscoveryMaxInflight = 32 + +// linkDiscoveryMaxInflight returns the configured cap on concurrent +// in-flight ProcessDiscoveredLinks executions. +func linkDiscoveryMaxInflight() int { + if v := strings.TrimSpace(os.Getenv("JOBS_LINK_DISCOVERY_MAX_INFLIGHT")); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + return n + } + jobsLog.Warn("invalid JOBS_LINK_DISCOVERY_MAX_INFLIGHT, using default", + "value", v, "default", defaultLinkDiscoveryMaxInflight) + } + return defaultLinkDiscoveryMaxInflight +} + // NewStreamWorkerPool creates a StreamWorkerPool. func NewStreamWorkerPool(deps StreamWorkerDeps, opts StreamWorkerOpts) *StreamWorkerPool { return &StreamWorkerPool{ - consumer: deps.Consumer, - scheduler: deps.Scheduler, - counters: deps.Counters, - pacer: deps.Pacer, - executor: deps.Executor, - batchManager: deps.BatchManager, - dbQueue: deps.DBQueue, - jobManager: deps.JobManager, - opts: opts, - jobInfoCache: make(map[string]*cachedJobInfo), + consumer: deps.Consumer, + scheduler: deps.Scheduler, + counters: deps.Counters, + pacer: deps.Pacer, + executor: deps.Executor, + batchManager: deps.BatchManager, + dbQueue: deps.DBQueue, + jobManager: deps.JobManager, + opts: opts, + jobInfoCache: make(map[string]*cachedJobInfo), + linkDiscoverySem: make(chan struct{}, linkDiscoveryMaxInflight()), } } @@ -409,6 +433,19 @@ func (swp *StreamWorkerPool) handleDiscoveredLinks(ctx context.Context, outcome return } + // Cap concurrent link-discovery DB chains so the bulk pool isn't + // drained by every worker hitting CreatePageRecords + EnqueueJobURLs + // simultaneously (HOVER-KG). Block on the semaphore so the caller + // goroutine backpressures naturally; ctx cancellation aborts the wait. + if swp.linkDiscoverySem != nil { + select { + case swp.linkDiscoverySem <- struct{}{}: + defer func() { <-swp.linkDiscoverySem }() + case <-ctx.Done(): + return + } + } + // Build a Task with the fields ProcessDiscoveredLinks needs. task := &Task{ ID: outcome.Task.ID, diff --git a/internal/observability/observability.go b/internal/observability/observability.go index a8d37ce3..f71ba48e 100644 --- a/internal/observability/observability.go +++ b/internal/observability/observability.go @@ -810,19 +810,19 @@ func RecordWorkerConcurrency(ctx context.Context, workerID int, delta int64, cap } } -// RecordJobConcurrencySnapshot captures the running task count and concurrency limit for a job. +// RecordJobConcurrencySnapshot captures the running task count and concurrency +// limit for a job. jobID is retained in the signature for call-site stability +// but omitted from metric labels — per-job cardinality is unbounded at launch +// scale and these gauges are intended for worker-wide snapshots. func RecordJobConcurrencySnapshot(ctx context.Context, jobID string, runningTasks int64, concurrencyLimit int64, unlimited bool) { + _ = jobID if jobRunningTasksGauge != nil { - jobRunningTasksGauge.Record(ctx, runningTasks, - metric.WithAttributes(attribute.String("job.id", jobID))) + jobRunningTasksGauge.Record(ctx, runningTasks) } if jobConcurrencyLimitGauge != nil { jobConcurrencyLimitGauge.Record(ctx, concurrencyLimit, - metric.WithAttributes( - attribute.String("job.id", jobID), - attribute.Bool("job.concurrency_unlimited", unlimited), - )) + metric.WithAttributes(attribute.Bool("job.concurrency_unlimited", unlimited))) } } @@ -1142,25 +1142,29 @@ func initBrokerInstruments(meterProvider *sdkmetric.MeterProvider) error { return err } -// BrokerStreamStats captures per-job broker depth probed from Redis. +// BrokerStreamStats captures worker-wide broker depth, aggregated across all +// active jobs. Previously emitted per-job with a job.id label; the label was +// removed to keep Mimir series cardinality bounded as job counts grow. The +// original dashboard queries used sum(...) across the per-job series, so +// emitting the pre-aggregated total preserves the dashboard semantics. type BrokerStreamStats struct { - JobID string StreamLength int64 ScheduledDepth int64 Pending int64 } -// RecordBrokerStreamStats emits Tier 1 per-job depth gauges. +// RecordBrokerStreamStats emits Tier 1 broker depth gauges aggregated across +// all active jobs in the probe tick. Per-job drill-down is intentionally +// unavailable — use traces or logs (which carry job_id) for that. func RecordBrokerStreamStats(ctx context.Context, s BrokerStreamStats) { - attrs := metric.WithAttributes(attribute.String("job.id", s.JobID)) if brokerStreamLengthGauge != nil { - brokerStreamLengthGauge.Record(ctx, s.StreamLength, attrs) + brokerStreamLengthGauge.Record(ctx, s.StreamLength) } if brokerScheduledDepthGauge != nil { - brokerScheduledDepthGauge.Record(ctx, s.ScheduledDepth, attrs) + brokerScheduledDepthGauge.Record(ctx, s.ScheduledDepth) } if brokerConsumerPendingGauge != nil { - brokerConsumerPendingGauge.Record(ctx, s.Pending, attrs) + brokerConsumerPendingGauge.Record(ctx, s.Pending) } } @@ -1257,12 +1261,15 @@ func RecordBrokerPacerDelay(ctx context.Context, domain string, delayMs float64) // RecordBrokerCounterSyncSkew records the absolute difference between // the Redis running counter and Postgres running_tasks at sync time. +// jobID retained for call-site stability; omitted from labels to keep +// cardinality bounded — the histogram captures the distribution across +// all jobs, which is the signal dashboards care about. func RecordBrokerCounterSyncSkew(ctx context.Context, jobID string, skew float64) { + _ = jobID if brokerCounterSyncSkew == nil { return } - brokerCounterSyncSkew.Record(ctx, skew, - metric.WithAttributes(attribute.String("job.id", jobID))) + brokerCounterSyncSkew.Record(ctx, skew) } // RecordBrokerCounterPELSkew records the absolute difference between the diff --git a/supabase/migrations/20260425000001_promote_waiting_deterministic_lock_order.sql b/supabase/migrations/20260425000001_promote_waiting_deterministic_lock_order.sql new file mode 100644 index 00000000..8568e6d8 --- /dev/null +++ b/supabase/migrations/20260425000001_promote_waiting_deterministic_lock_order.sql @@ -0,0 +1,86 @@ +-- Make promote_waiting_with_outbox lock task rows in deterministic order. +-- +-- Background (HOVER-K2): +-- Sentry shows persistent 40P01 ("deadlock detected") on attempt 5/5 +-- inside DbQueue.PromoteWaitingToPending. The function updates a set of +-- `tasks` rows whose AFTER trigger (trg_update_job_queue_counters) then +-- updates the parent `jobs` row. Because the picked CTE only orders by +-- priority_score / created_at, two concurrent promoters (different jobs, +-- or the same job retrying) can lock task rows in different sequences, +-- and downstream lock ordering becomes non-deterministic. +-- +-- Adding `ORDER BY id` after the priority/created sort tie-breakers makes +-- the outer UPDATE always touch task rows in the same order across +-- concurrent transactions, so the row-lock graph becomes acyclic and the +-- 40P01 cannot form. +-- +-- Semantics preserved: the new ORDER BY uses the existing priority and +-- created_at as primary keys; `id` is only a tie-breaker, so the same set +-- of tasks gets picked. FOR UPDATE SKIP LOCKED still avoids contention on +-- the pick. ON CONFLICT (task_id) DO NOTHING in the outbox insert is +-- unchanged. + +CREATE OR REPLACE FUNCTION promote_waiting_with_outbox( + p_job_id UUID, + p_slots INTEGER +) +RETURNS INTEGER +LANGUAGE plpgsql +SET search_path = public +AS $$ +DECLARE + promoted INTEGER; +BEGIN + IF p_slots <= 0 THEN + RETURN 0; + END IF; + + WITH picked AS ( + SELECT id + FROM tasks + WHERE job_id = p_job_id + AND status = 'waiting' + ORDER BY priority_score DESC, created_at ASC, id ASC + LIMIT p_slots + FOR UPDATE SKIP LOCKED + ), + picked_ordered AS ( + SELECT id FROM picked ORDER BY id + ), + updated AS ( + UPDATE tasks t + SET status = 'pending' + FROM picked_ordered + WHERE t.id = picked_ordered.id + RETURNING t.id, t.job_id, t.page_id, t.host, t.path, + t.priority_score, t.retry_count, + t.source_type, + COALESCE(t.source_url, '') AS source_url, + COALESCE(t.run_at, NOW()) AS run_at + ), + inserted AS ( + INSERT INTO task_outbox ( + task_id, job_id, page_id, host, path, priority, + retry_count, source_type, source_url, run_at, + attempts, created_at + ) + SELECT id, job_id, page_id, host, path, priority_score, + retry_count, source_type, source_url, run_at, + 0, NOW() + FROM updated + ORDER BY id + ON CONFLICT (task_id) DO NOTHING + RETURNING 1 + ) + SELECT COUNT(*)::INTEGER INTO promoted FROM updated; + + RETURN COALESCE(promoted, 0); +END; +$$; + +COMMENT ON FUNCTION promote_waiting_with_outbox(UUID, INTEGER) IS + 'Atomically promote up to p_slots waiting tasks for a job to pending ' + 'and enqueue corresponding task_outbox rows. Locks task rows in id ' + 'order (after priority/created tie-breakers) to keep the row-lock ' + 'graph acyclic across concurrent promoters and avoid 40P01 deadlocks ' + 'against the trg_update_job_queue_counters AFTER trigger.'; From ee162fa22543d4ceef80768ee30c4157906a2f3b Mon Sep 17 00:00:00 2001 From: simonsmallchua Date: Sat, 25 Apr 2026 09:18:56 +1000 Subject: [PATCH 2/5] Extract shared crawler HTTP transport Both http.Transport literals in crawler.go were byte-identical after the ForceAttemptHTTP2 flip. Pull them into newBaseHTTPTransport() so future tuning can't leave the two sites out of sync. --- internal/crawler/crawler.go | 48 ++++++++++++++++++------------------- 1 file changed, 23 insertions(+), 25 deletions(-) diff --git a/internal/crawler/crawler.go b/internal/crawler/crawler.go index 78ae6d63..706984f1 100644 --- a/internal/crawler/crawler.go +++ b/internal/crawler/crawler.go @@ -363,21 +363,7 @@ func New(config *Config, id ...string) *Crawler { metricsMap := &sync.Map{} // Set up base transport with SSRF-safe dialer. - // ForceAttemptHTTP2 disabled: under sustained crawl load some upstreams - // (notably misbehaving HTTP/2 servers) trigger Go's net/http2 stream - // state machine to log "received DATA after END_STREAM" by the - // thousand. Falling back to ALPN-negotiated HTTP/1.1 removes the noise - // without measurable throughput impact for short-lived single-page - // fetches. - baseTransport := &http.Transport{ - MaxIdleConns: 150, // Global cap — prevents idle socket accumulation across hosts - MaxIdleConnsPerHost: 25, - MaxConnsPerHost: 50, - IdleConnTimeout: 120 * time.Second, - TLSHandshakeTimeout: 10 * time.Second, - DisableCompression: true, - ForceAttemptHTTP2: false, - } + baseTransport := newBaseHTTPTransport() // Add SSRF-safe DialContext if protection is enabled // This validates IPs at connection time to prevent DNS rebinding attacks @@ -1211,16 +1197,7 @@ func (c *Crawler) CreateHTTPClient(timeout time.Duration) *http.Client { timeout = c.config.DefaultTimeout } - // ForceAttemptHTTP2 disabled — see baseTransport above. - transport := &http.Transport{ - MaxIdleConns: 150, // Global cap — prevents idle socket accumulation across hosts - MaxIdleConnsPerHost: 25, - MaxConnsPerHost: 50, - IdleConnTimeout: 120 * time.Second, - TLSHandshakeTimeout: 10 * time.Second, - DisableCompression: true, - ForceAttemptHTTP2: false, - } + transport := newBaseHTTPTransport() // Add SSRF-safe DialContext if protection is enabled if !c.config.SkipSSRFCheck { @@ -1233,6 +1210,27 @@ func (c *Crawler) CreateHTTPClient(timeout time.Duration) *http.Client { } } +// newBaseHTTPTransport returns the shared *http.Transport tuning used by +// both the colly crawler and CreateHTTPClient. Callers are responsible for +// attaching SSRF-safe DialContext and any wrapping round trippers. +// +// ForceAttemptHTTP2 is disabled: under sustained crawl load some upstreams +// (notably misbehaving HTTP/2 servers) trigger Go's net/http2 stream state +// machine to log "received DATA after END_STREAM" by the thousand. Falling +// back to ALPN-negotiated HTTP/1.1 removes the noise without measurable +// throughput impact for short-lived single-page fetches. +func newBaseHTTPTransport() *http.Transport { + return &http.Transport{ + MaxIdleConns: 150, // Global cap — prevents idle socket accumulation across hosts + MaxIdleConnsPerHost: 25, + MaxConnsPerHost: 50, + IdleConnTimeout: 120 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + DisableCompression: true, + ForceAttemptHTTP2: false, + } +} + // Config returns the Crawler's configuration. func (c *Crawler) Config() *Config { return c.config From adf0cbd83dc10afd1876122c9147649c05cc6bd4 Mon Sep 17 00:00:00 2001 From: simonsmallchua Date: Sat, 25 Apr 2026 09:27:42 +1000 Subject: [PATCH 3/5] Derive probe transport from shared helper --- internal/crawler/crawler.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/internal/crawler/crawler.go b/internal/crawler/crawler.go index 706984f1..92c8c9b4 100644 --- a/internal/crawler/crawler.go +++ b/internal/crawler/crawler.go @@ -406,13 +406,14 @@ func New(config *Config, id ...string) *Crawler { // Build a shared probe transport for cache-status HEAD requests. // Reusing a single client avoids leaking a new http.Transport per probe call. - probeTransport := &http.Transport{ - MaxIdleConns: 20, - MaxIdleConnsPerHost: 5, - MaxConnsPerHost: 10, - IdleConnTimeout: 30 * time.Second, - TLSHandshakeTimeout: 10 * time.Second, - } + // Derived from newBaseHTTPTransport so the H2 + compression posture stays + // consistent across the three crawler clients; only the pool sizes and + // idle timeout differ (probes are HEAD-only and can run with a smaller pool). + probeTransport := newBaseHTTPTransport() + probeTransport.MaxIdleConns = 20 + probeTransport.MaxIdleConnsPerHost = 5 + probeTransport.MaxConnsPerHost = 10 + probeTransport.IdleConnTimeout = 30 * time.Second if !config.SkipSSRFCheck { probeTransport.DialContext = ssrfSafeDialContext() } From 09bcfa22743a9409b292916d4fffa3a169f956b2 Mon Sep 17 00:00:00 2001 From: simonsmallchua Date: Sat, 25 Apr 2026 09:42:04 +1000 Subject: [PATCH 4/5] Add changelog entry for load-test stabilisation --- CHANGELOG.md | 52 +++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 51 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a5885dcf..eb9851dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,7 +28,57 @@ On merge, CI will: ## [Unreleased] -_Add unreleased changes here._ +### Fixed + +- `promote_waiting_with_outbox` now sorts the picked CTE with `id ASC` as a + tie-breaker and orders the outbox `INSERT … SELECT` by `id`, making concurrent + promoters lock task rows in a deterministic sequence. The AFTER trigger + `trg_update_job_queue_counters` then updates the parent `jobs` row on top of + an acyclic lock graph, so the 40P01 deadlock that fired ~3,000 times during + the 2026-04-24 overnight load test (Sentry HOVER-K2) cannot form. Migration + `20260425000001_promote_waiting_deterministic_lock_order`. +- `broker.DefaultDBSyncFunc` now sorts `jobIDs` before issuing per-job + `UPDATE jobs SET running_tasks = …` and wraps the zero-stale UPDATE in a + `WITH targets AS (… ORDER BY id FOR UPDATE) UPDATE jobs … FROM targets` CTE. + Previously the sync loop iterated a Go map (random order), so two worker VMs + ticking concurrently locked `jobs` rows in different sequences and deadlocked + (Sentry HOVER-K4, ~2,000 events/12h, escalating). +- `StreamWorkerPool.handleDiscoveredLinks` now acquires from a bounded semaphore + (`JOBS_LINK_DISCOVERY_MAX_INFLIGHT`, default 32) before each link is enqueued. + Previously the fan-out was unbounded; one Sentry event captured 2,153 live + goroutines in the worker (HOVER-KG, ~2,500 events/12h). +- Outbox sweeper `StatementTimeout` raised from 5 s to 15 s and made + env-overridable via `OUTBOX_SWEEP_STATEMENT_TIMEOUT_MS`, so the + `bump attempts` UPDATE no longer hits the context deadline when the shedding + pool is under pressure (Sentry HOVER-K3). + +### Changed + +- Per-job `job.id` label removed from the broker stream-depth gauges + (`bee.broker.stream_length`, `bee.broker.scheduled_zset_depth`, + `bee.broker.consumer_pending`), the running-tasks / concurrency-limit gauges + (`bee.jobs.running_tasks`, `bee.jobs.concurrency_limit`), and the counter-sync + skew histogram (`bee.broker.counter_sync_skew`). The probe loop now sums + per-job depths across all active jobs and emits one aggregate per tick, so + dashboard `sum(...)` queries continue to render the correct totals while Mimir + series cardinality drops from `6N + 1` to `7` per worker (≈85× reduction at + 100 jobs × 30 workers). Per-job drill-down on these metrics is intentionally + gone — use trace spans (which still carry `task.domain` / `task.id` / + `job.id`) for that. +- Crawler `http.Transport` configuration centralised in `newBaseHTTPTransport`. + Both `CreateHTTPClient` and the colly base transport now derive from the + helper, and the probe-only HEAD client derives from it too with overrides for + its smaller pool. Among other things this makes `ForceAttemptHTTP2: false` + apply uniformly across the three clients. +- `ForceAttemptHTTP2: false` on crawler transports silences ~1,200 + `received DATA after END_STREAM` log lines per heavy-load window; ALPN still + negotiates HTTP/1.1 and per-request throughput is unchanged. +- HTML body-cap skip log demoted from Warn to Debug — a single large domain was + producing thousands of warns per job. +- `GNH_PRESSURE_INITIAL_LIMIT` removed from `fly.toml` and `fly.worker.toml`. + `pressure.go` already defaults the initial limit to `DB_QUEUE_MAX_CONCURRENCY` + (the safe maximum), so setting it explicitly only invited drift and the + "exceeds queue cap" clamp warning when the two values got out of sync. ## Full changelog history From 6c2bb74c9c2e5f22b5aad3e78914a0533fc024a3 Mon Sep 17 00:00:00 2001 From: simonsmallchua Date: Sat, 25 Apr 2026 10:39:23 +1000 Subject: [PATCH 5/5] Lower prod API log level to info --- CHANGELOG.md | 3 +++ fly.toml | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index eb9851dd..bf66d621 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -79,6 +79,9 @@ On merge, CI will: `pressure.go` already defaults the initial limit to `DB_QUEUE_MAX_CONCURRENCY` (the safe maximum), so setting it explicitly only invited drift and the "exceeds queue cap" clamp warning when the two values got out of sync. +- Production API `LOG_LEVEL` lowered from `debug` to `info` in `fly.toml`, now + matching the worker. Debug verbosity was retained while we were chasing the + load-test issues and is no longer needed; review apps stay on `debug`. ## Full changelog history diff --git a/fly.toml b/fly.toml index f0966846..89a5109d 100644 --- a/fly.toml +++ b/fly.toml @@ -34,7 +34,7 @@ primary_region = 'syd' GNH_LINK_DISCOVERY_MIN_PRIORITY = "0.5" DB_TX_MAX_RETRIES = "5" FLIGHT_RECORDER_ENABLED = "false" - LOG_LEVEL = "debug" + LOG_LEVEL = "info" GNH_BATCH_CHANNEL_SIZE = "5000" OBSERVABILITY_ENABLED = "true" SUPABASE_AUTH_URL = "https://hover.auth.goodnative.co"