Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 55 additions & 10 deletions pkg/bot/bot.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,25 @@ func (c *Coordinator) handlePullRequestEventWithData(ctx context.Context, owner,
slog.Info("no users blocking PR - no notifications needed",
logFieldPR, fmt.Sprintf(prFormatString, owner, repo, prNumber),
"pr_state", prState)

// For merged/closed PRs, still update existing DMs even if no one is blocking
// This ensures users see the final state (🚀 merged or ❌ closed)
if prState == "merged" || prState == "closed" {
slog.Info("updating DMs for terminal PR state",
logFieldPR, fmt.Sprintf(prFormatString, owner, repo, prNumber),
"pr_state", prState,
"reason", "terminal_state_update")
c.updateDMMessagesForPR(ctx, prUpdateInfo{
Owner: owner,
Repo: repo,
PRNumber: prNumber,
PRTitle: event.PullRequest.Title,
PRAuthor: event.PullRequest.User.Login,
PRState: prState,
PRURL: event.PullRequest.HTMLURL,
CheckResult: checkResult,
})
}
}
}

Expand Down Expand Up @@ -876,12 +895,18 @@ func (*Coordinator) getStateQueryParam(prState string) string {
}
}

// UserTagInfo contains information about a user who was tagged in PR notifications.
type UserTagInfo struct {
UserID string
IsInAnyChannel bool // True if user is member of at least one channel where they were tagged
}

