Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ func run(ctx context.Context, cancel context.CancelFunc, cfg *config.ServerConfi
RecordDM(userID, prURL string, sentAt time.Time) error
DMMessage(userID, prURL string) (state.DMInfo, bool)
SaveDMMessage(userID, prURL string, info state.DMInfo) error
ListDMUsers(prURL string) []string
LastDigest(userID, date string) (time.Time, bool)
RecordDigest(userID, date string, sentAt time.Time) error
WasProcessed(eventKey string) bool
Expand Down Expand Up @@ -684,6 +685,7 @@ func runBotCoordinators(
RecordDM(userID, prURL string, sentAt time.Time) error
DMMessage(userID, prURL string) (state.DMInfo, bool)
SaveDMMessage(userID, prURL string, info state.DMInfo) error
ListDMUsers(prURL string) []string
LastDigest(userID, date string) (time.Time, bool)
RecordDigest(userID, date string, sentAt time.Time) error
WasProcessed(eventKey string) bool
Expand Down
118 changes: 87 additions & 31 deletions internal/bot/bot.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type StateStore interface {
SaveThread(owner, repo string, number int, channelID string, info ThreadInfo) error
LastDM(userID, prURL string) (time.Time, bool)
RecordDM(userID, prURL string, sentAt time.Time) error
ListDMUsers(prURL string) []string
WasProcessed(eventKey string) bool
MarkProcessed(eventKey string, ttl time.Duration) error
LastNotification(prURL string) time.Time
Expand Down Expand Up @@ -793,18 +794,82 @@ func (*Coordinator) extractBlockedUsersFromTurnclient(checkResult *turn.CheckRes
return blockedUsers
}

// updateDMMessagesForPR updates DM messages for all blocked users on a PR.
func (c *Coordinator) updateDMMessagesForPR(ctx context.Context, checkResult *turn.CheckResponse, owner, repo string, prNumber int, title, author, prState, prURL string) {
if checkResult == nil || len(checkResult.Analysis.NextAction) == 0 {
slog.Debug("no blocked users, skipping DM updates",
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber))
return
// prUpdateInfo groups PR information for DM updates.
type prUpdateInfo struct {
checkRes *turn.CheckResponse
owner string
repo string
title string
author string
state string
url string
number int
}

// updateDMMessagesForPR updates DM messages for all relevant users on a PR.
// For merged/closed PRs, updates all users who previously received DMs.
// For other states, updates users in NextAction (currently blocked).
func (c *Coordinator) updateDMMessagesForPR(ctx context.Context, pr prUpdateInfo) {
owner, repo, prNumber := pr.owner, pr.repo, pr.number
prState, prURL := pr.state, pr.url
checkResult := pr.checkRes
// Determine which users to update based on PR state
var slackUserIDs []string

// For terminal states (merged/closed), update all users who received DMs
if prState == "merged" || prState == "closed" {
slackUserIDs = c.stateStore.ListDMUsers(prURL)
if len(slackUserIDs) == 0 {
slog.Debug("no DM recipients found for merged/closed PR",
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber),
"pr_state", prState)
return
}
slog.Info("updating DMs for merged/closed PR",
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber),
"pr_state", prState,
"dm_recipients", len(slackUserIDs))
} else {
// For other states, update only users who are currently blocked
if checkResult == nil || len(checkResult.Analysis.NextAction) == 0 {
slog.Debug("no blocked users, skipping DM updates",
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber))
return
}

// Map GitHub users to Slack users
domain := c.configManager.Domain(owner)
for githubUser := range checkResult.Analysis.NextAction {
if githubUser == "_system" {
continue
}

slackUserID, err := c.userMapper.SlackHandle(ctx, githubUser, owner, domain)
if err != nil || slackUserID == "" {
slog.Debug("no Slack mapping for GitHub user, skipping",
"github_user", githubUser,
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber),
"error", err)
continue
}
slackUserIDs = append(slackUserIDs, slackUserID)
}

if len(slackUserIDs) == 0 {
slog.Debug("no Slack users found for blocked GitHub users",
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber))
return
}
}

// Format the DM message (same format as initial send)
prefix := notify.PrefixForState(prState)
var action string
switch prState {
case "merged":
action = "merged"
case "closed":
action = "closed"
case "tests_broken":
action = "fix tests"
case "awaiting_review":
Expand All @@ -820,41 +885,22 @@ func (c *Coordinator) updateDMMessagesForPR(ctx context.Context, checkResult *tu
message := fmt.Sprintf(
"%s %s <%s|%s#%d> · %s → %s",
prefix,
title,
pr.title,
prURL,
repo,
prNumber,
author,
pr.author,
action,
)

// Update DM for each blocked user
// Update DM for each user
updatedCount := 0
skippedCount := 0
domain := c.configManager.Domain(owner)

for githubUser := range checkResult.Analysis.NextAction {
// Skip _system user
if githubUser == "_system" {
continue
}

// Map GitHub user to Slack user
slackUserID, err := c.userMapper.SlackHandle(ctx, githubUser, owner, domain)
if err != nil || slackUserID == "" {
slog.Debug("no Slack mapping for GitHub user, skipping DM update",
"github_user", githubUser,
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber),
"error", err)
skippedCount++
continue
}

// Update the DM message
for _, slackUserID := range slackUserIDs {
if err := c.slack.UpdateDMMessage(ctx, slackUserID, prURL, message); err != nil {
slog.Debug("failed to update DM message",
"user", slackUserID,
"github_user", githubUser,
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber),
"error", err,
"reason", "DM may not exist or too old")
Expand All @@ -869,7 +915,8 @@ func (c *Coordinator) updateDMMessagesForPR(ctx context.Context, checkResult *tu
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber),
"pr_state", prState,
"updated", updatedCount,
"skipped", skippedCount)
"skipped", skippedCount,
"total_recipients", len(slackUserIDs))
}
}

