From 3cb89481c9fadb2d9dc426f3e970eadea006e0e9 Mon Sep 17 00:00:00 2001 From: Simon Smallchua <40650011+simonsmallchua@users.noreply.github.com> Date: Wed, 29 Apr 2026 06:24:34 +1000 Subject: [PATCH 1/8] Harden WAF pre-flight edge cases --- CHANGELOG.md | 12 +++++++- internal/crawler/probe.go | 6 +++- internal/crawler/probe_test.go | 26 +++++++++++++++++ internal/jobs/fail_job_message_test.go | 39 ++++++++++++++++++++++++++ internal/jobs/manager.go | 32 +++++++++++++++++++-- 5 files changed, 111 insertions(+), 4 deletions(-) create mode 100644 internal/jobs/fail_job_message_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index bc33aedf..ff55a9f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,7 +28,17 @@ On merge, CI will: ## [Unreleased] -_Add unreleased changes here._ +### Fixed + +- WAF pre-flight no longer strands jobs in `pending` if `BlockJob`'s DB write + fails — a fallback transition writes `failed` with an explanatory message so + the job always reaches a terminal state. +- `JobStatusBlocked` now triggers the same per-job in-process state cleanup + (`processedPages`, milestones) as the other terminal statuses; long-running + workers no longer leak map entries for blocked jobs. +- WAF probe scheme detection is now case-insensitive — `HTTPS://example.com` no + longer double-prefixes to `https://HTTPS://...` and silently skips the + verdict. ## Full changelog history diff --git a/internal/crawler/probe.go b/internal/crawler/probe.go index ed90fef5..8e1fa024 100644 --- a/internal/crawler/probe.go +++ b/internal/crawler/probe.go @@ -70,7 +70,11 @@ func Probe(ctx context.Context, domain string, userAgent string, transport http. func normaliseProbeTarget(domain string) string { d := strings.TrimSpace(domain) - if strings.HasPrefix(d, "http://") || strings.HasPrefix(d, "https://") { + // Scheme detection is case-insensitive — "HTTPS://example.com" + // otherwise double-prefixes to "https://HTTPS://example.com/" and + // the request build fails, silently skipping the WAF verdict. + dl := strings.ToLower(d) + if strings.HasPrefix(dl, "http://") || strings.HasPrefix(dl, "https://") { return strings.TrimSuffix(d, "/") + "/" } return "https://" + strings.TrimSuffix(d, "/") + "/" diff --git a/internal/crawler/probe_test.go b/internal/crawler/probe_test.go index efcafc5a..d46ee7b9 100644 --- a/internal/crawler/probe_test.go +++ b/internal/crawler/probe_test.go @@ -66,6 +66,32 @@ func TestProbe_BodyTruncation(t *testing.T) { } } +func TestNormaliseProbeTarget(t *testing.T) { + cases := []struct { + in string + want string + }{ + {"example.com", "https://example.com/"}, + {"example.com/", "https://example.com/"}, + {" example.com ", "https://example.com/"}, + {"https://example.com", "https://example.com/"}, + {"http://example.com/", "http://example.com/"}, + // Case-insensitive scheme detection — without it + // "HTTPS://example.com" double-prefixed to + // "https://HTTPS://example.com/" and silently failed. + {"HTTPS://example.com", "HTTPS://example.com/"}, + {"Http://example.com/", "Http://example.com/"}, + } + for _, tc := range cases { + t.Run(tc.in, func(t *testing.T) { + got := normaliseProbeTarget(tc.in) + if got != tc.want { + t.Errorf("normaliseProbeTarget(%q) = %q, want %q", tc.in, got, tc.want) + } + }) + } +} + // redirectingTransport rewrites probe requests (which target https:///) // onto the local httptest server, so we exercise the production // host-resolution code path while staying offline. diff --git a/internal/jobs/fail_job_message_test.go b/internal/jobs/fail_job_message_test.go new file mode 100644 index 00000000..c955c672 --- /dev/null +++ b/internal/jobs/fail_job_message_test.go @@ -0,0 +1,39 @@ +package jobs + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestFailJobWithMessage_StatusGuard asserts the fallback failJob +// transition only writes when the job is still in a pre-terminal +// status. Without the guard, a fallback after BlockJob's DB error +// could overwrite a freshly-completed job from a concurrent worker. +func TestFailJobWithMessage_StatusGuard(t *testing.T) { + mockDB, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp)) + require.NoError(t, err) + defer mockDB.Close() + + jm := &JobManager{ + db: mockDB, + dbQueue: &mockDbQueueWrapper{mockDB: mockDB}, + processedPages: make(map[string]struct{}), + } + + const jobID = "job-fallback-1" + + mock.ExpectBegin() + mock.ExpectExec(`UPDATE jobs\s+SET status = \$1, error_message = \$2, completed_at = \$3\s+WHERE id = \$4\s+AND status IN \(\$5, \$6, \$7, \$8\)`). + WithArgs(string(JobStatusFailed), "boom", sqlmock.AnyArg(), jobID, + string(JobStatusRunning), string(JobStatusPending), string(JobStatusPaused), string(JobStatusInitialising)). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectCommit() + + err = jm.failJobWithMessage(context.Background(), jobID, "boom") + require.NoError(t, err) + assert.NoError(t, mock.ExpectationsWereMet()) +} diff --git a/internal/jobs/manager.go b/internal/jobs/manager.go index cb2dfad5..948a835d 100644 --- a/internal/jobs/manager.go +++ b/internal/jobs/manager.go @@ -578,12 +578,40 @@ func (jm *JobManager) runWAFPreflight(ctx context.Context, job *Job, normalisedD "reason", det.Reason) if err := jm.BlockJob(ctx, job.ID, det.Vendor, det.Reason); err != nil { - jobsLog.Error("Failed to block job after WAF pre-flight detection", + jobsLog.Error("Failed to block job after WAF pre-flight detection; falling back to failed status", "error", err, "job_id", job.ID) + // Fail-safe: returning true after a failed BlockJob would + // strand the job in 'pending' with no tasks forever, because + // the caller skips discovery on the strength of our return + // value alone. Transition the job to failed via a separate + // path so the customer sees a terminal state either way. + if failErr := jm.failJobWithMessage(ctx, job.ID, "WAF detected but block transition failed: "+err.Error()); failErr != nil { + jobsLog.Error("Fallback failJob after BlockJob error also failed; allowing discovery to proceed", + "error", failErr, "job_id", job.ID) + return false + } } return true } +// failJobWithMessage transitions a job to JobStatusFailed with an +// explanatory message. Used as the fallback path when a more specific +// terminal transition (BlockJob) couldn't complete. The status guard +// keeps a concurrent terminal write safe — we never overwrite a +// completed/failed/cancelled/blocked row. +func (jm *JobManager) failJobWithMessage(ctx context.Context, jobID, message string) error { + return jm.dbQueue.Execute(ctx, func(tx *sql.Tx) error { + _, err := tx.ExecContext(ctx, ` + UPDATE jobs + SET status = $1, error_message = $2, completed_at = $3 + WHERE id = $4 + AND status IN ($5, $6, $7, $8) + `, JobStatusFailed, message, time.Now().UTC(), jobID, + JobStatusRunning, JobStatusPending, JobStatusPaused, JobStatusInitialising) + return err + }) +} + func (jm *JobManager) CreateJob(ctx context.Context, options *JobOptions) (*Job, error) { span := sentry.StartSpan(ctx, "manager.create_job") defer span.Finish() @@ -1302,7 +1330,7 @@ func (jm *JobManager) UpdateJobStatus(ctx context.Context, jobID string, status // Drop in-process state on terminal status so long-running workers don't accumulate per-job map entries. switch status { - case JobStatusCompleted, JobStatusFailed, JobStatusCancelled, JobStatusArchived: + case JobStatusCompleted, JobStatusFailed, JobStatusCancelled, JobStatusArchived, JobStatusBlocked: jm.clearProcessedPages(jobID) jm.clearMilestoneState(jobID) } From e3bbdb3e6631105ef36dfc248daa8f9d6985fdd4 Mon Sep 17 00:00:00 2001 From: Simon Smallchua <40650011+simonsmallchua@users.noreply.github.com> Date: Wed, 29 Apr 2026 06:52:06 +1000 Subject: [PATCH 2/8] Guard BlockJob race and async breaker --- CHANGELOG.md | 9 + internal/jobs/block_job_test.go | 59 +++++- internal/jobs/manager.go | 37 +++- internal/jobs/waf_circuit_breaker.go | 60 +++++-- .../jobs/waf_circuit_breaker_dispatch_test.go | 168 ++++++++++++++++++ 5 files changed, 316 insertions(+), 17 deletions(-) create mode 100644 internal/jobs/waf_circuit_breaker_dispatch_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index ff55a9f1..e76d7dc5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,15 @@ On merge, CI will: - WAF probe scheme detection is now case-insensitive — `HTTPS://example.com` no longer double-prefixes to `https://HTTPS://...` and silently skips the verdict. +- `BlockJob` now CAS-guards the terminal `UPDATE jobs` against a stale pre-read + status, so a freshly-completed/failed/cancelled job from a concurrent worker + is no longer overwritten with `blocked` (and the domain row no longer stamped + off a verdict that didn't actually land for that run). A lost race rolls the + whole transaction back and surfaces as nil success. +- The WAF mid-job circuit breaker now dispatches `BlockJob` in a detached + goroutine with a 30 s timeout, so the stream worker hot path can't stall on + terminal-state DB lock contention. On `BlockJob` failure the breaker re-arms + for the job, so a transient DB blip no longer permanently disables it. ## Full changelog history diff --git a/internal/jobs/block_job_test.go b/internal/jobs/block_job_test.go index d40a4294..4a663268 100644 --- a/internal/jobs/block_job_test.go +++ b/internal/jobs/block_job_test.go @@ -46,9 +46,13 @@ func TestBlockJob_LockOrder(t *testing.T) { WithArgs(jobID, string(TaskStatusSkipped), string(TaskStatusPending), string(TaskStatusWaiting)). WillReturnResult(sqlmock.NewResult(0, 5)) - // 2. Jobs second — error_message carries the WAF reason. - mock.ExpectExec(`UPDATE jobs\s+SET status = \$1, completed_at = \$2, error_message = \$3\s+WHERE id = \$4`). - WithArgs(string(JobStatusBlocked), sqlmock.AnyArg(), sqlmock.AnyArg(), jobID). + // 2. Jobs second — error_message carries the WAF reason. The + // status IN (...) clause is the CAS guard: we only write when + // the job is still in a pre-terminal state, so a concurrent + // completion can't be silently overwritten. + mock.ExpectExec(`UPDATE jobs\s+SET status = \$1, completed_at = \$2, error_message = \$3\s+WHERE id = \$4\s+AND status IN \(\$5, \$6, \$7, \$8\)`). + WithArgs(string(JobStatusBlocked), sqlmock.AnyArg(), sqlmock.AnyArg(), jobID, + string(JobStatusRunning), string(JobStatusPending), string(JobStatusPaused), string(JobStatusInitialising)). WillReturnResult(sqlmock.NewResult(0, 1)) // 3. Outbox cleanup. @@ -95,6 +99,55 @@ func TestBlockJob_AlreadyBlockedIsNoOp(t *testing.T) { assert.NoError(t, mock.ExpectationsWereMet()) } +// TestBlockJob_RaceLostReturnsNil simulates the GetJob-pre-read-then- +// concurrent-completion race: the pre-read sees `running`, the worker +// completes the job, then BlockJob's tx fires and the CAS WHERE clause +// matches zero rows. The whole tx must roll back (no outbox/domains +// writes) and BlockJob must return nil — surfacing an error here +// would be a red toast for a benign race. +func TestBlockJob_RaceLostReturnsNil(t *testing.T) { + mockDB, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp)) + require.NoError(t, err) + defer mockDB.Close() + + jm := &JobManager{ + db: mockDB, + dbQueue: &mockDbQueueWrapper{mockDB: mockDB}, + processedPages: make(map[string]struct{}), + } + + const jobID = "job-race-lost" + + mock.ExpectBegin() + mock.ExpectQuery(`SELECT[\s\S]+FROM jobs j[\s\S]+JOIN domains d`). + WithArgs(jobID). + WillReturnRows(jobRow(jobID, JobStatusRunning)) + mock.ExpectCommit() + + mock.ExpectBegin() + + // Tasks update can affect any number of rows — irrelevant once + // the CAS misses; the tx will roll back. + mock.ExpectExec(`(?s)WITH picked AS \(\s*SELECT id FROM tasks`). + WithArgs(jobID, string(TaskStatusSkipped), string(TaskStatusPending), string(TaskStatusWaiting)). + WillReturnResult(sqlmock.NewResult(0, 0)) + + // CAS UPDATE matches zero rows — the concurrent terminal write + // removed the job from the eligible status set. + mock.ExpectExec(`UPDATE jobs\s+SET status = \$1, completed_at = \$2, error_message = \$3\s+WHERE id = \$4\s+AND status IN`). + WithArgs(string(JobStatusBlocked), sqlmock.AnyArg(), sqlmock.AnyArg(), jobID, + string(JobStatusRunning), string(JobStatusPending), string(JobStatusPaused), string(JobStatusInitialising)). + WillReturnResult(sqlmock.NewResult(0, 0)) + + // No DELETE FROM task_outbox, no UPDATE domains — the tx must + // roll back as soon as the CAS RowsAffected check trips. + mock.ExpectRollback() + + err = jm.BlockJob(context.Background(), jobID, "akamai", "Server: AkamaiGHost on 403") + require.NoError(t, err, "race-lost must surface as nil success, not error") + assert.NoError(t, mock.ExpectationsWereMet()) +} + // TestBlockJob_TerminalStatusErrors asserts a block against a // completed/failed/cancelled job returns an error rather than // silently overwriting a finished result. diff --git a/internal/jobs/manager.go b/internal/jobs/manager.go index 948a835d..4966fb29 100644 --- a/internal/jobs/manager.go +++ b/internal/jobs/manager.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "encoding/json" + "errors" "fmt" "net/url" "os" @@ -24,6 +25,12 @@ import ( var jobsLog = logging.Component("jobs") +// errBlockJobRaceLost is the sentinel a BlockJob transaction returns +// when its CAS guard finds the job already in a terminal status set by +// a concurrent writer. It is not a real failure — callers up-stack +// translate it to nil success. +var errBlockJobRaceLost = errors.New("block_job: lost race to terminal transition") + type DbQueueProvider interface { Execute(ctx context.Context, fn func(*sql.Tx) error) error EnqueueURLs(ctx context.Context, jobID string, pages []db.Page, sourceType string, sourceURL string) error @@ -944,14 +951,30 @@ func (jm *JobManager) BlockJob(ctx context.Context, jobID string, vendor, reason return err } - _, err = tx.ExecContext(ctx, ` + // CAS guard: GetJob ran outside this tx, so another worker + // could have written a terminal state in between. Restrict the + // UPDATE to pre-terminal statuses and bail if zero rows match. + // Without this, a freshly-completed/failed/cancelled row would + // be silently overwritten with `blocked`, and worse, we'd + // stamp domains.waf_blocked off a verdict that didn't actually + // land for this run. + res, err := tx.ExecContext(ctx, ` UPDATE jobs SET status = $1, completed_at = $2, error_message = $3 WHERE id = $4 - `, job.Status, job.CompletedAt, errorMessage, job.ID) + AND status IN ($5, $6, $7, $8) + `, job.Status, job.CompletedAt, errorMessage, job.ID, + JobStatusRunning, JobStatusPending, JobStatusPaused, JobStatusInitialising) if err != nil { return err } + affected, err := res.RowsAffected() + if err != nil { + return err + } + if affected != 1 { + return errBlockJobRaceLost + } _, err = tx.ExecContext(ctx, ` DELETE FROM task_outbox WHERE job_id = $1 @@ -972,6 +995,16 @@ func (jm *JobManager) BlockJob(ctx context.Context, jobID string, vendor, reason return err }) + // errBlockJobRaceLost is not a real failure — a concurrent writer + // reached terminal first. The whole transaction rolled back, so no + // stale state landed; report success-equivalent so callers don't + // surface a red error to the customer. + if errors.Is(err, errBlockJobRaceLost) { + jobsLog.Info("BlockJob lost race to another terminal transition; treating as no-op", + "job_id", job.ID, "domain", job.Domain) + return nil + } + if err != nil { span.SetTag("error", "true") span.SetData("error.message", err.Error()) diff --git a/internal/jobs/waf_circuit_breaker.go b/internal/jobs/waf_circuit_breaker.go index 2edf642c..9da39f4e 100644 --- a/internal/jobs/waf_circuit_breaker.go +++ b/internal/jobs/waf_circuit_breaker.go @@ -6,10 +6,17 @@ import ( "strconv" "strings" "sync" + "time" "github.com/Harvey-AU/hover/internal/crawler" ) +// blockJobDispatchTimeout bounds the detached terminal-state write +// fired when the breaker trips. The stream worker hot path must not +// stall on lock contention; if BlockJob can't land in this budget the +// caller logs and re-arms so a subsequent WAF response retries. +const blockJobDispatchTimeout = 30 * time.Second + // defaultWAFCircuitBreakerThreshold is the consecutive-WAF-response // count that trips the breaker mid-job. Tuned conservatively: once we // see three responses in a row carrying recognised WAF fingerprints we @@ -102,6 +109,20 @@ func (b *WAFCircuitBreaker) Forget(jobID string) { delete(b.tripped, jobID) } +// Rearm clears only the single-fire tripped flag for a job, leaving +// the consecutive-WAF counter at zero. Called when the dispatched +// BlockJob couldn't land — without this, a transient DB error during +// the trip would permanently disable the breaker for the job and +// every subsequent WAF response would slip through unchecked. +func (b *WAFCircuitBreaker) Rearm(jobID string) { + if b == nil || jobID == "" { + return + } + b.mu.Lock() + defer b.mu.Unlock() + delete(b.tripped, jobID) +} + // Threshold exposes the configured trip count for telemetry/logging. func (b *WAFCircuitBreaker) Threshold() int { if b == nil { @@ -112,9 +133,14 @@ func (b *WAFCircuitBreaker) Threshold() int { // MaybeTripFromOutcome is the convenience wrapper used from the stream // worker hot path. It pulls the WAF detection from the outcome, calls -// Observe, and on a trip dispatches BlockJob in a fresh detached -// context (the caller's context is the per-task one and may be on its -// way out). +// Observe, and on a trip dispatches BlockJob in a detached goroutine +// with a bounded timeout. If the dispatch fails the breaker is +// re-armed for that job so a subsequent WAF response can retry. +// +// The dispatch is asynchronous so the stream worker isn't held up by +// terminal-state DB lock contention — the per-task ACK / counter +// decrement / batch enqueue must not stall behind a multi-statement +// terminal transaction. func (b *WAFCircuitBreaker) MaybeTripFromOutcome(ctx context.Context, jm JobManagerInterface, outcome *TaskOutcome) { if b == nil || jm == nil || outcome == nil || outcome.Task == nil { return @@ -128,17 +154,27 @@ func (b *WAFCircuitBreaker) MaybeTripFromOutcome(ctx context.Context, jm JobMana return } - jobsLog.Info("WAF circuit breaker tripped", - "job_id", outcome.Task.JobID, + jobID := outcome.Task.JobID + jobsLog.Info("WAF circuit breaker tripped; dispatching BlockJob", + "job_id", jobID, "threshold", b.Threshold(), "vendor", vendor.Vendor, "reason", vendor.Reason) - // Detach so the per-task ctx cancellation doesn't truncate the - // terminal-state writes. - bgCtx := context.WithoutCancel(ctx) - if err := jm.BlockJob(bgCtx, outcome.Task.JobID, vendor.Vendor, "circuit breaker: "+vendor.Reason); err != nil { - jobsLog.Error("BlockJob from circuit breaker failed", - "error", err, "job_id", outcome.Task.JobID) - } + // Detach from the per-task ctx so the caller's cancellation + // doesn't truncate the terminal-state writes, then bound the + // dispatch so a wedged DB doesn't pin a goroutine forever. + parentCtx := context.WithoutCancel(ctx) + go func() { + dispatchCtx, cancel := context.WithTimeout(parentCtx, blockJobDispatchTimeout) + defer cancel() + if err := jm.BlockJob(dispatchCtx, jobID, vendor.Vendor, "circuit breaker: "+vendor.Reason); err != nil { + jobsLog.Error("BlockJob from circuit breaker failed; re-arming for retry", + "error", err, "job_id", jobID) + // Re-arm so the next WAF response for this job can trip + // again — without this a transient DB blip would silently + // permanently disable the breaker for the job. + b.Rearm(jobID) + } + }() } diff --git a/internal/jobs/waf_circuit_breaker_dispatch_test.go b/internal/jobs/waf_circuit_breaker_dispatch_test.go new file mode 100644 index 00000000..bf333640 --- /dev/null +++ b/internal/jobs/waf_circuit_breaker_dispatch_test.go @@ -0,0 +1,168 @@ +package jobs + +import ( + "context" + "errors" + "sync/atomic" + "testing" + "time" + + "github.com/Harvey-AU/hover/internal/crawler" + "github.com/Harvey-AU/hover/internal/db" +) + +// stubJobManager is a minimal JobManagerInterface impl whose BlockJob +// behaviour can be steered from a test. Other methods are stubbed +// because the breaker only calls BlockJob. +type stubJobManager struct { + blockJobErr error + blockJobBlock chan struct{} + calls atomic.Int32 +} + +func (s *stubJobManager) CreateJob(context.Context, *JobOptions) (*Job, error) { return nil, nil } +func (s *stubJobManager) CancelJob(context.Context, string) error { return nil } +func (s *stubJobManager) BlockJob(_ context.Context, _ string, _, _ string) error { + s.calls.Add(1) + if s.blockJobBlock != nil { + <-s.blockJobBlock + } + return s.blockJobErr +} +func (s *stubJobManager) GetJobStatus(context.Context, string) (*Job, error) { return nil, nil } +func (s *stubJobManager) GetJob(context.Context, string) (*Job, error) { return nil, nil } +func (s *stubJobManager) EnqueueJobURLs(context.Context, string, []db.Page, string, string) error { + return nil +} +func (s *stubJobManager) IsJobComplete(*Job) bool { return false } +func (s *stubJobManager) CalculateJobProgress(*Job) float64 { return 0 } +func (s *stubJobManager) ValidateStatusTransition(_, _ JobStatus) error { + return nil +} +func (s *stubJobManager) UpdateJobStatus(context.Context, string, JobStatus) error { return nil } +func (s *stubJobManager) MarkJobRunning(context.Context, string) error { return nil } +func (s *stubJobManager) GetRobotsRules(context.Context, string) (*crawler.RobotsRules, error) { + return nil, nil +} + +// TestMaybeTripFromOutcome_AsyncDispatch verifies the stream worker +// hot path isn't held up by BlockJob's terminal-state DB write. We +// hold BlockJob inside the stub via a channel; MaybeTripFromOutcome +// must return promptly anyway. +func TestMaybeTripFromOutcome_AsyncDispatch(t *testing.T) { + b := NewWAFCircuitBreaker() + b.threshold = 1 // trip immediately + + hold := make(chan struct{}) + jm := &stubJobManager{blockJobBlock: hold} + + outcome := &TaskOutcome{ + Task: &db.Task{ID: "t1", JobID: "job-async"}, + CrawlResult: &crawler.CrawlResult{ + WAF: &crawler.WAFDetection{Blocked: true, Vendor: "akamai", Reason: "Server: AkamaiGHost on 403"}, + }, + } + + done := make(chan struct{}) + go func() { + b.MaybeTripFromOutcome(context.Background(), jm, outcome) + close(done) + }() + + select { + case <-done: + // Good — returned without waiting on BlockJob. + case <-time.After(500 * time.Millisecond): + close(hold) // unblock the dispatched goroutine before failing + t.Fatalf("MaybeTripFromOutcome did not return promptly; the dispatch is not async") + } + + close(hold) // let BlockJob complete + + // Wait briefly for BlockJob to be called (still in the goroutine). + deadline := time.Now().Add(time.Second) + for time.Now().Before(deadline) { + if jm.calls.Load() == 1 { + return + } + time.Sleep(5 * time.Millisecond) + } + t.Fatalf("BlockJob was never called by the dispatch goroutine") +} + +// TestMaybeTripFromOutcome_RearmAfterFailure verifies that if BlockJob +// returns an error, a subsequent WAF observation for the same job can +// trip again. Without re-arm, a transient DB error would permanently +// disable the breaker for that job. +func TestMaybeTripFromOutcome_RearmAfterFailure(t *testing.T) { + b := NewWAFCircuitBreaker() + b.threshold = 1 // trip immediately + + jm := &stubJobManager{blockJobErr: errors.New("simulated DB blip")} + + outcome := &TaskOutcome{ + Task: &db.Task{ID: "t1", JobID: "job-rearm"}, + CrawlResult: &crawler.CrawlResult{ + WAF: &crawler.WAFDetection{Blocked: true, Vendor: "akamai", Reason: "AkamaiGHost"}, + }, + } + + b.MaybeTripFromOutcome(context.Background(), jm, outcome) + + // Wait for the goroutine to have called BlockJob and re-armed. + deadline := time.Now().Add(time.Second) + for time.Now().Before(deadline) { + if jm.calls.Load() == 1 { + break + } + time.Sleep(5 * time.Millisecond) + } + if jm.calls.Load() != 1 { + t.Fatalf("first BlockJob never called") + } + + // Give the goroutine a moment to re-arm after the err return. + time.Sleep(50 * time.Millisecond) + + // Second observation: must trip again because the breaker was + // re-armed. We use Observe directly so the test doesn't depend on + // goroutine ordering for the second dispatch. + tripped, _ := b.Observe("job-rearm", outcome.CrawlResult.WAF) + if !tripped { + t.Fatalf("breaker did not re-arm: second observation should have tripped") + } +} + +// TestMaybeTripFromOutcome_NoRearmOnSuccess asserts that a successful +// BlockJob does NOT re-arm the breaker. After a successful trip the +// job is terminal and any further observations should be ignored. +func TestMaybeTripFromOutcome_NoRearmOnSuccess(t *testing.T) { + b := NewWAFCircuitBreaker() + b.threshold = 1 + + jm := &stubJobManager{} // BlockJob returns nil + + outcome := &TaskOutcome{ + Task: &db.Task{ID: "t1", JobID: "job-once"}, + CrawlResult: &crawler.CrawlResult{ + WAF: &crawler.WAFDetection{Blocked: true, Vendor: "akamai"}, + }, + } + + b.MaybeTripFromOutcome(context.Background(), jm, outcome) + + deadline := time.Now().Add(time.Second) + for time.Now().Before(deadline) { + if jm.calls.Load() == 1 { + break + } + time.Sleep(5 * time.Millisecond) + } + time.Sleep(50 * time.Millisecond) // settle + + // Second observation must NOT trip — the job stays single-fired. + tripped, _ := b.Observe("job-once", outcome.CrawlResult.WAF) + if tripped { + t.Fatalf("breaker re-armed after a successful trip; expected single-fire") + } +} From 970ea58b1dce78b5daf011fd518ce3b8372f64ca Mon Sep 17 00:00:00 2001 From: Simon Smallchua <40650011+simonsmallchua@users.noreply.github.com> Date: Wed, 29 Apr 2026 06:59:51 +1000 Subject: [PATCH 3/8] Drop raw err from customer message --- internal/jobs/manager.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/internal/jobs/manager.go b/internal/jobs/manager.go index 4966fb29..fcf91493 100644 --- a/internal/jobs/manager.go +++ b/internal/jobs/manager.go @@ -586,13 +586,18 @@ func (jm *JobManager) runWAFPreflight(ctx context.Context, job *Job, normalisedD if err := jm.BlockJob(ctx, job.ID, det.Vendor, det.Reason); err != nil { jobsLog.Error("Failed to block job after WAF pre-flight detection; falling back to failed status", - "error", err, "job_id", job.ID) + "error", err, "job_id", job.ID, "domain", normalisedDomain, + "vendor", det.Vendor, "reason", det.Reason) // Fail-safe: returning true after a failed BlockJob would // strand the job in 'pending' with no tasks forever, because // the caller skips discovery on the strength of our return // value alone. Transition the job to failed via a separate // path so the customer sees a terminal state either way. - if failErr := jm.failJobWithMessage(ctx, job.ID, "WAF detected but block transition failed: "+err.Error()); failErr != nil { + // The customer-facing error_message stays stable; the raw + // underlying error is captured in the structured log above + // (with vendor/reason/domain context) for ops debugging. + const wafFallbackMsg = "WAF detected but block transition failed" + if failErr := jm.failJobWithMessage(ctx, job.ID, wafFallbackMsg); failErr != nil { jobsLog.Error("Fallback failJob after BlockJob error also failed; allowing discovery to proceed", "error", failErr, "job_id", job.ID) return false From a6799b9448335146d0d1f2956637cfb01295e755 Mon Sep 17 00:00:00 2001 From: Simon Smallchua <40650011+simonsmallchua@users.noreply.github.com> Date: Wed, 29 Apr 2026 07:50:25 +1000 Subject: [PATCH 4/8] Stop enqueueing against terminal jobs --- CHANGELOG.md | 9 ++ internal/db/queue.go | 45 ++++++++- internal/db/queue_test.go | 33 +++++++ internal/jobs/manager.go | 36 +++++++ internal/jobs/sitemap_terminal_guard_test.go | 98 ++++++++++++++++++++ 5 files changed, 218 insertions(+), 3 deletions(-) create mode 100644 internal/jobs/sitemap_terminal_guard_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index e76d7dc5..796976b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,6 +48,15 @@ On merge, CI will: goroutine with a 30 s timeout, so the stream worker hot path can't stall on terminal-state DB lock contention. On `BlockJob` failure the breaker re-arms for the job, so a transient DB blip no longer permanently disables it. +- `EnqueueURLs` now short-circuits under its existing `FOR UPDATE OF j` row lock + when the target job is in a terminal status (blocked, cancelled, failed, + completed, archived). Without this, sitemap discovery and link extraction kept + inserting orphan tasks for jobs that had already transitioned terminal + mid-flight — kmart.com.au-class jobs were accreting 32k+ pending rows after + the circuit breaker had already fired. The sitemap-discovery loop additionally + reads job status between batches as a cheap pre-flight, so terminal jobs stop + parsing remaining batches instead of round-tripping to the DB just to be + rejected. ## Full changelog history diff --git a/internal/db/queue.go b/internal/db/queue.go index 396d558e..06a2a873 100644 --- a/internal/db/queue.go +++ b/internal/db/queue.go @@ -888,6 +888,26 @@ type enqueueJobConfig struct { orgID sql.NullString quotaRemaining sql.NullInt64 currentTaskCount int + status string +} + +// terminalJobStatuses are job statuses past which no further task +// inserts should land. The set mirrors the application-side +// JobStatusCompleted/Failed/Cancelled/Archived/Blocked values. +// Defined as a slice so the SQL ANY($N) form stays readable. +var terminalJobStatuses = []string{"completed", "failed", "cancelled", "archived", "blocked"} + +// IsTerminalJobStatus reports whether the supplied status string is a +// terminal job state past which task enqueueing should be a no-op. +// Exposed for use in callers that hold a status they read separately +// (e.g. the sitemap discovery loop). +func IsTerminalJobStatus(status string) bool { + for _, t := range terminalJobStatuses { + if status == t { + return true + } + } + return false } // deduplicatePages removes duplicate pages, keeping highest priority for each page ID @@ -1009,7 +1029,14 @@ func (q *DbQueue) EnqueueURLs(ctx context.Context, jobID string, pages []Page, s return uniquePages[i].ID < uniquePages[j].ID }) - // Get job's max_pages, concurrency, domain, org, and current task counts. + // Get job's max_pages, concurrency, domain, org, status, and current + // task counts. Reading j.status under the same FOR UPDATE OF j as + // the INSERT below makes the terminal-state guard race-free against + // a concurrent BlockJob/CancelJob: either we acquire the lock + // first and see the pre-terminal status (the terminal write blocks + // until our tx commits, then it sees zero matching pending rows), + // or it commits first and we see the terminal status here and + // short-circuit without inserting orphan tasks. // total_tasks - skipped_tasks is maintained incrementally by triggers and // avoids the correlated COUNT(*) subquery that previously ran under the job lock. err := tx.QueryRowContext(ctx, ` @@ -1019,17 +1046,29 @@ func (q *DbQueue) EnqueueURLs(ctx context.Context, jobID string, pages []Page, s CASE WHEN j.organisation_id IS NOT NULL THEN get_daily_quota_remaining(j.organisation_id) ELSE NULL - END + END, + j.status FROM jobs j LEFT JOIN domains d ON j.domain_id = d.id WHERE j.id = $1 FOR UPDATE OF j `, jobID).Scan(&cfg.maxPages, &cfg.concurrency, &cfg.runningTasks, &cfg.pendingTaskCount, - &cfg.domainID, &cfg.domainName, &cfg.currentTaskCount, &cfg.orgID, &cfg.quotaRemaining) + &cfg.domainID, &cfg.domainName, &cfg.currentTaskCount, &cfg.orgID, &cfg.quotaRemaining, + &cfg.status) if err != nil { return fmt.Errorf("failed to get job configuration and task count: %w", err) } + // Terminal-status guard: skip the entire INSERT path so a sitemap + // loop or link-discovery callback that's still running after a + // BlockJob/CancelJob doesn't accrete orphan tasks against the + // terminal job. Issue #365 row 1: kmart.com.au saw 32k+ rows + // land in tasks for a job that had already transitioned to + // `blocked` mid-discovery. + if IsTerminalJobStatus(cfg.status) { + return nil + } + // Calculate available slots with concurrency override and quota limits effectiveConcurrency := q.calculateEffectiveConcurrency(jobID, cfg.concurrency, cfg.domainName) concurrencySlots, _ := calculateAvailableSlots(effectiveConcurrency, cfg.runningTasks, cfg.pendingTaskCount, sql.NullInt64{}) diff --git a/internal/db/queue_test.go b/internal/db/queue_test.go index 9918135c..7c59e5d1 100644 --- a/internal/db/queue_test.go +++ b/internal/db/queue_test.go @@ -2,6 +2,39 @@ package db import "testing" +func TestIsTerminalJobStatus(t *testing.T) { + cases := []struct { + status string + want bool + }{ + // Terminal statuses — EnqueueURLs short-circuits for these. + {"completed", true}, + {"failed", true}, + {"cancelled", true}, + {"archived", true}, + {"blocked", true}, + + // Non-terminal — task inserts proceed. + {"pending", false}, + {"running", false}, + {"initializing", false}, + {"paused", false}, + + // Unknown / empty — must not be treated as terminal so a typo in + // the DB doesn't silently disable enqueueing. + {"", false}, + {"unknown", false}, + {"BLOCKED", false}, // case-sensitive: DB uses lowercase + } + for _, tc := range cases { + t.Run(tc.status, func(t *testing.T) { + if got := IsTerminalJobStatus(tc.status); got != tc.want { + t.Errorf("IsTerminalJobStatus(%q) = %v, want %v", tc.status, got, tc.want) + } + }) + } +} + func TestClassifyEnqueuedTaskDropsOverflowAtMaxPages(t *testing.T) { disposition := classifyEnqueuedTask(10, 10, 0, 0, 3) if disposition != enqueueTaskDrop { diff --git a/internal/jobs/manager.go b/internal/jobs/manager.go index fcf91493..2cbca4f7 100644 --- a/internal/jobs/manager.go +++ b/internal/jobs/manager.go @@ -606,6 +606,27 @@ func (jm *JobManager) runWAFPreflight(ctx context.Context, job *Job, normalisedD return true } +// isJobInTerminalStatus reports whether a job's current row status is +// one the discovery / link-extraction paths must stop adding tasks +// for. Used between sitemap batches as a cheap pre-flight before each +// EnqueueJobURLs round-trip; the DB-side guard in +// dbQueue.EnqueueURLs is the race-free safety net. +// +// Read errors are treated as "not terminal" — a transient query +// failure must not silently abort a healthy crawl. +func (jm *JobManager) isJobInTerminalStatus(ctx context.Context, jobID string) bool { + var status string + err := jm.dbQueue.Execute(ctx, func(tx *sql.Tx) error { + return tx.QueryRowContext(ctx, `SELECT status FROM jobs WHERE id = $1`, jobID).Scan(&status) + }) + if err != nil { + jobsLog.Warn("Could not read job status during terminal-state check; continuing", + "error", err, "job_id", jobID) + return false + } + return db.IsTerminalJobStatus(status) +} + // failJobWithMessage transitions a job to JobStatusFailed with an // explanatory message. Used as the fallback path when a more specific // terminal transition (BlockJob) couldn't complete. The status guard @@ -1513,6 +1534,21 @@ func (jm *JobManager) processSitemap(ctx context.Context, jobID, domain string, batch := urls[i:end] batchNum := (i / batchSize) + 1 + // Cheap status read between batches: a concurrent BlockJob + // (pre-flight or circuit breaker) or CancelJob may have + // flipped the job terminal mid-discovery. The DB guard in + // dbQueue.EnqueueURLs is the load-bearing safety net, but + // stopping here saves the per-batch sitemap parsing + DB + // round-trip that would otherwise be wasted work for every + // remaining batch (kmart.com.au-class jobs have hundreds). + if jm.isJobInTerminalStatus(ctx, jobID) { + jobsLog.Info("Sitemap discovery aborting: job reached terminal status mid-loop", + "job_id", jobID, "batch_number", batchNum, + "batches_remaining", totalBatches-batchNum+1, + "urls_remaining", len(urls)-i) + return + } + select { case jm.sitemapSem <- struct{}{}: case <-ctx.Done(): diff --git a/internal/jobs/sitemap_terminal_guard_test.go b/internal/jobs/sitemap_terminal_guard_test.go new file mode 100644 index 00000000..994059b0 --- /dev/null +++ b/internal/jobs/sitemap_terminal_guard_test.go @@ -0,0 +1,98 @@ +package jobs + +import ( + "context" + "errors" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestIsJobInTerminalStatus_Terminal asserts the cheap mid-loop status +// check returns true for every terminal status. Used by processSitemap +// between batches to short-circuit work for jobs already transitioned +// to blocked/cancelled/etc by a concurrent BlockJob/CancelJob. +func TestIsJobInTerminalStatus_Terminal(t *testing.T) { + for _, status := range []string{"blocked", "cancelled", "failed", "completed", "archived"} { + t.Run(status, func(t *testing.T) { + mockDB, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp)) + require.NoError(t, err) + defer mockDB.Close() + + jm := &JobManager{ + db: mockDB, + dbQueue: &mockDbQueueWrapper{mockDB: mockDB}, + processedPages: make(map[string]struct{}), + } + + mock.ExpectBegin() + mock.ExpectQuery(`SELECT status FROM jobs WHERE id = \$1`). + WithArgs("job-x"). + WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow(status)) + mock.ExpectCommit() + + if !jm.isJobInTerminalStatus(context.Background(), "job-x") { + t.Errorf("status %q must be reported as terminal", status) + } + assert.NoError(t, mock.ExpectationsWereMet()) + }) + } +} + +// TestIsJobInTerminalStatus_NotTerminal asserts the active statuses +// the sitemap loop legitimately encounters mid-discovery do not abort +// the loop. +func TestIsJobInTerminalStatus_NotTerminal(t *testing.T) { + for _, status := range []string{"pending", "running", "initializing", "paused"} { + t.Run(status, func(t *testing.T) { + mockDB, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp)) + require.NoError(t, err) + defer mockDB.Close() + + jm := &JobManager{ + db: mockDB, + dbQueue: &mockDbQueueWrapper{mockDB: mockDB}, + processedPages: make(map[string]struct{}), + } + + mock.ExpectBegin() + mock.ExpectQuery(`SELECT status FROM jobs WHERE id = \$1`). + WithArgs("job-x"). + WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow(status)) + mock.ExpectCommit() + + if jm.isJobInTerminalStatus(context.Background(), "job-x") { + t.Errorf("status %q must NOT be reported as terminal", status) + } + assert.NoError(t, mock.ExpectationsWereMet()) + }) + } +} + +// TestIsJobInTerminalStatus_QueryErrorContinues asserts a transient +// query failure does not silently abort a healthy crawl: the function +// returns false (not terminal) so the sitemap loop keeps going. +func TestIsJobInTerminalStatus_QueryErrorContinues(t *testing.T) { + mockDB, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp)) + require.NoError(t, err) + defer mockDB.Close() + + jm := &JobManager{ + db: mockDB, + dbQueue: &mockDbQueueWrapper{mockDB: mockDB}, + processedPages: make(map[string]struct{}), + } + + mock.ExpectBegin() + mock.ExpectQuery(`SELECT status FROM jobs WHERE id = \$1`). + WithArgs("job-x"). + WillReturnError(errors.New("transient DB blip")) + mock.ExpectRollback() + + if jm.isJobInTerminalStatus(context.Background(), "job-x") { + t.Errorf("DB error must not surface as terminal — false-positive would stall a healthy crawl") + } + assert.NoError(t, mock.ExpectationsWereMet()) +} From cdd37826bf22b0835f31eb20c2bcc09b785d5177 Mon Sep 17 00:00:00 2001 From: Simon Smallchua <40650011+simonsmallchua@users.noreply.github.com> Date: Wed, 29 Apr 2026 07:58:44 +1000 Subject: [PATCH 5/8] Detect Akamai BM cookies, breaker N=2 --- CHANGELOG.md | 14 +++++++++ internal/crawler/waf.go | 25 ++++++++++++++- internal/crawler/waf_test.go | 47 ++++++++++++++++++++++++++++ internal/jobs/waf_circuit_breaker.go | 12 ++++--- 4 files changed, 93 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 796976b8..7d521567 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -58,6 +58,20 @@ On merge, CI will: parsing remaining batches instead of round-tripping to the DB just to be rejected. +### Changed + +- WAF detector now recognises Akamai Bot Manager `_abck` and `bm_sz` cookies on + blocking status codes (403/202) as Akamai signals. Catches BM-fronted sites + that don't emit `Server: AkamaiGHost` or `akaalb_*` cookies (e.g. + kmart.com.au) and gives the mid-job circuit breaker `vendor=akamai` + attribution instead of falling through to `generic`. Cookies on a 200 response + are explicitly NOT treated as a block — many sites run BM in monitor mode + without ever blocking. +- WAF mid-job circuit breaker default threshold lowered from 3 → 2 consecutive + WAF responses. Trips ~33% earlier, capping orphan-task accumulation when a + large sitemap is mid-discovery. Override via + `GNH_WAF_CIRCUIT_BREAKER_THRESHOLD`. + ## Full changelog history ## [0.33.13] – 2026-04-28 diff --git a/internal/crawler/waf.go b/internal/crawler/waf.go index 4a96ea50..9bda26da 100644 --- a/internal/crawler/waf.go +++ b/internal/crawler/waf.go @@ -84,13 +84,36 @@ func DetectWAF(statusCode int, headers http.Header, bodySample []byte) WAFDetect if blocking { for _, c := range headers.Values("Set-Cookie") { - if strings.Contains(strings.ToLower(c), "akaalb_") { + cl := strings.ToLower(c) + if strings.Contains(cl, "akaalb_") { return WAFDetection{ Blocked: true, Vendor: WAFVendorAkamai, Reason: "akaalb_ cookie on " + statusLabel(statusCode), } } + // _abck and bm_sz are Akamai Bot Manager cookies (sensor + + // session). They appear on every BM-fronted response, so by + // themselves they're not a block marker — many sites run BM + // in monitor mode without ever blocking. Combined with a + // blocking status code (403/202), they're unambiguous BM + // walls and give better attribution than the generic + // tiny-body fallback (kmart.com.au-class sites that don't + // emit Server: AkamaiGHost or akaalb_*). + if strings.Contains(cl, "_abck=") || strings.HasPrefix(cl, "_abck=") { + return WAFDetection{ + Blocked: true, + Vendor: WAFVendorAkamai, + Reason: "_abck (Bot Manager session) cookie on " + statusLabel(statusCode), + } + } + if strings.Contains(cl, "bm_sz=") || strings.HasPrefix(cl, "bm_sz=") { + return WAFDetection{ + Blocked: true, + Vendor: WAFVendorAkamai, + Reason: "bm_sz (Bot Manager sensor) cookie on " + statusLabel(statusCode), + } + } } for _, st := range headers.Values("Server-Timing") { if strings.Contains(strings.ToLower(st), "ak_p;") { diff --git a/internal/crawler/waf_test.go b/internal/crawler/waf_test.go index 4ec75a85..b2b3e64f 100644 --- a/internal/crawler/waf_test.go +++ b/internal/crawler/waf_test.go @@ -43,6 +43,53 @@ func TestDetectWAF(t *testing.T) { wantVendor: WAFVendorAkamai, reasonPrefix: "akaalb_ cookie", }, + { + // kmart.com.au-class: deeper page returns 403 with Akamai + // Bot Manager session cookie. Without this signal the + // detector would fall through to "generic tiny body". + name: "akamai BM — _abck cookie on 403", + status: http.StatusForbidden, + headers: http.Header{ + "Set-Cookie": []string{"_abck=06F95C1AA35B5110~-1~YAAQjyw...~-1~-1~-1~-1~-1; Domain=.example.com; Path=/"}, + }, + body: []byte("blocked"), + wantBlocked: true, + wantVendor: WAFVendorAkamai, + reasonPrefix: "_abck", + }, + { + name: "akamai BM — bm_sz cookie on 403", + status: http.StatusForbidden, + headers: http.Header{ + "Set-Cookie": []string{"bm_sz=E18A0D7D1B94A12A~YAAQjyw...; Domain=.example.com; Path=/"}, + }, + body: []byte("blocked"), + wantBlocked: true, + wantVendor: WAFVendorAkamai, + reasonPrefix: "bm_sz", + }, + { + // Critical negative test: Akamai BM cookies appear on every + // BM-fronted response, including legitimate 200s in monitor + // mode. Detector must NOT flag these — would over-trigger + // massively (every kmart.com.au homepage hit would block). + name: "akamai BM — _abck on 200 must NOT trip (monitor mode)", + status: http.StatusOK, + headers: http.Header{ + "Set-Cookie": []string{"_abck=06F95C1AA35B5110~-1~YAAQjyw...~-1~-1~-1~-1~-1; Domain=.example.com; Path=/"}, + }, + body: []byte("real content from monitor-mode site"), + wantBlocked: false, + }, + { + name: "akamai BM — bm_sz on 200 must NOT trip (monitor mode)", + status: http.StatusOK, + headers: http.Header{ + "Set-Cookie": []string{"bm_sz=E18A0D7D~YAAQjyw...; Domain=.example.com; Path=/"}, + }, + body: []byte("real content"), + wantBlocked: false, + }, { name: "akamai — Server-Timing ak_p on 403", status: http.StatusForbidden, diff --git a/internal/jobs/waf_circuit_breaker.go b/internal/jobs/waf_circuit_breaker.go index 9da39f4e..43c75bee 100644 --- a/internal/jobs/waf_circuit_breaker.go +++ b/internal/jobs/waf_circuit_breaker.go @@ -18,10 +18,14 @@ import ( const blockJobDispatchTimeout = 30 * time.Second // defaultWAFCircuitBreakerThreshold is the consecutive-WAF-response -// count that trips the breaker mid-job. Tuned conservatively: once we -// see three responses in a row carrying recognised WAF fingerprints we -// have very high confidence the domain has flipped to blocking us. -const defaultWAFCircuitBreakerThreshold = 3 +// count that trips the breaker mid-job. Lowered from 3 to 2 after +// kmart.com.au-class observations: by the time three tasks have +// returned WAF fingerprints, the sitemap discovery loop has typically +// inserted thousands of URLs that all need to be skipped. Two +// consecutive WAF-flagged responses is still high-confidence (random +// transient 403s rarely cluster) and trips ~33% earlier, capping the +// orphan-task accumulation. Override via GNH_WAF_CIRCUIT_BREAKER_THRESHOLD. +const defaultWAFCircuitBreakerThreshold = 2 // WAFCircuitBreaker tracks per-job runs of consecutive WAF-flagged // responses and trips a callback once the threshold is reached. The From 50e6ab33be6c038da9b6398d6b4dfea68094395e Mon Sep 17 00:00:00 2001 From: "coderabbitai[bot]" <136622811+coderabbitai[bot]@users.noreply.github.com> Date: Tue, 28 Apr 2026 22:15:06 +0000 Subject: [PATCH 6/8] =?UTF-8?q?=F0=9F=93=9D=20Add=20docstrings=20to=20`wor?= =?UTF-8?q?k/waf-followups-a`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Docstrings generation was requested by @simonsmallchua. The following files were modified: * `internal/crawler/probe.go` * `internal/crawler/waf.go` * `internal/db/queue.go` These files were ignored: * `internal/crawler/probe_test.go` * `internal/crawler/waf_test.go` * `internal/db/queue_test.go` * `internal/jobs/block_job_test.go` * `internal/jobs/fail_job_message_test.go` * `internal/jobs/sitemap_terminal_guard_test.go` * `internal/jobs/waf_circuit_breaker_dispatch_test.go` These file types are not supported: * `CHANGELOG.md` --- internal/crawler/probe.go | 4 ++++ internal/crawler/waf.go | 12 +++++++++++- internal/db/queue.go | 3 ++- 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/internal/crawler/probe.go b/internal/crawler/probe.go index 8e1fa024..07293b2a 100644 --- a/internal/crawler/probe.go +++ b/internal/crawler/probe.go @@ -68,6 +68,10 @@ func Probe(ctx context.Context, domain string, userAgent string, transport http. return DetectWAF(resp.StatusCode, resp.Header, body), nil } +// normaliseProbeTarget produces a probeable URL for the given domain. +// If the input already includes a scheme (case-insensitive "http://" or "https://"), +// it preserves the original input (minus any trailing slashes) and ensures exactly +// one trailing slash. Otherwise it prefixes "https://" and appends a trailing slash. func normaliseProbeTarget(domain string) string { d := strings.TrimSpace(domain) // Scheme detection is case-insensitive — "HTTPS://example.com" diff --git a/internal/crawler/waf.go b/internal/crawler/waf.go index 9bda26da..9407e53a 100644 --- a/internal/crawler/waf.go +++ b/internal/crawler/waf.go @@ -40,7 +40,17 @@ const ( // - DataDome: Server header equals DataDome // - Akamai: Server header AkamaiGHost OR akaalb_ cookie OR // Server-Timing ak_p marker, all on a blocking status -// - Generic: tiny body (<500 bytes) on 403 or 202 with no other signal +// DetectWAF inspects an HTTP status, headers, and a response body sample and returns a WAFDetection describing a detected WAF/bot-protection verdict, or the zero value when no signal is found. +// +// It treats a nil headers value as empty and considers status codes 403 and 202 as "blocking". Detection signals that produce a blocked verdict include: +// - Cloudflare: non-empty `Cf-Mitigated` header on a blocking status. +// - Imperva: `_Incapsula_Resource` marker present in the body sample. +// - DataDome/Akamai via `Server` header values (e.g., `datadome`, `akamaighost` with blocking status). +// - Akamai via cookie-based markers (`akaalb_`, `_abck=`, `bm_sz=`) when the response is blocking. +// - Akamai via `Server-Timing` containing `ak_p;`. +// - Generic fallback: blocking status with a non-empty body sample smaller than 500 bytes. +// +// The returned WAFDetection has Blocked=true and populated Vendor and Reason when a matching signal is found; otherwise it returns the zero-value WAFDetection (not blocked). func DetectWAF(statusCode int, headers http.Header, bodySample []byte) WAFDetection { if headers == nil { headers = http.Header{} diff --git a/internal/db/queue.go b/internal/db/queue.go index 06a2a873..bfedddb7 100644 --- a/internal/db/queue.go +++ b/internal/db/queue.go @@ -900,7 +900,8 @@ var terminalJobStatuses = []string{"completed", "failed", "cancelled", "archived // IsTerminalJobStatus reports whether the supplied status string is a // terminal job state past which task enqueueing should be a no-op. // Exposed for use in callers that hold a status they read separately -// (e.g. the sitemap discovery loop). +// IsTerminalJobStatus reports whether the provided job status is a terminal state. +// It returns `true` if the status is one of "completed", "failed", "cancelled", "archived", or "blocked", `false` otherwise. func IsTerminalJobStatus(status string) bool { for _, t := range terminalJobStatuses { if status == t { From f12bd33483b55d2b53c8478dfdeba92e40e61741 Mon Sep 17 00:00:00 2001 From: Simon Smallchua <40650011+simonsmallchua@users.noreply.github.com> Date: Wed, 29 Apr 2026 08:15:51 +1000 Subject: [PATCH 7/8] Note customer-safe error message --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d521567..ce1ba84c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,12 @@ On merge, CI will: - WAF pre-flight no longer strands jobs in `pending` if `BlockJob`'s DB write fails — a fallback transition writes `failed` with an explanatory message so the job always reaches a terminal state. +- Customer-facing `jobs.error_message` for the WAF fallback path is now a stable + `"WAF detected but block transition failed"` string. The raw underlying error + (which could include DB driver text like + `pq: SSL is not enabled on the server`) is still logged via the structured ops + logger with vendor/reason/domain context, but no longer leaks into the + customer-visible field. - `JobStatusBlocked` now triggers the same per-job in-process state cleanup (`processedPages`, milestones) as the other terminal statuses; long-running workers no longer leak map entries for blocked jobs. From b6f9c175255592f9c6a2b66b30dbcd9bf0210b58 Mon Sep 17 00:00:00 2001 From: Simon Smallchua <40650011+simonsmallchua@users.noreply.github.com> Date: Wed, 29 Apr 2026 09:13:58 +1000 Subject: [PATCH 8/8] Seed breaker counter on rearm --- internal/crawler/waf.go | 14 +---- internal/jobs/waf_circuit_breaker.go | 28 ++++++--- internal/jobs/waf_circuit_breaker_test.go | 75 ++++++++++++++++++++++- 3 files changed, 96 insertions(+), 21 deletions(-) diff --git a/internal/crawler/waf.go b/internal/crawler/waf.go index 9407e53a..4387af62 100644 --- a/internal/crawler/waf.go +++ b/internal/crawler/waf.go @@ -38,19 +38,9 @@ const ( // - Cloudflare: cf-mitigated header set on a non-200 response // - Imperva: body contains _Incapsula_Resource // - DataDome: Server header equals DataDome -// - Akamai: Server header AkamaiGHost OR akaalb_ cookie OR +// - Akamai: Server header AkamaiGHost OR akaalb_/_abck/bm_sz cookie OR // Server-Timing ak_p marker, all on a blocking status -// DetectWAF inspects an HTTP status, headers, and a response body sample and returns a WAFDetection describing a detected WAF/bot-protection verdict, or the zero value when no signal is found. -// -// It treats a nil headers value as empty and considers status codes 403 and 202 as "blocking". Detection signals that produce a blocked verdict include: -// - Cloudflare: non-empty `Cf-Mitigated` header on a blocking status. -// - Imperva: `_Incapsula_Resource` marker present in the body sample. -// - DataDome/Akamai via `Server` header values (e.g., `datadome`, `akamaighost` with blocking status). -// - Akamai via cookie-based markers (`akaalb_`, `_abck=`, `bm_sz=`) when the response is blocking. -// - Akamai via `Server-Timing` containing `ak_p;`. -// - Generic fallback: blocking status with a non-empty body sample smaller than 500 bytes. -// -// The returned WAFDetection has Blocked=true and populated Vendor and Reason when a matching signal is found; otherwise it returns the zero-value WAFDetection (not blocked). +// - Generic: tiny body (<500 bytes) on 403 or 202 with no other signal func DetectWAF(statusCode int, headers http.Header, bodySample []byte) WAFDetection { if headers == nil { headers = http.Header{} diff --git a/internal/jobs/waf_circuit_breaker.go b/internal/jobs/waf_circuit_breaker.go index 43c75bee..08f9bfea 100644 --- a/internal/jobs/waf_circuit_breaker.go +++ b/internal/jobs/waf_circuit_breaker.go @@ -113,18 +113,28 @@ func (b *WAFCircuitBreaker) Forget(jobID string) { delete(b.tripped, jobID) } -// Rearm clears only the single-fire tripped flag for a job, leaving -// the consecutive-WAF counter at zero. Called when the dispatched -// BlockJob couldn't land — without this, a transient DB error during -// the trip would permanently disable the breaker for the job and -// every subsequent WAF response would slip through unchecked. -func (b *WAFCircuitBreaker) Rearm(jobID string) { +// Rearm clears the single-fire tripped flag for a job AND seeds the +// consecutive-WAF counter to threshold-1 (with the previous trip's +// vendor preserved) so a single subsequent blocked outcome immediately +// retrips. Called when the dispatched BlockJob couldn't land — at +// that point we've already proven the domain is consistently walling +// us; making the retry re-establish the full streak would waste N-1 +// blocked observations. The counter still resets on any non-blocked +// response (Observe), so a site that recovers between attempts still +// gets a clean slate. +func (b *WAFCircuitBreaker) Rearm(jobID string, lastVendor crawler.WAFDetection) { if b == nil || jobID == "" { return } b.mu.Lock() defer b.mu.Unlock() delete(b.tripped, jobID) + seed := b.threshold - 1 + if seed < 0 { + seed = 0 + } + b.counts[jobID] = seed + b.vendors[jobID] = lastVendor } // Threshold exposes the configured trip count for telemetry/logging. @@ -177,8 +187,10 @@ func (b *WAFCircuitBreaker) MaybeTripFromOutcome(ctx context.Context, jm JobMana "error", err, "job_id", jobID) // Re-arm so the next WAF response for this job can trip // again — without this a transient DB blip would silently - // permanently disable the breaker for the job. - b.Rearm(jobID) + // permanently disable the breaker for the job. The vendor + // from this trip is preserved so the retrip's BlockJob + // call carries accurate attribution. + b.Rearm(jobID, vendor) } }() } diff --git a/internal/jobs/waf_circuit_breaker_test.go b/internal/jobs/waf_circuit_breaker_test.go index 6d05c5d8..9208406d 100644 --- a/internal/jobs/waf_circuit_breaker_test.go +++ b/internal/jobs/waf_circuit_breaker_test.go @@ -132,5 +132,78 @@ func TestWAFCircuitBreaker_NilSafe(t *testing.T) { if tripped { t.Fatalf("nil receiver tripped") } - b.Forget("job-1") // must not panic + b.Forget("job-1") // must not panic + b.Rearm("job-1", crawler.WAFDetection{}) // must not panic +} + +// TestWAFCircuitBreaker_RearmSeedsToThresholdMinusOne verifies the +// post-Rearm fast-retrip behaviour: after a failed BlockJob dispatch, +// the breaker is primed so a single subsequent blocked outcome +// retrips immediately, rather than re-establishing the full +// consecutive-WAF streak from zero. The first trip already proved the +// site is consistently walling us; the retry shouldn't waste N-1 +// blocked observations re-proving it. +func TestWAFCircuitBreaker_RearmSeedsToThresholdMinusOne(t *testing.T) { + b := &WAFCircuitBreaker{ + threshold: 3, + counts: make(map[string]int), + tripped: make(map[string]struct{}), + vendors: make(map[string]crawler.WAFDetection), + } + + det := &crawler.WAFDetection{Blocked: true, Vendor: "akamai", Reason: "AkamaiGHost"} + + // Establish the first trip the long way. + b.Observe("job-1", det) // 1 + b.Observe("job-1", det) // 2 + tripped, vendor := b.Observe("job-1", det) + if !tripped { + t.Fatalf("expected first trip at threshold=3") + } + + // Simulate BlockJob failure: caller re-arms with the captured vendor. + b.Rearm("job-1", vendor) + + // One blocked observation must be enough to retrip — we already + // established consecutiveness on the first trip. + tripped, retripVendor := b.Observe("job-1", det) + if !tripped { + t.Fatalf("expected immediate retrip after Rearm; took more than 1 blocked observation") + } + if retripVendor.Vendor != "akamai" { + t.Errorf("retrip vendor lost: got %q, want %q", retripVendor.Vendor, "akamai") + } +} + +// TestWAFCircuitBreaker_RearmThenHealthyResetsStreak asserts the +// recovery path: if the site stops walling us between trip attempts +// (a non-blocked response arrives), the seeded counter clears so the +// breaker doesn't pre-load its retrip from stale evidence. +func TestWAFCircuitBreaker_RearmThenHealthyResetsStreak(t *testing.T) { + b := &WAFCircuitBreaker{ + threshold: 3, + counts: make(map[string]int), + tripped: make(map[string]struct{}), + vendors: make(map[string]crawler.WAFDetection), + } + + det := &crawler.WAFDetection{Blocked: true, Vendor: "akamai"} + + // Trip once. + b.Observe("job-1", det) + b.Observe("job-1", det) + _, vendor := b.Observe("job-1", det) + + // Re-arm (simulates BlockJob failure). + b.Rearm("job-1", vendor) + + // A healthy (non-blocked) response between trip attempts must + // clear the seeded counter — the site might have recovered. + b.Observe("job-1", nil) + + // One blocked observation now must NOT retrip — counter starts + // fresh, needs threshold=3 again. + if tripped, _ := b.Observe("job-1", det); tripped { + t.Fatalf("breaker retripped after a healthy response cleared the seed; recovery path broken") + } }