// processChannelsInParallel processes multiple channels concurrently for better performance.
// processChannelsInParallel processes PR notifications for multiple channels concurrently.
// Returns a map of Slack user IDs that were successfully tagged in at least one channel.
// Returns a map of Slack user IDs to tag info for users successfully tagged in at least one channel.
func (c *Coordinator) processChannelsInParallel(
ctx context.Context, prCtx prContext, channels []string, workspaceID string,
) map[string]bool {
) map[string]UserTagInfo {
event, ok := prCtx.Event.(struct {
Action string `json:"action"`
PullRequest struct {
Expand Down Expand Up @@ -951,7 +976,7 @@ func (c *Coordinator) processChannelsInParallel(

// Track which Slack users were successfully tagged across all channels
var taggedUsersMu sync.Mutex
taggedUsers := make(map[string]bool)
taggedUsers := make(map[string]UserTagInfo)

// Process channels in parallel for better performance
// Use WaitGroup instead of errgroup since we don't want one failure to cancel others
Expand All @@ -967,8 +992,18 @@ func (c *Coordinator) processChannelsInParallel(

// Merge tagged users from this channel into the overall set
taggedUsersMu.Lock()
for userID := range channelTaggedUsers {
taggedUsers[userID] = true
for userID, info := range channelTaggedUsers {
existing, exists := taggedUsers[userID]
if !exists {
// First time seeing this user
taggedUsers[userID] = info
} else {
// User already tagged in another channel - update IsInAnyChannel if this channel has them
if info.IsInAnyChannel {
existing.IsInAnyChannel = true
taggedUsers[userID] = existing
}
}
}
taggedUsersMu.Unlock()
}(channelName)
Expand All @@ -985,10 +1020,10 @@ func (c *Coordinator) processChannelsInParallel(
}

// processPRForChannel handles PR processing for a single channel (extracted from the main loop).
// Returns a map of Slack user IDs that were successfully tagged in this channel.
// Returns a map of Slack user IDs to UserTagInfo for users successfully tagged in this channel.
func (c *Coordinator) processPRForChannel(
ctx context.Context, prCtx prContext, channelName, workspaceID string,
) map[string]bool {
) map[string]UserTagInfo {
owner, repo, prNumber, prState := prCtx.Owner, prCtx.Repo, prCtx.Number, prCtx.State
checkResult := prCtx.CheckRes
event, ok := prCtx.Event.(struct {
Expand Down Expand Up @@ -1145,11 +1180,12 @@ func (c *Coordinator) resolveAndValidateChannel(
}

// trackUserTagsForDMDelay tracks user tags in channel for DM delay logic.
// Returns map of Slack user IDs to UserTagInfo with channel membership status.
func (c *Coordinator) trackUserTagsForDMDelay(
ctx context.Context, workspaceID, channelID, channelDisplay, owner, repo string, prNumber int,
checkResult *turn.CheckResponse,
) map[string]bool {
taggedUsers := make(map[string]bool)
) map[string]UserTagInfo {
taggedUsers := make(map[string]UserTagInfo)
blockedUsers := c.extractBlockedUsersFromTurnclient(checkResult)
if len(blockedUsers) == 0 {
return taggedUsers
Expand All @@ -1166,13 +1202,22 @@ func (c *Coordinator) trackUserTagsForDMDelay(
if c.notifier != nil && c.notifier.Tracker != nil {
c.notifier.Tracker.UpdateUserPRChannelTag(workspaceID, slackUserID, channelID, owner, repo, prNumber)
}
taggedUsers[slackUserID] = true

// Check if user is member of this channel (for DM delay decision)
isInChannel := c.slack.IsUserInChannel(ctx, channelID, slackUserID)

taggedUsers[slackUserID] = UserTagInfo{
UserID: slackUserID,
IsInAnyChannel: isInChannel,
}

slog.Debug("tracked user tag in channel",
"workspace", workspaceID,
"github_user", githubUser,
"slack_user", slackUserID,
"channel", channelDisplay,
"channel_id", channelID,
"is_in_channel", isInChannel,
"pr", fmt.Sprintf(prFormatString, owner, repo, prNumber))
}
}
Expand Down
153 changes: 97 additions & 56 deletions pkg/bot/dm.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type dmNotificationRequest struct {
// Updates to existing DMs happen immediately (no delay).
// New DMs respect reminder_dm_delay (queue for later if user in channel).
//
//nolint:maintidx,revive // This function coordinates all DM scenarios (queued/sent, update/create, delay logic) and benefits from being in one place
//nolint:maintidx,revive,gocognit // This function coordinates all DM scenarios (queued/sent, update/create, delay logic) and benefits from being in one place
func (c *Coordinator) sendPRNotification(ctx context.Context, req dmNotificationRequest) error {
// Lock per user+PR to prevent concurrent goroutines from sending duplicate DMs
lockKey := req.UserID + ":" + req.PRURL
Expand All @@ -53,12 +53,24 @@ func (c *Coordinator) sendPRNotification(ctx context.Context, req dmNotification
"error", err)
}

// Find any pending DM for this user+PR
var pendingDM *state.PendingDM
// Find ALL pending DMs for this user+PR (there may be multiple if user tagged in multiple channels)
var matchingPendingDMs []*state.PendingDM
for i := range pendingDMs {
if pendingDMs[i].UserID == req.UserID && pendingDMs[i].PRURL == req.PRURL {
pendingDM = &pendingDMs[i]
break
matchingPendingDMs = append(matchingPendingDMs, &pendingDMs[i])
}
}

// Use the first pending DM for decision-making (they should all have same state)
var pendingDM *state.PendingDM
if len(matchingPendingDMs) > 0 {
pendingDM = matchingPendingDMs[0]
// Log if we found duplicates (indicates user was tagged in multiple channels)
if len(matchingPendingDMs) > 1 {
slog.Info("found multiple queued DMs for same user+PR (user tagged in multiple channels)",
"user", req.UserID,
"pr", req.PRURL,
"count", len(matchingPendingDMs))
}
}

Expand All @@ -73,19 +85,22 @@ func (c *Coordinator) sendPRNotification(ctx context.Context, req dmNotification
userStillBlocked = len(req.CheckResult.Analysis.NextAction) > 0
}

// If user no longer blocked, cancel the queued DM
// If user no longer blocked, cancel ALL queued DMs
if !userStillBlocked {
slog.Info("cancelling queued DM - user no longer blocked",
slog.Info("cancelling queued DMs - user no longer blocked",
"user", req.UserID,
"pr", req.PRURL,
"old_state", pendingDM.PRState,
"new_state", prState)
if err := c.stateStore.RemovePendingDM(ctx, pendingDM.ID); err != nil {
slog.Warn("failed to remove pending DM",
"user", req.UserID,
"pr", req.PRURL,
"dm_id", pendingDM.ID,
"error", err)
"new_state", prState,
"count", len(matchingPendingDMs))
for _, dm := range matchingPendingDMs {
if err := c.stateStore.RemovePendingDM(ctx, dm.ID); err != nil {
slog.Warn("failed to remove pending DM",
"user", req.UserID,
"pr", req.PRURL,
"dm_id", dm.ID,
"error", err)
}
}
return nil
}
Expand All @@ -97,23 +112,27 @@ func (c *Coordinator) sendPRNotification(ctx context.Context, req dmNotification
"pr", req.PRURL,
"old_state", pendingDM.PRState,
"new_state", prState,
"scheduled_send", pendingDM.SendAfter)
// Remove old queued DM and queue new one with updated state
if err := c.stateStore.RemovePendingDM(ctx, pendingDM.ID); err != nil {
slog.Warn("failed to remove pending DM for update",
"user", req.UserID,
"pr", req.PRURL,
"dm_id", pendingDM.ID,
"error", err)
// Continue anyway - attempt to queue new DM
"scheduled_send", pendingDM.SendAfter,
"removing_duplicates", len(matchingPendingDMs))
// Remove ALL old queued DMs and queue ONE new one with updated state
for _, dm := range matchingPendingDMs {
if err := c.stateStore.RemovePendingDM(ctx, dm.ID); err != nil {
slog.Warn("failed to remove pending DM for update",
"user", req.UserID,
"pr", req.PRURL,
"dm_id", dm.ID,
"error", err)
}
}
// Queue single new DM with updated state
return c.queueDMForUser(ctx, req, prState, pendingDM.SendAfter)
}
// State unchanged, queued DM is still valid
slog.Debug("DM already queued with same state",
"user", req.UserID,
"pr", req.PRURL,
"state", prState)
"state", prState,
"queued_count", len(matchingPendingDMs))
return nil
}

Expand Down Expand Up @@ -223,10 +242,28 @@ func (c *Coordinator) sendPRNotification(ctx context.Context, req dmNotification
// All updates failed - fall through to send new DM
}

// If we know a DM exists but couldn't find/update it, don't send duplicate
// This prevents duplicate DMs when history search fails or Slack API is slow
if exists {
slog.Warn("DM exists but couldn't be located or updated - skipping to prevent duplicate",
"user", req.UserID,
"pr", req.PRURL,
"last_state", lastNotif.LastState,
"new_state", prState,
"has_channel_id", lastNotif.ChannelID != "",
"has_message_ts", lastNotif.MessageTS != "",
"impact", "user may see stale state temporarily")
return nil
}

// Path 2: Send new DM (check delay logic)
shouldQueue, sendAfter := c.shouldDelayNewDM(ctx, req.UserID, req.ChannelID, req.ChannelName, req.Owner, req.Repo)

if shouldQueue {
// Cancel any existing pending DMs for this user+PR before queueing new one
// This ensures we never have duplicate queued DMs (e.g., from multiple channels)
c.cancelPendingDMs(ctx, req.UserID, req.PRURL)

// Queue for later delivery
slog.Info("queueing DM for delayed delivery",
"user", req.UserID,
Expand Down Expand Up @@ -307,39 +344,37 @@ func (c *Coordinator) findDMInHistory(ctx context.Context, userID, prURL string)

// shouldDelayNewDM determines if a new DM should be queued for later.
// Returns (shouldQueue bool, sendAfter time.Time).
// Simplified version of evaluateDMDelay - removes user presence checking and anti-spam.
// Channel membership is determined by caller - if channelID is non-empty, user was in at least one channel.
func (c *Coordinator) shouldDelayNewDM(
ctx context.Context,
userID, channelID, channelName string,
owner, _ string,
) (bool, time.Time) {
// Get configured delay for this channel (in minutes)
delayMinutes := c.configManager.ReminderDMDelay(owner, channelName)
delay := time.Duration(delayMinutes) * time.Minute

// If delay is 0, feature is disabled - send immediately
if delay == 0 {
return false, time.Time{}
}

// If user wasn't tagged in a channel, send immediately
// If channelID is empty, user wasn't in any channel we notified - send immediately
if channelID == "" {
slog.Debug("user not in any channel, sending DM immediately",
"user", userID)
return false, time.Time{}
}

// Check if user is in the channel where they were tagged
isInChannel := c.slack.IsUserInChannel(ctx, channelID, userID)
// User was in at least one channel - apply configured delay
delayMinutes := c.configManager.ReminderDMDelay(owner, channelName)
delay := time.Duration(delayMinutes) * time.Minute

// If user is NOT in channel, they can't see the tag - send immediately
if !isInChannel {
slog.Debug("user not in channel, sending DM immediately",
// If delay is 0, feature is disabled - send immediately even if user in channel
if delay == 0 {
slog.Debug("DM delay feature disabled, sending immediately",
"user", userID,
"channel", channelID)
"owner", owner)
return false, time.Time{}
}

// User is in channel - queue for delayed delivery
sendAfter := time.Now().Add(delay)
slog.Debug("user in channel, delaying DM",
"user", userID,
"delay_minutes", delayMinutes,
"send_after", sendAfter)
return true, sendAfter
}

Expand Down Expand Up @@ -446,10 +481,10 @@ func getSentAt(info state.DMInfo, exists bool) time.Time {

// sendDMNotificationsToTaggedUsers sends DM notifications to Slack users who were tagged in channels.
// This runs in a separate goroutine to avoid blocking event processing.
// Uses the simplified sendPRNotification() for all DM operations.
// Decides per-user whether to send immediately or delay based on channel membership.
func (c *Coordinator) sendDMNotificationsToTaggedUsers(
ctx context.Context, workspaceID, owner, repo string,
prNumber int, slackUsers map[string]bool,
prNumber int, taggedUsers map[string]UserTagInfo,
event struct {
Action string `json:"action"`
PullRequest struct {
Expand All @@ -468,25 +503,30 @@ func (c *Coordinator) sendDMNotificationsToTaggedUsers(
slog.Info("starting DM notification batch for tagged Slack users",
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber),
"workspace", workspaceID,
"user_count", len(slackUsers))
"user_count", len(taggedUsers))

sentCount := 0
failedCount := 0
delayedCount := 0

for slackUserID := range slackUsers {
// Get tag info to determine which channel the user was tagged in
for _, userInfo := range taggedUsers {
// Determine delay based on channel membership from THIS event
// If user is NOT in any channel we notified → send immediately
// If user IS in at least one channel → apply configured delay
var channelID string
if c.notifier != nil && c.notifier.Tracker != nil {
tagInfo := c.notifier.Tracker.LastUserPRChannelTag(workspaceID, slackUserID, owner, repo, prNumber)
channelID = tagInfo.ChannelID
if userInfo.IsInAnyChannel {
// User is in a channel - apply delay logic
// We don't need the specific channel, just any channel ID to trigger delay
// Use a placeholder to signal "user was in a channel"
channelID = "delay"
delayedCount++
}
// If IsInAnyChannel is false, channelID stays empty → immediate send

// ChannelName is not available (no reverse lookup), so pass empty string
// The delay logic will use the default config for the org
err := c.sendPRNotification(ctx, dmNotificationRequest{
UserID: slackUserID,
ChannelID: channelID,
ChannelName: "", // not available
UserID: userInfo.UserID,
ChannelID: channelID, // "delay" or empty based on channel membership
ChannelName: "", // not used
Owner: owner,
Repo: repo,
PRNumber: prNumber,
Expand All @@ -498,7 +538,7 @@ func (c *Coordinator) sendDMNotificationsToTaggedUsers(
if err != nil {
slog.Warn("failed to notify user",
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber),
"slack_user", slackUserID,
"slack_user", userInfo.UserID,
"error", err)
failedCount++
} else {
Expand All @@ -510,8 +550,9 @@ func (c *Coordinator) sendDMNotificationsToTaggedUsers(
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber),
"workspace", workspaceID,
"sent_count", sentCount,
"delayed_count", delayedCount,
"failed_count", failedCount,
"total_users", len(slackUsers))
"total_users", len(taggedUsers))
}

// sendDMNotificationsToBlockedUsers sends immediate DM notifications to blocked GitHub users.
Expand Down
Loading
Loading