Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@ On merge, CI will:

## [Unreleased]

_Add unreleased changes here._
### Changed

- `JobManager.GetRobotsRules` now caches results per normalised domain (1h
positive TTL, 60s negative TTL), and collapses concurrent misses onto a single
origin fetch via singleflight. A long crawl previously refetched `/robots.txt`
every five minutes (stream worker's job-info TTL) and a 429 on `/robots.txt`
returned on the next read; both are now bounded.

## Full changelog history

Expand Down
104 changes: 95 additions & 9 deletions internal/jobs/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/good-native/hover/internal/util"
"github.com/google/uuid"
"github.com/lib/pq"
"golang.org/x/sync/singleflight"
)

var jobsLog = logging.Component("jobs")
Expand Down Expand Up @@ -96,12 +97,39 @@ type JobManager struct {
pagesMutex sync.RWMutex

sitemapSem chan struct{} // caps concurrent sitemap batch insertions across all jobs

// In-memory cache for robots.txt rules. Previously every job-info
// cache miss in the stream worker (5 min TTL) refetched /robots.txt;
// a long crawl hammered the origin and produced repeat 429s. Negative
// entries are kept too so a 429 on /robots.txt doesn't get re-hit
// every Release. Single-flight collapses concurrent misses for the
// same domain into one origin fetch.
robotsCache map[string]robotsCacheEntry
robotsMutex sync.RWMutex
robotsGroup singleflight.Group
robotsTTLPos time.Duration // success TTL
robotsTTLNeg time.Duration // failure / nil-rules TTL
}

type robotsCacheEntry struct {
rules *crawler.RobotsRules
err error
expiresAt time.Time
}

type ProgressMilestoneCallback func(ctx context.Context, jobID string, oldPct, newPct int)

type JobTerminatedCallback func(ctx context.Context, jobID string)

// Defaults sized to swallow the stream worker's 5-minute job-info refresh
// without re-hitting the origin: 1h success TTL means a long crawl fetches
// /robots.txt at most a handful of times. Negative TTL is short so a
// transient 429 on /robots.txt unblocks within ~60s.
const (
defaultRobotsTTLPositive = time.Hour
defaultRobotsTTLNegative = 60 * time.Second
)

func NewJobManager(db *sql.DB, dbQueue DbQueueProvider, crawler CrawlerInterface) *JobManager {
sitemapConcurrency := sitemapInsertConcurrency()
return &JobManager{
Expand All @@ -111,6 +139,9 @@ func NewJobManager(db *sql.DB, dbQueue DbQueueProvider, crawler CrawlerInterface
processedPages: make(map[string]struct{}),
lastMilestoneFired: make(map[string]int),
sitemapSem: make(chan struct{}, sitemapConcurrency),
robotsCache: make(map[string]robotsCacheEntry),
robotsTTLPos: defaultRobotsTTLPositive,
robotsTTLNeg: defaultRobotsTTLNegative,
}
}

Expand Down Expand Up @@ -368,19 +399,74 @@ func (jm *JobManager) setupJobDatabase(ctx context.Context, job *Job, normalised
const wafCacheTTL = 24 * time.Hour

// Returns nil (no error) when the crawler is unavailable; callers treat
// nil rules as "no restriction".
// nil rules as "no restriction". Results are cached per normalised domain:
// successful fetches for robotsTTLPos, errors for robotsTTLNeg. Concurrent
// misses for the same domain collapse onto a single origin fetch via
// singleflight so a job-info burst can't trigger N parallel /robots.txt
// requests.
func (jm *JobManager) GetRobotsRules(ctx context.Context, domain string) (*crawler.RobotsRules, error) {
if jm.crawler == nil {
return nil, nil
}
result, err := jm.crawler.DiscoverSitemapsAndRobots(ctx, domain)
if err != nil {
return nil, fmt.Errorf("jobs: fetch robots for %s: %w", domain, err)
}
if result == nil {
return nil, nil
}
return result.RobotsRules, nil

// Lowercase first: util.NormaliseDomain uses case-sensitive
// TrimPrefix for the scheme/www strips, so an upper-case "HTTPS://"
// would otherwise survive and split the cache.
key := util.NormaliseDomain(strings.ToLower(domain))
now := time.Now()

jm.robotsMutex.RLock()
if entry, ok := jm.robotsCache[key]; ok && now.Before(entry.expiresAt) {
jm.robotsMutex.RUnlock()
return entry.rules, entry.err
}
jm.robotsMutex.RUnlock()

type robotsResult struct {
rules *crawler.RobotsRules
err error
}
val, _, _ := jm.robotsGroup.Do(key, func() (any, error) {
// Fetch with the canonical key so the request and the cache
// entry agree on the origin string. Without this, a scheme- or
// case-variant input would fetch under the raw form while the
// failure outcome got stored against the normalised key.
result, fetchErr := jm.crawler.DiscoverSitemapsAndRobots(ctx, key)
out := robotsResult{}
if fetchErr != nil {
out.err = fmt.Errorf("jobs: fetch robots for %s: %w", key, fetchErr)
} else if result != nil {
out.rules = result.RobotsRules
}

// Don't negative-cache caller-side cancellations: one timed-out
// caller would otherwise poison the shared cache for everyone
// else for `robotsTTLNeg`.
transient := out.err != nil && (errors.Is(fetchErr, context.Canceled) ||
errors.Is(fetchErr, context.DeadlineExceeded))

if !transient {
ttl := jm.robotsTTLPos
if out.err != nil {
ttl = jm.robotsTTLNeg
}
jm.robotsMutex.Lock()
jm.robotsCache[key] = robotsCacheEntry{
rules: out.rules,
err: out.err,
expiresAt: time.Now().Add(ttl),
}
jm.robotsMutex.Unlock()
}

// singleflight.Do returns the inner error to all sharers; return
// nil here and propagate err via the typed payload so a context
// cancellation in one caller doesn't poison the cached failure.
return out, nil
})

res := val.(robotsResult)
return res.rules, res.err
}

func (jm *JobManager) validateRootURLAccess(ctx context.Context, job *Job, normalisedDomain string, rootPath string) (*crawler.RobotsRules, error) {
Expand Down
Loading
Loading