Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,19 @@ On merge, CI will:

## [Unreleased]

_Add unreleased changes here._
### Changed

- Rate-limited (429/403/503) tasks now retry up to 8 times before hard-failing,
raised from 3, so a domain that throttles hard is paced-and-deferred rather
than failed mid-job. Tunable via `GNH_RATE_LIMIT_MAX_RETRIES` (previously
documented but never wired).
- Every job now starts paced instead of bursting the full per-job concurrency at
a domain. Previously only a never-crawled domain got a warm-up delay; a domain
with a learned (or zero) adaptive delay seeded un-paced and could trigger an
instant 429 storm. Job starts are now floored to a minimum seed delay, so a
previously-crawled domain ramps up from a few concurrent requests and widens
on success. Tunable via `GNH_PACER_START_FLOOR_DELAY_MS` (`0` restores the
prior behaviour).

## Full changelog history

Expand Down
4 changes: 3 additions & 1 deletion docs/operations/ENV_VARS.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ the TOML `[env]` section. If it grants access to anything, it is a secret.
| `GNH_RATE_LIMIT_MAX_DELAY_SECONDS` | Maximum adaptive delay | `60` |
| `GNH_RATE_LIMIT_SUCCESS_THRESHOLD` | Successes before delay reduction | `5` |
| `GNH_RATE_LIMIT_DELAY_STEP_MS` | Adaptive delay increment | `500` |
| `GNH_RATE_LIMIT_MAX_RETRIES` | Max blocking retries on rate limit | `3` |
| `GNH_RATE_LIMIT_MAX_RETRIES` | Max blocking retries on rate limit | `8` |
| `GNH_PACER_WARMUP_DELAY_MS` | Seed delay for a never-crawled domain (starts at ~1 worker) | `2000` |
| `GNH_PACER_START_FLOOR_DELAY_MS` | Min seed delay on job start; caps initial burst (`0` disables) | `500` |
| `GNH_RATE_LIMIT_CANCEL_THRESHOLD` | Consecutive errors before cancellation | `20` |
| `GNH_RATE_LIMIT_CANCEL_DELAY_SECONDS` | Delay before auto-cancel | `60` |
| `GNH_RATE_LIMIT_CANCEL_ENABLED` | Enable auto-cancellation | `false` |
Expand Down
2 changes: 1 addition & 1 deletion internal/jobs/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type ExecutorConfig struct {

func DefaultExecutorConfig() ExecutorConfig {
return ExecutorConfig{
MaxBlockingRetries: 3,
MaxBlockingRetries: envIntWithDefault("GNH_RATE_LIMIT_MAX_RETRIES", DefaultMaxBlockingRetries),
MaxTaskRetries: MaxTaskRetries,
BaseDelayMS: envIntWithDefault("GNH_RATE_LIMIT_BASE_DELAY_MS", 50),
MaxDelayMS: envIntWithDefault("GNH_RATE_LIMIT_MAX_DELAY_MS", 60000),
Expand Down
77 changes: 77 additions & 0 deletions internal/jobs/pacer_seed_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package jobs

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestSeedAdaptiveDelayMS(t *testing.T) {
t.Setenv("GNH_PACER_WARMUP_DELAY_MS", "2000")
t.Setenv("GNH_PACER_START_FLOOR_DELAY_MS", "500")

t.Run("never crawled uses warm-up", func(t *testing.T) {
assert.Equal(t, 2000, seedAdaptiveDelayMS(false, 0))
})

t.Run("learned zero is floored, not burst", func(t *testing.T) {
assert.Equal(t, 500, seedAdaptiveDelayMS(true, 0))
})

t.Run("learned below floor is raised to floor", func(t *testing.T) {
assert.Equal(t, 500, seedAdaptiveDelayMS(true, 200))
})

t.Run("learned at floor is unchanged", func(t *testing.T) {
assert.Equal(t, 500, seedAdaptiveDelayMS(true, 500))
})

t.Run("learned above floor is trusted as-is", func(t *testing.T) {
assert.Equal(t, 3000, seedAdaptiveDelayMS(true, 3000))
})
}

func TestSeedAdaptiveDelayMS_FloorDisabled(t *testing.T) {
t.Setenv("GNH_PACER_WARMUP_DELAY_MS", "2000")
t.Setenv("GNH_PACER_START_FLOOR_DELAY_MS", "0")

assert.Equal(t, 0, seedAdaptiveDelayMS(true, 0))
assert.Equal(t, 2000, seedAdaptiveDelayMS(false, 0))
}

func TestPacerStartFloorDelayMS(t *testing.T) {
t.Run("default when unset", func(t *testing.T) {
t.Setenv("GNH_PACER_START_FLOOR_DELAY_MS", "")
assert.Equal(t, defaultPacerStartFloorDelayMS, pacerStartFloorDelayMS())
})
t.Run("override when valid", func(t *testing.T) {
t.Setenv("GNH_PACER_START_FLOOR_DELAY_MS", "750")
assert.Equal(t, 750, pacerStartFloorDelayMS())
})
t.Run("default when invalid", func(t *testing.T) {
t.Setenv("GNH_PACER_START_FLOOR_DELAY_MS", "nope")
assert.Equal(t, defaultPacerStartFloorDelayMS, pacerStartFloorDelayMS())
})
}

func TestPacerWarmupDelayMS(t *testing.T) {
t.Run("default when unset", func(t *testing.T) {
t.Setenv("GNH_PACER_WARMUP_DELAY_MS", "")
assert.Equal(t, defaultPacerWarmupDelayMS, pacerWarmupDelayMS())
})
t.Run("default when invalid", func(t *testing.T) {
t.Setenv("GNH_PACER_WARMUP_DELAY_MS", "-")
assert.Equal(t, defaultPacerWarmupDelayMS, pacerWarmupDelayMS())
})
}

func TestDefaultExecutorConfig_MaxBlockingRetries(t *testing.T) {
t.Run("default", func(t *testing.T) {
t.Setenv("GNH_RATE_LIMIT_MAX_RETRIES", "")
assert.Equal(t, DefaultMaxBlockingRetries, DefaultExecutorConfig().MaxBlockingRetries)
})
t.Run("env override", func(t *testing.T) {
t.Setenv("GNH_RATE_LIMIT_MAX_RETRIES", "15")
assert.Equal(t, 15, DefaultExecutorConfig().MaxBlockingRetries)
})
}
38 changes: 31 additions & 7 deletions internal/jobs/stream_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ const jobInfoTTL = 5 * time.Minute

const defaultPacerWarmupDelayMS = 2000

// Floors job-start seed delay so a learned-zero domain ramps from ~3
// concurrent (at EstResponseMS=1500) instead of bursting full concurrency.
const defaultPacerStartFloorDelayMS = 500

func pacerWarmupDelayMS() int {
if v := strings.TrimSpace(os.Getenv("GNH_PACER_WARMUP_DELAY_MS")); v != "" {
if n, err := strconv.Atoi(v); err == nil && n >= 0 {
Expand All @@ -98,6 +102,32 @@ func pacerWarmupDelayMS() int {
return defaultPacerWarmupDelayMS
}

func pacerStartFloorDelayMS() int {
if v := strings.TrimSpace(os.Getenv("GNH_PACER_START_FLOOR_DELAY_MS")); v != "" {
if n, err := strconv.Atoi(v); err == nil && n >= 0 {
return n
}
jobsLog.Warn("invalid GNH_PACER_START_FLOOR_DELAY_MS; using default",
"value", v, "default", defaultPacerStartFloorDelayMS)
}
return defaultPacerStartFloorDelayMS
}

// Never-crawled domains seed the warm-up; crawled domains use their learned
// delay but never below the start floor, so none bursts on the first wave.
func seedAdaptiveDelayMS(learned bool, learnedMS int) int {
if !learned {
if warmup := pacerWarmupDelayMS(); warmup > 0 {
return warmup
}
return learnedMS
}
if floor := pacerStartFloorDelayMS(); floor > 0 && learnedMS < floor {
return floor
}
return learnedMS
}

type cachedJobInfo struct {
info *JobInfo
expiresAt time.Time
Expand Down Expand Up @@ -770,14 +800,8 @@ func (swp *StreamWorkerPool) fetchJobInfo(ctx context.Context, jobID string) (*J
// on every job-info cache miss is safe and cheap.
if swp.pacer != nil {
baseDelayMS := info.CrawlDelay * 1000
adaptiveDelayMS := info.AdaptiveDelay * 1000
floorMS := info.AdaptiveDelayFloor * 1000
// NULL means never crawled; a stored 0 is a learned value we trust.
if !adaptiveDelay.Valid {
if warmup := pacerWarmupDelayMS(); warmup > 0 {
adaptiveDelayMS = warmup
}
}
adaptiveDelayMS := seedAdaptiveDelayMS(adaptiveDelay.Valid, info.AdaptiveDelay*1000)
if seedErr := swp.pacer.Seed(ctx, info.DomainName, baseDelayMS, adaptiveDelayMS, floorMS); seedErr != nil {
jobsLog.Warn("pacer seed from postgres failed, continuing",
"error", seedErr, "domain", info.DomainName)
Expand Down
2 changes: 2 additions & 0 deletions internal/jobs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ const (
const (
TaskStaleTimeout = 3 * time.Minute
MaxTaskRetries = 5
// High so a persistently-throttling domain is deferred, not failed.
DefaultMaxBlockingRetries = 8
)

// CHECK: Do all of these currently get utilised somewhere in the app?
Expand Down
Loading