Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 54 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,60 @@ On merge, CI will:

## [Unreleased]

_Add unreleased changes here._
### Fixed

- `promote_waiting_with_outbox` now sorts the picked CTE with `id ASC` as a
tie-breaker and orders the outbox `INSERT … SELECT` by `id`, making concurrent
promoters lock task rows in a deterministic sequence. The AFTER trigger
`trg_update_job_queue_counters` then updates the parent `jobs` row on top of
an acyclic lock graph, so the 40P01 deadlock that fired ~3,000 times during
the 2026-04-24 overnight load test (Sentry HOVER-K2) cannot form. Migration
`20260425000001_promote_waiting_deterministic_lock_order`.
- `broker.DefaultDBSyncFunc` now sorts `jobIDs` before issuing per-job
`UPDATE jobs SET running_tasks = …` and wraps the zero-stale UPDATE in a
`WITH targets AS (… ORDER BY id FOR UPDATE) UPDATE jobs … FROM targets` CTE.
Previously the sync loop iterated a Go map (random order), so two worker VMs
ticking concurrently locked `jobs` rows in different sequences and deadlocked
(Sentry HOVER-K4, ~2,000 events/12h, escalating).
- `StreamWorkerPool.handleDiscoveredLinks` now acquires from a bounded semaphore
(`JOBS_LINK_DISCOVERY_MAX_INFLIGHT`, default 32) before each link is enqueued.
Previously the fan-out was unbounded; one Sentry event captured 2,153 live
goroutines in the worker (HOVER-KG, ~2,500 events/12h).
- Outbox sweeper `StatementTimeout` raised from 5 s to 15 s and made
env-overridable via `OUTBOX_SWEEP_STATEMENT_TIMEOUT_MS`, so the
`bump attempts` UPDATE no longer hits the context deadline when the shedding
pool is under pressure (Sentry HOVER-K3).

### Changed

- Per-job `job.id` label removed from the broker stream-depth gauges
(`bee.broker.stream_length`, `bee.broker.scheduled_zset_depth`,
`bee.broker.consumer_pending`), the running-tasks / concurrency-limit gauges
(`bee.jobs.running_tasks`, `bee.jobs.concurrency_limit`), and the counter-sync
skew histogram (`bee.broker.counter_sync_skew`). The probe loop now sums
per-job depths across all active jobs and emits one aggregate per tick, so
dashboard `sum(...)` queries continue to render the correct totals while Mimir
series cardinality drops from `6N + 1` to `7` per worker (≈85× reduction at
100 jobs × 30 workers). Per-job drill-down on these metrics is intentionally
gone — use trace spans (which still carry `task.domain` / `task.id` /
`job.id`) for that.
- Crawler `http.Transport` configuration centralised in `newBaseHTTPTransport`.
Both `CreateHTTPClient` and the colly base transport now derive from the
helper, and the probe-only HEAD client derives from it too with overrides for
its smaller pool. Among other things this makes `ForceAttemptHTTP2: false`
apply uniformly across the three clients.
- `ForceAttemptHTTP2: false` on crawler transports silences ~1,200
`received DATA after END_STREAM` log lines per heavy-load window; ALPN still
negotiates HTTP/1.1 and per-request throughput is unchanged.
- HTML body-cap skip log demoted from Warn to Debug — a single large domain was
producing thousands of warns per job.
- `GNH_PRESSURE_INITIAL_LIMIT` removed from `fly.toml` and `fly.worker.toml`.
`pressure.go` already defaults the initial limit to `DB_QUEUE_MAX_CONCURRENCY`
(the safe maximum), so setting it explicitly only invited drift and the
"exceeds queue cap" clamp warning when the two values got out of sync.
- Production API `LOG_LEVEL` lowered from `debug` to `info` in `fly.toml`, now
matching the worker. Debug verbosity was retained while we were chasing the
load-test issues and is no longer needed; review apps stay on `debug`.

## Full changelog history

