diff --git a/pkg/bot/bot.go b/pkg/bot/bot.go index 4555d60..e88b986 100644 --- a/pkg/bot/bot.go +++ b/pkg/bot/bot.go @@ -734,6 +734,25 @@ func (c *Coordinator) handlePullRequestEventWithData(ctx context.Context, owner, slog.Info("no users blocking PR - no notifications needed", logFieldPR, fmt.Sprintf(prFormatString, owner, repo, prNumber), "pr_state", prState) + + // For merged/closed PRs, still update existing DMs even if no one is blocking + // This ensures users see the final state (🚀 merged or ❌ closed) + if prState == "merged" || prState == "closed" { + slog.Info("updating DMs for terminal PR state", + logFieldPR, fmt.Sprintf(prFormatString, owner, repo, prNumber), + "pr_state", prState, + "reason", "terminal_state_update") + c.updateDMMessagesForPR(ctx, prUpdateInfo{ + Owner: owner, + Repo: repo, + PRNumber: prNumber, + PRTitle: event.PullRequest.Title, + PRAuthor: event.PullRequest.User.Login, + PRState: prState, + PRURL: event.PullRequest.HTMLURL, + CheckResult: checkResult, + }) + } } } @@ -876,12 +895,18 @@ func (*Coordinator) getStateQueryParam(prState string) string { } } +// UserTagInfo contains information about a user who was tagged in PR notifications. +type UserTagInfo struct { + UserID string + IsInAnyChannel bool // True if user is member of at least one channel where they were tagged +} + // processChannelsInParallel processes multiple channels concurrently for better performance. // processChannelsInParallel processes PR notifications for multiple channels concurrently. -// Returns a map of Slack user IDs that were successfully tagged in at least one channel. +// Returns a map of Slack user IDs to tag info for users successfully tagged in at least one channel. func (c *Coordinator) processChannelsInParallel( ctx context.Context, prCtx prContext, channels []string, workspaceID string, -) map[string]bool { +) map[string]UserTagInfo { event, ok := prCtx.Event.(struct { Action string `json:"action"` PullRequest struct { @@ -951,7 +976,7 @@ func (c *Coordinator) processChannelsInParallel( // Track which Slack users were successfully tagged across all channels var taggedUsersMu sync.Mutex - taggedUsers := make(map[string]bool) + taggedUsers := make(map[string]UserTagInfo) // Process channels in parallel for better performance // Use WaitGroup instead of errgroup since we don't want one failure to cancel others @@ -967,8 +992,18 @@ func (c *Coordinator) processChannelsInParallel( // Merge tagged users from this channel into the overall set taggedUsersMu.Lock() - for userID := range channelTaggedUsers { - taggedUsers[userID] = true + for userID, info := range channelTaggedUsers { + existing, exists := taggedUsers[userID] + if !exists { + // First time seeing this user + taggedUsers[userID] = info + } else { + // User already tagged in another channel - update IsInAnyChannel if this channel has them + if info.IsInAnyChannel { + existing.IsInAnyChannel = true + taggedUsers[userID] = existing + } + } } taggedUsersMu.Unlock() }(channelName) @@ -985,10 +1020,10 @@ func (c *Coordinator) processChannelsInParallel( } // processPRForChannel handles PR processing for a single channel (extracted from the main loop). -// Returns a map of Slack user IDs that were successfully tagged in this channel. +// Returns a map of Slack user IDs to UserTagInfo for users successfully tagged in this channel. func (c *Coordinator) processPRForChannel( ctx context.Context, prCtx prContext, channelName, workspaceID string, -) map[string]bool { +) map[string]UserTagInfo { owner, repo, prNumber, prState := prCtx.Owner, prCtx.Repo, prCtx.Number, prCtx.State checkResult := prCtx.CheckRes event, ok := prCtx.Event.(struct { @@ -1145,11 +1180,12 @@ func (c *Coordinator) resolveAndValidateChannel( } // trackUserTagsForDMDelay tracks user tags in channel for DM delay logic. +// Returns map of Slack user IDs to UserTagInfo with channel membership status. func (c *Coordinator) trackUserTagsForDMDelay( ctx context.Context, workspaceID, channelID, channelDisplay, owner, repo string, prNumber int, checkResult *turn.CheckResponse, -) map[string]bool { - taggedUsers := make(map[string]bool) +) map[string]UserTagInfo { + taggedUsers := make(map[string]UserTagInfo) blockedUsers := c.extractBlockedUsersFromTurnclient(checkResult) if len(blockedUsers) == 0 { return taggedUsers @@ -1166,13 +1202,22 @@ func (c *Coordinator) trackUserTagsForDMDelay( if c.notifier != nil && c.notifier.Tracker != nil { c.notifier.Tracker.UpdateUserPRChannelTag(workspaceID, slackUserID, channelID, owner, repo, prNumber) } - taggedUsers[slackUserID] = true + + // Check if user is member of this channel (for DM delay decision) + isInChannel := c.slack.IsUserInChannel(ctx, channelID, slackUserID) + + taggedUsers[slackUserID] = UserTagInfo{ + UserID: slackUserID, + IsInAnyChannel: isInChannel, + } + slog.Debug("tracked user tag in channel", "workspace", workspaceID, "github_user", githubUser, "slack_user", slackUserID, "channel", channelDisplay, "channel_id", channelID, + "is_in_channel", isInChannel, "pr", fmt.Sprintf(prFormatString, owner, repo, prNumber)) } } diff --git a/pkg/bot/dm.go b/pkg/bot/dm.go index dafab34..74e84bc 100644 --- a/pkg/bot/dm.go +++ b/pkg/bot/dm.go @@ -33,7 +33,7 @@ type dmNotificationRequest struct { // Updates to existing DMs happen immediately (no delay). // New DMs respect reminder_dm_delay (queue for later if user in channel). // -//nolint:maintidx,revive // This function coordinates all DM scenarios (queued/sent, update/create, delay logic) and benefits from being in one place +//nolint:maintidx,revive,gocognit // This function coordinates all DM scenarios (queued/sent, update/create, delay logic) and benefits from being in one place func (c *Coordinator) sendPRNotification(ctx context.Context, req dmNotificationRequest) error { // Lock per user+PR to prevent concurrent goroutines from sending duplicate DMs lockKey := req.UserID + ":" + req.PRURL @@ -53,12 +53,24 @@ func (c *Coordinator) sendPRNotification(ctx context.Context, req dmNotification "error", err) } - // Find any pending DM for this user+PR - var pendingDM *state.PendingDM + // Find ALL pending DMs for this user+PR (there may be multiple if user tagged in multiple channels) + var matchingPendingDMs []*state.PendingDM for i := range pendingDMs { if pendingDMs[i].UserID == req.UserID && pendingDMs[i].PRURL == req.PRURL { - pendingDM = &pendingDMs[i] - break + matchingPendingDMs = append(matchingPendingDMs, &pendingDMs[i]) + } + } + + // Use the first pending DM for decision-making (they should all have same state) + var pendingDM *state.PendingDM + if len(matchingPendingDMs) > 0 { + pendingDM = matchingPendingDMs[0] + // Log if we found duplicates (indicates user was tagged in multiple channels) + if len(matchingPendingDMs) > 1 { + slog.Info("found multiple queued DMs for same user+PR (user tagged in multiple channels)", + "user", req.UserID, + "pr", req.PRURL, + "count", len(matchingPendingDMs)) } } @@ -73,19 +85,22 @@ func (c *Coordinator) sendPRNotification(ctx context.Context, req dmNotification userStillBlocked = len(req.CheckResult.Analysis.NextAction) > 0 } - // If user no longer blocked, cancel the queued DM + // If user no longer blocked, cancel ALL queued DMs if !userStillBlocked { - slog.Info("cancelling queued DM - user no longer blocked", + slog.Info("cancelling queued DMs - user no longer blocked", "user", req.UserID, "pr", req.PRURL, "old_state", pendingDM.PRState, - "new_state", prState) - if err := c.stateStore.RemovePendingDM(ctx, pendingDM.ID); err != nil { - slog.Warn("failed to remove pending DM", - "user", req.UserID, - "pr", req.PRURL, - "dm_id", pendingDM.ID, - "error", err) + "new_state", prState, + "count", len(matchingPendingDMs)) + for _, dm := range matchingPendingDMs { + if err := c.stateStore.RemovePendingDM(ctx, dm.ID); err != nil { + slog.Warn("failed to remove pending DM", + "user", req.UserID, + "pr", req.PRURL, + "dm_id", dm.ID, + "error", err) + } } return nil } @@ -97,23 +112,27 @@ func (c *Coordinator) sendPRNotification(ctx context.Context, req dmNotification "pr", req.PRURL, "old_state", pendingDM.PRState, "new_state", prState, - "scheduled_send", pendingDM.SendAfter) - // Remove old queued DM and queue new one with updated state - if err := c.stateStore.RemovePendingDM(ctx, pendingDM.ID); err != nil { - slog.Warn("failed to remove pending DM for update", - "user", req.UserID, - "pr", req.PRURL, - "dm_id", pendingDM.ID, - "error", err) - // Continue anyway - attempt to queue new DM + "scheduled_send", pendingDM.SendAfter, + "removing_duplicates", len(matchingPendingDMs)) + // Remove ALL old queued DMs and queue ONE new one with updated state + for _, dm := range matchingPendingDMs { + if err := c.stateStore.RemovePendingDM(ctx, dm.ID); err != nil { + slog.Warn("failed to remove pending DM for update", + "user", req.UserID, + "pr", req.PRURL, + "dm_id", dm.ID, + "error", err) + } } + // Queue single new DM with updated state return c.queueDMForUser(ctx, req, prState, pendingDM.SendAfter) } // State unchanged, queued DM is still valid slog.Debug("DM already queued with same state", "user", req.UserID, "pr", req.PRURL, - "state", prState) + "state", prState, + "queued_count", len(matchingPendingDMs)) return nil } @@ -223,10 +242,28 @@ func (c *Coordinator) sendPRNotification(ctx context.Context, req dmNotification // All updates failed - fall through to send new DM } + // If we know a DM exists but couldn't find/update it, don't send duplicate + // This prevents duplicate DMs when history search fails or Slack API is slow + if exists { + slog.Warn("DM exists but couldn't be located or updated - skipping to prevent duplicate", + "user", req.UserID, + "pr", req.PRURL, + "last_state", lastNotif.LastState, + "new_state", prState, + "has_channel_id", lastNotif.ChannelID != "", + "has_message_ts", lastNotif.MessageTS != "", + "impact", "user may see stale state temporarily") + return nil + } + // Path 2: Send new DM (check delay logic) shouldQueue, sendAfter := c.shouldDelayNewDM(ctx, req.UserID, req.ChannelID, req.ChannelName, req.Owner, req.Repo) if shouldQueue { + // Cancel any existing pending DMs for this user+PR before queueing new one + // This ensures we never have duplicate queued DMs (e.g., from multiple channels) + c.cancelPendingDMs(ctx, req.UserID, req.PRURL) + // Queue for later delivery slog.Info("queueing DM for delayed delivery", "user", req.UserID, @@ -307,39 +344,37 @@ func (c *Coordinator) findDMInHistory(ctx context.Context, userID, prURL string) // shouldDelayNewDM determines if a new DM should be queued for later. // Returns (shouldQueue bool, sendAfter time.Time). -// Simplified version of evaluateDMDelay - removes user presence checking and anti-spam. +// Channel membership is determined by caller - if channelID is non-empty, user was in at least one channel. func (c *Coordinator) shouldDelayNewDM( ctx context.Context, userID, channelID, channelName string, owner, _ string, ) (bool, time.Time) { - // Get configured delay for this channel (in minutes) - delayMinutes := c.configManager.ReminderDMDelay(owner, channelName) - delay := time.Duration(delayMinutes) * time.Minute - - // If delay is 0, feature is disabled - send immediately - if delay == 0 { - return false, time.Time{} - } - - // If user wasn't tagged in a channel, send immediately + // If channelID is empty, user wasn't in any channel we notified - send immediately if channelID == "" { + slog.Debug("user not in any channel, sending DM immediately", + "user", userID) return false, time.Time{} } - // Check if user is in the channel where they were tagged - isInChannel := c.slack.IsUserInChannel(ctx, channelID, userID) + // User was in at least one channel - apply configured delay + delayMinutes := c.configManager.ReminderDMDelay(owner, channelName) + delay := time.Duration(delayMinutes) * time.Minute - // If user is NOT in channel, they can't see the tag - send immediately - if !isInChannel { - slog.Debug("user not in channel, sending DM immediately", + // If delay is 0, feature is disabled - send immediately even if user in channel + if delay == 0 { + slog.Debug("DM delay feature disabled, sending immediately", "user", userID, - "channel", channelID) + "owner", owner) return false, time.Time{} } // User is in channel - queue for delayed delivery sendAfter := time.Now().Add(delay) + slog.Debug("user in channel, delaying DM", + "user", userID, + "delay_minutes", delayMinutes, + "send_after", sendAfter) return true, sendAfter } @@ -446,10 +481,10 @@ func getSentAt(info state.DMInfo, exists bool) time.Time { // sendDMNotificationsToTaggedUsers sends DM notifications to Slack users who were tagged in channels. // This runs in a separate goroutine to avoid blocking event processing. -// Uses the simplified sendPRNotification() for all DM operations. +// Decides per-user whether to send immediately or delay based on channel membership. func (c *Coordinator) sendDMNotificationsToTaggedUsers( ctx context.Context, workspaceID, owner, repo string, - prNumber int, slackUsers map[string]bool, + prNumber int, taggedUsers map[string]UserTagInfo, event struct { Action string `json:"action"` PullRequest struct { @@ -468,25 +503,30 @@ func (c *Coordinator) sendDMNotificationsToTaggedUsers( slog.Info("starting DM notification batch for tagged Slack users", "pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber), "workspace", workspaceID, - "user_count", len(slackUsers)) + "user_count", len(taggedUsers)) sentCount := 0 failedCount := 0 + delayedCount := 0 - for slackUserID := range slackUsers { - // Get tag info to determine which channel the user was tagged in + for _, userInfo := range taggedUsers { + // Determine delay based on channel membership from THIS event + // If user is NOT in any channel we notified → send immediately + // If user IS in at least one channel → apply configured delay var channelID string - if c.notifier != nil && c.notifier.Tracker != nil { - tagInfo := c.notifier.Tracker.LastUserPRChannelTag(workspaceID, slackUserID, owner, repo, prNumber) - channelID = tagInfo.ChannelID + if userInfo.IsInAnyChannel { + // User is in a channel - apply delay logic + // We don't need the specific channel, just any channel ID to trigger delay + // Use a placeholder to signal "user was in a channel" + channelID = "delay" + delayedCount++ } + // If IsInAnyChannel is false, channelID stays empty → immediate send - // ChannelName is not available (no reverse lookup), so pass empty string - // The delay logic will use the default config for the org err := c.sendPRNotification(ctx, dmNotificationRequest{ - UserID: slackUserID, - ChannelID: channelID, - ChannelName: "", // not available + UserID: userInfo.UserID, + ChannelID: channelID, // "delay" or empty based on channel membership + ChannelName: "", // not used Owner: owner, Repo: repo, PRNumber: prNumber, @@ -498,7 +538,7 @@ func (c *Coordinator) sendDMNotificationsToTaggedUsers( if err != nil { slog.Warn("failed to notify user", "pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber), - "slack_user", slackUserID, + "slack_user", userInfo.UserID, "error", err) failedCount++ } else { @@ -510,8 +550,9 @@ func (c *Coordinator) sendDMNotificationsToTaggedUsers( "pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber), "workspace", workspaceID, "sent_count", sentCount, + "delayed_count", delayedCount, "failed_count", failedCount, - "total_users", len(slackUsers)) + "total_users", len(taggedUsers)) } // sendDMNotificationsToBlockedUsers sends immediate DM notifications to blocked GitHub users. diff --git a/pkg/bot/dm_simplified_test.go b/pkg/bot/dm_simplified_test.go index f1d84c6..d5196c3 100644 --- a/pkg/bot/dm_simplified_test.go +++ b/pkg/bot/dm_simplified_test.go @@ -275,42 +275,33 @@ func TestShouldDelayNewDM_NoChannel(t *testing.T) { } // TestShouldDelayNewDM_UserNotInChannel verifies immediate send when user not in channel. +// Channel membership is now determined by caller - empty channelID means user not in any channel. func TestShouldDelayNewDM_UserNotInChannel(t *testing.T) { - slack := &mockSlackClient{ - isUserInChannelFunc: func(ctx context.Context, channelID, userID string) bool { - return false - }, - } config := &mockConfigManager{ dmDelay: 30, } c := &Coordinator{ - slack: slack, configManager: config, userMapper: &mockUserMapper{}, } - shouldQueue, _ := c.shouldDelayNewDM(context.Background(), "U123", "C123", "general", "owner", "repo") + // Empty channelID signals user was not in any channel + shouldQueue, _ := c.shouldDelayNewDM(context.Background(), "U123", "", "general", "owner", "repo") if shouldQueue { - t.Error("Expected shouldQueue=false when user not in channel") + t.Error("Expected shouldQueue=false when user not in channel (empty channelID)") } } // TestShouldDelayNewDM_UserInChannel verifies delayed send when user is in channel. +// Channel membership is now determined by caller - non-empty channelID means user was in a channel. func TestShouldDelayNewDM_UserInChannel(t *testing.T) { - slack := &mockSlackClient{ - isUserInChannelFunc: func(ctx context.Context, channelID, userID string) bool { - return true - }, - } config := &mockConfigManager{ dmDelay: 30, } c := &Coordinator{ - slack: slack, configManager: config, userMapper: &mockUserMapper{}, } @@ -569,8 +560,9 @@ func TestUpdateDMMessagesForPR_NoUsers_Simplified(t *testing.T) { } } -// TestSendPRNotification_UpdateFailsFallbackToNew tests fallback to new DM when update fails. -func TestSendPRNotification_UpdateFailsFallbackToNew(t *testing.T) { +// TestSendPRNotification_UpdateFails tests that we skip sending duplicate when update fails. +// This prevents duplicate DMs when Slack API is slow or history search fails. +func TestSendPRNotification_UpdateFails(t *testing.T) { store := &mockStateStore{ dmMessages: map[string]state.DMInfo{ "U123:https://github.com/owner/repo/pull/1": { @@ -616,13 +608,14 @@ func TestSendPRNotification_UpdateFailsFallbackToNew(t *testing.T) { t.Errorf("Expected no error, got: %v", err) } - // Verify both UpdateMessage and SendDirectMessage were called + // Verify UpdateMessage was attempted if len(slack.updatedMessages) == 0 { t.Error("Expected UpdateMessage to be attempted") } - if len(slack.sentDirectMessages) != 1 { - t.Fatal("Expected SendDirectMessage to be called as fallback") + // CRITICAL: No new DM should be sent to prevent duplicates + if len(slack.sentDirectMessages) != 0 { + t.Errorf("Expected no new DM when update fails (to prevent duplicates), got %d DMs", len(slack.sentDirectMessages)) } } @@ -1248,3 +1241,118 @@ func TestSendPRNotification_QueuedDMBehaviorAcrossRestarts(t *testing.T) { t.Errorf("Expected no immediate DM (should update queued DM), got %d DMs", len(slack.sentDirectMessages)) } } + +// TestSendDMNotificationsToTaggedUsers_ChannelMembershipDecision tests that users +// not in any channel get immediate DMs while users in channels get delayed DMs. +func TestSendDMNotificationsToTaggedUsers_ChannelMembershipDecision(t *testing.T) { + store := &mockStateStore{} + slack := &mockSlackClient{} + config := &mockConfigManager{ + dmDelay: 60, // 60 minute delay configured + domain: "example.com", + } + + c := &Coordinator{ + stateStore: store, + slack: slack, + configManager: config, + userMapper: &mockUserMapper{}, + } + + checkResult := newCheckResponse("awaiting_review") + + taggedUsers := map[string]UserTagInfo{ + "U123": { + UserID: "U123", + IsInAnyChannel: false, // User NOT in any channel -> immediate + }, + "U456": { + UserID: "U456", + IsInAnyChannel: true, // User IS in a channel -> delayed + }, + } + + event := struct { + Action string `json:"action"` + PullRequest struct { + HTMLURL string `json:"html_url"` + Title string `json:"title"` + CreatedAt time.Time `json:"created_at"` + User struct { + Login string `json:"login"` + } `json:"user"` + Number int `json:"number"` + } `json:"pull_request"` + Number int `json:"number"` + }{ + Action: "opened", + PullRequest: struct { + HTMLURL string `json:"html_url"` + Title string `json:"title"` + CreatedAt time.Time `json:"created_at"` + User struct { + Login string `json:"login"` + } `json:"user"` + Number int `json:"number"` + }{ + HTMLURL: "https://github.com/owner/repo/pull/1", + Title: "Test PR", + CreatedAt: time.Now(), + User: struct { + Login string `json:"login"` + }{ + Login: "author", + }, + Number: 1, + }, + Number: 1, + } + + c.sendDMNotificationsToTaggedUsers( + context.Background(), + "test-workspace", + "owner", + "repo", + 1, + taggedUsers, + event, + checkResult, + ) + + // U123 (not in channel) should get immediate DM + foundImmediateDM := false + for _, msg := range slack.sentDirectMessages { + if msg.UserID == "U123" { + foundImmediateDM = true + break + } + } + if !foundImmediateDM { + t.Error("Expected immediate DM for user U123 (not in any channel)") + } + + // U456 (in channel) should get queued DM + pendingDMs, err := store.PendingDMs(context.Background(), time.Now().Add(24*time.Hour)) + if err != nil { + t.Fatal("Failed to get pending DMs:", err) + } + + foundQueuedDM := false + for _, dm := range pendingDMs { + if dm.UserID == "U456" { + foundQueuedDM = true + break + } + } + if !foundQueuedDM { + t.Error("Expected queued DM for user U456 (in channel)") + } + + // Verify exactly 1 immediate and 1 queued + if len(slack.sentDirectMessages) != 1 { + t.Errorf("Expected exactly 1 immediate DM, got %d", len(slack.sentDirectMessages)) + } + if len(pendingDMs) != 1 { + t.Errorf("Expected exactly 1 queued DM, got %d", len(pendingDMs)) + } +}