From 4574295c7b07408e212f1a2a15f8aac4c50ff755 Mon Sep 17 00:00:00 2001 From: Thomas Stromberg Date: Tue, 21 Oct 2025 08:05:50 +0200 Subject: [PATCH] update status of DM'd PRs --- cmd/server/main.go | 2 + internal/bot/bot.go | 118 ++++++++++++++++++++++++++---------- internal/state/datastore.go | 62 ++++++++++++++++--- internal/state/json.go | 21 +++++++ internal/state/store.go | 1 + 5 files changed, 166 insertions(+), 38 deletions(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index 4ff284d..36b3372 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -167,6 +167,7 @@ func run(ctx context.Context, cancel context.CancelFunc, cfg *config.ServerConfi RecordDM(userID, prURL string, sentAt time.Time) error DMMessage(userID, prURL string) (state.DMInfo, bool) SaveDMMessage(userID, prURL string, info state.DMInfo) error + ListDMUsers(prURL string) []string LastDigest(userID, date string) (time.Time, bool) RecordDigest(userID, date string, sentAt time.Time) error WasProcessed(eventKey string) bool @@ -684,6 +685,7 @@ func runBotCoordinators( RecordDM(userID, prURL string, sentAt time.Time) error DMMessage(userID, prURL string) (state.DMInfo, bool) SaveDMMessage(userID, prURL string, info state.DMInfo) error + ListDMUsers(prURL string) []string LastDigest(userID, date string) (time.Time, bool) RecordDigest(userID, date string, sentAt time.Time) error WasProcessed(eventKey string) bool diff --git a/internal/bot/bot.go b/internal/bot/bot.go index 62656e6..e349614 100644 --- a/internal/bot/bot.go +++ b/internal/bot/bot.go @@ -102,6 +102,7 @@ type StateStore interface { SaveThread(owner, repo string, number int, channelID string, info ThreadInfo) error LastDM(userID, prURL string) (time.Time, bool) RecordDM(userID, prURL string, sentAt time.Time) error + ListDMUsers(prURL string) []string WasProcessed(eventKey string) bool MarkProcessed(eventKey string, ttl time.Duration) error LastNotification(prURL string) time.Time @@ -793,18 +794,82 @@ func (*Coordinator) extractBlockedUsersFromTurnclient(checkResult *turn.CheckRes return blockedUsers } -// updateDMMessagesForPR updates DM messages for all blocked users on a PR. -func (c *Coordinator) updateDMMessagesForPR(ctx context.Context, checkResult *turn.CheckResponse, owner, repo string, prNumber int, title, author, prState, prURL string) { - if checkResult == nil || len(checkResult.Analysis.NextAction) == 0 { - slog.Debug("no blocked users, skipping DM updates", - "pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber)) - return +// prUpdateInfo groups PR information for DM updates. +type prUpdateInfo struct { + checkRes *turn.CheckResponse + owner string + repo string + title string + author string + state string + url string + number int +} + +// updateDMMessagesForPR updates DM messages for all relevant users on a PR. +// For merged/closed PRs, updates all users who previously received DMs. +// For other states, updates users in NextAction (currently blocked). +func (c *Coordinator) updateDMMessagesForPR(ctx context.Context, pr prUpdateInfo) { + owner, repo, prNumber := pr.owner, pr.repo, pr.number + prState, prURL := pr.state, pr.url + checkResult := pr.checkRes + // Determine which users to update based on PR state + var slackUserIDs []string + + // For terminal states (merged/closed), update all users who received DMs + if prState == "merged" || prState == "closed" { + slackUserIDs = c.stateStore.ListDMUsers(prURL) + if len(slackUserIDs) == 0 { + slog.Debug("no DM recipients found for merged/closed PR", + "pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber), + "pr_state", prState) + return + } + slog.Info("updating DMs for merged/closed PR", + "pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber), + "pr_state", prState, + "dm_recipients", len(slackUserIDs)) + } else { + // For other states, update only users who are currently blocked + if checkResult == nil || len(checkResult.Analysis.NextAction) == 0 { + slog.Debug("no blocked users, skipping DM updates", + "pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber)) + return + } + + // Map GitHub users to Slack users + domain := c.configManager.Domain(owner) + for githubUser := range checkResult.Analysis.NextAction { + if githubUser == "_system" { + continue + } + + slackUserID, err := c.userMapper.SlackHandle(ctx, githubUser, owner, domain) + if err != nil || slackUserID == "" { + slog.Debug("no Slack mapping for GitHub user, skipping", + "github_user", githubUser, + "pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber), + "error", err) + continue + } + slackUserIDs = append(slackUserIDs, slackUserID) + } + + if len(slackUserIDs) == 0 { + slog.Debug("no Slack users found for blocked GitHub users", + "pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber)) + return + } } // Format the DM message (same format as initial send) prefix := notify.PrefixForState(prState) var action string switch prState { + case "merged": + action = "merged" + case "closed": + action = "closed" case "tests_broken": action = "fix tests" case "awaiting_review": @@ -820,41 +885,22 @@ func (c *Coordinator) updateDMMessagesForPR(ctx context.Context, checkResult *tu message := fmt.Sprintf( "%s %s <%s|%s#%d> · %s → %s", prefix, - title, + pr.title, prURL, repo, prNumber, - author, + pr.author, action, ) - // Update DM for each blocked user + // Update DM for each user updatedCount := 0 skippedCount := 0 - domain := c.configManager.Domain(owner) - - for githubUser := range checkResult.Analysis.NextAction { - // Skip _system user - if githubUser == "_system" { - continue - } - - // Map GitHub user to Slack user - slackUserID, err := c.userMapper.SlackHandle(ctx, githubUser, owner, domain) - if err != nil || slackUserID == "" { - slog.Debug("no Slack mapping for GitHub user, skipping DM update", - "github_user", githubUser, - "pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber), - "error", err) - skippedCount++ - continue - } - // Update the DM message + for _, slackUserID := range slackUserIDs { if err := c.slack.UpdateDMMessage(ctx, slackUserID, prURL, message); err != nil { slog.Debug("failed to update DM message", "user", slackUserID, - "github_user", githubUser, "pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber), "error", err, "reason", "DM may not exist or too old") @@ -869,7 +915,8 @@ func (c *Coordinator) updateDMMessagesForPR(ctx context.Context, checkResult *tu "pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber), "pr_state", prState, "updated", updatedCount, - "skipped", skippedCount) + "skipped", skippedCount, + "total_recipients", len(slackUserIDs)) } } @@ -1212,7 +1259,16 @@ func (c *Coordinator) processPRForChannel( "pr_state", prState) // Also update DM messages for blocked users - c.updateDMMessagesForPR(ctx, checkResult, owner, repo, prNumber, event.PullRequest.Title, event.PullRequest.User.Login, prState, event.PullRequest.HTMLURL) + c.updateDMMessagesForPR(ctx, prUpdateInfo{ + owner: owner, + repo: repo, + number: prNumber, + title: event.PullRequest.Title, + author: event.PullRequest.User.Login, + state: prState, + url: event.PullRequest.HTMLURL, + checkRes: checkResult, + }) } } else { slog.Debug("message already matches expected content, no update needed", diff --git a/internal/state/datastore.go b/internal/state/datastore.go index 49ea17d..36abf8e 100644 --- a/internal/state/datastore.go +++ b/internal/state/datastore.go @@ -4,6 +4,7 @@ import ( "context" "errors" "log/slog" + "strings" "time" "cloud.google.com/go/datastore" @@ -317,13 +318,7 @@ func (s *DatastoreStore) DMMessage(userID, prURL string) (DMInfo, bool) { } // Found in Datastore - update memory cache and return - result := DMInfo{ - ChannelID: entity.ChannelID, - MessageTS: entity.MessageTS, - MessageText: entity.MessageText, - UpdatedAt: entity.UpdatedAt, - SentAt: entity.SentAt, - } + result := DMInfo(entity) // Update memory cache async go func() { @@ -372,6 +367,59 @@ func (s *DatastoreStore) SaveDMMessage(userID, prURL string, info DMInfo) error return nil } +// ListDMUsers returns all user IDs who have received DMs for a given PR. +// Queries both memory cache and Datastore to ensure data persists across restarts. +func (s *DatastoreStore) ListDMUsers(prURL string) []string { + // Check memory cache first (fast path) + users := s.memory.ListDMUsers(prURL) + if len(users) > 0 || s.disabled || s.ds == nil { + return users + } + + // PERFORMANCE NOTE: Datastore queries are expensive and lack substring/regex filtering. + // We must fetch all DM message keys and filter client-side. This is acceptable because: + // 1. Uses KeysOnly() to minimize data transfer (no entity bodies) + // 2. Bounded by reasonable limit (1000 - PRs rarely have >100 participants) + // 3. Only runs on cache miss (typically at startup) + // 4. Results populate memory cache for future fast lookups + // + // Alternative considered: Ancestor queries require schema change (breaking existing data) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + query := datastore.NewQuery(kindDMMessage).KeysOnly().Limit(1000) + keys, err := s.ds.GetAll(ctx, query, nil) + if err != nil { + slog.Warn("failed to query Datastore for DM users", + "pr_url", prURL, + "error", err, + "fallback", "returning empty list") + return nil + } + + // Filter keys matching this PR URL + // Key format: "dm:{userID}:{prURL}" + suffix := ":" + prURL + filtered := make([]string, 0, min(len(keys), 50)) // Most PRs have <50 DM recipients + + for _, key := range keys { + if strings.HasSuffix(key.Name, suffix) { + // Extract userID from key + parts := strings.SplitN(key.Name, ":", 3) + if len(parts) == 3 { + filtered = append(filtered, parts[1]) + } + } + } + + slog.Debug("queried Datastore for DM users", + "pr_url", prURL, + "users_found", len(filtered), + "keys_scanned", len(keys)) + + return filtered +} + // LastDigest retrieves last digest time. func (s *DatastoreStore) LastDigest(userID, date string) (time.Time, bool) { // Check memory first diff --git a/internal/state/json.go b/internal/state/json.go index 7479d92..be582c9 100644 --- a/internal/state/json.go +++ b/internal/state/json.go @@ -164,6 +164,27 @@ func (s *JSONStore) SaveDMMessage(userID, prURL string, info DMInfo) error { return s.save() } +// ListDMUsers returns all user IDs who have received DMs for a given PR. +func (s *JSONStore) ListDMUsers(prURL string) []string { + s.mu.RLock() + defer s.mu.RUnlock() + + var users []string + suffix := ":" + prURL + + for key := range s.dmMessages { + if strings.HasSuffix(key, suffix) { + // Extract userID from key format "dm:{userID}:{prURL}" + parts := strings.SplitN(key, ":", 3) + if len(parts) == 3 { + users = append(users, parts[1]) + } + } + } + + return users +} + // LastDigest retrieves the last digest timestamp for a user and date. func (s *JSONStore) LastDigest(userID, date string) (time.Time, bool) { s.mu.RLock() diff --git a/internal/state/store.go b/internal/state/store.go index 6a76834..6c85275 100644 --- a/internal/state/store.go +++ b/internal/state/store.go @@ -38,6 +38,7 @@ type Store interface { // DM message tracking - store DM message info for updating DMMessage(userID, prURL string) (DMInfo, bool) SaveDMMessage(userID, prURL string, info DMInfo) error + ListDMUsers(prURL string) []string // Daily digest tracking - one per user per day LastDigest(userID, date string) (time.Time, bool)