Expand Down Expand Up @@ -1212,7 +1259,16 @@ func (c *Coordinator) processPRForChannel(
"pr_state", prState)

// Also update DM messages for blocked users
c.updateDMMessagesForPR(ctx, checkResult, owner, repo, prNumber, event.PullRequest.Title, event.PullRequest.User.Login, prState, event.PullRequest.HTMLURL)
c.updateDMMessagesForPR(ctx, prUpdateInfo{
owner: owner,
repo: repo,
number: prNumber,
title: event.PullRequest.Title,
author: event.PullRequest.User.Login,
state: prState,
url: event.PullRequest.HTMLURL,
checkRes: checkResult,
})
}
} else {
slog.Debug("message already matches expected content, no update needed",
Expand Down
62 changes: 55 additions & 7 deletions internal/state/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"log/slog"
"strings"
"time"

"cloud.google.com/go/datastore"
Expand Down Expand Up @@ -317,13 +318,7 @@ func (s *DatastoreStore) DMMessage(userID, prURL string) (DMInfo, bool) {
}

// Found in Datastore - update memory cache and return
result := DMInfo{
ChannelID: entity.ChannelID,
MessageTS: entity.MessageTS,
MessageText: entity.MessageText,
UpdatedAt: entity.UpdatedAt,
SentAt: entity.SentAt,
}
result := DMInfo(entity)

// Update memory cache async
go func() {
Expand Down Expand Up @@ -372,6 +367,59 @@ func (s *DatastoreStore) SaveDMMessage(userID, prURL string, info DMInfo) error
return nil
}

// ListDMUsers returns all user IDs who have received DMs for a given PR.
// Queries both memory cache and Datastore to ensure data persists across restarts.
func (s *DatastoreStore) ListDMUsers(prURL string) []string {
// Check memory cache first (fast path)
users := s.memory.ListDMUsers(prURL)
if len(users) > 0 || s.disabled || s.ds == nil {
return users
}

// PERFORMANCE NOTE: Datastore queries are expensive and lack substring/regex filtering.
// We must fetch all DM message keys and filter client-side. This is acceptable because:
// 1. Uses KeysOnly() to minimize data transfer (no entity bodies)
// 2. Bounded by reasonable limit (1000 - PRs rarely have >100 participants)
// 3. Only runs on cache miss (typically at startup)
// 4. Results populate memory cache for future fast lookups
//
// Alternative considered: Ancestor queries require schema change (breaking existing data)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

query := datastore.NewQuery(kindDMMessage).KeysOnly().Limit(1000)
keys, err := s.ds.GetAll(ctx, query, nil)
if err != nil {
slog.Warn("failed to query Datastore for DM users",
"pr_url", prURL,
"error", err,
"fallback", "returning empty list")
return nil
}

// Filter keys matching this PR URL
// Key format: "dm:{userID}:{prURL}"
suffix := ":" + prURL
filtered := make([]string, 0, min(len(keys), 50)) // Most PRs have <50 DM recipients

for _, key := range keys {
if strings.HasSuffix(key.Name, suffix) {
// Extract userID from key
parts := strings.SplitN(key.Name, ":", 3)
if len(parts) == 3 {
filtered = append(filtered, parts[1])
}
}
}

slog.Debug("queried Datastore for DM users",
"pr_url", prURL,
"users_found", len(filtered),
"keys_scanned", len(keys))

return filtered
}

// LastDigest retrieves last digest time.
func (s *DatastoreStore) LastDigest(userID, date string) (time.Time, bool) {
// Check memory first
Expand Down
21 changes: 21 additions & 0 deletions internal/state/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,27 @@ func (s *JSONStore) SaveDMMessage(userID, prURL string, info DMInfo) error {
return s.save()
}

// ListDMUsers returns all user IDs who have received DMs for a given PR.
func (s *JSONStore) ListDMUsers(prURL string) []string {
s.mu.RLock()
defer s.mu.RUnlock()

var users []string
suffix := ":" + prURL

for key := range s.dmMessages {
if strings.HasSuffix(key, suffix) {
// Extract userID from key format "dm:{userID}:{prURL}"
parts := strings.SplitN(key, ":", 3)
if len(parts) == 3 {
users = append(users, parts[1])
}
}
}

return users
}

// LastDigest retrieves the last digest timestamp for a user and date.
func (s *JSONStore) LastDigest(userID, date string) (time.Time, bool) {
s.mu.RLock()
Expand Down
1 change: 1 addition & 0 deletions internal/state/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Store interface {
// DM message tracking - store DM message info for updating
DMMessage(userID, prURL string) (DMInfo, bool)
SaveDMMessage(userID, prURL string, info DMInfo) error
ListDMUsers(prURL string) []string

// Daily digest tracking - one per user per day
LastDigest(userID, date string) (time.Time, bool)
Expand Down