Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,14 @@ func run(ctx context.Context, cancel context.CancelFunc, cfg *config.ServerConfi
// Tokens are fetched from GSM based on team_id from org configs.
slackManager := slack.NewManager(cfg.SlackSigningSecret)

// Initialize state store (Datastore + JSON fallback).
// Initialize state store (in-memory + Datastore or JSON for persistence).
var stateStore interface {
Thread(owner, repo string, number int, channelID string) (state.ThreadInfo, bool)
SaveThread(owner, repo string, number int, channelID string, info state.ThreadInfo) error
LastDM(userID, prURL string) (time.Time, bool)
RecordDM(userID, prURL string, sentAt time.Time) error
DMMessage(userID, prURL string) (state.DMInfo, bool)
SaveDMMessage(userID, prURL string, info state.DMInfo) error
LastDigest(userID, date string) (time.Time, bool)
RecordDigest(userID, date string, sentAt time.Time) error
WasProcessed(eventKey string) bool
Expand Down Expand Up @@ -196,35 +198,39 @@ func run(ctx context.Context, cancel context.CancelFunc, cfg *config.ServerConfi
}

if datastoreDB != "" && projectID != "" {
slog.Info("initializing Cloud Datastore for persistent state",
slog.Info("initializing Cloud Datastore for persistent state (with in-memory cache)",
"project_id", projectID,
"database", datastoreDB)
"database", datastoreDB,
"cache", "in-memory")
var err error
stateStore, err = state.NewDatastoreStore(ctx, projectID, datastoreDB)
if err != nil {
// FATAL: If DATASTORE is explicitly configured, fail startup on initialization errors.
// This prevents silent fallback to JSON-only mode which causes duplicate messages
// This prevents silent fallback to memory-only mode which causes duplicate messages
// during rolling deployments (no cross-instance event deduplication).
slog.Error("FATAL: failed to initialize Cloud Datastore - DATASTORE variable is set but initialization failed",
"project_id", projectID,
"database", datastoreDB,
"error", err)
"error", err,
"note", "Set DATASTORE='' to use JSON files instead")
cancel()
return 1
}
slog.Info("successfully initialized Cloud Datastore",
slog.Info("successfully initialized Cloud Datastore with in-memory cache",
"project_id", projectID,
"database", datastoreDB)
"database", datastoreDB,
"mode", "hybrid: in-memory + Datastore")
} else {
var reason string
if datastoreDB == "" {
reason = "DATASTORE not set"
} else {
reason = "GCP_PROJECT not set and could not auto-detect"
}
slog.Info("using JSON files for state storage",
slog.Info("using JSON files for persistent state (with in-memory cache)",
"path", "os.UserCacheDir()/slacker/state",
"reason", reason)
"reason", reason,
"mode", "hybrid: in-memory + JSON files")
var err error
stateStore, err = state.NewJSONStore()
if err != nil {
Expand All @@ -241,6 +247,10 @@ func run(ctx context.Context, cancel context.CancelFunc, cfg *config.ServerConfi
}
}()

// Set state store on Slack manager for DM message tracking
slackManager.SetStateStore(stateStore)
slog.Info("configured Slack manager with state store for DM tracking")

// Initialize notification manager for multi-workspace notifications.
notifier := notify.New(slackManager, configManager)

Expand Down Expand Up @@ -672,6 +682,8 @@ func runBotCoordinators(
SaveThread(owner, repo string, number int, channelID string, info state.ThreadInfo) error
LastDM(userID, prURL string) (time.Time, bool)
RecordDM(userID, prURL string, sentAt time.Time) error
DMMessage(userID, prURL string) (state.DMInfo, bool)
SaveDMMessage(userID, prURL string, info state.DMInfo) error
LastDigest(userID, date string) (time.Time, bool)
RecordDigest(userID, date string, sentAt time.Time) error
WasProcessed(eventKey string) bool
Expand Down
167 changes: 141 additions & 26 deletions internal/bot/bot.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,86 @@ func (*Coordinator) extractBlockedUsersFromTurnclient(checkResult *turn.CheckRes
return blockedUsers
}

// updateDMMessagesForPR updates DM messages for all blocked users on a PR.
func (c *Coordinator) updateDMMessagesForPR(ctx context.Context, checkResult *turn.CheckResponse, owner, repo string, prNumber int, title, author, prState, prURL string) {
if checkResult == nil || len(checkResult.Analysis.NextAction) == 0 {
slog.Debug("no blocked users, skipping DM updates",
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber))
return
}

// Format the DM message (same format as initial send)
prefix := notify.PrefixForState(prState)
var action string
switch prState {
case "tests_broken":
action = "fix tests"
case "awaiting_review":
action = "review"
case "changes_requested":
action = "address feedback"
case "approved":
action = "merge"
default:
action = "attention needed"
}

message := fmt.Sprintf(
"%s %s <%s|%s#%d> · %s → %s",
prefix,
title,
prURL,
repo,
prNumber,
author,
action,
)

// Update DM for each blocked user
updatedCount := 0
skippedCount := 0
domain := c.configManager.Domain(owner)

for githubUser := range checkResult.Analysis.NextAction {
// Skip _system user
if githubUser == "_system" {
continue
}

// Map GitHub user to Slack user
slackUserID, err := c.userMapper.SlackHandle(ctx, githubUser, owner, domain)
if err != nil || slackUserID == "" {
slog.Debug("no Slack mapping for GitHub user, skipping DM update",
"github_user", githubUser,
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber),
"error", err)
skippedCount++
continue
}

// Update the DM message
if err := c.slack.UpdateDMMessage(ctx, slackUserID, prURL, message); err != nil {
slog.Debug("failed to update DM message",
"user", slackUserID,
"github_user", githubUser,
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber),
"error", err,
"reason", "DM may not exist or too old")
skippedCount++
} else {
updatedCount++
}
}

if updatedCount > 0 {
slog.Info("updated DM messages for PR state change",
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber),
"pr_state", prState,
"updated", updatedCount,
"skipped", skippedCount)
}
}

