From 99dc3f2ee138893e063b1a8e3919e19bfa71011f Mon Sep 17 00:00:00 2001 From: Thomas Stromberg Date: Tue, 28 Oct 2025 12:08:35 -0400 Subject: [PATCH] more action/mapping debugging --- internal/bot/bot.go | 278 +++++++++++++++++++++++++++++--------- internal/notify/notify.go | 30 +++- 2 files changed, 242 insertions(+), 66 deletions(-) diff --git a/internal/bot/bot.go b/internal/bot/bot.go index 71a8477..c786234 100644 --- a/internal/bot/bot.go +++ b/internal/bot/bot.go @@ -626,40 +626,65 @@ func (c *Coordinator) handlePullRequestEventWithData(ctx context.Context, owner, Event: event, CheckRes: checkResult, } - c.processChannelsInParallel(ctx, prCtx, channels, workspaceID) + taggedUsers := c.processChannelsInParallel(ctx, prCtx, channels, workspaceID) // Handle user notifications - send DMs to blocked users - // This happens AFTER channel processing completes to ensure tags are tracked first + // Logic: + // - If channels were notified (taggedUsers not empty): Only send DMs to those successfully tagged Slack users + // - If no channels were notified (taggedUsers empty/nil): Attempt DMs to all blocked GitHub users if len(blockedOn) > 0 { - // Deduplicate blocked users (same user might be blocked for multiple reasons) - uniqueUsers := make(map[string]bool) - for _, githubUser := range blockedOn { - uniqueUsers[githubUser] = true - } + if len(taggedUsers) > 0 { + // Channels were notified - only send DMs to successfully tagged Slack users + slog.Info("preparing to send DM notifications to users tagged in channels", + logFieldPR, fmt.Sprintf(prFormatString, owner, repo, prNumber), + "total_blocked_github_users", len(blockedOn), + "successfully_tagged_slack_users", len(taggedUsers), + "will_send_async", true, + "dm_logic", "immediate if not in channel, delayed if in channel") + + // Send DMs asynchronously to avoid blocking event processing + // SECURITY NOTE: Use detached context to allow graceful completion of DM notifications + // even if parent context is cancelled during shutdown. Operations are still bounded by + // explicit 60-second timeout for DM delivery (~1s each). + // Note: No panic recovery - we want panics to propagate and restart the service (Cloud Run will handle it) + // A quiet failure is worse than a visible crash that triggers automatic recovery + dmCtx, dmCancel := context.WithTimeout(context.WithoutCancel(ctx), 60*time.Second) + go func() { + defer dmCancel() + c.sendDMNotificationsToSlackUsers(dmCtx, workspaceID, owner, repo, prNumber, taggedUsers, event, prState, checkResult) + }() + } else { + // No channels were notified - attempt to send immediate DMs to all blocked GitHub users + // Deduplicate blocked users (same user might be blocked for multiple reasons) + uniqueGitHubUsers := make(map[string]bool) + for _, githubUser := range blockedOn { + uniqueGitHubUsers[githubUser] = true + } - slog.Info("preparing to send DM notifications", - logFieldPR, fmt.Sprintf(prFormatString, owner, repo, prNumber), - "total_blocked_users", len(blockedOn), - "unique_users", len(uniqueUsers), - "will_send_async", true, - "warning", "DMs are sent async - if instance crashes before completion, another instance may retry and send duplicates") - - // Send DMs asynchronously to avoid blocking event processing - // SECURITY NOTE: Use detached context to allow graceful completion of DM notifications - // even if parent context is cancelled during shutdown. Operations are still bounded by - // explicit 60-second timeout, allowing time for: - // - GitHub email lookup via gh-mailto (~5-10s with retries/timeouts) - // - Slack API user lookup for multiple guesses (~5-15s total) - // - DM delivery (~1s) - // This generous timeout prevents premature cancellation while still ensuring - // eventual completion during shutdown. Most requests complete in <10s. - // Note: No panic recovery - we want panics to propagate and restart the service (Cloud Run will handle it) - // A quiet failure is worse than a visible crash that triggers automatic recovery - dmCtx, dmCancel := context.WithTimeout(context.WithoutCancel(ctx), 60*time.Second) - go func() { - defer dmCancel() - c.sendDMNotifications(dmCtx, workspaceID, owner, repo, prNumber, uniqueUsers, event, prState, checkResult) - }() + slog.Info("preparing to send immediate DMs (no channels were notified)", + logFieldPR, fmt.Sprintf(prFormatString, owner, repo, prNumber), + "total_blocked_github_users", len(blockedOn), + "unique_github_users", len(uniqueGitHubUsers), + "will_send_async", true, + "warning", "DMs are sent async - if instance crashes before completion, another instance may retry and send duplicates") + + // Send DMs asynchronously to avoid blocking event processing + // SECURITY NOTE: Use detached context to allow graceful completion of DM notifications + // even if parent context is cancelled during shutdown. Operations are still bounded by + // explicit 60-second timeout, allowing time for: + // - GitHub email lookup via gh-mailto (~5-10s with retries/timeouts) + // - Slack API user lookup for multiple guesses (~5-15s total) + // - DM delivery (~1s) + // This generous timeout prevents premature cancellation while still ensuring + // eventual completion during shutdown. Most requests complete in <10s. + // Note: No panic recovery - we want panics to propagate and restart the service (Cloud Run will handle it) + // A quiet failure is worse than a visible crash that triggers automatic recovery + dmCtx, dmCancel := context.WithTimeout(context.WithoutCancel(ctx), 60*time.Second) + go func() { + defer dmCancel() + c.sendDMNotificationsToGitHubUsers(dmCtx, workspaceID, owner, repo, prNumber, uniqueGitHubUsers, event, prState, checkResult) + }() + } } else { slog.Info("no users blocking PR - no notifications needed", logFieldPR, fmt.Sprintf(prFormatString, owner, repo, prNumber), @@ -667,11 +692,12 @@ func (c *Coordinator) handlePullRequestEventWithData(ctx context.Context, owner, } } -// sendDMNotifications sends DM notifications to unique blocked users asynchronously. +// sendDMNotificationsToSlackUsers sends DM notifications to Slack users who were tagged in channels. // This runs in a separate goroutine to avoid blocking event processing. -func (c *Coordinator) sendDMNotifications( +// Uses Slack user IDs directly (no GitHub->Slack mapping needed). +func (c *Coordinator) sendDMNotificationsToSlackUsers( ctx context.Context, workspaceID, owner, repo string, - prNumber int, uniqueUsers map[string]bool, + prNumber int, slackUsers map[string]bool, event struct { Action string `json:"action"` PullRequest struct { @@ -688,27 +714,16 @@ func (c *Coordinator) sendDMNotifications( prState string, checkResult *turn.CheckResponse, ) { - slog.Info("starting DM notification batch", + slog.Info("starting DM notification batch for tagged Slack users", logFieldPR, fmt.Sprintf(prFormatString, owner, repo, prNumber), "workspace", workspaceID, - "user_count", len(uniqueUsers), + "user_count", len(slackUsers), "pr_state", prState) - domain := c.configManager.Domain(owner) sentCount := 0 failedCount := 0 - for githubUser := range uniqueUsers { - // Map GitHub username to Slack user ID - slackUserID, err := c.userMapper.SlackHandle(ctx, githubUser, owner, domain) - if err != nil || slackUserID == "" { - slog.Debug("could not map GitHub user to Slack - skipping DM", - logFieldPR, fmt.Sprintf(prFormatString, owner, repo, prNumber), - "github_user", githubUser, - "error", err) - continue - } - + for slackUserID := range slackUsers { // Get tag info to determine which channel the user was tagged in tagInfo := c.notifier.Tracker.LastUserPRChannelTag(workspaceID, slackUserID, owner, repo, prNumber) @@ -737,9 +752,93 @@ func (c *Coordinator) sendDMNotifications( prInfo.NextAction = checkResult.Analysis.NextAction } - err = c.notifier.NotifyUser(ctx, workspaceID, slackUserID, tagInfo.ChannelID, channelName, prInfo) + err := c.notifier.NotifyUser(ctx, workspaceID, slackUserID, tagInfo.ChannelID, channelName, prInfo) if err != nil { slog.Warn("failed to notify user", + logFieldPR, fmt.Sprintf(prFormatString, owner, repo, prNumber), + "slack_user", slackUserID, + "error", err) + failedCount++ + } else { + sentCount++ + } + } + + slog.Info("completed DM notification batch for tagged Slack users", + logFieldPR, fmt.Sprintf(prFormatString, owner, repo, prNumber), + "workspace", workspaceID, + "sent_count", sentCount, + "failed_count", failedCount, + "total_users", len(slackUsers)) +} + +// sendDMNotificationsToGitHubUsers sends immediate DM notifications to blocked GitHub users. +// This runs in a separate goroutine to avoid blocking event processing. +// Used when no channels were notified (performs GitHub->Slack mapping). +func (c *Coordinator) sendDMNotificationsToGitHubUsers( + ctx context.Context, workspaceID, owner, repo string, + prNumber int, uniqueUsers map[string]bool, + event struct { + Action string `json:"action"` + PullRequest struct { + HTMLURL string `json:"html_url"` + Title string `json:"title"` + CreatedAt time.Time `json:"created_at"` + User struct { + Login string `json:"login"` + } `json:"user"` + Number int `json:"number"` + } `json:"pull_request"` + Number int `json:"number"` + }, + prState string, + checkResult *turn.CheckResponse, +) { + slog.Info("starting immediate DM notification batch (no channels were notified)", + logFieldPR, fmt.Sprintf(prFormatString, owner, repo, prNumber), + "workspace", workspaceID, + "github_user_count", len(uniqueUsers), + "pr_state", prState, + "note", "will attempt GitHub->Slack mapping for each user") + + domain := c.configManager.Domain(owner) + sentCount := 0 + failedCount := 0 + mappingFailures := 0 + + for githubUser := range uniqueUsers { + // Map GitHub username to Slack user ID + slackUserID, err := c.userMapper.SlackHandle(ctx, githubUser, owner, domain) + if err != nil || slackUserID == "" { + slog.Info("could not map GitHub user to Slack - skipping immediate DM", + logFieldPR, fmt.Sprintf(prFormatString, owner, repo, prNumber), + "github_user", githubUser, + "error", err, + "impact", "user will not receive DM notification") + mappingFailures++ + continue + } + + // Send immediate DM (no channel tag delay logic since no channels were notified) + prInfo := notify.PRInfo{ + Owner: owner, + Repo: repo, + Number: prNumber, + Title: event.PullRequest.Title, + Author: event.PullRequest.User.Login, + State: prState, + HTMLURL: event.PullRequest.HTMLURL, + } + // Add workflow state and next actions if available + if checkResult != nil { + prInfo.WorkflowState = checkResult.Analysis.WorkflowState + prInfo.NextAction = checkResult.Analysis.NextAction + } + + // Send immediate DM (pass empty channelID and channelName since no channels were notified) + err = c.notifier.NotifyUser(ctx, workspaceID, slackUserID, "", "", prInfo) + if err != nil { + slog.Warn("failed to send immediate DM", logFieldPR, fmt.Sprintf(prFormatString, owner, repo, prNumber), "github_user", githubUser, "slack_user", slackUserID, @@ -750,12 +849,13 @@ func (c *Coordinator) sendDMNotifications( } } - slog.Info("completed DM notification batch", + slog.Info("completed immediate DM notification batch (no channels were notified)", logFieldPR, fmt.Sprintf(prFormatString, owner, repo, prNumber), "workspace", workspaceID, "sent_count", sentCount, "failed_count", failedCount, - "total_users", len(uniqueUsers)) + "mapping_failures", mappingFailures, + "total_github_users", len(uniqueUsers)) } // extractStateFromTurnclient extracts PR state from turnclient response without additional API calls. @@ -908,8 +1008,16 @@ func (c *Coordinator) updateDMMessagesForPR(ctx context.Context, pr prUpdateInfo // Determine prefix based on workflow state and next actions var prefix string if checkResult != nil { + slog.Info("determining DM emoji from analysis", + "pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber), + "workflow_state", checkResult.Analysis.WorkflowState, + "next_action_count", len(checkResult.Analysis.NextAction), + "pr_state_fallback", prState) prefix = notify.PrefixForAnalysis(checkResult.Analysis.WorkflowState, checkResult.Analysis.NextAction) } else { + slog.Info("no analysis available - using state-based emoji fallback", + "pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber), + "pr_state", prState) prefix = notify.PrefixForState(prState) } var action string @@ -1033,9 +1141,11 @@ func (*Coordinator) getStateQueryParam(prState string) string { } // processChannelsInParallel processes multiple channels concurrently for better performance. +// processChannelsInParallel processes PR notifications for multiple channels concurrently. +// Returns a map of Slack user IDs that were successfully tagged in at least one channel. func (c *Coordinator) processChannelsInParallel( ctx context.Context, prCtx prContext, channels []string, workspaceID string, -) { +) map[string]bool { event, ok := prCtx.Event.(struct { Action string `json:"action"` PullRequest struct { @@ -1051,7 +1161,7 @@ func (c *Coordinator) processChannelsInParallel( }) if !ok { slog.Error("invalid event type in prContext", "expected", "pull_request_event") - return + return nil } slog.Debug("processing PR for all configured channels", @@ -1094,7 +1204,7 @@ func (c *Coordinator) processChannelsInParallel( logFieldPR, fmt.Sprintf(prFormatString, prCtx.Owner, prCtx.Repo, prCtx.Number), "total_channels", len(channels), "valid_channels", 0) - return + return nil } slog.Debug("filtered channels for processing", @@ -1103,6 +1213,10 @@ func (c *Coordinator) processChannelsInParallel( "valid_channels", len(validChannels), "filtered_out", len(channels)-len(validChannels)) + // Track which Slack users were successfully tagged across all channels + var taggedUsersMu sync.Mutex + taggedUsers := make(map[string]bool) + // Process channels in parallel for better performance // Use WaitGroup instead of errgroup since we don't want one failure to cancel others // Note: No panic recovery - we want panics to propagate and restart the service (Cloud Run will handle it) @@ -1113,7 +1227,14 @@ func (c *Coordinator) processChannelsInParallel( for _, channelName := range validChannels { go func(ch string) { defer wg.Done() - c.processPRForChannel(ctx, prCtx, ch, workspaceID) + channelTaggedUsers := c.processPRForChannel(ctx, prCtx, ch, workspaceID) + + // Merge tagged users from this channel into the overall set + taggedUsersMu.Lock() + for userID := range channelTaggedUsers { + taggedUsers[userID] = true + } + taggedUsersMu.Unlock() }(channelName) } @@ -1121,13 +1242,17 @@ func (c *Coordinator) processChannelsInParallel( wg.Wait() slog.Debug("completed parallel channel processing", logFieldPR, fmt.Sprintf(prFormatString, prCtx.Owner, prCtx.Repo, prCtx.Number), - "channels_processed", len(validChannels)) + "channels_processed", len(validChannels), + "total_users_tagged", len(taggedUsers)) + + return taggedUsers } // processPRForChannel handles PR processing for a single channel (extracted from the main loop). +// Returns a map of Slack user IDs that were successfully tagged in this channel. func (c *Coordinator) processPRForChannel( ctx context.Context, prCtx prContext, channelName, workspaceID string, -) { +) map[string]bool { owner, repo, prNumber, prState := prCtx.Owner, prCtx.Repo, prCtx.Number, prCtx.State checkResult := prCtx.CheckRes event, ok := prCtx.Event.(struct { @@ -1145,7 +1270,7 @@ func (c *Coordinator) processPRForChannel( }) if !ok { slog.Error("invalid event type in prContext", "expected", "pull_request_event", "channel", channelName) - return + return nil } // Resolve channel name to ID for API calls @@ -1156,7 +1281,7 @@ func (c *Coordinator) processPRForChannel( logFieldPR, fmt.Sprintf(prFormatString, owner, repo, prNumber), "channel", channelName, "action_required", "verify channel exists and bot has access") - return + return nil } // For display purposes, show both name and ID @@ -1216,13 +1341,14 @@ func (c *Coordinator) processPRForChannel( "impact", "channel_update_skipped_will_retry_via_polling", "next_poll_in", "5m", "will_continue_with_next_channel", true) - return + return nil } // Track that we notified users in this channel for DM delay logic c.notifier.Tracker.UpdateChannelNotification(workspaceID, owner, repo, prNumber) - // Track user tags in channel for DM delay logic + // Track user tags in channel for DM delay logic and collect successfully tagged Slack users + taggedUsers := make(map[string]bool) blockedUsers := c.extractBlockedUsersFromTurnclient(checkResult) domain := c.configManager.Domain(owner) if len(blockedUsers) > 0 { @@ -1234,6 +1360,7 @@ func (c *Coordinator) processPRForChannel( slackUserID, err := c.userMapper.SlackHandle(lookupCtx, githubUser, owner, domain) if err == nil && slackUserID != "" { c.notifier.Tracker.UpdateUserPRChannelTag(workspaceID, slackUserID, channelID, owner, repo, prNumber) + taggedUsers[slackUserID] = true slog.Debug("tracked user tag in channel", "workspace", workspaceID, "github_user", githubUser, @@ -1251,8 +1378,16 @@ func (c *Coordinator) processPRForChannel( // Determine expected prefix based on workflow state and next actions var expectedPrefix string if checkResult != nil { + slog.Info("determining message update emoji from analysis", + "pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber), + "workflow_state", checkResult.Analysis.WorkflowState, + "next_action_count", len(checkResult.Analysis.NextAction), + "pr_state_fallback", prState) expectedPrefix = notify.PrefixForAnalysis(checkResult.Analysis.WorkflowState, checkResult.Analysis.NextAction) } else { + slog.Info("no analysis available - using state-based emoji fallback for message update", + "pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber), + "pr_state", prState) expectedPrefix = notify.PrefixForState(prState) } domain := c.configManager.Domain(owner) @@ -1341,7 +1476,10 @@ func (c *Coordinator) processPRForChannel( "thread_ts", threadTS, "action", event.Action, "pr_state", prState, - "had_state_change", oldState != "" && oldState != prState) + "had_state_change", oldState != "" && oldState != prState, + "users_tagged", len(taggedUsers)) + + return taggedUsers } // handlePullRequestFromSprinkler handles pull request events from sprinkler by fetching PR data from GitHub API. @@ -1494,9 +1632,17 @@ func (c *Coordinator) createPRThread(ctx context.Context, channel, owner, repo s // Get emoji prefix based on workflow state and next actions var prefix string if checkResult != nil { + slog.Info("determining new thread emoji from analysis", + "pr", fmt.Sprintf("%s/%s#%d", owner, repo, number), + "workflow_state", checkResult.Analysis.WorkflowState, + "next_action_count", len(checkResult.Analysis.NextAction), + "pr_state_fallback", prState) prefix = notify.PrefixForAnalysis(checkResult.Analysis.WorkflowState, checkResult.Analysis.NextAction) } else { // Fallback to state-based prefix if no checkResult available + slog.Info("no analysis available - using state-based emoji fallback for new thread", + "pr", fmt.Sprintf("%s/%s#%d", owner, repo, number), + "pr_state", prState) prefix = notify.PrefixForState(prState) } @@ -1551,10 +1697,12 @@ func (c *Coordinator) createPRThread(ctx context.Context, channel, owner, repo s domain := c.configManager.Domain(owner) if checkResult != nil && len(checkResult.Analysis.NextAction) > 0 { // SECURITY NOTE: Use detached context to complete message enrichment even during shutdown. - // Operations bounded by 15-second timeout. This ensures reasonably fast shutdown while - // completing active message updates (most lookups hit cache instantly, but occasional - // cold lookups can take 10+ seconds). - enrichCtx, enrichCancel := context.WithTimeout(context.WithoutCancel(ctx), 15*time.Second) + // Operations bounded by 60-second timeout, allowing time for: + // - GitHub email lookup via gh-mailto (~5-10s with retries/timeouts) + // - Slack API user lookups for multiple users (~5-15s total) + // - Message update API call (~1s) + // Most lookups hit cache instantly, but cold lookups can take 10+ seconds. + enrichCtx, enrichCancel := context.WithTimeout(context.WithoutCancel(ctx), 60*time.Second) // Capture variables to avoid data race capturedThreadTS := threadTS capturedOwner := owner diff --git a/internal/notify/notify.go b/internal/notify/notify.go index 9daf679..b645655 100644 --- a/internal/notify/notify.go +++ b/internal/notify/notify.go @@ -173,15 +173,24 @@ func PrimaryAction(nextActions map[string]turn.Action) string { var primaryAction string minPriority := 999 + // Track all actions considered for debugging + actionPriorities := make(map[string]int) + for _, action := range nextActions { kind := string(action.Kind) priority := actionPriority(kind) + actionPriorities[kind] = priority if priority < minPriority { minPriority = priority primaryAction = kind } } + slog.Debug("determined primary action from priorities", + "primary_action", primaryAction, + "primary_priority", minPriority, + "all_action_priorities", actionPriorities) + return primaryAction } @@ -190,18 +199,37 @@ func PrimaryAction(nextActions map[string]turn.Action) string { // 1. If workflow_state == "newly_published" → ":new:" // 2. Otherwise → emoji based on primary next_action func PrefixForAnalysis(workflowState string, nextActions map[string]turn.Action) string { + // Log input for debugging emoji selection + actionKinds := make([]string, 0, len(nextActions)) + for user, action := range nextActions { + actionKinds = append(actionKinds, fmt.Sprintf("%s:%s", user, action.Kind)) + } + slog.Debug("determining emoji prefix", + "workflow_state", workflowState, + "next_actions_count", len(nextActions), + "next_actions", actionKinds) + // Special case: newly published PRs always show :new: if workflowState == "newly_published" { + slog.Debug("using :new: emoji for newly published PR") return ":new:" } // Determine primary action and return corresponding emoji primaryAction := PrimaryAction(nextActions) if primaryAction != "" { - return PrefixForAction(primaryAction) + emoji := PrefixForAction(primaryAction) + slog.Debug("determined emoji from primary action", + "primary_action", primaryAction, + "emoji", emoji) + return emoji } // Fallback if no actions + slog.Info("no primary action found - using fallback emoji", + "workflow_state", workflowState, + "next_actions_count", len(nextActions), + "fallback_emoji", ":postal_horn:") return ":postal_horn:" }