diff --git a/cmd/goose/cache.go b/cmd/goose/cache.go index b5e7820..61d1e1b 100644 --- a/cmd/goose/cache.go +++ b/cmd/goose/cache.go @@ -25,26 +25,34 @@ type cacheEntry struct { // checkCache checks the cache for a PR and returns the cached data if valid. // Returns (cachedData, cacheHit, hasRunningTests). func (app *App) checkCache(cacheFile, url string, updatedAt time.Time) (cachedData *turn.CheckResponse, cacheHit bool, hasRunningTests bool) { - fileData, readErr := os.ReadFile(cacheFile) - if readErr != nil { - if !os.IsNotExist(readErr) { - slog.Debug("[CACHE] Cache file read error", "url", url, "error", readErr) + fileData, err := os.ReadFile(cacheFile) + if err != nil { + if !os.IsNotExist(err) { + slog.Debug("[CACHE] Cache file read error", "url", url, "error", err) } return nil, false, false } var entry cacheEntry - if unmarshalErr := json.Unmarshal(fileData, &entry); unmarshalErr != nil { - slog.Warn("Failed to unmarshal cache data", "url", url, "error", unmarshalErr) + if err := json.Unmarshal(fileData, &entry); err != nil { + slog.Warn("Failed to unmarshal cache data", "url", url, "error", err) // Remove corrupted cache file - if removeErr := os.Remove(cacheFile); removeErr != nil { - slog.Error("Failed to remove corrupted cache file", "error", removeErr) + if err := os.Remove(cacheFile); err != nil { + slog.Error("Failed to remove corrupted cache file", "error", err) } return nil, false, false } + // Determine TTL based on test state - use shorter TTL for incomplete tests + testState := entry.Data.PullRequest.TestState + isTestIncomplete := testState == "running" || testState == "queued" || testState == "pending" + ttl := cacheTTL + if isTestIncomplete { + ttl = runningTestsCacheTTL + } + // Check if cache is expired or PR updated - if time.Since(entry.CachedAt) >= cacheTTL || !entry.UpdatedAt.Equal(updatedAt) { + if time.Since(entry.CachedAt) >= ttl || !entry.UpdatedAt.Equal(updatedAt) { // Log why cache was invalid if !entry.UpdatedAt.Equal(updatedAt) { slog.Debug("[CACHE] Cache miss - PR updated", @@ -56,15 +64,14 @@ func (app *App) checkCache(cacheFile, url string, updatedAt time.Time) (cachedDa "url", url, "cached_at", entry.CachedAt.Format(time.RFC3339), "cache_age", time.Since(entry.CachedAt).Round(time.Second), - "ttl", cacheTTL) + "ttl", ttl, + "test_state", testState) } - return nil, false, false + return nil, false, isTestIncomplete } - // Check for incomplete tests that should invalidate cache + // Check for incomplete tests that should invalidate cache and trigger Turn API cache bypass cacheAge := time.Since(entry.CachedAt) - testState := entry.Data.PullRequest.TestState - isTestIncomplete := testState == "running" || testState == "queued" || testState == "pending" if entry.Data != nil && isTestIncomplete && cacheAge < runningTestsCacheBypass { slog.Debug("[CACHE] Cache invalidated - tests incomplete and cache entry is fresh", "url", url, @@ -144,15 +151,15 @@ func (app *App) turnData(ctx context.Context, url string, updatedAt time.Time) ( "timestamp_sent", timestampToSend.Format(time.RFC3339)) } - var retryErr error + var err error slog.Debug("[TURN] Making API call", "url", url, "user", app.currentUser.GetLogin(), "pr_updated_at", timestampToSend.Format(time.RFC3339)) - data, retryErr = app.turnClient.Check(turnCtx, url, app.currentUser.GetLogin(), timestampToSend) - if retryErr != nil { - slog.Warn("Turn API error (will retry)", "error", retryErr) - return retryErr + data, err = app.turnClient.Check(turnCtx, url, app.currentUser.GetLogin(), timestampToSend) + if err != nil { + slog.Warn("Turn API error (will retry)", "error", err) + return err } slog.Debug("[TURN] API call successful", "url", url) return nil @@ -188,45 +195,36 @@ func (app *App) turnData(ctx context.Context, url string, updatedAt time.Time) ( } // Save to cache (don't fail if caching fails) - skip if --no-cache is set - // Don't cache when tests are incomplete - always re-poll to catch completion - if !app.noCache { - shouldCache := true - - // Never cache PRs with incomplete tests - we want fresh data on every poll - testState := "" - if data != nil { - testState = data.PullRequest.TestState - } + // Cache PRs with incomplete tests using short TTL to catch completion quickly + if !app.noCache && data != nil { + testState := data.PullRequest.TestState isTestIncomplete := testState == "running" || testState == "queued" || testState == "pending" - if data != nil && isTestIncomplete { - shouldCache = false - slog.Debug("[CACHE] Skipping cache for PR with incomplete tests", - "url", url, - "test_state", testState, - "pending_checks", len(data.PullRequest.CheckSummary.Pending)) - } - if shouldCache { - entry := cacheEntry{ - Data: data, - CachedAt: time.Now(), - UpdatedAt: updatedAt, - } - if cacheData, marshalErr := json.Marshal(entry); marshalErr != nil { - slog.Error("Failed to marshal cache data", "url", url, "error", marshalErr) + entry := cacheEntry{ + Data: data, + CachedAt: time.Now(), + UpdatedAt: updatedAt, + } + if cacheData, err := json.Marshal(entry); err != nil { + slog.Error("Failed to marshal cache data", "url", url, "error", err) + } else { + // Ensure cache directory exists with secure permissions + if err := os.MkdirAll(filepath.Dir(cacheFile), 0o700); err != nil { + slog.Error("Failed to create cache directory", "error", err) + } else if err := os.WriteFile(cacheFile, cacheData, 0o600); err != nil { + slog.Error("Failed to write cache file", "error", err) } else { - // Ensure cache directory exists with secure permissions - if dirErr := os.MkdirAll(filepath.Dir(cacheFile), 0o700); dirErr != nil { - slog.Error("Failed to create cache directory", "error", dirErr) - } else if writeErr := os.WriteFile(cacheFile, cacheData, 0o600); writeErr != nil { - slog.Error("Failed to write cache file", "error", writeErr) - } else { - slog.Debug("[CACHE] Saved to cache", - "url", url, - "cached_at", entry.CachedAt.Format(time.RFC3339), - "pr_updated_at", entry.UpdatedAt.Format(time.RFC3339), - "cache_file", filepath.Base(cacheFile)) + ttl := cacheTTL + if isTestIncomplete { + ttl = runningTestsCacheTTL } + slog.Debug("[CACHE] Saved to cache", + "url", url, + "cached_at", entry.CachedAt.Format(time.RFC3339), + "pr_updated_at", entry.UpdatedAt.Format(time.RFC3339), + "ttl", ttl, + "test_state", testState, + "cache_file", filepath.Base(cacheFile)) } } } @@ -258,8 +256,8 @@ func (app *App) cleanupOldCache() { // Remove cache files older than cleanup interval (15 days) if time.Since(info.ModTime()) > cacheCleanupInterval { filePath := filepath.Join(app.cacheDir, entry.Name()) - if removeErr := os.Remove(filePath); removeErr != nil { - slog.Error("Failed to remove old cache file", "file", filePath, "error", removeErr) + if err := os.Remove(filePath); err != nil { + slog.Error("Failed to remove old cache file", "file", filePath, "error", err) errorCount++ } else { cleanupCount++ diff --git a/cmd/goose/main.go b/cmd/goose/main.go index 5db7335..f001710 100644 --- a/cmd/goose/main.go +++ b/cmd/goose/main.go @@ -32,6 +32,7 @@ var ( const ( cacheTTL = 10 * 24 * time.Hour // 10 days - rely mostly on PR UpdatedAt + runningTestsCacheTTL = 2 * time.Minute // Short TTL for PRs with incomplete tests to catch completions quickly cacheCleanupInterval = 15 * 24 * time.Hour // 15 days - cleanup older than cache TTL stalePRThreshold = 90 * 24 * time.Hour runningTestsCacheBypass = 90 * time.Minute // Don't cache PRs with running tests if fresher than this @@ -249,11 +250,11 @@ func main() { if app.client != nil { var user *github.User err := retry.Do(func() error { - var retryErr error - user, _, retryErr = app.client.Users.Get(ctx, "") - if retryErr != nil { - slog.Warn("GitHub Users.Get failed (will retry)", "error", retryErr) - return retryErr + var err error + user, _, err = app.client.Users.Get(ctx, "") + if err != nil { + slog.Warn("GitHub Users.Get failed (will retry)", "error", err) + return err } return nil }, @@ -327,11 +328,11 @@ func (app *App) handleReauthentication(ctx context.Context) { if app.client != nil { var user *github.User err := retry.Do(func() error { - var retryErr error - user, _, retryErr = app.client.Users.Get(ctx, "") - if retryErr != nil { - slog.Warn("GitHub Users.Get failed (will retry)", "error", retryErr) - return retryErr + var err error + user, _, err = app.client.Users.Get(ctx, "") + if err != nil { + slog.Warn("GitHub Users.Get failed (will retry)", "error", err) + return err } return nil }, @@ -536,9 +537,9 @@ func (app *App) updatePRs(ctx context.Context) { var incoming, outgoing []PR err := safeExecute("fetchPRs", func() error { - var fetchErr error - incoming, outgoing, fetchErr = app.fetchPRsInternal(ctx) - return fetchErr + var err error + incoming, outgoing, err = app.fetchPRsInternal(ctx) + return err }) if err != nil { slog.Error("Error fetching PRs", "error", err) diff --git a/cmd/goose/sprinkler.go b/cmd/goose/sprinkler.go index 323d03d..d6f2bcf 100644 --- a/cmd/goose/sprinkler.go +++ b/cmd/goose/sprinkler.go @@ -24,13 +24,19 @@ const ( sprinklerMaxDelay = 10 * time.Second // Max delay between retries ) +// prEvent captures the essential details from a sprinkler event. +type prEvent struct { + timestamp time.Time + url string +} + // sprinklerMonitor manages WebSocket event subscriptions for all user orgs. type sprinklerMonitor struct { lastConnectedAt time.Time app *App client *client.Client cancel context.CancelFunc - eventChan chan string + eventChan chan prEvent lastEventMap map[string]time.Time token string orgs []string @@ -45,7 +51,7 @@ func newSprinklerMonitor(app *App, token string) *sprinklerMonitor { app: app, token: token, orgs: make([]string, 0), - eventChan: make(chan string, eventChannelSize), + eventChan: make(chan prEvent, eventChannelSize), lastEventMap: make(map[string]time.Time), } } @@ -243,12 +249,15 @@ func (sm *sprinklerMonitor) handleEvent(event client.Event) { slog.Info("[SPRINKLER] PR event received", "url", event.URL, - "org", org) + "org", org, + "timestamp", event.Timestamp.Format(time.RFC3339)) // Send to event channel for processing (non-blocking) select { - case sm.eventChan <- event.URL: - slog.Debug("[SPRINKLER] Event queued for processing", "url", event.URL) + case sm.eventChan <- prEvent{timestamp: event.Timestamp, url: event.URL}: + slog.Debug("[SPRINKLER] Event queued for processing", + "url", event.URL, + "timestamp", event.Timestamp.Format(time.RFC3339)) default: slog.Warn("[SPRINKLER] Event channel full, dropping event", "url", event.URL, @@ -268,34 +277,34 @@ func (sm *sprinklerMonitor) processEvents(ctx context.Context) { select { case <-ctx.Done(): return - case prURL := <-sm.eventChan: - sm.checkAndNotify(ctx, prURL) + case evt := <-sm.eventChan: + sm.checkAndNotify(ctx, evt) } } } // checkAndNotify checks if a PR is blocking and sends notification if needed. -func (sm *sprinklerMonitor) checkAndNotify(ctx context.Context, url string) { +func (sm *sprinklerMonitor) checkAndNotify(ctx context.Context, evt prEvent) { start := time.Now() user := sm.currentUser() if user == "" { - slog.Debug("[SPRINKLER] Skipping check - no user configured", "url", url) + slog.Debug("[SPRINKLER] Skipping check - no user configured", "url", evt.url) return } - repo, n := parseRepoAndNumberFromURL(url) + repo, n := parseRepoAndNumberFromURL(evt.url) if repo == "" || n == 0 { - slog.Warn("[SPRINKLER] Failed to parse PR URL", "url", url) + slog.Warn("[SPRINKLER] Failed to parse PR URL", "url", evt.url) return } - data, cached := sm.fetchTurnData(ctx, url, repo, n, start) + data, cached := sm.fetchTurnData(ctx, evt, repo, n, start) if data == nil { return } - if sm.handleClosedPR(ctx, data, url, repo, n, cached) { + if sm.handleClosedPR(ctx, data, evt.url, repo, n, cached) { return } @@ -304,11 +313,11 @@ func (sm *sprinklerMonitor) checkAndNotify(ctx context.Context, url string) { return } - if sm.handleNewPR(ctx, url, repo, n, act) { + if sm.handleNewPR(ctx, evt.url, repo, n, act) { return } - if sm.isAlreadyTrackedAsBlocked(url, repo, n) { + if sm.isAlreadyTrackedAsBlocked(evt.url, repo, n) { return } @@ -317,9 +326,10 @@ func (sm *sprinklerMonitor) checkAndNotify(ctx context.Context, url string) { "number", n, "action", act.Kind, "reason", act.Reason, + "event_timestamp", evt.timestamp.Format(time.RFC3339), "elapsed", time.Since(start).Round(time.Millisecond)) - sm.sendNotifications(ctx, url, repo, n, act) + sm.sendNotifications(ctx, evt.url, repo, n, act) } // currentUser returns the configured user for the sprinkler monitor. @@ -335,16 +345,20 @@ func (sm *sprinklerMonitor) currentUser() string { } // fetchTurnData retrieves PR data from Turn API with retry logic. -func (sm *sprinklerMonitor) fetchTurnData(ctx context.Context, url, repo string, n int, start time.Time) (*turn.CheckResponse, bool) { +func (sm *sprinklerMonitor) fetchTurnData(ctx context.Context, evt prEvent, repo string, n int, start time.Time) (*turn.CheckResponse, bool) { var data *turn.CheckResponse var cached bool err := retry.Do(func() error { var err error - data, cached, err = sm.app.turnData(ctx, url, time.Now()) + // Use event timestamp to bypass caching - this ensures we get fresh data for real-time events + data, cached, err = sm.app.turnData(ctx, evt.url, evt.timestamp) if err != nil { slog.Debug("[SPRINKLER] Turn API call failed (will retry)", - "repo", repo, "number", n, "error", err) + "repo", repo, + "number", n, + "event_timestamp", evt.timestamp.Format(time.RFC3339), + "error", err) return err } return nil @@ -365,6 +379,7 @@ func (sm *sprinklerMonitor) fetchTurnData(ctx context.Context, url, repo string, slog.Warn("[SPRINKLER] Failed to get turn data after retries", "repo", repo, "number", n, + "event_timestamp", evt.timestamp.Format(time.RFC3339), "elapsed", time.Since(start).Round(time.Millisecond), "error", err) return nil, false