Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 49 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,55 @@ On merge, CI will:

## [Unreleased]

_Add unreleased changes here._
### Fixed

- WAF pre-flight no longer strands jobs in `pending` if `BlockJob`'s DB write
fails — a fallback transition writes `failed` with an explanatory message so
the job always reaches a terminal state.
- Customer-facing `jobs.error_message` for the WAF fallback path is now a stable
`"WAF detected but block transition failed"` string. The raw underlying error
(which could include DB driver text like
`pq: SSL is not enabled on the server`) is still logged via the structured ops
logger with vendor/reason/domain context, but no longer leaks into the
customer-visible field.
- `JobStatusBlocked` now triggers the same per-job in-process state cleanup
(`processedPages`, milestones) as the other terminal statuses; long-running
workers no longer leak map entries for blocked jobs.
- WAF probe scheme detection is now case-insensitive — `HTTPS://example.com` no
longer double-prefixes to `https://HTTPS://...` and silently skips the
verdict.
- `BlockJob` now CAS-guards the terminal `UPDATE jobs` against a stale pre-read
status, so a freshly-completed/failed/cancelled job from a concurrent worker
is no longer overwritten with `blocked` (and the domain row no longer stamped
off a verdict that didn't actually land for that run). A lost race rolls the
whole transaction back and surfaces as nil success.
- The WAF mid-job circuit breaker now dispatches `BlockJob` in a detached
goroutine with a 30 s timeout, so the stream worker hot path can't stall on
terminal-state DB lock contention. On `BlockJob` failure the breaker re-arms
for the job, so a transient DB blip no longer permanently disables it.
- `EnqueueURLs` now short-circuits under its existing `FOR UPDATE OF j` row lock
when the target job is in a terminal status (blocked, cancelled, failed,
completed, archived). Without this, sitemap discovery and link extraction kept
inserting orphan tasks for jobs that had already transitioned terminal
mid-flight — kmart.com.au-class jobs were accreting 32k+ pending rows after
the circuit breaker had already fired. The sitemap-discovery loop additionally
reads job status between batches as a cheap pre-flight, so terminal jobs stop
parsing remaining batches instead of round-tripping to the DB just to be
rejected.

### Changed

- WAF detector now recognises Akamai Bot Manager `_abck` and `bm_sz` cookies on
blocking status codes (403/202) as Akamai signals. Catches BM-fronted sites
that don't emit `Server: AkamaiGHost` or `akaalb_*` cookies (e.g.
kmart.com.au) and gives the mid-job circuit breaker `vendor=akamai`
attribution instead of falling through to `generic`. Cookies on a 200 response
are explicitly NOT treated as a block — many sites run BM in monitor mode
without ever blocking.
- WAF mid-job circuit breaker default threshold lowered from 3 → 2 consecutive
WAF responses. Trips ~33% earlier, capping orphan-task accumulation when a
large sitemap is mid-discovery. Override via
`GNH_WAF_CIRCUIT_BREAKER_THRESHOLD`.

## Full changelog history

Expand Down
10 changes: 9 additions & 1 deletion internal/crawler/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,17 @@ func Probe(ctx context.Context, domain string, userAgent string, transport http.
return DetectWAF(resp.StatusCode, resp.Header, body), nil
}