// formatNextActions formats NextAction map into a compact string like "fix tests: user1, user2; review: user3".
// It groups users by action kind and formats each action as "action_name: user1, user2".
// Multiple actions are separated by semicolons.
Expand Down Expand Up @@ -888,10 +968,23 @@ func (c *Coordinator) processChannelsInParallel(
var validChannels []string
for _, channelName := range channels {
channelID := c.slack.ResolveChannelID(ctx, channelName)

// Check if channel resolution failed (returns original name if not found)
if channelID == channelName || (channelName != "" && channelName[0] == '#' && channelID == channelName[1:]) {
slog.Warn("could not resolve channel - may not exist or bot lacks permissions",
"workspace", c.workspaceName,
logFieldPR, fmt.Sprintf(prFormatString, prCtx.Owner, prCtx.Repo, prCtx.Number),
"channel", channelName,
"action_required", "verify channel exists and bot has access")
continue
}

if c.slack.IsBotInChannel(ctx, channelID) {
validChannels = append(validChannels, channelName)
} else {
slog.Warn("skipping channel - bot not a member",
"workspace", c.workspaceName,
logFieldPR, fmt.Sprintf(prFormatString, prCtx.Owner, prCtx.Repo, prCtx.Number),
"channel", channelName,
"channel_id", channelID,
"action_required", "invite bot to channel")
Expand Down Expand Up @@ -960,6 +1053,16 @@ func (c *Coordinator) processPRForChannel(
// Resolve channel name to ID for API calls
channelID := c.slack.ResolveChannelID(ctx, channelName)

// Check if channel resolution failed
if channelID == channelName || (channelName != "" && channelName[0] == '#' && channelID == channelName[1:]) {
slog.Warn("could not resolve channel for PR processing",
"workspace", c.workspaceName,
logFieldPR, fmt.Sprintf(prFormatString, owner, repo, prNumber),
"channel", channelName,
"action_required", "verify channel exists and bot has access")
return
}

// For display purposes, show both name and ID
var channelDisplay string
switch {
Expand Down Expand Up @@ -1025,30 +1128,28 @@ func (c *Coordinator) processPRForChannel(
blockedUsers := c.extractBlockedUsersFromTurnclient(checkResult)
domain := c.configManager.Domain(owner)
if len(blockedUsers) > 0 {
// Run email lookups in background to avoid blocking message delivery
// SECURITY NOTE: Use detached context to complete email lookups even during shutdown.
// Operations bounded by 15-second timeout. This ensures reasonably fast shutdown while
// completing active lookups for accurate DM delivery (most lookups hit cache instantly,
// but occasional cold lookups can take 10+ seconds).
lookupCtx, lookupCancel := context.WithTimeout(context.WithoutCancel(ctx), 15*time.Second)
go func() {
defer lookupCancel()
for _, githubUser := range blockedUsers {
// Map GitHub username to Slack user ID
slackUserID, err := c.userMapper.SlackHandle(lookupCtx, githubUser, owner, domain)
if err == nil && slackUserID != "" {
// Track with channelID - this will only update on FIRST call per user/PR
c.notifier.Tracker.UpdateUserPRChannelTag(workspaceID, slackUserID, channelID, owner, repo, prNumber)
slog.Debug("tracked user tag in channel (async)",
"workspace", workspaceID,
"github_user", githubUser,
"slack_user", slackUserID,
"channel", channelDisplay,
"channel_id", channelID,
"pr", fmt.Sprintf(prFormatString, owner, repo, prNumber))
}
// Record tags for blocked users synchronously to prevent race with DM sending
// This must complete BEFORE DM notifications check tag info
// Note: Most lookups hit cache and are instant; occasional cold lookups may delay slightly
// but this is necessary for correct DM delay logic
lookupCtx, lookupCancel := context.WithTimeout(ctx, 5*time.Second)
defer lookupCancel()

for _, githubUser := range blockedUsers {
// Map GitHub username to Slack user ID
slackUserID, err := c.userMapper.SlackHandle(lookupCtx, githubUser, owner, domain)
if err == nil && slackUserID != "" {
// Track with channelID - this will only update on FIRST call per user/PR
c.notifier.Tracker.UpdateUserPRChannelTag(workspaceID, slackUserID, channelID, owner, repo, prNumber)
slog.Debug("tracked user tag in channel",
"workspace", workspaceID,
"github_user", githubUser,
"slack_user", slackUserID,
"channel", channelDisplay,
"channel_id", channelID,
"pr", fmt.Sprintf(prFormatString, owner, repo, prNumber))
}
}()
}
}

// Build what the message SHOULD be based on current PR state
Expand Down Expand Up @@ -1109,6 +1210,9 @@ func (c *Coordinator) processPRForChannel(
"channel_id", channelID,
"thread_ts", threadTS,
"pr_state", prState)

// Also update DM messages for blocked users
c.updateDMMessagesForPR(ctx, checkResult, owner, repo, prNumber, event.PullRequest.Title, event.PullRequest.User.Login, prState, event.PullRequest.HTMLURL)
}
} else {
slog.Debug("message already matches expected content, no update needed",
Expand Down Expand Up @@ -1291,10 +1395,21 @@ func (c *Coordinator) createPRThread(ctx context.Context, channel, owner, repo s

// Resolve channel name to ID for consistent API calls
resolvedChannel := c.slack.ResolveChannelID(ctx, channel)
if resolvedChannel != channel {
slog.Debug("resolved channel for thread creation", "original", channel, "resolved", resolvedChannel)

// Check if channel resolution failed (returns original name if not found)
if resolvedChannel == channel || (channel != "" && channel[0] == '#' && resolvedChannel == channel[1:]) {
// Only warn if it's not already a channel ID
if channel == "" || channel[0] != 'C' {
slog.Warn("could not resolve channel for thread creation",
"workspace", c.workspaceName,
logFieldPR, fmt.Sprintf(prFormatString, owner, repo, number),
"channel", channel,
"action_required", "verify channel exists and bot has access")
return "", "", fmt.Errorf("could not resolve channel: %s", channel)
}
slog.Debug("channel is already a channel ID", "channel_id", channel)
} else {
slog.Debug("channel resolution did not change value", "channel", channel, "might_be_channel_id_already", resolvedChannel[0] == 'C')
slog.Debug("resolved channel for thread creation", "original", channel, "resolved", resolvedChannel)
}

// Create thread with resolved channel ID - post immediately without waiting for user lookups
Expand Down
15 changes: 11 additions & 4 deletions internal/bot/polling.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,17 @@ func (c *Coordinator) updateClosedPRThread(ctx context.Context, pr *github.PRSna
n := 0
for _, ch := range channels {
id := c.slack.ResolveChannelID(ctx, ch)
if id == "" {
slog.Debug("could not resolve channel ID for closed PR thread update",
"channel_name", ch,
"pr", prKey)

// Check if channel resolution failed (returns original name if not found)
if id == ch || (ch != "" && ch[0] == '#' && id == ch[1:]) {
slog.Warn("could not resolve channel for closed PR thread update",
"workspace", c.workspaceName,
"pr", prKey,
"owner", pr.Owner,
"repo", pr.Repo,
"number", pr.Number,
"channel", ch,
"action_required", "verify channel exists and bot has access")
continue
}

Expand Down
16 changes: 14 additions & 2 deletions internal/notify/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,8 @@ func (m *Manager) NotifyUser(ctx context.Context, workspaceID, userID, channelID
}

// Send DM to user.
if err := slackClient.SendDirectMessage(ctx, userID, message); err != nil {
dmChannelID, messageTS, err := slackClient.SendDirectMessage(ctx, userID, message)
if err != nil {
slog.Error("failed to send DM notification",
"user", userID,
"pr", fmt.Sprintf("%s/%s#%d", pr.Owner, pr.Repo, pr.Number),
Expand All @@ -309,12 +310,23 @@ func (m *Manager) NotifyUser(ctx context.Context, workspaceID, userID, channelID
// Update last DM notification time.
m.Tracker.UpdateDMNotification(workspaceID, userID)

// Save DM message info for future updates
if err := slackClient.SaveDMMessageInfo(ctx, userID, pr.HTMLURL, dmChannelID, messageTS, message); err != nil {
slog.Warn("failed to save DM message info",
"user", userID,
"pr", fmt.Sprintf("%s/%s#%d", pr.Owner, pr.Repo, pr.Number),
"error", err,
"impact", "DM won't be updated on state changes")
}

slog.Info("successfully sent DM notification",
"user", userID,
"pr", fmt.Sprintf("%s/%s#%d", pr.Owner, pr.Repo, pr.Number),
"pr_author", pr.Author,
"pr_state", pr.State,
"action_required", action,
"notification_updated", true)
"notification_updated", true,
"dm_channel_id", dmChannelID,
"message_ts", messageTS)
return nil
}
Loading