diff --git a/cmd/registrar/main.go b/cmd/registrar/main.go index b5b990b..b800178 100644 --- a/cmd/registrar/main.go +++ b/cmd/registrar/main.go @@ -32,7 +32,7 @@ func main() { logHandler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ AddSource: true, Level: slog.LevelInfo, - ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr { + ReplaceAttr: func(_ []string, a slog.Attr) slog.Attr { // Shorten source paths to relative paths for cleaner logs if a.Key == slog.SourceKey { if source, ok := a.Value.Any().(*slog.Source); ok { diff --git a/cmd/server/main.go b/cmd/server/main.go index d898a55..99f7395 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -33,7 +33,8 @@ import ( // Returns empty string if not running on GCP or detection fails. func detectGCPProjectID(ctx context.Context) string { // Try metadata service (works on Cloud Run, GCE, GKE, Cloud Functions) - client := &http.Client{Timeout: 2 * time.Second} + httpClient := &http.Client{Timeout: 2 * time.Second} + //nolint:revive // GCP metadata service is internal and always accessed via HTTP req, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://metadata.google.internal/computeMetadata/v1/project/project-id", http.NoBody) if err != nil { @@ -41,7 +42,7 @@ func detectGCPProjectID(ctx context.Context) string { } req.Header.Set("Metadata-Flavor", "Google") - resp, err := client.Do(req) + resp, err := httpClient.Do(req) if err != nil { slog.Debug("metadata service not available (not running on GCP?)", "error", err) return "" @@ -87,7 +88,7 @@ func main() { logHandler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ AddSource: true, Level: slog.LevelInfo, - ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr { + ReplaceAttr: func(_ []string, a slog.Attr) slog.Attr { // Shorten source paths to relative paths for cleaner logs if a.Key == slog.SourceKey { if source, ok := a.Value.Any().(*slog.Source); ok { @@ -121,6 +122,7 @@ func main() { os.Exit(exitCode) } +//nolint:revive,maintidx // Complex initialization requires length for clarity and maintainability func run(ctx context.Context, cancel context.CancelFunc, cfg *config.ServerConfig) int { // Handle graceful shutdown. sigChan := make(chan os.Signal, 1) @@ -160,6 +162,7 @@ func run(ctx context.Context, cancel context.CancelFunc, cfg *config.ServerConfi slackManager := slack.NewManager(cfg.SlackSigningSecret) // Initialize state store (in-memory + Datastore or JSON for persistence). + //nolint:interfacebloat // Interface mirrors state.Store for local type safety var stateStore interface { Thread(owner, repo string, number int, channelID string) (state.ThreadInfo, bool) SaveThread(owner, repo string, number int, channelID string, info state.ThreadInfo) error @@ -668,6 +671,8 @@ func (cm *coordinatorManager) handleRefreshInstallations(ctx context.Context) { // runBotCoordinators manages bot coordinators for all GitHub installations. // It spawns one coordinator per org and refreshes the list every 5 minutes. // Failed coordinators are automatically restarted every minute. +// +//nolint:interfacebloat // Interface mirrors state.Store for local type safety func runBotCoordinators( ctx context.Context, slackManager *slack.Manager, diff --git a/internal/bot/bot.go b/internal/bot/bot.go index a376020..139384e 100644 --- a/internal/bot/bot.go +++ b/internal/bot/bot.go @@ -41,11 +41,13 @@ type prContext struct { } // ThreadCache manages PR thread IDs for a workspace. +// +//nolint:govet // Field order optimized for logical grouping over memory alignment type ThreadCache struct { - prThreads map[string]ThreadInfo // "owner/repo#123" -> thread info mu sync.RWMutex - creationLock sync.Mutex // Prevents concurrent creation of the same PR thread - creating map[string]bool // Track PRs currently being created + creationLock sync.Mutex // Prevents concurrent creation of the same PR thread + prThreads map[string]ThreadInfo // "owner/repo#123" -> thread info + creating map[string]bool // Track PRs currently being created } // ThreadInfo is an alias to state.ThreadInfo to avoid duplication. @@ -82,18 +84,20 @@ func (tc *ThreadCache) Cleanup(maxAge time.Duration) { } // Coordinator coordinates between GitHub, Slack, and notifications for a single org. +// +//nolint:govet // Field order optimized for logical grouping over memory alignment type Coordinator struct { + processingEvents sync.WaitGroup // Tracks in-flight event processing for graceful shutdown + stateStore StateStore // Persistent state across restarts + sprinklerURL string + workspaceName string // Track workspace name for better logging slack *slackpkg.Client github *github.Client configManager *config.Manager notifier *notify.Manager userMapper *usermapping.Service - sprinklerURL string - threadCache *ThreadCache // In-memory cache for fast lookups - stateStore StateStore // Persistent state across restarts - workspaceName string // Track workspace name for better logging - eventSemaphore chan struct{} // Limits concurrent event processing (prevents overwhelming APIs) - processingEvents sync.WaitGroup // Tracks in-flight event processing for graceful shutdown + threadCache *ThreadCache // In-memory cache for fast lookups + eventSemaphore chan struct{} // Limits concurrent event processing (prevents overwhelming APIs) } // StateStore interface for persistent state - allows dependency injection for testing. @@ -157,16 +161,40 @@ func New( return c } +// saveThread persists thread info to both cache and persistent storage. +// This ensures threads survive restarts and are available for closed PR updates. +func (c *Coordinator) saveThread(owner, repo string, number int, channelID string, info ThreadInfo) { + // Save to in-memory cache for fast lookups + key := fmt.Sprintf("%s/%s#%d:%s", owner, repo, number, channelID) + c.threadCache.Set(key, info) + + // Persist to state store for cross-instance sharing and restart recovery + if err := c.stateStore.SaveThread(owner, repo, number, channelID, info); err != nil { + slog.Warn("failed to persist thread to state store", + "pr", fmt.Sprintf("%s/%s#%d", owner, repo, number), + "channel_id", channelID, + "error", err, + "impact", "thread updates may fail after restart") + } else { + slog.Debug("persisted thread to state store", + "pr", fmt.Sprintf("%s/%s#%d", owner, repo, number), + "channel_id", channelID, + "thread_ts", info.ThreadTS) + } +} + // findOrCreatePRThread finds an existing thread or creates a new one for a PR. -// Returns (threadTS, wasNewlyCreated, error). +// Returns (threadTS, wasNewlyCreated, currentMessageText, error). +// +//nolint:revive // Four return values needed to track thread state and creation status func (c *Coordinator) findOrCreatePRThread(ctx context.Context, channelID, owner, repo string, prNumber int, prState string, pullRequest struct { - User struct { + CreatedAt time.Time `json:"created_at"` + User struct { Login string `json:"login"` } `json:"user"` - HTMLURL string `json:"html_url"` - Title string `json:"title"` - Number int `json:"number"` - CreatedAt time.Time `json:"created_at"` + HTMLURL string `json:"html_url"` + Title string `json:"title"` + Number int `json:"number"` }, checkResult *turn.CheckResponse, ) (threadTS string, wasNewlyCreated bool, currentMessageText string, err error) { // Use cache key that includes channel ID to support multiple channels per PR @@ -212,8 +240,8 @@ func (c *Coordinator) findOrCreatePRThread(ctx context.Context, channelID, owner logFieldChannel, channelID, "current_message_preview", initialSearchText[:min(100, len(initialSearchText))]) - // Cache the found thread with its current message text - c.threadCache.Set(cacheKey, ThreadInfo{ + // Save the found thread (cache + persist) + c.saveThread(owner, repo, prNumber, channelID, ThreadInfo{ ThreadTS: initialSearchTS, ChannelID: channelID, LastState: prState, @@ -289,8 +317,8 @@ func (c *Coordinator) findOrCreatePRThread(ctx context.Context, channelID, owner "current_message_preview", crossInstanceText[:min(100, len(crossInstanceText))], "note", "this prevented duplicate thread creation during rolling deployment") - // Cache it and return - c.threadCache.Set(cacheKey, ThreadInfo{ + // Save it and return (cache + persist) + c.saveThread(owner, repo, prNumber, channelID, ThreadInfo{ ThreadTS: crossInstanceCheckTS, ChannelID: channelID, LastState: prState, @@ -312,8 +340,8 @@ func (c *Coordinator) findOrCreatePRThread(ctx context.Context, channelID, owner return "", false, "", fmt.Errorf("failed to create PR thread: %w", err) } - // Cache the new thread with its message text - c.threadCache.Set(cacheKey, ThreadInfo{ + // Save the new thread (cache + persist) + c.saveThread(owner, repo, prNumber, channelID, ThreadInfo{ ThreadTS: newThreadTS, ChannelID: channelID, LastState: prState, @@ -337,7 +365,7 @@ func (c *Coordinator) findOrCreatePRThread(ctx context.Context, channelID, owner // Note: This is more expensive than search API but works reliably with basic bot permissions. // Results are cached by the calling code to minimize API calls. // Returns (threadTS, currentMessageText) - both empty if not found. -func (c *Coordinator) searchForPRThread(ctx context.Context, channelID, prURL string, prCreatedAt time.Time) (string, string) { +func (c *Coordinator) searchForPRThread(ctx context.Context, channelID, prURL string, prCreatedAt time.Time) (threadTS string, messageText string) { slog.Info("searching for existing PR thread using channel history", logFieldChannel, channelID, "pr_url", prURL) @@ -789,6 +817,10 @@ func (*Coordinator) extractStateFromTurnclient(checkResult *turn.CheckResponse) func (*Coordinator) extractBlockedUsersFromTurnclient(checkResult *turn.CheckResponse) []string { var blockedUsers []string for user := range checkResult.Analysis.NextAction { + // Skip _system sentinel value - it indicates processing state, not an actual user + if user == "_system" { + continue + } blockedUsers = append(blockedUsers, user) } return blockedUsers @@ -899,10 +931,11 @@ func (c *Coordinator) updateDMMessagesForPR(ctx context.Context, pr prUpdateInfo for _, slackUserID := range slackUserIDs { if err := c.slack.UpdateDMMessage(ctx, slackUserID, prURL, message); err != nil { - slog.Debug("failed to update DM message", + slog.Warn("failed to update DM message", "user", slackUserID, "pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber), "error", err, + "impact", "user sees stale PR state in DM", "reason", "DM may not exist or too old") skippedCount++ } else { @@ -1140,13 +1173,13 @@ func (c *Coordinator) processPRForChannel( // Find or create thread for this PR in this channel // Convert to the expected struct format pullRequestStruct := struct { - User struct { + CreatedAt time.Time `json:"created_at"` + User struct { Login string `json:"login"` } `json:"user"` - HTMLURL string `json:"html_url"` - Title string `json:"title"` - Number int `json:"number"` - CreatedAt time.Time `json:"created_at"` + HTMLURL string `json:"html_url"` + Title string `json:"title"` + Number int `json:"number"` }{ User: event.PullRequest.User, HTMLURL: event.PullRequest.HTMLURL, @@ -1154,7 +1187,9 @@ func (c *Coordinator) processPRForChannel( Number: event.PullRequest.Number, CreatedAt: event.PullRequest.CreatedAt, } - threadTS, wasNewlyCreated, currentText, err := c.findOrCreatePRThread(ctx, channelID, owner, repo, prNumber, prState, pullRequestStruct, checkResult) + threadTS, wasNewlyCreated, currentText, err := c.findOrCreatePRThread( + ctx, channelID, owner, repo, prNumber, prState, pullRequestStruct, checkResult, + ) if err != nil { slog.Error("failed to find or create PR thread", "workspace", c.workspaceName, @@ -1162,6 +1197,8 @@ func (c *Coordinator) processPRForChannel( "channel", channelDisplay, "channel_id", channelID, "error", err, + "impact", "channel_update_skipped_will_retry_via_polling", + "next_poll_in", "5m", "will_continue_with_next_channel", true) return } @@ -1240,11 +1277,12 @@ func (c *Coordinator) processPRForChannel( "channel", channelDisplay, "channel_id", channelID, "thread_ts", threadTS, - "error", err) + "error", err, + "impact", "message_update_skipped_will_retry_via_polling", + "next_poll_in", "5m") } else { - // Update cache with new message text - cacheKey := fmt.Sprintf("%s/%s#%d:%s", owner, repo, prNumber, channelID) - c.threadCache.Set(cacheKey, ThreadInfo{ + // Save updated thread info (cache + persist) + c.saveThread(owner, repo, prNumber, channelID, ThreadInfo{ ThreadTS: threadTS, ChannelID: channelID, LastState: prState, @@ -1411,7 +1449,9 @@ func (c *Coordinator) handlePullRequestFromSprinkler( // handlePullRequestReviewFromSprinkler handles PR review events from sprinkler. // Reviews update PR state (approved, changes requested, etc), so we treat them // like regular pull_request events and let turnclient analyze the current state. -func (c *Coordinator) handlePullRequestReviewFromSprinkler(ctx context.Context, owner, repo string, prNumber int, sprinklerURL string, eventTimestamp time.Time) { +func (c *Coordinator) handlePullRequestReviewFromSprinkler( + ctx context.Context, owner, repo string, prNumber int, sprinklerURL string, eventTimestamp time.Time, +) { slog.Info("handling PR review event from sprinkler", logFieldOwner, owner, logFieldRepo, repo, @@ -1427,13 +1467,13 @@ func (c *Coordinator) handlePullRequestReviewFromSprinkler(ctx context.Context, // Critical performance optimization: Posts thread immediately WITHOUT user mentions, // then updates asynchronously once email lookups complete (which take 13-20 seconds each). func (c *Coordinator) createPRThread(ctx context.Context, channel, owner, repo string, number int, prState string, pr struct { - User struct { + CreatedAt time.Time `json:"created_at"` + User struct { Login string `json:"login"` } `json:"user"` - HTMLURL string `json:"html_url"` - Title string `json:"title"` - Number int `json:"number"` - CreatedAt time.Time `json:"created_at"` + HTMLURL string `json:"html_url"` + Title string `json:"title"` + Number int `json:"number"` }, checkResult *turn.CheckResponse, ) (threadTS string, messageText string, err error) { // Get state-based prefix diff --git a/internal/bot/bot_sprinkler.go b/internal/bot/bot_sprinkler.go index e314132..fa8a754 100644 --- a/internal/bot/bot_sprinkler.go +++ b/internal/bot/bot_sprinkler.go @@ -67,13 +67,14 @@ func (c *Coordinator) handleSprinklerEvent(ctx context.Context, event client.Eve "event_key", eventKey, "reason", "deduplication_race") } else { - slog.Warn("failed to mark event as processed - database error", + slog.Error("failed to mark event as processed - database error", "organization", organization, "type", event.Type, "url", event.URL, "event_key", eventKey, "error", err, - "impact", "will_skip_event") + "impact", "event_dropped_will_retry_via_polling", + "next_poll_in", "5m") } return } @@ -161,16 +162,24 @@ func (c *Coordinator) handleSprinklerEvent(ctx context.Context, event client.Eve Timestamp: event.Timestamp, } - if err := c.processEvent(ctx, msg); err != nil { + // Add timeout to prevent hanging on external API failures + processCtx, processCancel := context.WithTimeout(ctx, 30*time.Second) + defer processCancel() + + if err := c.processEvent(processCtx, msg); err != nil { + timedOut := errors.Is(err, context.DeadlineExceeded) slog.Error("error processing event", "organization", organization, "error", err, "type", event.Type, "url", event.URL, - "repo", repo) + "repo", repo, + "timed_out", timedOut, + "impact", "event_dropped_will_retry_via_polling", + "next_poll_in", "5m") // Event already marked as processed before goroutine started. // Failed processing won't be retried automatically. - // This is intentional - we don't want infinite retries of broken events. + // Polling will catch this within 5 minutes for open PRs. return } @@ -184,6 +193,8 @@ func (c *Coordinator) handleSprinklerEvent(ctx context.Context, event client.Eve // waitForEventProcessing waits for all in-flight events to complete during shutdown. // Returns immediately if no events are being processed. +// +//nolint:unparam // maxWait parameter provides flexibility for different shutdown scenarios func (c *Coordinator) waitForEventProcessing(organization string, maxWait time.Duration) { // Check if any events are being processed queueLen := len(c.eventSemaphore) @@ -252,6 +263,8 @@ func (c *Coordinator) handleAuthError( } // RunWithSprinklerClient runs the bot using the official sprinkler client library. +// +//nolint:revive,maintidx // Complex retry/reconnection logic requires length for robustness func (c *Coordinator) RunWithSprinklerClient(ctx context.Context) error { slog.Info("starting bot coordinator with sprinkler client") diff --git a/internal/bot/dedup_test.go b/internal/bot/dedup_test.go index f39b25e..0cbf0cb 100644 --- a/internal/bot/dedup_test.go +++ b/internal/bot/dedup_test.go @@ -264,7 +264,7 @@ func TestConcurrentEventDeduplicationStress(t *testing.T) { var wg sync.WaitGroup wg.Add(numConcurrentEvents) - handleEvent := func(id int) { + handleEvent := func(_ int) { defer wg.Done() // Check if currently being processed @@ -299,7 +299,7 @@ func TestConcurrentEventDeduplicationStress(t *testing.T) { } // Launch all goroutines simultaneously - for i := 0; i < numConcurrentEvents; i++ { + for i := range numConcurrentEvents { go handleEvent(i) } diff --git a/internal/bot/polling.go b/internal/bot/polling.go index 4267247..7e8dd96 100644 --- a/internal/bot/polling.go +++ b/internal/bot/polling.go @@ -264,7 +264,7 @@ func (c *Coordinator) updateClosedPRThread(ctx context.Context, pr *github.PRSna return nil } - n := 0 + updatedCount := 0 for _, ch := range channels { id := c.slack.ResolveChannelID(ctx, ch) @@ -283,14 +283,46 @@ func (c *Coordinator) updateClosedPRThread(ctx context.Context, pr *github.PRSna info, ok := c.stateStore.Thread(pr.Owner, pr.Repo, pr.Number, id) if !ok { - slog.Debug("no thread found for closed PR in channel", + // Thread not in persistent storage - search channel history as fallback + // This handles cases where state was lost or thread created before persistence was added + slog.Debug("thread not in state store, searching channel history", "pr", prKey, "channel", ch, "channel_id", id, - "pr_state", pr.State, - "pr_updated_at", pr.UpdatedAt, - "possible_reason", "PR closed before thread created or thread in different channel") - continue + "pr_state", pr.State) + + threadTS, messageText := c.searchForPRThread(ctx, id, pr.URL, pr.CreatedAt) + if threadTS == "" { + slog.Debug("no thread found in channel history for closed PR", + "pr", prKey, + "channel", ch, + "channel_id", id, + "pr_state", pr.State, + "pr_created_at", pr.CreatedAt, + "possible_reason", "PR closed before thread created or thread in different channel") + continue + } + + // Found via channel history - reconstruct ThreadInfo + info = ThreadInfo{ + ThreadTS: threadTS, + ChannelID: id, + MessageText: messageText, + UpdatedAt: time.Now(), + } + + // Persist for future use (avoid redundant searches) + if err := c.stateStore.SaveThread(pr.Owner, pr.Repo, pr.Number, id, info); err != nil { + slog.Warn("failed to persist recovered thread", + "pr", prKey, + "error", err) + } + + slog.Info("found thread via channel history search", + "pr", prKey, + "channel", ch, + "thread_ts", threadTS, + "message_preview", messageText[:min(len(messageText), 100)]) } if err := c.updateThreadForClosedPR(ctx, pr, id, info); err != nil { @@ -301,7 +333,7 @@ func (c *Coordinator) updateClosedPRThread(ctx context.Context, pr *github.PRSna continue } - n++ + updatedCount++ slog.Info("updated thread for closed/merged PR", "pr", prKey, "state", pr.State, @@ -309,7 +341,7 @@ func (c *Coordinator) updateClosedPRThread(ctx context.Context, pr *github.PRSna "thread_ts", info.ThreadTS) } - if n == 0 { + if updatedCount == 0 { return errors.New("no threads found or updated for closed PR") } diff --git a/internal/config/config_race_test.go b/internal/config/config_race_test.go index 859d0da..6ea253c 100644 --- a/internal/config/config_race_test.go +++ b/internal/config/config_race_test.go @@ -24,11 +24,11 @@ func TestConfigCacheRace(t *testing.T) { var wg sync.WaitGroup wg.Add(numGoroutines) - for i := 0; i < numGoroutines; i++ { + for range numGoroutines { go func() { defer wg.Done() // Each goroutine performs 100 cache accesses - for j := 0; j < 100; j++ { + for j := range 100 { // Mix of hits and misses if j%2 == 0 { cache.get("test-org") // Cache hit @@ -69,7 +69,7 @@ func TestConfigCacheStatsNoLock(t *testing.T) { done := make(chan struct{}) // Reader goroutines - continuously call stats() - for i := 0; i < 10; i++ { + for range 10 { wg.Add(1) go func() { defer wg.Done() @@ -85,11 +85,11 @@ func TestConfigCacheStatsNoLock(t *testing.T) { } // Writer goroutines - continuously call get() - for i := 0; i < 10; i++ { + for range 10 { wg.Add(1) go func() { defer wg.Done() - for j := 0; j < 1000; j++ { + for range 1000 { cache.get("test-org") } }() diff --git a/internal/github/graphql.go b/internal/github/graphql.go index a2dd4b6..0a4347b 100644 --- a/internal/github/graphql.go +++ b/internal/github/graphql.go @@ -14,15 +14,15 @@ import ( // PRSnapshot contains minimal PR information from GraphQL query. type PRSnapshot struct { + UpdatedAt time.Time + CreatedAt time.Time Owner string Repo string - Number int Title string Author string URL string - UpdatedAt time.Time - CreatedAt time.Time State string // "OPEN", "CLOSED", "MERGED" + Number int IsDraft bool } @@ -67,16 +67,17 @@ func (c *GraphQLClient) ListClosedPRs(ctx context.Context, org string, updatedSi since := time.Now().Add(-time.Duration(updatedSinceHours) * time.Hour) // GraphQL query structure + //nolint:govet // Inline anonymous struct matches GraphQL API structure for clarity var query struct { Search struct { Nodes []struct { PullRequest struct { - Number int - Title string - URL string UpdatedAt time.Time CreatedAt time.Time + Title string + URL string State string + Number int IsDraft bool Merged bool Author struct { @@ -190,16 +191,17 @@ func (c *GraphQLClient) listOpenPRsGraphQL(ctx context.Context, org string, upda since := time.Now().Add(-time.Duration(updatedSinceHours) * time.Hour) // GraphQL query structure + //nolint:govet // Inline anonymous struct matches GraphQL API structure for clarity var query struct { Search struct { Nodes []struct { PullRequest struct { - Number int - Title string - URL string UpdatedAt time.Time CreatedAt time.Time + Title string + URL string State string + Number int IsDraft bool Author struct { Login string diff --git a/internal/notify/daily.go b/internal/notify/daily.go index 17c5b9c..870406a 100644 --- a/internal/notify/daily.go +++ b/internal/notify/daily.go @@ -198,7 +198,7 @@ func (d *DailyDigestScheduler) processOrgDigests(ctx context.Context, org string } // fetchOrgPRs fetches all open PRs for an organization. -func (d *DailyDigestScheduler) fetchOrgPRs(ctx context.Context, githubClient *github.Client, org string) ([]home.PR, error) { +func (*DailyDigestScheduler) fetchOrgPRs(ctx context.Context, githubClient *github.Client, org string) ([]home.PR, error) { client := githubClient.Client() // Search for all open PRs in this org @@ -267,7 +267,7 @@ func (d *DailyDigestScheduler) fetchOrgPRs(ctx context.Context, githubClient *gi } // analyzePR analyzes a PR with turnclient. -func (d *DailyDigestScheduler) analyzePR(ctx context.Context, githubClient *github.Client, org string, pr home.PR) (*turn.CheckResponse, error) { +func (*DailyDigestScheduler) analyzePR(ctx context.Context, githubClient *github.Client, _ string, pr home.PR) (*turn.CheckResponse, error) { turnClient, err := turn.NewDefaultClient() if err != nil { return nil, fmt.Errorf("failed to create turn client: %w", err) @@ -305,7 +305,7 @@ func (d *DailyDigestScheduler) analyzePR(ctx context.Context, githubClient *gith } // enrichPR enriches a PR with turnclient analysis results. -func (d *DailyDigestScheduler) enrichPR(pr home.PR, checkResult *turn.CheckResponse, githubUser string, action turn.Action) home.PR { +func (*DailyDigestScheduler) enrichPR(pr home.PR, _ *turn.CheckResponse, _ string, action turn.Action) home.PR { pr.ActionKind = string(action.Kind) pr.ActionReason = action.Reason pr.NeedsReview = action.Kind == "review" || action.Kind == "approve" @@ -315,7 +315,10 @@ func (d *DailyDigestScheduler) enrichPR(pr home.PR, checkResult *turn.CheckRespo } // shouldSendDigest determines if a digest should be sent to a user now. -func (d *DailyDigestScheduler) shouldSendDigest(ctx context.Context, userMapper *usermapping.Service, slackClient *slackpkg.Client, githubUser, org, domain string, prs []home.PR) bool { +func (d *DailyDigestScheduler) shouldSendDigest( + ctx context.Context, userMapper *usermapping.Service, slackClient *slackpkg.Client, + githubUser, org, domain string, _ []home.PR, +) bool { // Map to Slack user slackUserID, err := userMapper.SlackHandle(ctx, githubUser, org, domain) if err != nil || slackUserID == "" { @@ -368,7 +371,10 @@ func (d *DailyDigestScheduler) shouldSendDigest(ctx context.Context, userMapper } // sendDigest sends a daily digest to a user. -func (d *DailyDigestScheduler) sendDigest(ctx context.Context, userMapper *usermapping.Service, slackClient *slackpkg.Client, githubUser, org, domain string, prs []home.PR) error { +func (d *DailyDigestScheduler) sendDigest( + ctx context.Context, userMapper *usermapping.Service, slackClient *slackpkg.Client, + githubUser, org, domain string, prs []home.PR, +) error { // Map to Slack user slackUserID, err := userMapper.SlackHandle(ctx, githubUser, org, domain) if err != nil { @@ -435,7 +441,7 @@ func (d *DailyDigestScheduler) formatDigestMessage(incoming, outgoing []home.PR) } // formatDigestMessageAt formats a daily digest message at a specific time (for testing). -func (d *DailyDigestScheduler) formatDigestMessageAt(incoming, outgoing []home.PR, now time.Time) string { +func (*DailyDigestScheduler) formatDigestMessageAt(incoming, outgoing []home.PR, now time.Time) string { var sb strings.Builder // Friendly, happy greetings - keep it chill and inviting diff --git a/internal/notify/daily_test.go b/internal/notify/daily_test.go index db297de..6dd63da 100644 --- a/internal/notify/daily_test.go +++ b/internal/notify/daily_test.go @@ -172,7 +172,7 @@ func TestDailyDigestExample(t *testing.T) { t.Logf("Example daily digest DM:\n\n%s\n", message) // Verify it has the expected structure - if len(message) == 0 { + if message == "" { t.Error("Message should not be empty") } @@ -185,7 +185,9 @@ func TestDailyDigestExample(t *testing.T) { } // Should contain all PR URLs - allPRs := append(exampleIncoming, exampleOutgoing...) + allPRs := make([]home.PR, 0, len(exampleIncoming)+len(exampleOutgoing)) + allPRs = append(allPRs, exampleIncoming...) + allPRs = append(allPRs, exampleOutgoing...) for _, pr := range allPRs { if !strings.Contains(message, pr.URL) { t.Errorf("Message should contain PR URL: %s", pr.URL) diff --git a/internal/notify/notify.go b/internal/notify/notify.go index 0451000..ae518a3 100644 --- a/internal/notify/notify.go +++ b/internal/notify/notify.go @@ -286,13 +286,16 @@ func (m *Manager) NotifyUser(ctx context.Context, workspaceID, userID, channelID slog.Warn("failed to check for recent DM, will send anyway to avoid false negative", "user", userID, "pr", fmt.Sprintf("%s/%s#%d", pr.Owner, pr.Repo, pr.Number), - "error", err) + "error", err, + "check_window", "1h", + "impact", "possible_duplicate_dm") } else if hasRecent { slog.Info("skipping DM - already sent notification about this PR recently", "user", userID, "pr", fmt.Sprintf("%s/%s#%d", pr.Owner, pr.Repo, pr.Number), - "check_window", "1 hour", - "reason", "duplicate prevention during rolling deployment") + "check_window", "1h", + "reason", "duplicate prevention during rolling deployment", + "will_retry_later", false) return nil } diff --git a/internal/slack/events_router_test.go b/internal/slack/events_router_test.go index 03c4d41..8eba577 100644 --- a/internal/slack/events_router_test.go +++ b/internal/slack/events_router_test.go @@ -33,14 +33,6 @@ func (m *mockHomeHandler) handler(ctx context.Context, teamID, userID string) er return m.callErr } -func (m *mockHomeHandler) getCalls() []homeViewCall { - m.mu.Lock() - defer m.mu.Unlock() - result := make([]homeViewCall, len(m.calls)) - copy(result, m.calls) - return result -} - // TestHandleInteractionsRefreshButton tests the refresh button interaction flow. func TestHandleInteractionsRefreshButton(t *testing.T) { tests := []struct { diff --git a/internal/slack/home_handler.go b/internal/slack/home_handler.go index 3c0bb21..c501349 100644 --- a/internal/slack/home_handler.go +++ b/internal/slack/home_handler.go @@ -43,7 +43,7 @@ func (h *HomeHandler) HandleAppHomeOpened(ctx context.Context, teamID, slackUser "slack_user_id", slackUserID) // Try up to 2 times - first with cached client, second with fresh client after invalid_auth - for attempt := 0; attempt < 2; attempt++ { + for attempt := range 2 { if attempt > 0 { slog.Info("retrying home view after invalid_auth", "team_id", teamID, "attempt", attempt+1) } diff --git a/internal/slack/manager.go b/internal/slack/manager.go index 4bb4000..b81aaf9 100644 --- a/internal/slack/manager.go +++ b/internal/slack/manager.go @@ -26,13 +26,15 @@ type StateStore interface { } // Manager manages Slack clients for multiple workspaces. +// +//nolint:govet // Field order optimized for logical grouping over memory alignment type Manager struct { + mu sync.RWMutex + signingSecret string + stateStore StateStore // State store for DM message tracking clients map[string]*Client // team_id -> client metadata map[string]*WorkspaceMetadata - signingSecret string homeViewHandler func(ctx context.Context, teamID, userID string) error // Global home view handler - stateStore StateStore // State store for DM message tracking - mu sync.RWMutex } // NewManager creates a new Slack client manager. diff --git a/internal/slack/slack.go b/internal/slack/slack.go index ad782f4..405e041 100644 --- a/internal/slack/slack.go +++ b/internal/slack/slack.go @@ -8,6 +8,7 @@ import ( "crypto/sha256" "encoding/hex" "encoding/json" + "errors" "fmt" "io" "log/slog" @@ -44,17 +45,32 @@ type apiCache struct { misses int64 // Cache miss counter } +// circuitBreaker implements simple circuit breaker pattern for API calls. +// +//nolint:govet // Field order prioritizes logical grouping over memory alignment optimization +type circuitBreaker struct { + mu sync.Mutex + lastFailure time.Time + openUntil time.Time + state string // "closed", "open", "half-open" + failures int + failureLimit int // Number of failures before opening circuit +} + // Client wraps the Slack API client with caching. +// +//nolint:govet // Field order optimized for logical grouping over memory alignment type Client struct { + homeViewHandlerMu sync.RWMutex + stateStoreMu sync.RWMutex + signingSecret string + teamID string // Workspace team ID + stateStore StateStore // State store for DM message tracking api *slack.Client cache *apiCache - signingSecret string - teamID string // Workspace team ID + breaker *circuitBreaker manager *Manager // Reference to manager for cache invalidation homeViewHandler func(ctx context.Context, teamID, userID string) error // Callback for app_home_opened events - homeViewHandlerMu sync.RWMutex - stateStore StateStore // State store for DM message tracking - stateStoreMu sync.RWMutex } // set stores a value in the cache with TTL. @@ -111,6 +127,63 @@ func (c *Client) invalidateChannelCache(channelID string) { slog.Debug("invalidated channel caches", "channel_id", channelID, "cleared", "membership") } +// shouldSkipCall checks if circuit breaker is open (API unavailable). +func (cb *circuitBreaker) shouldSkipCall() bool { + cb.mu.Lock() + defer cb.mu.Unlock() + + // Circuit open - fast-fail + if time.Now().Before(cb.openUntil) { + return true + } + + // Circuit was open but timeout elapsed - move to half-open (allow one retry) + if cb.state == "open" { + cb.state = "half-open" + } + + // Reset failure count if last failure was >1 minute ago + if time.Since(cb.lastFailure) > 1*time.Minute { + cb.failures = 0 + cb.state = "closed" + } + + return false +} + +// recordSuccess resets circuit breaker on successful API call. +func (cb *circuitBreaker) recordSuccess() { + cb.mu.Lock() + defer cb.mu.Unlock() + + if cb.state == "half-open" { + slog.Info("Slack API circuit breaker recovered - back to normal operation") + } + + cb.failures = 0 + cb.state = "closed" + cb.openUntil = time.Time{} +} + +// recordFailure tracks API failures and opens circuit if threshold exceeded. +func (cb *circuitBreaker) recordFailure() { + cb.mu.Lock() + defer cb.mu.Unlock() + + cb.failures++ + cb.lastFailure = time.Now() + + // Open circuit after threshold + if cb.failures >= cb.failureLimit && cb.state != "open" { + cb.state = "open" + cb.openUntil = time.Now().Add(1 * time.Minute) + slog.Error("Slack API circuit breaker opened - fast-failing for 1 minute", + "failure_count", cb.failures, + "will_retry_at", cb.openUntil.Format(time.RFC3339), + "impact", "Slack operations will fail-fast until circuit closes") + } +} + // New creates a new Slack client with caching. func New(token, signingSecret string) *Client { return &Client{ @@ -119,6 +192,10 @@ func New(token, signingSecret string) *Client { cache: &apiCache{ entries: make(map[string]cacheEntry), }, + breaker: &circuitBreaker{ + state: "closed", + failureLimit: 10, // Open circuit after 10 consecutive failures + }, } } @@ -206,6 +283,11 @@ func (c *Client) WorkspaceInfo(ctx context.Context) (*slack.TeamInfo, error) { // PostThread creates a new thread in a channel for a PR with retry logic. func (c *Client) PostThread(ctx context.Context, channelID, text string, attachments []slack.Attachment) (string, error) { + // Check circuit breaker before making API call + if c.breaker.shouldSkipCall() { + return "", errors.New("slack API circuit breaker open - service temporarily unavailable") + } + slog.Info("posting thread to channel", "channel_id", channelID, "text_preview", func() string { @@ -265,9 +347,11 @@ func (c *Client) PostThread(ctx context.Context, channelID, text string, attachm retry.Context(ctx), ) if err != nil { + c.breaker.recordFailure() return "", fmt.Errorf("failed to post message after retries: %w", err) } + c.breaker.recordSuccess() slog.Info("successfully posted thread", "thread_timestamp", timestamp, "channel_id", channelID, @@ -284,6 +368,10 @@ func (c *Client) PostThread(ctx context.Context, channelID, text string, attachm // UpdateMessage updates an existing message with retry logic. func (c *Client) UpdateMessage(ctx context.Context, channelID, timestamp, text string) error { + // Check circuit breaker before making API call + if c.breaker.shouldSkipCall() { + return errors.New("slack API circuit breaker open - service temporarily unavailable") + } slog.Debug("updating message", "channel_id", channelID, "timestamp", timestamp, @@ -328,9 +416,11 @@ func (c *Client) UpdateMessage(ctx context.Context, channelID, timestamp, text s retry.Context(ctx), ) if err != nil { + c.breaker.recordFailure() return fmt.Errorf("failed to update message after retries: %w", err) } + c.breaker.recordSuccess() slog.Debug("successfully updated message", "timestamp", timestamp, "channel_id", channelID) @@ -589,6 +679,11 @@ func (c *Client) HasRecentDMAboutPR(ctx context.Context, userID, prURL string) ( // SendDirectMessage sends a direct message to a user with retry logic. func (c *Client) SendDirectMessage(ctx context.Context, userID, text string) (dmChannelID, messageTS string, err error) { + // Check circuit breaker before making API call + if c.breaker.shouldSkipCall() { + return "", "", errors.New("slack API circuit breaker open - service temporarily unavailable") + } + slog.Info("sending DM to user", "user", userID) var channelID string @@ -643,9 +738,11 @@ func (c *Client) SendDirectMessage(ctx context.Context, userID, text string) (dm retry.Context(ctx), ) if err != nil { + c.breaker.recordFailure() return "", "", fmt.Errorf("failed to send DM after retries: %w", err) } + c.breaker.recordSuccess() slog.Info("successfully sent DM", "user", userID, "channel_id", channelID, "message_ts", msgTS) return channelID, msgTS, nil } diff --git a/internal/state/datastore.go b/internal/state/datastore.go index 522319a..6fcdae3 100644 --- a/internal/state/datastore.go +++ b/internal/state/datastore.go @@ -34,46 +34,46 @@ var ErrAlreadyProcessed = errors.New("event already processed by another instanc // Thread entity for Datastore. type threadEntity struct { + UpdatedAt time.Time `datastore:"updated_at"` + LastEventTime time.Time `datastore:"last_event_time"` ThreadTS string `datastore:"thread_ts"` ChannelID string `datastore:"channel_id"` MessageText string `datastore:"message_text,noindex"` - UpdatedAt time.Time `datastore:"updated_at"` - LastEventTime time.Time `datastore:"last_event_time"` } // DM tracking entity. type dmEntity struct { + SentAt time.Time `datastore:"sent_at"` UserID string `datastore:"user_id"` PRURL string `datastore:"pr_url"` - SentAt time.Time `datastore:"sent_at"` } // DM message entity for updating messages. type dmMessageEntity struct { + UpdatedAt time.Time `datastore:"updated_at"` + SentAt time.Time `datastore:"sent_at"` ChannelID string `datastore:"channel_id"` MessageTS string `datastore:"message_ts"` MessageText string `datastore:"message_text,noindex"` - UpdatedAt time.Time `datastore:"updated_at"` - SentAt time.Time `datastore:"sent_at"` } // Digest tracking entity. type digestEntity struct { + SentAt time.Time `datastore:"sent_at"` UserID string `datastore:"user_id"` Date string `datastore:"date"` // YYYY-MM-DD format - SentAt time.Time `datastore:"sent_at"` } // Event deduplication entity. type eventEntity struct { - EventKey string `datastore:"event_key"` Processed time.Time `datastore:"processed"` + EventKey string `datastore:"event_key"` } // Notification tracking entity. type notifyEntity struct { - PRURL string `datastore:"pr_url"` NotifiedAt time.Time `datastore:"notified_at"` + PRURL string `datastore:"pr_url"` } // NewDatastoreStore creates a new Datastore-backed store with in-memory cache. diff --git a/internal/state/json.go b/internal/state/json.go index 868b7e4..3055068 100644 --- a/internal/state/json.go +++ b/internal/state/json.go @@ -13,12 +13,12 @@ import ( // JSONStore implements Store using JSON files in the user cache directory. // Simple, reliable, and easy to debug. +// +//nolint:govet // Field order optimized for logical grouping over memory alignment type JSONStore struct { - baseDir string - mu sync.RWMutex - - // In-memory cache for fast lookups - threads map[string]ThreadInfo + mu sync.RWMutex + baseDir string + threads map[string]ThreadInfo // In-memory cache for fast lookups dms map[string]time.Time dmMessages map[string]DMInfo // DM message tracking for updates digests map[string]time.Time diff --git a/internal/state/memory.go b/internal/state/memory.go index d5b40ea..e796dc7 100644 --- a/internal/state/memory.go +++ b/internal/state/memory.go @@ -8,11 +8,11 @@ import ( // MemoryStore implements Store using in-memory maps only (no persistence). // This is used as a cache layer on top of Datastore or JSON backends. +// +//nolint:govet // Field order optimized for logical grouping over memory alignment type MemoryStore struct { - mu sync.RWMutex - - // In-memory cache for fast lookups - threads map[string]ThreadInfo + mu sync.RWMutex + threads map[string]ThreadInfo // In-memory cache for fast lookups dms map[string]time.Time dmMessages map[string]DMInfo digests map[string]time.Time diff --git a/internal/state/store.go b/internal/state/store.go index 6c85275..78c2d79 100644 --- a/internal/state/store.go +++ b/internal/state/store.go @@ -17,15 +17,17 @@ type ThreadInfo struct { // DMInfo stores information about a DM message for a PR. type DMInfo struct { + UpdatedAt time.Time `json:"updated_at"` // When we last updated this message + SentAt time.Time `json:"sent_at"` // When we first sent this message ChannelID string `json:"channel_id"` // DM conversation channel ID MessageTS string `json:"message_ts"` // Message timestamp for updating MessageText string `json:"message_text"` // Current message text - UpdatedAt time.Time `json:"updated_at"` // When we last updated this message - SentAt time.Time `json:"sent_at"` // When we first sent this message } // Store provides persistent storage for bot state. // Implementations must be safe for concurrent use. +// +//nolint:interfacebloat // Store intentionally groups all state operations for simplicity type Store interface { // Thread operations - map PR to Slack thread Thread(owner, repo string, number int, channelID string) (ThreadInfo, bool) diff --git a/internal/usermapping/usermapping.go b/internal/usermapping/usermapping.go index 0f6b870..bdb3605 100644 --- a/internal/usermapping/usermapping.go +++ b/internal/usermapping/usermapping.go @@ -48,13 +48,15 @@ type GitHubEmailLookup interface { } // Service handles GitHub-to-Slack user mapping. +// +//nolint:govet // Field order optimized for logical grouping over memory alignment type Service struct { + cacheMu sync.RWMutex + flight singleflight.Group // Deduplicates concurrent identical lookups slackClient SlackAPI githubLookup GitHubEmailLookup cache map[string]*UserMapping lookupSem chan struct{} // Semaphore for limiting concurrent lookups - cacheMu sync.RWMutex - flight singleflight.Group // Deduplicates concurrent identical lookups } // New creates a new user mapping service. diff --git a/pkg/home/ui.go b/pkg/home/ui.go index e8d3dcc..69b8d7d 100644 --- a/pkg/home/ui.go +++ b/pkg/home/ui.go @@ -65,10 +65,7 @@ func BuildBlocks(dashboard *Dashboard, primaryOrg string) []slack.Block { slack.NewContextBlock("", slack.NewTextBlockObject("mrkdwn", ctx, false, false), ), - ) - - // Refresh button - blocks = append(blocks, + // Refresh button slack.NewActionBlock( "refresh_actions", slack.NewButtonBlockElement( @@ -77,11 +74,10 @@ func BuildBlocks(dashboard *Dashboard, primaryOrg string) []slack.Block { slack.NewTextBlockObject("plain_text", "πŸ”„ Refresh Dashboard", true, false), ).WithStyle("primary"), ), + // Incoming PRs section + slack.NewDividerBlock(), ) - // Incoming PRs section - blocks = append(blocks, slack.NewDividerBlock()) - incoming := fmt.Sprintf(":arrow_down: *Incoming PRs* (%d total)", counts.IncomingTotal) if counts.IncomingBlocked > 0 { incoming = fmt.Sprintf(":rotating_light: *Incoming PRs* β€’ *%d blocked on you* β€’ %d total", counts.IncomingBlocked, counts.IncomingTotal) @@ -160,21 +156,20 @@ func BuildBlocks(dashboard *Dashboard, primaryOrg string) []slack.Block { func formatEnhancedPRBlock(pr *PR) slack.Block { // Status indicators - clear visual hierarchy var emoji, status string - if pr.IsBlocked { - if pr.NeedsReview { - // Blocked on YOU - highest priority - emoji = "🚨" - status = "*BLOCKED ON YOU*" - } else { - // Blocked on author - emoji = "⏸️" - status = "Blocked on author" - } - } else if pr.NeedsReview { + switch { + case pr.IsBlocked && pr.NeedsReview: + // Blocked on YOU - highest priority + emoji = "🚨" + status = "*BLOCKED ON YOU*" + case pr.IsBlocked: + // Blocked on author + emoji = "⏸️" + status = "Blocked on author" + case pr.NeedsReview: // Ready for your review emoji = "πŸ‘€" status = "Ready for review" - } else { + default: // Waiting/in progress emoji = "⏳" status = "In progress"