// normaliseProbeTarget produces a probeable URL for the given domain.
// If the input already includes a scheme (case-insensitive "http://" or "https://"),
// it preserves the original input (minus any trailing slashes) and ensures exactly
// one trailing slash. Otherwise it prefixes "https://" and appends a trailing slash.
func normaliseProbeTarget(domain string) string {
d := strings.TrimSpace(domain)
if strings.HasPrefix(d, "http://") || strings.HasPrefix(d, "https://") {
// Scheme detection is case-insensitive — "HTTPS://example.com"
// otherwise double-prefixes to "https://HTTPS://example.com/" and
// the request build fails, silently skipping the WAF verdict.
dl := strings.ToLower(d)
if strings.HasPrefix(dl, "http://") || strings.HasPrefix(dl, "https://") {
return strings.TrimSuffix(d, "/") + "/"
}
return "https://" + strings.TrimSuffix(d, "/") + "/"
Expand Down
26 changes: 26 additions & 0 deletions internal/crawler/probe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,32 @@ func TestProbe_BodyTruncation(t *testing.T) {
}
}

func TestNormaliseProbeTarget(t *testing.T) {
cases := []struct {
in string
want string
}{
{"example.com", "https://example.com/"},
{"example.com/", "https://example.com/"},
{" example.com ", "https://example.com/"},
{"https://example.com", "https://example.com/"},
{"http://example.com/", "http://example.com/"},
// Case-insensitive scheme detection — without it
// "HTTPS://example.com" double-prefixed to
// "https://HTTPS://example.com/" and silently failed.
{"HTTPS://example.com", "HTTPS://example.com/"},
{"Http://example.com/", "Http://example.com/"},
}
for _, tc := range cases {
t.Run(tc.in, func(t *testing.T) {
got := normaliseProbeTarget(tc.in)
if got != tc.want {
t.Errorf("normaliseProbeTarget(%q) = %q, want %q", tc.in, got, tc.want)
}
})
}
}

// redirectingTransport rewrites probe requests (which target https://<host>/)
// onto the local httptest server, so we exercise the production
// host-resolution code path while staying offline.
Expand Down
27 changes: 25 additions & 2 deletions internal/crawler/waf.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const (
// - Cloudflare: cf-mitigated header set on a non-200 response
// - Imperva: body contains _Incapsula_Resource
// - DataDome: Server header equals DataDome
// - Akamai: Server header AkamaiGHost OR akaalb_ cookie OR
// - Akamai: Server header AkamaiGHost OR akaalb_/_abck/bm_sz cookie OR
// Server-Timing ak_p marker, all on a blocking status
// - Generic: tiny body (<500 bytes) on 403 or 202 with no other signal
func DetectWAF(statusCode int, headers http.Header, bodySample []byte) WAFDetection {
Expand Down Expand Up @@ -84,13 +84,36 @@ func DetectWAF(statusCode int, headers http.Header, bodySample []byte) WAFDetect

if blocking {
for _, c := range headers.Values("Set-Cookie") {
if strings.Contains(strings.ToLower(c), "akaalb_") {
cl := strings.ToLower(c)
if strings.Contains(cl, "akaalb_") {
return WAFDetection{
Blocked: true,
Vendor: WAFVendorAkamai,
Reason: "akaalb_ cookie on " + statusLabel(statusCode),
}
}
// _abck and bm_sz are Akamai Bot Manager cookies (sensor +
// session). They appear on every BM-fronted response, so by
// themselves they're not a block marker — many sites run BM
// in monitor mode without ever blocking. Combined with a
// blocking status code (403/202), they're unambiguous BM
// walls and give better attribution than the generic
// tiny-body fallback (kmart.com.au-class sites that don't
// emit Server: AkamaiGHost or akaalb_*).
if strings.Contains(cl, "_abck=") || strings.HasPrefix(cl, "_abck=") {
return WAFDetection{
Blocked: true,
Vendor: WAFVendorAkamai,
Reason: "_abck (Bot Manager session) cookie on " + statusLabel(statusCode),
}
}
if strings.Contains(cl, "bm_sz=") || strings.HasPrefix(cl, "bm_sz=") {
return WAFDetection{
Blocked: true,
Vendor: WAFVendorAkamai,
Reason: "bm_sz (Bot Manager sensor) cookie on " + statusLabel(statusCode),
}
}
}
for _, st := range headers.Values("Server-Timing") {
if strings.Contains(strings.ToLower(st), "ak_p;") {
Expand Down
47 changes: 47 additions & 0 deletions internal/crawler/waf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,53 @@ func TestDetectWAF(t *testing.T) {
wantVendor: WAFVendorAkamai,
reasonPrefix: "akaalb_ cookie",
},
{
// kmart.com.au-class: deeper page returns 403 with Akamai
// Bot Manager session cookie. Without this signal the
// detector would fall through to "generic tiny body".
name: "akamai BM — _abck cookie on 403",
status: http.StatusForbidden,
headers: http.Header{
"Set-Cookie": []string{"_abck=06F95C1AA35B5110~-1~YAAQjyw...~-1~-1~-1~-1~-1; Domain=.example.com; Path=/"},
},
body: []byte("blocked"),
wantBlocked: true,
wantVendor: WAFVendorAkamai,
reasonPrefix: "_abck",
},
{
name: "akamai BM — bm_sz cookie on 403",
status: http.StatusForbidden,
headers: http.Header{
"Set-Cookie": []string{"bm_sz=E18A0D7D1B94A12A~YAAQjyw...; Domain=.example.com; Path=/"},
},
body: []byte("blocked"),
wantBlocked: true,
wantVendor: WAFVendorAkamai,
reasonPrefix: "bm_sz",
},
{
// Critical negative test: Akamai BM cookies appear on every
// BM-fronted response, including legitimate 200s in monitor
// mode. Detector must NOT flag these — would over-trigger
// massively (every kmart.com.au homepage hit would block).
name: "akamai BM — _abck on 200 must NOT trip (monitor mode)",
status: http.StatusOK,
headers: http.Header{
"Set-Cookie": []string{"_abck=06F95C1AA35B5110~-1~YAAQjyw...~-1~-1~-1~-1~-1; Domain=.example.com; Path=/"},
},
body: []byte("<html>real content from monitor-mode site</html>"),
wantBlocked: false,
},
{
name: "akamai BM — bm_sz on 200 must NOT trip (monitor mode)",
status: http.StatusOK,
headers: http.Header{
"Set-Cookie": []string{"bm_sz=E18A0D7D~YAAQjyw...; Domain=.example.com; Path=/"},
},
body: []byte("<html>real content</html>"),
wantBlocked: false,
},
{
name: "akamai — Server-Timing ak_p on 403",
status: http.StatusForbidden,
Expand Down
46 changes: 43 additions & 3 deletions internal/db/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,27 @@ type enqueueJobConfig struct {
orgID sql.NullString
quotaRemaining sql.NullInt64
currentTaskCount int
status string
}

// terminalJobStatuses are job statuses past which no further task
// inserts should land. The set mirrors the application-side
// JobStatusCompleted/Failed/Cancelled/Archived/Blocked values.
// Defined as a slice so the SQL ANY($N) form stays readable.
var terminalJobStatuses = []string{"completed", "failed", "cancelled", "archived", "blocked"}

// IsTerminalJobStatus reports whether the supplied status string is a
// terminal job state past which task enqueueing should be a no-op.
// Exposed for use in callers that hold a status they read separately
// IsTerminalJobStatus reports whether the provided job status is a terminal state.
// It returns `true` if the status is one of "completed", "failed", "cancelled", "archived", or "blocked", `false` otherwise.
func IsTerminalJobStatus(status string) bool {
for _, t := range terminalJobStatuses {
if status == t {
return true
}
}
return false
}

// deduplicatePages removes duplicate pages, keeping highest priority for each page ID
Expand Down Expand Up @@ -1009,7 +1030,14 @@ func (q *DbQueue) EnqueueURLs(ctx context.Context, jobID string, pages []Page, s
return uniquePages[i].ID < uniquePages[j].ID
})

// Get job's max_pages, concurrency, domain, org, and current task counts.
// Get job's max_pages, concurrency, domain, org, status, and current
// task counts. Reading j.status under the same FOR UPDATE OF j as
// the INSERT below makes the terminal-state guard race-free against
// a concurrent BlockJob/CancelJob: either we acquire the lock
// first and see the pre-terminal status (the terminal write blocks
// until our tx commits, then it sees zero matching pending rows),
// or it commits first and we see the terminal status here and
// short-circuit without inserting orphan tasks.
// total_tasks - skipped_tasks is maintained incrementally by triggers and
// avoids the correlated COUNT(*) subquery that previously ran under the job lock.
err := tx.QueryRowContext(ctx, `
Expand All @@ -1019,17 +1047,29 @@ func (q *DbQueue) EnqueueURLs(ctx context.Context, jobID string, pages []Page, s
CASE WHEN j.organisation_id IS NOT NULL
THEN get_daily_quota_remaining(j.organisation_id)
ELSE NULL
END
END,
j.status
FROM jobs j
LEFT JOIN domains d ON j.domain_id = d.id
WHERE j.id = $1
FOR UPDATE OF j
`, jobID).Scan(&cfg.maxPages, &cfg.concurrency, &cfg.runningTasks, &cfg.pendingTaskCount,
&cfg.domainID, &cfg.domainName, &cfg.currentTaskCount, &cfg.orgID, &cfg.quotaRemaining)
&cfg.domainID, &cfg.domainName, &cfg.currentTaskCount, &cfg.orgID, &cfg.quotaRemaining,
&cfg.status)
if err != nil {
return fmt.Errorf("failed to get job configuration and task count: %w", err)
}

// Terminal-status guard: skip the entire INSERT path so a sitemap
// loop or link-discovery callback that's still running after a
// BlockJob/CancelJob doesn't accrete orphan tasks against the
// terminal job. Issue #365 row 1: kmart.com.au saw 32k+ rows
// land in tasks for a job that had already transitioned to
// `blocked` mid-discovery.
if IsTerminalJobStatus(cfg.status) {
return nil
}

// Calculate available slots with concurrency override and quota limits
effectiveConcurrency := q.calculateEffectiveConcurrency(jobID, cfg.concurrency, cfg.domainName)
concurrencySlots, _ := calculateAvailableSlots(effectiveConcurrency, cfg.runningTasks, cfg.pendingTaskCount, sql.NullInt64{})
Expand Down
33 changes: 33 additions & 0 deletions internal/db/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,39 @@ package db

import "testing"

func TestIsTerminalJobStatus(t *testing.T) {
cases := []struct {
status string
want bool
}{
// Terminal statuses — EnqueueURLs short-circuits for these.
{"completed", true},
{"failed", true},
{"cancelled", true},
{"archived", true},
{"blocked", true},

// Non-terminal — task inserts proceed.
{"pending", false},
{"running", false},
{"initializing", false},
{"paused", false},

// Unknown / empty — must not be treated as terminal so a typo in
// the DB doesn't silently disable enqueueing.
{"", false},
{"unknown", false},
{"BLOCKED", false}, // case-sensitive: DB uses lowercase
}
for _, tc := range cases {
t.Run(tc.status, func(t *testing.T) {
if got := IsTerminalJobStatus(tc.status); got != tc.want {
t.Errorf("IsTerminalJobStatus(%q) = %v, want %v", tc.status, got, tc.want)
}
})
}
}

func TestClassifyEnqueuedTaskDropsOverflowAtMaxPages(t *testing.T) {
disposition := classifyEnqueuedTask(10, 10, 0, 0, 3)
if disposition != enqueueTaskDrop {
Expand Down
59 changes: 56 additions & 3 deletions internal/jobs/block_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,13 @@ func TestBlockJob_LockOrder(t *testing.T) {
WithArgs(jobID, string(TaskStatusSkipped), string(TaskStatusPending), string(TaskStatusWaiting)).
WillReturnResult(sqlmock.NewResult(0, 5))

// 2. Jobs second — error_message carries the WAF reason.
mock.ExpectExec(`UPDATE jobs\s+SET status = \$1, completed_at = \$2, error_message = \$3\s+WHERE id = \$4`).
WithArgs(string(JobStatusBlocked), sqlmock.AnyArg(), sqlmock.AnyArg(), jobID).
// 2. Jobs second — error_message carries the WAF reason. The
// status IN (...) clause is the CAS guard: we only write when
// the job is still in a pre-terminal state, so a concurrent
// completion can't be silently overwritten.
mock.ExpectExec(`UPDATE jobs\s+SET status = \$1, completed_at = \$2, error_message = \$3\s+WHERE id = \$4\s+AND status IN \(\$5, \$6, \$7, \$8\)`).
WithArgs(string(JobStatusBlocked), sqlmock.AnyArg(), sqlmock.AnyArg(), jobID,
string(JobStatusRunning), string(JobStatusPending), string(JobStatusPaused), string(JobStatusInitialising)).
WillReturnResult(sqlmock.NewResult(0, 1))

// 3. Outbox cleanup.
Expand Down Expand Up @@ -95,6 +99,55 @@ func TestBlockJob_AlreadyBlockedIsNoOp(t *testing.T) {
assert.NoError(t, mock.ExpectationsWereMet())
}

// TestBlockJob_RaceLostReturnsNil simulates the GetJob-pre-read-then-
// concurrent-completion race: the pre-read sees `running`, the worker
// completes the job, then BlockJob's tx fires and the CAS WHERE clause
// matches zero rows. The whole tx must roll back (no outbox/domains
// writes) and BlockJob must return nil — surfacing an error here
// would be a red toast for a benign race.
func TestBlockJob_RaceLostReturnsNil(t *testing.T) {
mockDB, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp))
require.NoError(t, err)
defer mockDB.Close()

jm := &JobManager{
db: mockDB,
dbQueue: &mockDbQueueWrapper{mockDB: mockDB},
processedPages: make(map[string]struct{}),
}

const jobID = "job-race-lost"

mock.ExpectBegin()
mock.ExpectQuery(`SELECT[\s\S]+FROM jobs j[\s\S]+JOIN domains d`).
WithArgs(jobID).
WillReturnRows(jobRow(jobID, JobStatusRunning))
mock.ExpectCommit()

mock.ExpectBegin()

// Tasks update can affect any number of rows — irrelevant once
// the CAS misses; the tx will roll back.
mock.ExpectExec(`(?s)WITH picked AS \(\s*SELECT id FROM tasks`).
WithArgs(jobID, string(TaskStatusSkipped), string(TaskStatusPending), string(TaskStatusWaiting)).
WillReturnResult(sqlmock.NewResult(0, 0))

// CAS UPDATE matches zero rows — the concurrent terminal write
// removed the job from the eligible status set.
mock.ExpectExec(`UPDATE jobs\s+SET status = \$1, completed_at = \$2, error_message = \$3\s+WHERE id = \$4\s+AND status IN`).
WithArgs(string(JobStatusBlocked), sqlmock.AnyArg(), sqlmock.AnyArg(), jobID,
string(JobStatusRunning), string(JobStatusPending), string(JobStatusPaused), string(JobStatusInitialising)).
WillReturnResult(sqlmock.NewResult(0, 0))

// No DELETE FROM task_outbox, no UPDATE domains — the tx must
// roll back as soon as the CAS RowsAffected check trips.
mock.ExpectRollback()

err = jm.BlockJob(context.Background(), jobID, "akamai", "Server: AkamaiGHost on 403")
require.NoError(t, err, "race-lost must surface as nil success, not error")
assert.NoError(t, mock.ExpectationsWereMet())
}

// TestBlockJob_TerminalStatusErrors asserts a block against a
// completed/failed/cancelled job returns an error rather than
// silently overwriting a finished result.
Expand Down
Loading
Loading