Expand Down
3 changes: 3 additions & 0 deletions cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ func main() {
if v := envInt("OUTBOX_SWEEP_BATCH_SIZE", 0); v > 0 {
outboxOpts.BatchSize = v
}
if v := envInt("OUTBOX_SWEEP_STATEMENT_TIMEOUT_MS", 0); v > 0 {
outboxOpts.StatementTimeout = time.Duration(v) * time.Millisecond
}
// Sweeper reads task_outbox, which is written in the same tx as tasks
// — so it belongs on queueDB in split deployments.
outboxSweeper := broker.NewOutboxSweeper(queueDB.GetDB(), scheduler, outboxOpts)
Expand Down
6 changes: 4 additions & 2 deletions fly.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ primary_region = 'syd'
DB_QUEUE_MAX_CONCURRENCY = "100"
GNH_PRESSURE_HIGH_MARK_MS = "80"
GNH_PRESSURE_LOW_MARK_MS = "40"
GNH_PRESSURE_INITIAL_LIMIT = "100"
# GNH_PRESSURE_INITIAL_LIMIT intentionally unset — pressure.go defaults it to
# DB_QUEUE_MAX_CONCURRENCY (safeMax). Setting it explicitly invites drift and
# the "exceeds queue cap" clamp warning if the two values get out of sync.
GNH_PRESSURE_MIN_LIMIT = "30"
GNH_PRESSURE_STEP_DOWN = "5"
REDIS_POOL_SIZE = "50"
Expand All @@ -32,7 +34,7 @@ primary_region = 'syd'
GNH_LINK_DISCOVERY_MIN_PRIORITY = "0.5"
DB_TX_MAX_RETRIES = "5"
FLIGHT_RECORDER_ENABLED = "false"
LOG_LEVEL = "debug"
LOG_LEVEL = "info"
GNH_BATCH_CHANNEL_SIZE = "5000"
OBSERVABILITY_ENABLED = "true"
SUPABASE_AUTH_URL = "https://hover.auth.goodnative.co"
Expand Down
4 changes: 3 additions & 1 deletion fly.worker.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ primary_region = 'syd'
DB_TX_MAX_RETRIES = "5"
GNH_PRESSURE_HIGH_MARK_MS = "80"
GNH_PRESSURE_LOW_MARK_MS = "40"
GNH_PRESSURE_INITIAL_LIMIT = "100"
# GNH_PRESSURE_INITIAL_LIMIT intentionally unset — pressure.go defaults it to
# DB_QUEUE_MAX_CONCURRENCY (safeMax). Setting it explicitly invites drift and
# the "exceeds queue cap" clamp warning if the two values get out of sync.
GNH_PRESSURE_MIN_LIMIT = "30"
GNH_PRESSURE_STEP_DOWN = "5"
REDIS_POOL_SIZE = "200"
Expand Down
42 changes: 33 additions & 9 deletions internal/broker/counters.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"fmt"
"math"
"sort"
"strconv"
"time"

Expand Down Expand Up @@ -145,9 +146,19 @@ func DefaultDBSyncFunc(sqlDB *sql.DB) DBSyncFunc {
return func(ctx context.Context, counts map[string]int64) error {
// When counts is empty all jobs have finished — reset any stale
// positive running_tasks left in Postgres.
// ORDER BY id ensures deterministic row-lock order; without it
// concurrent sync ticks across worker VMs deadlock (HOVER-K4).
if len(counts) == 0 {
_, err := sqlDB.ExecContext(ctx,
`UPDATE jobs SET running_tasks = 0 WHERE running_tasks > 0 AND status IN ('running', 'pending')`)
`WITH targets AS (
SELECT id FROM jobs
WHERE running_tasks > 0
AND status IN ('running', 'pending')
ORDER BY id
FOR UPDATE
)
UPDATE jobs SET running_tasks = 0
FROM targets WHERE jobs.id = targets.id`)
return err
}

Expand All @@ -161,10 +172,14 @@ func DefaultDBSyncFunc(sqlDB *sql.DB) DBSyncFunc {
// Redis-vs-PG skew per job. A consistent skew > small noise
// usually means a counter leak (increment/decrement imbalance)
// or a sync lag spike.
// Sorted job IDs give every worker the same row-lock order
// when the per-job UPDATEs below run inside this transaction
// (fixes 40P01 deadlock, HOVER-K4).
jobIDsForSnapshot := make([]string, 0, len(counts))
for jobID := range counts {
jobIDsForSnapshot = append(jobIDsForSnapshot, jobID)
}
sort.Strings(jobIDsForSnapshot)
priorCounts := make(map[string]int64, len(counts))
if len(jobIDsForSnapshot) > 0 {
rows, qerr := tx.QueryContext(ctx,
Expand All @@ -190,26 +205,35 @@ func DefaultDBSyncFunc(sqlDB *sql.DB) DBSyncFunc {
// "prepared statement already exists" (SQLSTATE 42P05) and kills
// the sync tick. ExecContext honours default_query_exec_mode=
// simple_protocol set on the pool, avoiding PREPARE entirely.
jobIDs := make([]string, 0, len(counts))
for jobID, count := range counts {
// Iterate jobs in sorted order so every worker takes per-row
// locks in the same sequence (fixes HOVER-K4 deadlock).
jobIDs := jobIDsForSnapshot
for _, jobID := range jobIDs {
count := counts[jobID]
if _, err := tx.ExecContext(ctx,
`UPDATE jobs SET running_tasks = $1 WHERE id = $2 AND status IN ('running', 'pending')`,
count, jobID); err != nil {
return fmt.Errorf("update job %s: %w", jobID, err)
}
jobIDs = append(jobIDs, jobID)
observability.RecordBrokerCounterSyncSkew(ctx, jobID,
math.Abs(float64(count-priorCounts[jobID])))
}

// Zero out any active jobs whose counters are no longer tracked
// (they finished between sync intervals).
// (they finished between sync intervals). ORDER BY id keeps the
// row-lock order deterministic across concurrent sync ticks.
if len(jobIDs) > 0 {
if _, err := tx.ExecContext(ctx,
`UPDATE jobs SET running_tasks = 0
WHERE running_tasks > 0
AND status IN ('running', 'pending')
AND id != ALL($1)`,
`WITH targets AS (
SELECT id FROM jobs
WHERE running_tasks > 0
AND status IN ('running', 'pending')
AND id != ALL($1)
ORDER BY id
FOR UPDATE
)
UPDATE jobs SET running_tasks = 0
FROM targets WHERE jobs.id = targets.id`,
pq.Array(jobIDs),
); err != nil {
return fmt.Errorf("zero stale running_tasks: %w", err)
Expand Down
9 changes: 8 additions & 1 deletion internal/broker/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,21 @@ type OutboxSweeperOpts struct {
// up to 5s for its newly-discovered siblings to reach a worker, which
// dominated end-to-end throughput on small jobs. The sweep is an
// index-only SKIP LOCKED query; running it 10× more often is cheap.
//
// StatementTimeout was raised from 5s to 15s after HOVER-K3: when the
// shared queue pool saturates under bulk-lane load, pool acquire alone
// can eat several seconds of the tick budget, leaving sub-second
// headroom for the actual SELECT/UPDATE work and surfacing as
// "bump attempts: context deadline exceeded". 15s is comfortably
// shorter than session/idle timeouts but tolerates pool wait spikes.
func DefaultOutboxSweeperOpts() OutboxSweeperOpts {
return OutboxSweeperOpts{
Interval: 500 * time.Millisecond,
BatchSize: 200,
BaseBackoff: 2 * time.Second,
MaxBackoff: 5 * time.Minute,
MaxAttempts: DefaultOutboxMaxAttempts,
StatementTimeout: 5 * time.Second,
StatementTimeout: 15 * time.Second,
}
}

Expand Down
34 changes: 20 additions & 14 deletions internal/broker/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,31 @@ func (p *Probe) probeJobs(ctx context.Context) {
return
}

// Accumulate totals across all active jobs. Per-job labels were removed
// from the gauges to bound Mimir series cardinality — we emit the
// aggregate once per tick so dashboard sum(...) queries keep working.
var totals observability.BrokerStreamStats
for _, jobID := range jobIDs {
if ctx.Err() != nil {
return
}
p.probeJob(ctx, jobID)
streamLen, zDepth, pendingCount, ok := p.probeJob(ctx, jobID)
if !ok {
continue
}
totals.StreamLength += streamLen
totals.ScheduledDepth += zDepth
totals.Pending += pendingCount
}
observability.RecordBrokerStreamStats(ctx, totals)
}

// probeJob issues XLEN, ZCARD, and XLEN-of-pending via XPENDING for a
// single job. The three calls are issued in one pipeline so the probe
// adds one round-trip per job rather than three.
func (p *Probe) probeJob(ctx context.Context, jobID string) {
// adds one round-trip per job rather than three. Returns ok=false when
// the pipeline errored with anything other than NOGROUP so the caller
// skips accumulation rather than polluting the aggregate with zeroes.
func (p *Probe) probeJob(ctx context.Context, jobID string) (streamLen, zDepth, pendingCount int64, ok bool) {
pipe := p.client.rdb.Pipeline()
streamLenCmd := pipe.XLen(ctx, StreamKey(jobID))
zsetCmd := pipe.ZCard(ctx, ScheduleKey(jobID))
Expand All @@ -170,22 +183,15 @@ func (p *Probe) probeJob(ctx context.Context, jobID string) {
// masquerade as a healthy empty queue.
if !isNoGroupErr(err) {
brokerLog.Debug("broker probe pipeline error", "error", err, "job_id", jobID)
return
return 0, 0, 0, false
}
}

streamLen, _ := streamLenCmd.Result()
zDepth, _ := zsetCmd.Result()
streamLen, _ = streamLenCmd.Result()
zDepth, _ = zsetCmd.Result()

var pendingCount int64
if pending, err := pendingCmd.Result(); err == nil && pending != nil {
pendingCount = pending.Count
}

observability.RecordBrokerStreamStats(ctx, observability.BrokerStreamStats{
JobID: jobID,
StreamLength: streamLen,
ScheduledDepth: zDepth,
Pending: pendingCount,
})
return streamLen, zDepth, pendingCount, true
}
58 changes: 32 additions & 26 deletions internal/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,16 +362,8 @@ func New(config *Config, id ...string) *Crawler {
// Create metrics map for this crawler instance
metricsMap := &sync.Map{}

// Set up base transport with SSRF-safe dialer
baseTransport := &http.Transport{
MaxIdleConns: 150, // Global cap — prevents idle socket accumulation across hosts
MaxIdleConnsPerHost: 25,
MaxConnsPerHost: 50,
IdleConnTimeout: 120 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
DisableCompression: true,
ForceAttemptHTTP2: true,
}
// Set up base transport with SSRF-safe dialer.
baseTransport := newBaseHTTPTransport()

// Add SSRF-safe DialContext if protection is enabled
// This validates IPs at connection time to prevent DNS rebinding attacks
Expand Down Expand Up @@ -414,13 +406,14 @@ func New(config *Config, id ...string) *Crawler {

// Build a shared probe transport for cache-status HEAD requests.
// Reusing a single client avoids leaking a new http.Transport per probe call.
probeTransport := &http.Transport{
MaxIdleConns: 20,
MaxIdleConnsPerHost: 5,
MaxConnsPerHost: 10,
IdleConnTimeout: 30 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
}
// Derived from newBaseHTTPTransport so the H2 + compression posture stays
// consistent across the three crawler clients; only the pool sizes and
// idle timeout differ (probes are HEAD-only and can run with a smaller pool).
probeTransport := newBaseHTTPTransport()
probeTransport.MaxIdleConns = 20
probeTransport.MaxIdleConnsPerHost = 5
probeTransport.MaxConnsPerHost = 10
probeTransport.IdleConnTimeout = 30 * time.Second
if !config.SkipSSRFCheck {
probeTransport.DialContext = ssrfSafeDialContext()
}
Expand Down Expand Up @@ -1205,15 +1198,7 @@ func (c *Crawler) CreateHTTPClient(timeout time.Duration) *http.Client {
timeout = c.config.DefaultTimeout
}

transport := &http.Transport{
MaxIdleConns: 150, // Global cap — prevents idle socket accumulation across hosts
MaxIdleConnsPerHost: 25,
MaxConnsPerHost: 50,
IdleConnTimeout: 120 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
DisableCompression: true,
ForceAttemptHTTP2: true,
}
transport := newBaseHTTPTransport()

// Add SSRF-safe DialContext if protection is enabled
if !c.config.SkipSSRFCheck {
Expand All @@ -1226,6 +1211,27 @@ func (c *Crawler) CreateHTTPClient(timeout time.Duration) *http.Client {
}
}

// newBaseHTTPTransport returns the shared *http.Transport tuning used by
// both the colly crawler and CreateHTTPClient. Callers are responsible for
// attaching SSRF-safe DialContext and any wrapping round trippers.
//
// ForceAttemptHTTP2 is disabled: under sustained crawl load some upstreams
// (notably misbehaving HTTP/2 servers) trigger Go's net/http2 stream state
// machine to log "received DATA after END_STREAM" by the thousand. Falling
// back to ALPN-negotiated HTTP/1.1 removes the noise without measurable
// throughput impact for short-lived single-page fetches.
func newBaseHTTPTransport() *http.Transport {
return &http.Transport{
MaxIdleConns: 150, // Global cap — prevents idle socket accumulation across hosts
MaxIdleConnsPerHost: 25,
MaxConnsPerHost: 50,
IdleConnTimeout: 120 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
DisableCompression: true,
ForceAttemptHTTP2: false,
}
}

// Config returns the Crawler's configuration.
func (c *Crawler) Config() *Config {
return c.config
Expand Down
6 changes: 5 additions & 1 deletion internal/jobs/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,11 @@ func buildHTMLUpload(task *db.Task, result *crawler.CrawlResult, capturedAt time
}

if len(result.Body) > maxHTMLUploadBodyBytes {
jobsLog.Warn("Skipping HTML archive upload: body exceeds size cap",
// Demoted from Warn to Debug: the skip is safe (no archive is
// uploaded but crawl results still propagate) and a single large
// site can spam thousands of these per job. Volume metrics already
// surface the skip count at the dashboard level.
jobsLog.Debug("Skipping HTML archive upload: body exceeds size cap",
"task_id", task.ID,
"size_bytes", len(result.Body),
"cap_bytes", maxHTMLUploadBodyBytes,
Expand Down
Loading
Loading