diff --git a/CHANGELOG.md b/CHANGELOG.md index aa274966..5e97614c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,7 +28,13 @@ On merge, CI will: ## [Unreleased] -_Add unreleased changes here._ +### Changed + +- `JobManager.GetRobotsRules` now caches results per normalised domain (1h + positive TTL, 60s negative TTL), and collapses concurrent misses onto a single + origin fetch via singleflight. A long crawl previously refetched `/robots.txt` + every five minutes (stream worker's job-info TTL) and a 429 on `/robots.txt` + returned on the next read; both are now bounded. ## Full changelog history diff --git a/internal/jobs/manager.go b/internal/jobs/manager.go index c1c10504..48c0531d 100644 --- a/internal/jobs/manager.go +++ b/internal/jobs/manager.go @@ -21,6 +21,7 @@ import ( "github.com/good-native/hover/internal/util" "github.com/google/uuid" "github.com/lib/pq" + "golang.org/x/sync/singleflight" ) var jobsLog = logging.Component("jobs") @@ -96,12 +97,39 @@ type JobManager struct { pagesMutex sync.RWMutex sitemapSem chan struct{} // caps concurrent sitemap batch insertions across all jobs + + // In-memory cache for robots.txt rules. Previously every job-info + // cache miss in the stream worker (5 min TTL) refetched /robots.txt; + // a long crawl hammered the origin and produced repeat 429s. Negative + // entries are kept too so a 429 on /robots.txt doesn't get re-hit + // every Release. Single-flight collapses concurrent misses for the + // same domain into one origin fetch. + robotsCache map[string]robotsCacheEntry + robotsMutex sync.RWMutex + robotsGroup singleflight.Group + robotsTTLPos time.Duration // success TTL + robotsTTLNeg time.Duration // failure / nil-rules TTL +} + +type robotsCacheEntry struct { + rules *crawler.RobotsRules + err error + expiresAt time.Time } type ProgressMilestoneCallback func(ctx context.Context, jobID string, oldPct, newPct int) type JobTerminatedCallback func(ctx context.Context, jobID string) +// Defaults sized to swallow the stream worker's 5-minute job-info refresh +// without re-hitting the origin: 1h success TTL means a long crawl fetches +// /robots.txt at most a handful of times. Negative TTL is short so a +// transient 429 on /robots.txt unblocks within ~60s. +const ( + defaultRobotsTTLPositive = time.Hour + defaultRobotsTTLNegative = 60 * time.Second +) + func NewJobManager(db *sql.DB, dbQueue DbQueueProvider, crawler CrawlerInterface) *JobManager { sitemapConcurrency := sitemapInsertConcurrency() return &JobManager{ @@ -111,6 +139,9 @@ func NewJobManager(db *sql.DB, dbQueue DbQueueProvider, crawler CrawlerInterface processedPages: make(map[string]struct{}), lastMilestoneFired: make(map[string]int), sitemapSem: make(chan struct{}, sitemapConcurrency), + robotsCache: make(map[string]robotsCacheEntry), + robotsTTLPos: defaultRobotsTTLPositive, + robotsTTLNeg: defaultRobotsTTLNegative, } } @@ -368,19 +399,74 @@ func (jm *JobManager) setupJobDatabase(ctx context.Context, job *Job, normalised const wafCacheTTL = 24 * time.Hour // Returns nil (no error) when the crawler is unavailable; callers treat -// nil rules as "no restriction". +// nil rules as "no restriction". Results are cached per normalised domain: +// successful fetches for robotsTTLPos, errors for robotsTTLNeg. Concurrent +// misses for the same domain collapse onto a single origin fetch via +// singleflight so a job-info burst can't trigger N parallel /robots.txt +// requests. func (jm *JobManager) GetRobotsRules(ctx context.Context, domain string) (*crawler.RobotsRules, error) { if jm.crawler == nil { return nil, nil } - result, err := jm.crawler.DiscoverSitemapsAndRobots(ctx, domain) - if err != nil { - return nil, fmt.Errorf("jobs: fetch robots for %s: %w", domain, err) - } - if result == nil { - return nil, nil - } - return result.RobotsRules, nil + + // Lowercase first: util.NormaliseDomain uses case-sensitive + // TrimPrefix for the scheme/www strips, so an upper-case "HTTPS://" + // would otherwise survive and split the cache. + key := util.NormaliseDomain(strings.ToLower(domain)) + now := time.Now() + + jm.robotsMutex.RLock() + if entry, ok := jm.robotsCache[key]; ok && now.Before(entry.expiresAt) { + jm.robotsMutex.RUnlock() + return entry.rules, entry.err + } + jm.robotsMutex.RUnlock() + + type robotsResult struct { + rules *crawler.RobotsRules + err error + } + val, _, _ := jm.robotsGroup.Do(key, func() (any, error) { + // Fetch with the canonical key so the request and the cache + // entry agree on the origin string. Without this, a scheme- or + // case-variant input would fetch under the raw form while the + // failure outcome got stored against the normalised key. + result, fetchErr := jm.crawler.DiscoverSitemapsAndRobots(ctx, key) + out := robotsResult{} + if fetchErr != nil { + out.err = fmt.Errorf("jobs: fetch robots for %s: %w", key, fetchErr) + } else if result != nil { + out.rules = result.RobotsRules + } + + // Don't negative-cache caller-side cancellations: one timed-out + // caller would otherwise poison the shared cache for everyone + // else for `robotsTTLNeg`. + transient := out.err != nil && (errors.Is(fetchErr, context.Canceled) || + errors.Is(fetchErr, context.DeadlineExceeded)) + + if !transient { + ttl := jm.robotsTTLPos + if out.err != nil { + ttl = jm.robotsTTLNeg + } + jm.robotsMutex.Lock() + jm.robotsCache[key] = robotsCacheEntry{ + rules: out.rules, + err: out.err, + expiresAt: time.Now().Add(ttl), + } + jm.robotsMutex.Unlock() + } + + // singleflight.Do returns the inner error to all sharers; return + // nil here and propagate err via the typed payload so a context + // cancellation in one caller doesn't poison the cached failure. + return out, nil + }) + + res := val.(robotsResult) + return res.rules, res.err } func (jm *JobManager) validateRootURLAccess(ctx context.Context, job *Job, normalisedDomain string, rootPath string) (*crawler.RobotsRules, error) { diff --git a/internal/jobs/robots_cache_test.go b/internal/jobs/robots_cache_test.go new file mode 100644 index 00000000..2cc4c42f --- /dev/null +++ b/internal/jobs/robots_cache_test.go @@ -0,0 +1,254 @@ +package jobs + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/good-native/hover/internal/crawler" +) + +// stubRobotsCrawler implements CrawlerInterface but only the +// DiscoverSitemapsAndRobots method is exercised by the cache. Everything +// else returns zero values; tests must not call them. +type stubRobotsCrawler struct { + discoverFn func(ctx context.Context, domain string) (*crawler.SitemapDiscoveryResult, error) + calls atomic.Int32 +} + +func (s *stubRobotsCrawler) DiscoverSitemapsAndRobots(ctx context.Context, domain string) (*crawler.SitemapDiscoveryResult, error) { + s.calls.Add(1) + return s.discoverFn(ctx, domain) +} + +func (s *stubRobotsCrawler) WarmURL(context.Context, string, bool) (*crawler.CrawlResult, error) { + return nil, nil +} +func (s *stubRobotsCrawler) ParseSitemap(context.Context, string) ([]string, error) { + return nil, nil +} +func (s *stubRobotsCrawler) FilterURLs(urls []string, _, _ []string) []string { return urls } +func (s *stubRobotsCrawler) GetUserAgent() string { return "test" } +func (s *stubRobotsCrawler) Probe(context.Context, string) (crawler.WAFDetection, error) { + return crawler.WAFDetection{}, nil +} + +func newJobManagerWithCrawler(c CrawlerInterface) *JobManager { + return &JobManager{ + crawler: c, + robotsCache: make(map[string]robotsCacheEntry), + robotsTTLPos: defaultRobotsTTLPositive, + robotsTTLNeg: defaultRobotsTTLNegative, + } +} + +func TestGetRobotsRules_CachesSuccessForPositiveTTL(t *testing.T) { + rules := &crawler.RobotsRules{CrawlDelay: 5} + stub := &stubRobotsCrawler{ + discoverFn: func(context.Context, string) (*crawler.SitemapDiscoveryResult, error) { + return &crawler.SitemapDiscoveryResult{RobotsRules: rules}, nil + }, + } + jm := newJobManagerWithCrawler(stub) + ctx := context.Background() + + for i := 0; i < 10; i++ { + got, err := jm.GetRobotsRules(ctx, "example.com") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got != rules { + t.Fatalf("expected cached rules pointer, got %v", got) + } + } + if c := stub.calls.Load(); c != 1 { + t.Fatalf("expected one origin fetch under positive cache, got %d", c) + } +} + +func TestGetRobotsRules_CachesErrorForNegativeTTL(t *testing.T) { + fetchErr := errors.New("429 Too Many Requests") + stub := &stubRobotsCrawler{ + discoverFn: func(context.Context, string) (*crawler.SitemapDiscoveryResult, error) { + return nil, fetchErr + }, + } + jm := newJobManagerWithCrawler(stub) + ctx := context.Background() + + for i := 0; i < 5; i++ { + _, err := jm.GetRobotsRules(ctx, "throttled.com") + if err == nil { + t.Fatalf("expected wrapped fetch error, got nil") + } + if !errors.Is(err, fetchErr) { + t.Fatalf("expected wrapped 429 error, got %v", err) + } + } + if c := stub.calls.Load(); c != 1 { + t.Fatalf("expected one origin fetch under negative cache, got %d", c) + } +} + +func TestGetRobotsRules_RefetchesAfterPositiveTTL(t *testing.T) { + rules := &crawler.RobotsRules{} + stub := &stubRobotsCrawler{ + discoverFn: func(context.Context, string) (*crawler.SitemapDiscoveryResult, error) { + return &crawler.SitemapDiscoveryResult{RobotsRules: rules}, nil + }, + } + jm := newJobManagerWithCrawler(stub) + jm.robotsTTLPos = 0 // every read is a miss + ctx := context.Background() + + for i := 0; i < 3; i++ { + if _, err := jm.GetRobotsRules(ctx, "expiring.com"); err != nil { + t.Fatalf("unexpected error: %v", err) + } + } + if c := stub.calls.Load(); c != 3 { + t.Fatalf("expected 3 origin fetches with zero TTL, got %d", c) + } +} + +func TestGetRobotsRules_RefetchesAfterNegativeTTLExpires(t *testing.T) { + var attempts atomic.Int32 + stub := &stubRobotsCrawler{ + discoverFn: func(context.Context, string) (*crawler.SitemapDiscoveryResult, error) { + if attempts.Add(1) == 1 { + return nil, errors.New("transient 429") + } + return &crawler.SitemapDiscoveryResult{RobotsRules: &crawler.RobotsRules{}}, nil + }, + } + jm := newJobManagerWithCrawler(stub) + jm.robotsTTLNeg = 0 // failure entry expires immediately + ctx := context.Background() + + if _, err := jm.GetRobotsRules(ctx, "recover.com"); err == nil { + t.Fatalf("expected initial failure") + } + // Negative entry is already expired — second call must refetch and succeed. + if _, err := jm.GetRobotsRules(ctx, "recover.com"); err != nil { + t.Fatalf("expected recovery on refetch, got %v", err) + } + if c := stub.calls.Load(); c != 2 { + t.Fatalf("expected 2 origin fetches after negative TTL expiry, got %d", c) + } +} + +func TestGetRobotsRules_NormalisesDomain(t *testing.T) { + rules := &crawler.RobotsRules{} + stub := &stubRobotsCrawler{ + discoverFn: func(context.Context, string) (*crawler.SitemapDiscoveryResult, error) { + return &crawler.SitemapDiscoveryResult{RobotsRules: rules}, nil + }, + } + jm := newJobManagerWithCrawler(stub) + ctx := context.Background() + + if _, err := jm.GetRobotsRules(ctx, "https://Example.com/"); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if _, err := jm.GetRobotsRules(ctx, "example.com"); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if c := stub.calls.Load(); c != 1 { + t.Fatalf("normalised keys should share cache; got %d origin fetches", c) + } +} + +func TestGetRobotsRules_CollapsesConcurrentMisses(t *testing.T) { + gate := make(chan struct{}) + stub := &stubRobotsCrawler{ + discoverFn: func(context.Context, string) (*crawler.SitemapDiscoveryResult, error) { + <-gate + return &crawler.SitemapDiscoveryResult{RobotsRules: &crawler.RobotsRules{}}, nil + }, + } + jm := newJobManagerWithCrawler(stub) + ctx := context.Background() + + const fanout = 20 + var wg sync.WaitGroup + errCh := make(chan error, fanout) + wg.Add(fanout) + for i := 0; i < fanout; i++ { + go func() { + defer wg.Done() + if _, err := jm.GetRobotsRules(ctx, "swarm.com"); err != nil { + errCh <- err + } + }() + } + + // Wait until the discoverFn is parked on the gate, then release. + deadline := time.Now().Add(time.Second) + for time.Now().Before(deadline) && stub.calls.Load() == 0 { + time.Sleep(2 * time.Millisecond) + } + close(gate) + wg.Wait() + close(errCh) + for err := range errCh { + t.Fatalf("unexpected error from concurrent caller: %v", err) + } + + if c := stub.calls.Load(); c != 1 { + t.Fatalf("singleflight should collapse %d concurrent misses to one fetch, got %d", fanout, c) + } +} + +func TestGetRobotsRules_DoesNotCacheContextCancellation(t *testing.T) { + var attempts atomic.Int32 + stub := &stubRobotsCrawler{ + discoverFn: func(_ context.Context, _ string) (*crawler.SitemapDiscoveryResult, error) { + if attempts.Add(1) == 1 { + return nil, context.Canceled + } + return &crawler.SitemapDiscoveryResult{RobotsRules: &crawler.RobotsRules{}}, nil + }, + } + jm := newJobManagerWithCrawler(stub) + jm.robotsTTLNeg = time.Hour // would normally suppress a refetch + + // First call surfaces the cancellation error but must NOT cache it. + if _, err := jm.GetRobotsRules(context.Background(), "cancel.com"); !errors.Is(err, context.Canceled) { + t.Fatalf("expected wrapped context.Canceled, got %v", err) + } + + // Second call must refetch and succeed — a transient cancel cannot + // poison the shared cache. + if _, err := jm.GetRobotsRules(context.Background(), "cancel.com"); err != nil { + t.Fatalf("expected recovery on refetch, got %v", err) + } + if c := stub.calls.Load(); c != 2 { + t.Fatalf("expected 2 origin fetches when cancellation is not cached, got %d", c) + } +} + +func TestGetRobotsRules_FetchesUnderCanonicalDomain(t *testing.T) { + var seen string + stub := &stubRobotsCrawler{ + discoverFn: func(_ context.Context, domain string) (*crawler.SitemapDiscoveryResult, error) { + seen = domain + return &crawler.SitemapDiscoveryResult{RobotsRules: &crawler.RobotsRules{}}, nil + }, + } + jm := newJobManagerWithCrawler(stub) + + if _, err := jm.GetRobotsRules(context.Background(), "HTTPS://www.Example.COM/"); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if seen != "example.com" { + t.Fatalf("origin fetch must use the canonical key; got %q", seen) + } +} + +// Compile-time guard that the stub satisfies CrawlerInterface; if a method +// is added to the interface, this assertion breaks at build rather than +// at test time. +var _ CrawlerInterface = (*stubRobotsCrawler)(nil)