From 322103dbca567a02f016b95417694ac859071f03 Mon Sep 17 00:00:00 2001 From: Simon Smallchua <40650011+simonsmallchua@users.noreply.github.com> Date: Wed, 27 May 2026 21:10:44 +1000 Subject: [PATCH 1/2] Pace job starts, raise rate-limit retries --- CHANGELOG.md | 14 ++++++++++- docs/operations/ENV_VARS.md | 4 +++- internal/jobs/executor.go | 2 +- internal/jobs/pacer_seed_test.go | 40 ++++++++++++++++++++++++++++++++ internal/jobs/stream_worker.go | 38 ++++++++++++++++++++++++------ internal/jobs/types.go | 2 ++ 6 files changed, 90 insertions(+), 10 deletions(-) create mode 100644 internal/jobs/pacer_seed_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index f0a81bd9..a2e82df2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,7 +28,19 @@ On merge, CI will: ## [Unreleased] -_Add unreleased changes here._ +### Changed + +- Rate-limited (429/403/503) tasks now retry up to 8 times before hard-failing, + raised from 3, so a domain that throttles hard is paced-and-deferred rather + than failed mid-job. Tunable via `GNH_RATE_LIMIT_MAX_RETRIES` (previously + documented but never wired). +- Every job now starts paced instead of bursting the full per-job concurrency at + a domain. Previously only a never-crawled domain got a warm-up delay; a domain + with a learned (or zero) adaptive delay seeded un-paced and could trigger an + instant 429 storm. Job starts are now floored to a minimum seed delay, so a + previously-crawled domain ramps up from a few concurrent requests and widens + on success. Tunable via `GNH_PACER_START_FLOOR_DELAY_MS` (`0` restores the + prior behaviour). ## Full changelog history diff --git a/docs/operations/ENV_VARS.md b/docs/operations/ENV_VARS.md index fb0f9855..2fc41465 100644 --- a/docs/operations/ENV_VARS.md +++ b/docs/operations/ENV_VARS.md @@ -46,7 +46,9 @@ the TOML `[env]` section. If it grants access to anything, it is a secret. | `GNH_RATE_LIMIT_MAX_DELAY_SECONDS` | Maximum adaptive delay | `60` | | `GNH_RATE_LIMIT_SUCCESS_THRESHOLD` | Successes before delay reduction | `5` | | `GNH_RATE_LIMIT_DELAY_STEP_MS` | Adaptive delay increment | `500` | -| `GNH_RATE_LIMIT_MAX_RETRIES` | Max blocking retries on rate limit | `3` | +| `GNH_RATE_LIMIT_MAX_RETRIES` | Max blocking retries on rate limit | `8` | +| `GNH_PACER_WARMUP_DELAY_MS` | Seed delay for a never-crawled domain (starts at ~1 worker) | `2000` | +| `GNH_PACER_START_FLOOR_DELAY_MS` | Min seed delay on job start; caps initial burst (`0` disables) | `500` | | `GNH_RATE_LIMIT_CANCEL_THRESHOLD` | Consecutive errors before cancellation | `20` | | `GNH_RATE_LIMIT_CANCEL_DELAY_SECONDS` | Delay before auto-cancel | `60` | | `GNH_RATE_LIMIT_CANCEL_ENABLED` | Enable auto-cancellation | `false` | diff --git a/internal/jobs/executor.go b/internal/jobs/executor.go index 4563f9b1..1a7dd659 100644 --- a/internal/jobs/executor.go +++ b/internal/jobs/executor.go @@ -63,7 +63,7 @@ type ExecutorConfig struct { func DefaultExecutorConfig() ExecutorConfig { return ExecutorConfig{ - MaxBlockingRetries: 3, + MaxBlockingRetries: envIntWithDefault("GNH_RATE_LIMIT_MAX_RETRIES", DefaultMaxBlockingRetries), MaxTaskRetries: MaxTaskRetries, BaseDelayMS: envIntWithDefault("GNH_RATE_LIMIT_BASE_DELAY_MS", 50), MaxDelayMS: envIntWithDefault("GNH_RATE_LIMIT_MAX_DELAY_MS", 60000), diff --git a/internal/jobs/pacer_seed_test.go b/internal/jobs/pacer_seed_test.go new file mode 100644 index 00000000..1dd1f845 --- /dev/null +++ b/internal/jobs/pacer_seed_test.go @@ -0,0 +1,40 @@ +package jobs + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSeedAdaptiveDelayMS(t *testing.T) { + t.Setenv("GNH_PACER_WARMUP_DELAY_MS", "2000") + t.Setenv("GNH_PACER_START_FLOOR_DELAY_MS", "500") + + t.Run("never crawled uses warm-up", func(t *testing.T) { + assert.Equal(t, 2000, seedAdaptiveDelayMS(false, 0)) + }) + + t.Run("learned zero is floored, not burst", func(t *testing.T) { + assert.Equal(t, 500, seedAdaptiveDelayMS(true, 0)) + }) + + t.Run("learned below floor is raised to floor", func(t *testing.T) { + assert.Equal(t, 500, seedAdaptiveDelayMS(true, 200)) + }) + + t.Run("learned at floor is unchanged", func(t *testing.T) { + assert.Equal(t, 500, seedAdaptiveDelayMS(true, 500)) + }) + + t.Run("learned above floor is trusted as-is", func(t *testing.T) { + assert.Equal(t, 3000, seedAdaptiveDelayMS(true, 3000)) + }) +} + +func TestSeedAdaptiveDelayMS_FloorDisabled(t *testing.T) { + t.Setenv("GNH_PACER_WARMUP_DELAY_MS", "2000") + t.Setenv("GNH_PACER_START_FLOOR_DELAY_MS", "0") + + assert.Equal(t, 0, seedAdaptiveDelayMS(true, 0)) + assert.Equal(t, 2000, seedAdaptiveDelayMS(false, 0)) +} diff --git a/internal/jobs/stream_worker.go b/internal/jobs/stream_worker.go index 2f265a06..d11f8e16 100644 --- a/internal/jobs/stream_worker.go +++ b/internal/jobs/stream_worker.go @@ -87,6 +87,10 @@ const jobInfoTTL = 5 * time.Minute const defaultPacerWarmupDelayMS = 2000 +// Floors job-start seed delay so a learned-zero domain ramps from ~3 +// concurrent (at EstResponseMS=1500) instead of bursting full concurrency. +const defaultPacerStartFloorDelayMS = 500 + func pacerWarmupDelayMS() int { if v := strings.TrimSpace(os.Getenv("GNH_PACER_WARMUP_DELAY_MS")); v != "" { if n, err := strconv.Atoi(v); err == nil && n >= 0 { @@ -98,6 +102,32 @@ func pacerWarmupDelayMS() int { return defaultPacerWarmupDelayMS } +func pacerStartFloorDelayMS() int { + if v := strings.TrimSpace(os.Getenv("GNH_PACER_START_FLOOR_DELAY_MS")); v != "" { + if n, err := strconv.Atoi(v); err == nil && n >= 0 { + return n + } + jobsLog.Warn("invalid GNH_PACER_START_FLOOR_DELAY_MS; using default", + "value", v, "default", defaultPacerStartFloorDelayMS) + } + return defaultPacerStartFloorDelayMS +} + +// Never-crawled domains seed the warm-up; crawled domains use their learned +// delay but never below the start floor, so none bursts on the first wave. +func seedAdaptiveDelayMS(learned bool, learnedMS int) int { + if !learned { + if warmup := pacerWarmupDelayMS(); warmup > 0 { + return warmup + } + return learnedMS + } + if floor := pacerStartFloorDelayMS(); floor > 0 && learnedMS < floor { + return floor + } + return learnedMS +} + type cachedJobInfo struct { info *JobInfo expiresAt time.Time @@ -770,14 +800,8 @@ func (swp *StreamWorkerPool) fetchJobInfo(ctx context.Context, jobID string) (*J // on every job-info cache miss is safe and cheap. if swp.pacer != nil { baseDelayMS := info.CrawlDelay * 1000 - adaptiveDelayMS := info.AdaptiveDelay * 1000 floorMS := info.AdaptiveDelayFloor * 1000 - // NULL means never crawled; a stored 0 is a learned value we trust. - if !adaptiveDelay.Valid { - if warmup := pacerWarmupDelayMS(); warmup > 0 { - adaptiveDelayMS = warmup - } - } + adaptiveDelayMS := seedAdaptiveDelayMS(adaptiveDelay.Valid, info.AdaptiveDelay*1000) if seedErr := swp.pacer.Seed(ctx, info.DomainName, baseDelayMS, adaptiveDelayMS, floorMS); seedErr != nil { jobsLog.Warn("pacer seed from postgres failed, continuing", "error", seedErr, "domain", info.DomainName) diff --git a/internal/jobs/types.go b/internal/jobs/types.go index 4f6a19ab..07d8c431 100644 --- a/internal/jobs/types.go +++ b/internal/jobs/types.go @@ -45,6 +45,8 @@ const ( const ( TaskStaleTimeout = 3 * time.Minute MaxTaskRetries = 5 + // High so a persistently-throttling domain is deferred, not failed. + DefaultMaxBlockingRetries = 8 ) // CHECK: Do all of these currently get utilised somewhere in the app? From e1a0c67e6d6f568bdc70fb2141487dd11bf366e3 Mon Sep 17 00:00:00 2001 From: Simon Smallchua <40650011+simonsmallchua@users.noreply.github.com> Date: Wed, 27 May 2026 21:15:07 +1000 Subject: [PATCH 2/2] Cover rate-limit config helpers in tests --- internal/jobs/pacer_seed_test.go | 37 ++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/internal/jobs/pacer_seed_test.go b/internal/jobs/pacer_seed_test.go index 1dd1f845..49f4b971 100644 --- a/internal/jobs/pacer_seed_test.go +++ b/internal/jobs/pacer_seed_test.go @@ -38,3 +38,40 @@ func TestSeedAdaptiveDelayMS_FloorDisabled(t *testing.T) { assert.Equal(t, 0, seedAdaptiveDelayMS(true, 0)) assert.Equal(t, 2000, seedAdaptiveDelayMS(false, 0)) } + +func TestPacerStartFloorDelayMS(t *testing.T) { + t.Run("default when unset", func(t *testing.T) { + t.Setenv("GNH_PACER_START_FLOOR_DELAY_MS", "") + assert.Equal(t, defaultPacerStartFloorDelayMS, pacerStartFloorDelayMS()) + }) + t.Run("override when valid", func(t *testing.T) { + t.Setenv("GNH_PACER_START_FLOOR_DELAY_MS", "750") + assert.Equal(t, 750, pacerStartFloorDelayMS()) + }) + t.Run("default when invalid", func(t *testing.T) { + t.Setenv("GNH_PACER_START_FLOOR_DELAY_MS", "nope") + assert.Equal(t, defaultPacerStartFloorDelayMS, pacerStartFloorDelayMS()) + }) +} + +func TestPacerWarmupDelayMS(t *testing.T) { + t.Run("default when unset", func(t *testing.T) { + t.Setenv("GNH_PACER_WARMUP_DELAY_MS", "") + assert.Equal(t, defaultPacerWarmupDelayMS, pacerWarmupDelayMS()) + }) + t.Run("default when invalid", func(t *testing.T) { + t.Setenv("GNH_PACER_WARMUP_DELAY_MS", "-") + assert.Equal(t, defaultPacerWarmupDelayMS, pacerWarmupDelayMS()) + }) +} + +func TestDefaultExecutorConfig_MaxBlockingRetries(t *testing.T) { + t.Run("default", func(t *testing.T) { + t.Setenv("GNH_RATE_LIMIT_MAX_RETRIES", "") + assert.Equal(t, DefaultMaxBlockingRetries, DefaultExecutorConfig().MaxBlockingRetries) + }) + t.Run("env override", func(t *testing.T) { + t.Setenv("GNH_RATE_LIMIT_MAX_RETRIES", "15") + assert.Equal(t, 15, DefaultExecutorConfig().MaxBlockingRetries) + }) +}