From a4fbca70a32fe5651f490e80040f58bbd40e9e15 Mon Sep 17 00:00:00 2001 From: Thomas Stromberg Date: Mon, 20 Oct 2025 19:59:34 +0200 Subject: [PATCH] Update states in DMs --- cmd/server/main.go | 30 +++-- internal/bot/bot.go | 167 ++++++++++++++++++++++----- internal/bot/polling.go | 15 ++- internal/notify/notify.go | 16 ++- internal/slack/manager.go | 25 ++++ internal/slack/slack.go | 103 +++++++++++++++-- internal/state/datastore.go | 224 +++++++++++++++++++++++++----------- internal/state/json.go | 41 ++++++- internal/state/store.go | 13 +++ 9 files changed, 516 insertions(+), 118 deletions(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index 0504703..a5bc44d 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -159,12 +159,14 @@ func run(ctx context.Context, cancel context.CancelFunc, cfg *config.ServerConfi // Tokens are fetched from GSM based on team_id from org configs. slackManager := slack.NewManager(cfg.SlackSigningSecret) - // Initialize state store (Datastore + JSON fallback). + // Initialize state store (in-memory + Datastore or JSON for persistence). var stateStore interface { Thread(owner, repo string, number int, channelID string) (state.ThreadInfo, bool) SaveThread(owner, repo string, number int, channelID string, info state.ThreadInfo) error LastDM(userID, prURL string) (time.Time, bool) RecordDM(userID, prURL string, sentAt time.Time) error + DMMessage(userID, prURL string) (state.DMInfo, bool) + SaveDMMessage(userID, prURL string, info state.DMInfo) error LastDigest(userID, date string) (time.Time, bool) RecordDigest(userID, date string, sentAt time.Time) error WasProcessed(eventKey string) bool @@ -196,25 +198,28 @@ func run(ctx context.Context, cancel context.CancelFunc, cfg *config.ServerConfi } if datastoreDB != "" && projectID != "" { - slog.Info("initializing Cloud Datastore for persistent state", + slog.Info("initializing Cloud Datastore for persistent state (with in-memory cache)", "project_id", projectID, - "database", datastoreDB) + "database", datastoreDB, + "cache", "in-memory") var err error stateStore, err = state.NewDatastoreStore(ctx, projectID, datastoreDB) if err != nil { // FATAL: If DATASTORE is explicitly configured, fail startup on initialization errors. - // This prevents silent fallback to JSON-only mode which causes duplicate messages + // This prevents silent fallback to memory-only mode which causes duplicate messages // during rolling deployments (no cross-instance event deduplication). slog.Error("FATAL: failed to initialize Cloud Datastore - DATASTORE variable is set but initialization failed", "project_id", projectID, "database", datastoreDB, - "error", err) + "error", err, + "note", "Set DATASTORE='' to use JSON files instead") cancel() return 1 } - slog.Info("successfully initialized Cloud Datastore", + slog.Info("successfully initialized Cloud Datastore with in-memory cache", "project_id", projectID, - "database", datastoreDB) + "database", datastoreDB, + "mode", "hybrid: in-memory + Datastore") } else { var reason string if datastoreDB == "" { @@ -222,9 +227,10 @@ func run(ctx context.Context, cancel context.CancelFunc, cfg *config.ServerConfi } else { reason = "GCP_PROJECT not set and could not auto-detect" } - slog.Info("using JSON files for state storage", + slog.Info("using JSON files for persistent state (with in-memory cache)", "path", "os.UserCacheDir()/slacker/state", - "reason", reason) + "reason", reason, + "mode", "hybrid: in-memory + JSON files") var err error stateStore, err = state.NewJSONStore() if err != nil { @@ -241,6 +247,10 @@ func run(ctx context.Context, cancel context.CancelFunc, cfg *config.ServerConfi } }() + // Set state store on Slack manager for DM message tracking + slackManager.SetStateStore(stateStore) + slog.Info("configured Slack manager with state store for DM tracking") + // Initialize notification manager for multi-workspace notifications. notifier := notify.New(slackManager, configManager) @@ -672,6 +682,8 @@ func runBotCoordinators( SaveThread(owner, repo string, number int, channelID string, info state.ThreadInfo) error LastDM(userID, prURL string) (time.Time, bool) RecordDM(userID, prURL string, sentAt time.Time) error + DMMessage(userID, prURL string) (state.DMInfo, bool) + SaveDMMessage(userID, prURL string, info state.DMInfo) error LastDigest(userID, date string) (time.Time, bool) RecordDigest(userID, date string, sentAt time.Time) error WasProcessed(eventKey string) bool diff --git a/internal/bot/bot.go b/internal/bot/bot.go index fc27952..62656e6 100644 --- a/internal/bot/bot.go +++ b/internal/bot/bot.go @@ -793,6 +793,86 @@ func (*Coordinator) extractBlockedUsersFromTurnclient(checkResult *turn.CheckRes return blockedUsers } +// updateDMMessagesForPR updates DM messages for all blocked users on a PR. +func (c *Coordinator) updateDMMessagesForPR(ctx context.Context, checkResult *turn.CheckResponse, owner, repo string, prNumber int, title, author, prState, prURL string) { + if checkResult == nil || len(checkResult.Analysis.NextAction) == 0 { + slog.Debug("no blocked users, skipping DM updates", + "pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber)) + return + } + + // Format the DM message (same format as initial send) + prefix := notify.PrefixForState(prState) + var action string + switch prState { + case "tests_broken": + action = "fix tests" + case "awaiting_review": + action = "review" + case "changes_requested": + action = "address feedback" + case "approved": + action = "merge" + default: + action = "attention needed" + } + + message := fmt.Sprintf( + "%s %s <%s|%s#%d> · %s → %s", + prefix, + title, + prURL, + repo, + prNumber, + author, + action, + ) + + // Update DM for each blocked user + updatedCount := 0 + skippedCount := 0 + domain := c.configManager.Domain(owner) + + for githubUser := range checkResult.Analysis.NextAction { + // Skip _system user + if githubUser == "_system" { + continue + } + + // Map GitHub user to Slack user + slackUserID, err := c.userMapper.SlackHandle(ctx, githubUser, owner, domain) + if err != nil || slackUserID == "" { + slog.Debug("no Slack mapping for GitHub user, skipping DM update", + "github_user", githubUser, + "pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber), + "error", err) + skippedCount++ + continue + } + + // Update the DM message + if err := c.slack.UpdateDMMessage(ctx, slackUserID, prURL, message); err != nil { + slog.Debug("failed to update DM message", + "user", slackUserID, + "github_user", githubUser, + "pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber), + "error", err, + "reason", "DM may not exist or too old") + skippedCount++ + } else { + updatedCount++ + } + } + + if updatedCount > 0 { + slog.Info("updated DM messages for PR state change", + "pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber), + "pr_state", prState, + "updated", updatedCount, + "skipped", skippedCount) + } +} + // formatNextActions formats NextAction map into a compact string like "fix tests: user1, user2; review: user3". // It groups users by action kind and formats each action as "action_name: user1, user2". // Multiple actions are separated by semicolons. @@ -888,10 +968,23 @@ func (c *Coordinator) processChannelsInParallel( var validChannels []string for _, channelName := range channels { channelID := c.slack.ResolveChannelID(ctx, channelName) + + // Check if channel resolution failed (returns original name if not found) + if channelID == channelName || (channelName != "" && channelName[0] == '#' && channelID == channelName[1:]) { + slog.Warn("could not resolve channel - may not exist or bot lacks permissions", + "workspace", c.workspaceName, + logFieldPR, fmt.Sprintf(prFormatString, prCtx.Owner, prCtx.Repo, prCtx.Number), + "channel", channelName, + "action_required", "verify channel exists and bot has access") + continue + } + if c.slack.IsBotInChannel(ctx, channelID) { validChannels = append(validChannels, channelName) } else { slog.Warn("skipping channel - bot not a member", + "workspace", c.workspaceName, + logFieldPR, fmt.Sprintf(prFormatString, prCtx.Owner, prCtx.Repo, prCtx.Number), "channel", channelName, "channel_id", channelID, "action_required", "invite bot to channel") @@ -960,6 +1053,16 @@ func (c *Coordinator) processPRForChannel( // Resolve channel name to ID for API calls channelID := c.slack.ResolveChannelID(ctx, channelName) + // Check if channel resolution failed + if channelID == channelName || (channelName != "" && channelName[0] == '#' && channelID == channelName[1:]) { + slog.Warn("could not resolve channel for PR processing", + "workspace", c.workspaceName, + logFieldPR, fmt.Sprintf(prFormatString, owner, repo, prNumber), + "channel", channelName, + "action_required", "verify channel exists and bot has access") + return + } + // For display purposes, show both name and ID var channelDisplay string switch { @@ -1025,30 +1128,28 @@ func (c *Coordinator) processPRForChannel( blockedUsers := c.extractBlockedUsersFromTurnclient(checkResult) domain := c.configManager.Domain(owner) if len(blockedUsers) > 0 { - // Run email lookups in background to avoid blocking message delivery - // SECURITY NOTE: Use detached context to complete email lookups even during shutdown. - // Operations bounded by 15-second timeout. This ensures reasonably fast shutdown while - // completing active lookups for accurate DM delivery (most lookups hit cache instantly, - // but occasional cold lookups can take 10+ seconds). - lookupCtx, lookupCancel := context.WithTimeout(context.WithoutCancel(ctx), 15*time.Second) - go func() { - defer lookupCancel() - for _, githubUser := range blockedUsers { - // Map GitHub username to Slack user ID - slackUserID, err := c.userMapper.SlackHandle(lookupCtx, githubUser, owner, domain) - if err == nil && slackUserID != "" { - // Track with channelID - this will only update on FIRST call per user/PR - c.notifier.Tracker.UpdateUserPRChannelTag(workspaceID, slackUserID, channelID, owner, repo, prNumber) - slog.Debug("tracked user tag in channel (async)", - "workspace", workspaceID, - "github_user", githubUser, - "slack_user", slackUserID, - "channel", channelDisplay, - "channel_id", channelID, - "pr", fmt.Sprintf(prFormatString, owner, repo, prNumber)) - } + // Record tags for blocked users synchronously to prevent race with DM sending + // This must complete BEFORE DM notifications check tag info + // Note: Most lookups hit cache and are instant; occasional cold lookups may delay slightly + // but this is necessary for correct DM delay logic + lookupCtx, lookupCancel := context.WithTimeout(ctx, 5*time.Second) + defer lookupCancel() + + for _, githubUser := range blockedUsers { + // Map GitHub username to Slack user ID + slackUserID, err := c.userMapper.SlackHandle(lookupCtx, githubUser, owner, domain) + if err == nil && slackUserID != "" { + // Track with channelID - this will only update on FIRST call per user/PR + c.notifier.Tracker.UpdateUserPRChannelTag(workspaceID, slackUserID, channelID, owner, repo, prNumber) + slog.Debug("tracked user tag in channel", + "workspace", workspaceID, + "github_user", githubUser, + "slack_user", slackUserID, + "channel", channelDisplay, + "channel_id", channelID, + "pr", fmt.Sprintf(prFormatString, owner, repo, prNumber)) } - }() + } } // Build what the message SHOULD be based on current PR state @@ -1109,6 +1210,9 @@ func (c *Coordinator) processPRForChannel( "channel_id", channelID, "thread_ts", threadTS, "pr_state", prState) + + // Also update DM messages for blocked users + c.updateDMMessagesForPR(ctx, checkResult, owner, repo, prNumber, event.PullRequest.Title, event.PullRequest.User.Login, prState, event.PullRequest.HTMLURL) } } else { slog.Debug("message already matches expected content, no update needed", @@ -1291,10 +1395,21 @@ func (c *Coordinator) createPRThread(ctx context.Context, channel, owner, repo s // Resolve channel name to ID for consistent API calls resolvedChannel := c.slack.ResolveChannelID(ctx, channel) - if resolvedChannel != channel { - slog.Debug("resolved channel for thread creation", "original", channel, "resolved", resolvedChannel) + + // Check if channel resolution failed (returns original name if not found) + if resolvedChannel == channel || (channel != "" && channel[0] == '#' && resolvedChannel == channel[1:]) { + // Only warn if it's not already a channel ID + if channel == "" || channel[0] != 'C' { + slog.Warn("could not resolve channel for thread creation", + "workspace", c.workspaceName, + logFieldPR, fmt.Sprintf(prFormatString, owner, repo, number), + "channel", channel, + "action_required", "verify channel exists and bot has access") + return "", "", fmt.Errorf("could not resolve channel: %s", channel) + } + slog.Debug("channel is already a channel ID", "channel_id", channel) } else { - slog.Debug("channel resolution did not change value", "channel", channel, "might_be_channel_id_already", resolvedChannel[0] == 'C') + slog.Debug("resolved channel for thread creation", "original", channel, "resolved", resolvedChannel) } // Create thread with resolved channel ID - post immediately without waiting for user lookups diff --git a/internal/bot/polling.go b/internal/bot/polling.go index 3bd225c..4267247 100644 --- a/internal/bot/polling.go +++ b/internal/bot/polling.go @@ -267,10 +267,17 @@ func (c *Coordinator) updateClosedPRThread(ctx context.Context, pr *github.PRSna n := 0 for _, ch := range channels { id := c.slack.ResolveChannelID(ctx, ch) - if id == "" { - slog.Debug("could not resolve channel ID for closed PR thread update", - "channel_name", ch, - "pr", prKey) + + // Check if channel resolution failed (returns original name if not found) + if id == ch || (ch != "" && ch[0] == '#' && id == ch[1:]) { + slog.Warn("could not resolve channel for closed PR thread update", + "workspace", c.workspaceName, + "pr", prKey, + "owner", pr.Owner, + "repo", pr.Repo, + "number", pr.Number, + "channel", ch, + "action_required", "verify channel exists and bot has access") continue } diff --git a/internal/notify/notify.go b/internal/notify/notify.go index b8ff410..0451000 100644 --- a/internal/notify/notify.go +++ b/internal/notify/notify.go @@ -297,7 +297,8 @@ func (m *Manager) NotifyUser(ctx context.Context, workspaceID, userID, channelID } // Send DM to user. - if err := slackClient.SendDirectMessage(ctx, userID, message); err != nil { + dmChannelID, messageTS, err := slackClient.SendDirectMessage(ctx, userID, message) + if err != nil { slog.Error("failed to send DM notification", "user", userID, "pr", fmt.Sprintf("%s/%s#%d", pr.Owner, pr.Repo, pr.Number), @@ -309,12 +310,23 @@ func (m *Manager) NotifyUser(ctx context.Context, workspaceID, userID, channelID // Update last DM notification time. m.Tracker.UpdateDMNotification(workspaceID, userID) + // Save DM message info for future updates + if err := slackClient.SaveDMMessageInfo(ctx, userID, pr.HTMLURL, dmChannelID, messageTS, message); err != nil { + slog.Warn("failed to save DM message info", + "user", userID, + "pr", fmt.Sprintf("%s/%s#%d", pr.Owner, pr.Repo, pr.Number), + "error", err, + "impact", "DM won't be updated on state changes") + } + slog.Info("successfully sent DM notification", "user", userID, "pr", fmt.Sprintf("%s/%s#%d", pr.Owner, pr.Repo, pr.Number), "pr_author", pr.Author, "pr_state", pr.State, "action_required", action, - "notification_updated", true) + "notification_updated", true, + "dm_channel_id", dmChannelID, + "message_ts", messageTS) return nil } diff --git a/internal/slack/manager.go b/internal/slack/manager.go index 5de3006..6799d2a 100644 --- a/internal/slack/manager.go +++ b/internal/slack/manager.go @@ -8,6 +8,7 @@ import ( "sync" "github.com/codeGROOVE-dev/gsm" + "github.com/codeGROOVE-dev/slacker/internal/state" ) // WorkspaceMetadata contains metadata about a Slack workspace installation. @@ -18,12 +19,19 @@ type WorkspaceMetadata struct { AccessToken string `json:"-"` // Never serialize token to JSON } +// StateStore interface for DM message tracking. +type StateStore interface { + DMMessage(userID, prURL string) (state.DMInfo, bool) + SaveDMMessage(userID, prURL string, info state.DMInfo) error +} + // Manager manages Slack clients for multiple workspaces. type Manager struct { clients map[string]*Client // team_id -> client metadata map[string]*WorkspaceMetadata signingSecret string homeViewHandler func(ctx context.Context, teamID, userID string) error // Global home view handler + stateStore StateStore // State store for DM message tracking mu sync.RWMutex } @@ -36,6 +44,18 @@ func NewManager(signingSecret string) *Manager { } } +// SetStateStore sets the state store for DM message tracking. +func (m *Manager) SetStateStore(store StateStore) { + m.mu.Lock() + defer m.mu.Unlock() + m.stateStore = store + + // Update existing clients with the state store + for _, client := range m.clients { + client.SetStateStore(store) + } +} + // Client returns a Slack client for the given workspace. // If the client doesn't exist in cache, it fetches the token from GSM. func (m *Manager) Client(ctx context.Context, teamID string) (*Client, error) { @@ -89,6 +109,11 @@ func (m *Manager) Client(ctx context.Context, teamID string) (*Client, error) { client.SetHomeViewHandler(m.homeViewHandler) } + // Set state store if configured + if m.stateStore != nil { + client.SetStateStore(m.stateStore) + } + // Cache it m.mu.Lock() m.clients[teamID] = client diff --git a/internal/slack/slack.go b/internal/slack/slack.go index 2bb6d58..640a404 100644 --- a/internal/slack/slack.go +++ b/internal/slack/slack.go @@ -19,6 +19,7 @@ import ( "time" "github.com/codeGROOVE-dev/retry" + "github.com/codeGROOVE-dev/slacker/internal/state" "github.com/slack-go/slack" "github.com/slack-go/slack/slackevents" ) @@ -51,6 +52,8 @@ type Client struct { teamID string // Workspace team ID homeViewHandler func(ctx context.Context, teamID, userID string) error // Callback for app_home_opened events homeViewHandlerMu sync.RWMutex + stateStore StateStore // State store for DM message tracking + stateStoreMu sync.RWMutex } // set stores a value in the cache with TTL. @@ -130,6 +133,13 @@ func (c *Client) SetTeamID(teamID string) { c.teamID = teamID } +// SetStateStore sets the state store for DM message tracking. +func (c *Client) SetStateStore(store StateStore) { + c.stateStoreMu.Lock() + defer c.stateStoreMu.Unlock() + c.stateStore = store +} + // WorkspaceInfo returns information about the current workspace (cached for 1 hour). func (c *Client) WorkspaceInfo(ctx context.Context) (*slack.TeamInfo, error) { cacheKey := "team_info" @@ -562,13 +572,13 @@ func (c *Client) HasRecentDMAboutPR(ctx context.Context, userID, prURL string) ( } // SendDirectMessage sends a direct message to a user with retry logic. -func (c *Client) SendDirectMessage(ctx context.Context, userID, text string) error { +func (c *Client) SendDirectMessage(ctx context.Context, userID, text string) (dmChannelID, messageTS string, err error) { slog.Info("sending DM to user", "user", userID) var channelID string // First, open conversation with retry - err := retry.Do( + err = retry.Do( func() error { channel, _, _, err := c.api.OpenConversationContext(ctx, &slack.OpenConversationParameters{ Users: []string{userID}, @@ -589,13 +599,14 @@ func (c *Client) SendDirectMessage(ctx context.Context, userID, text string) err retry.Context(ctx), ) if err != nil { - return fmt.Errorf("failed to open conversation after retries: %w", err) + return "", "", fmt.Errorf("failed to open conversation after retries: %w", err) } + var msgTS string // Then send message with retry err = retry.Do( func() error { - _, _, err := c.api.PostMessageContext(ctx, channelID, slack.MsgOptionText(text, false)) + _, ts, err := c.api.PostMessageContext(ctx, channelID, slack.MsgOptionText(text, false)) if err != nil { if isRateLimitError(err) { slog.Warn("rate limited sending DM, backing off", "user", userID) @@ -604,6 +615,7 @@ func (c *Client) SendDirectMessage(ctx context.Context, userID, text string) err slog.Warn("failed to send DM, retrying", "user", userID, "error", err) return err } + msgTS = ts return nil }, retry.Attempts(5), @@ -615,10 +627,85 @@ func (c *Client) SendDirectMessage(ctx context.Context, userID, text string) err retry.Context(ctx), ) if err != nil { - return fmt.Errorf("failed to send DM after retries: %w", err) + return "", "", fmt.Errorf("failed to send DM after retries: %w", err) + } + + slog.Info("successfully sent DM", "user", userID, "channel_id", channelID, "message_ts", msgTS) + return channelID, msgTS, nil +} + +// SaveDMMessageInfo saves DM message info to state store for future updates. +func (c *Client) SaveDMMessageInfo(ctx context.Context, userID, prURL, channelID, messageTS, messageText string) error { + c.stateStoreMu.RLock() + store := c.stateStore + c.stateStoreMu.RUnlock() + + if store == nil { + slog.Debug("no state store configured, skipping DM message save", "user", userID, "pr_url", prURL) + return nil + } + + info := state.DMInfo{ + ChannelID: channelID, + MessageTS: messageTS, + MessageText: messageText, + SentAt: time.Now(), + } + + if err := store.SaveDMMessage(userID, prURL, info); err != nil { + return fmt.Errorf("failed to save DM message info: %w", err) + } + + slog.Debug("saved DM message info for future updates", + "user", userID, + "pr_url", prURL, + "channel_id", channelID, + "message_ts", messageTS) + + return nil +} + +// UpdateDMMessage updates a previously sent DM message. +func (c *Client) UpdateDMMessage(ctx context.Context, userID, prURL, newText string) error { + c.stateStoreMu.RLock() + store := c.stateStore + c.stateStoreMu.RUnlock() + + if store == nil { + slog.Debug("no state store configured, cannot update DM", "user", userID, "pr_url", prURL) + return nil + } + + // Get stored DM message info + info, exists := store.DMMessage(userID, prURL) + if !exists { + slog.Debug("no DM message found to update", + "user", userID, + "pr_url", prURL, + "reason", "never sent or too old") + return nil } - slog.Info("successfully sent DM", "user", userID) + // Update the message using Slack API + if err := c.UpdateMessage(ctx, info.ChannelID, info.MessageTS, newText); err != nil { + return fmt.Errorf("failed to update DM message: %w", err) + } + + // Update stored message text + info.MessageText = newText + if err := store.SaveDMMessage(userID, prURL, info); err != nil { + slog.Warn("failed to update stored DM message text", + "user", userID, + "pr_url", prURL, + "error", err) + } + + slog.Info("updated DM message", + "user", userID, + "pr_url", prURL, + "channel_id", info.ChannelID, + "message_ts", info.MessageTS) + return nil } @@ -1361,7 +1448,9 @@ func (c *Client) ResolveChannelID(ctx context.Context, channelName string) strin } } - slog.Warn("could not resolve channel name to ID", "channel", channelName) + slog.Warn("could not resolve channel name to ID", + "channel", channelName, + "team_id", c.teamID) // Cache the failure for SHORT time (user might create channel or fix typo) c.cache.set(cacheKey, channelName, 45*time.Second) slog.Info("caching channel resolution failure briefly to allow for channel creation", diff --git a/internal/state/datastore.go b/internal/state/datastore.go index 43ca675..49ea17d 100644 --- a/internal/state/datastore.go +++ b/internal/state/datastore.go @@ -3,28 +3,28 @@ package state import ( "context" "errors" - "fmt" "log/slog" "time" "cloud.google.com/go/datastore" ) -// DatastoreStore implements Store using Google Cloud Datastore with JSON fallback. -// Uses hybrid approach: write to both, read from Datastore with JSON fallback. +// DatastoreStore implements Store using Google Cloud Datastore with in-memory cache. +// Uses hybrid approach: in-memory cache for speed, Datastore for persistence. type DatastoreStore struct { ds *datastore.Client - json *JSONStore - disabled bool // If Datastore failed to initialize, use JSON only + memory *MemoryStore + disabled bool // If Datastore failed to initialize, memory-only mode } // Entity types for Datastore. const ( - kindThread = "SlackerThread" - kindDM = "SlackerDM" - kindDigest = "SlackerDigest" - kindEvent = "SlackerEvent" - kindNotify = "SlackerNotification" + kindThread = "SlackerThread" + kindDM = "SlackerDM" + kindDMMessage = "SlackerDMMessage" + kindDigest = "SlackerDigest" + kindEvent = "SlackerEvent" + kindNotify = "SlackerNotification" ) // ErrAlreadyProcessed indicates an event was already processed by another instance. @@ -47,6 +47,15 @@ type dmEntity struct { SentAt time.Time `datastore:"sent_at"` } +// DM message entity for updating messages. +type dmMessageEntity struct { + ChannelID string `datastore:"channel_id"` + MessageTS string `datastore:"message_ts"` + MessageText string `datastore:"message_text,noindex"` + UpdatedAt time.Time `datastore:"updated_at"` + SentAt time.Time `datastore:"sent_at"` +} + // Digest tracking entity. type digestEntity struct { UserID string `datastore:"user_id"` @@ -66,26 +75,22 @@ type notifyEntity struct { NotifiedAt time.Time `datastore:"notified_at"` } -// NewDatastoreStore creates a new Datastore-backed store with JSON fallback. +// NewDatastoreStore creates a new Datastore-backed store with in-memory cache. // The databaseID parameter specifies which Datastore database to use (e.g., "slacker", "(default)"). func NewDatastoreStore(ctx context.Context, projectID, databaseID string) (*DatastoreStore, error) { - // Always create JSON store as fallback - jsonStore, err := NewJSONStore() - if err != nil { - return nil, fmt.Errorf("failed to create JSON fallback store: %w", err) - } + // Always create in-memory cache + memStore := NewMemoryStore() // Create Datastore client with specified database - // Use NewClientWithDatabase to specify which database to use ds, err := datastore.NewClientWithDatabase(ctx, projectID, databaseID) if err != nil { - slog.Warn("failed to create Datastore client, using JSON-only mode", + slog.Warn("failed to create Datastore client, using memory-only mode", "error", err, "project_id", projectID, "database_id", databaseID, - "fallback", "JSON files only") + "fallback", "in-memory only (no persistence)") return &DatastoreStore{ - json: jsonStore, + memory: memStore, disabled: true, }, nil } @@ -98,14 +103,14 @@ func NewDatastoreStore(ctx context.Context, projectID, databaseID string) (*Data } _, err = ds.Put(ctx, testKey, testEntity) if err != nil { - slog.Warn("Datastore connectivity test failed, using JSON-only mode", + slog.Warn("Datastore connectivity test failed, using memory-only mode", "error", err, - "fallback", "JSON files only") + "fallback", "in-memory only (no persistence)") if closeErr := ds.Close(); closeErr != nil { slog.Debug("failed to close Datastore during test", "error", closeErr) } return &DatastoreStore{ - json: jsonStore, + memory: memStore, disabled: true, }, nil } @@ -115,23 +120,24 @@ func NewDatastoreStore(ctx context.Context, projectID, databaseID string) (*Data slog.Debug("failed to delete test entity", "error", err) } - slog.Info("initialized Datastore with JSON fallback", + slog.Info("initialized Datastore with in-memory cache", "project_id", projectID, + "database_id", databaseID, "mode", "hybrid") return &DatastoreStore{ ds: ds, - json: jsonStore, + memory: memStore, disabled: false, }, nil } -// Thread retrieves thread info with Datastore-first, JSON fallback. +// Thread retrieves thread info with memory-first, then Datastore fallback. func (s *DatastoreStore) Thread(owner, repo string, number int, channelID string) (ThreadInfo, bool) { key := threadKey(owner, repo, number, channelID) - // Fast path: Check JSON cache first - info, exists := s.json.Thread(owner, repo, number, channelID) + // Fast path: Check memory cache first + info, exists := s.memory.Thread(owner, repo, number, channelID) if exists { return info, true } @@ -158,7 +164,7 @@ func (s *DatastoreStore) Thread(owner, repo string, number int, channelID string return ThreadInfo{}, false } - // Found in Datastore - update JSON cache and return + // Found in Datastore - update memory cache and return result := ThreadInfo{ ThreadTS: entity.ThreadTS, ChannelID: entity.ChannelID, @@ -167,23 +173,21 @@ func (s *DatastoreStore) Thread(owner, repo string, number int, channelID string LastEventTime: entity.LastEventTime, } - // Async update JSON cache (don't wait) - go func() { - if err := s.json.SaveThread(owner, repo, number, channelID, result); err != nil { - slog.Debug("failed to update JSON cache for thread", "error", err) - } - }() + // Update memory cache (sync - fast) + if err := s.memory.SaveThread(owner, repo, number, channelID, result); err != nil { + slog.Debug("failed to update memory cache for thread", "error", err) + } return result, true } -// SaveThread saves thread info to both Datastore and JSON. +// SaveThread saves thread info to memory and Datastore. func (s *DatastoreStore) SaveThread(owner, repo string, number int, channelID string, info ThreadInfo) error { key := threadKey(owner, repo, number, channelID) - // Always save to JSON (fast, local) - if err := s.json.SaveThread(owner, repo, number, channelID, info); err != nil { - slog.Warn("failed to save thread to JSON", "error", err) + // Always save to memory (fast, local) + if err := s.memory.SaveThread(owner, repo, number, channelID, info); err != nil { + slog.Warn("failed to save thread to memory", "error", err) } // Skip Datastore if disabled @@ -215,10 +219,10 @@ func (s *DatastoreStore) SaveThread(owner, repo string, number int, channelID st return nil } -// LastDM retrieves last DM time with Datastore-first, JSON fallback. +// LastDM retrieves last DM time with Datastore-first, memory fallback. func (s *DatastoreStore) LastDM(userID, prURL string) (time.Time, bool) { - // Check JSON first (fast) - t, exists := s.json.LastDM(userID, prURL) + // Check memory first (fast) + t, exists := s.memory.LastDM(userID, prURL) if exists { return t, true } @@ -241,10 +245,10 @@ func (s *DatastoreStore) LastDM(userID, prURL string) (time.Time, bool) { return time.Time{}, false } - // Update JSON cache async + // Update memory cache async go func() { - if err := s.json.RecordDM(userID, prURL, entity.SentAt); err != nil { - slog.Debug("failed to update JSON cache for DM", "error", err) + if err := s.memory.RecordDM(userID, prURL, entity.SentAt); err != nil { + slog.Debug("failed to update memory cache for DM", "error", err) } }() @@ -253,9 +257,9 @@ func (s *DatastoreStore) LastDM(userID, prURL string) (time.Time, bool) { // RecordDM saves DM timestamp to both stores. func (s *DatastoreStore) RecordDM(userID, prURL string, sentAt time.Time) error { - // Save to JSON - if err := s.json.RecordDM(userID, prURL, sentAt); err != nil { - slog.Warn("failed to record DM in JSON", "error", err) + // Save to memory + if err := s.memory.RecordDM(userID, prURL, sentAt); err != nil { + slog.Warn("failed to record DM in memory", "error", err) } // Skip Datastore if disabled @@ -286,10 +290,92 @@ func (s *DatastoreStore) RecordDM(userID, prURL string, sentAt time.Time) error return nil } +// DMMessage retrieves DM message info with Datastore-first, memory fallback. +func (s *DatastoreStore) DMMessage(userID, prURL string) (DMInfo, bool) { + // Check memory first (fast) + info, exists := s.memory.DMMessage(userID, prURL) + if exists { + return info, true + } + + // Datastore disabled + if s.disabled || s.ds == nil { + return DMInfo{}, false + } + + // Try Datastore + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + key := dmKey(userID, prURL) + dsKey := datastore.NameKey(kindDMMessage, key, nil) + var entity dmMessageEntity + + err := s.ds.Get(ctx, dsKey, &entity) + if err != nil { + return DMInfo{}, false + } + + // Found in Datastore - update memory cache and return + result := DMInfo{ + ChannelID: entity.ChannelID, + MessageTS: entity.MessageTS, + MessageText: entity.MessageText, + UpdatedAt: entity.UpdatedAt, + SentAt: entity.SentAt, + } + + // Update memory cache async + go func() { + if err := s.memory.SaveDMMessage(userID, prURL, result); err != nil { + slog.Debug("failed to update memory cache for DM message", "error", err) + } + }() + + return result, true +} + +// SaveDMMessage saves DM message info to both stores. +func (s *DatastoreStore) SaveDMMessage(userID, prURL string, info DMInfo) error { + // Always save to memory first (fast, local) + if err := s.memory.SaveDMMessage(userID, prURL, info); err != nil { + slog.Warn("failed to save DM message to memory", "error", err) + } + + // Skip Datastore if disabled + if s.disabled || s.ds == nil { + return nil + } + + // Save to Datastore async + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + key := dmKey(userID, prURL) + dsKey := datastore.NameKey(kindDMMessage, key, nil) + entity := &dmMessageEntity{ + ChannelID: info.ChannelID, + MessageTS: info.MessageTS, + MessageText: info.MessageText, + UpdatedAt: time.Now(), + SentAt: info.SentAt, + } + + if _, err := s.ds.Put(ctx, dsKey, entity); err != nil { + slog.Warn("failed to save DM message to Datastore", + "user", userID, + "error", err) + } + }() + + return nil +} + // LastDigest retrieves last digest time. func (s *DatastoreStore) LastDigest(userID, date string) (time.Time, bool) { - // Check JSON first - t, exists := s.json.LastDigest(userID, date) + // Check memory first + t, exists := s.memory.LastDigest(userID, date) if exists { return t, true } @@ -314,8 +400,8 @@ func (s *DatastoreStore) LastDigest(userID, date string) (time.Time, bool) { // Update cache go func() { - if err := s.json.RecordDigest(userID, date, entity.SentAt); err != nil { - slog.Debug("failed to update JSON cache for digest", "error", err) + if err := s.memory.RecordDigest(userID, date, entity.SentAt); err != nil { + slog.Debug("failed to update memory cache for digest", "error", err) } }() @@ -324,9 +410,9 @@ func (s *DatastoreStore) LastDigest(userID, date string) (time.Time, bool) { // RecordDigest saves digest timestamp. func (s *DatastoreStore) RecordDigest(userID, date string, sentAt time.Time) error { - // Save to JSON - if err := s.json.RecordDigest(userID, date, sentAt); err != nil { - slog.Warn("failed to record digest in JSON", "error", err) + // Save to memory + if err := s.memory.RecordDigest(userID, date, sentAt); err != nil { + slog.Warn("failed to record digest in memory", "error", err) } // Skip Datastore if disabled @@ -357,8 +443,8 @@ func (s *DatastoreStore) RecordDigest(userID, date string, sentAt time.Time) err // WasProcessed checks if an event was already processed (distributed check). func (s *DatastoreStore) WasProcessed(eventKey string) bool { - // Check JSON first (fast) - if s.json.WasProcessed(eventKey) { + // Check memory first (fast) + if s.memory.WasProcessed(eventKey) { return true } @@ -380,8 +466,8 @@ func (s *DatastoreStore) WasProcessed(eventKey string) bool { if exists { // Update local cache go func() { - if err := s.json.MarkProcessed(eventKey, 24*time.Hour); err != nil { - slog.Debug("failed to update JSON cache for event", "error", err) + if err := s.memory.MarkProcessed(eventKey, 24*time.Hour); err != nil { + slog.Debug("failed to update memory cache for event", "error", err) } }() } @@ -392,9 +478,9 @@ func (s *DatastoreStore) WasProcessed(eventKey string) bool { // MarkProcessed marks an event as processed (distributed coordination). // Returns error if already processed by another instance (enables race detection). func (s *DatastoreStore) MarkProcessed(eventKey string, ttl time.Duration) error { - // Mark in JSON first for fast local lookups - if err := s.json.MarkProcessed(eventKey, ttl); err != nil { - slog.Warn("failed to mark event in JSON", "error", err) + // Mark in memory first for fast local lookups + if err := s.memory.MarkProcessed(eventKey, ttl); err != nil { + slog.Warn("failed to mark event in memory", "error", err) } // Skip Datastore if disabled @@ -496,9 +582,9 @@ func (s *DatastoreStore) RecordNotification(prURL string, notifiedAt time.Time) // Cleanup removes old data from both stores. func (s *DatastoreStore) Cleanup() error { - // Always cleanup JSON - if err := s.json.Cleanup(); err != nil { - slog.Warn("JSON cleanup failed", "error", err) + // Always cleanup memory + if err := s.memory.Cleanup(); err != nil { + slog.Warn("memory cleanup failed", "error", err) } // Skip Datastore if disabled @@ -507,15 +593,15 @@ func (s *DatastoreStore) Cleanup() error { } // Datastore cleanup is done async via TTL or manual queries - // For now, rely on JSON cleanup and Datastore's natural expiration + // For now, rely on memory cleanup and Datastore's natural expiration return nil } // Close releases resources. func (s *DatastoreStore) Close() error { - if s.json != nil { - if err := s.json.Close(); err != nil { - slog.Warn("failed to close JSON store", "error", err) + if s.memory != nil { + if err := s.memory.Close(); err != nil { + slog.Warn("failed to close memory store", "error", err) } } diff --git a/internal/state/json.go b/internal/state/json.go index 7daa773..7479d92 100644 --- a/internal/state/json.go +++ b/internal/state/json.go @@ -20,6 +20,7 @@ type JSONStore struct { // In-memory cache for fast lookups threads map[string]ThreadInfo dms map[string]time.Time + dmMessages map[string]DMInfo // DM message tracking for updates digests map[string]time.Time events map[string]time.Time notifications map[string]time.Time @@ -42,6 +43,7 @@ func NewJSONStore() (*JSONStore, error) { baseDir: baseDir, threads: make(map[string]ThreadInfo), dms: make(map[string]time.Time), + dmMessages: make(map[string]DMInfo), digests: make(map[string]time.Time), events: make(map[string]time.Time), notifications: make(map[string]time.Time), @@ -142,6 +144,26 @@ func (s *JSONStore) RecordDM(userID, prURL string, sentAt time.Time) error { return s.save() } +// DMMessage retrieves DM message information for a user and PR. +func (s *JSONStore) DMMessage(userID, prURL string) (DMInfo, bool) { + s.mu.RLock() + defer s.mu.RUnlock() + key := dmKey(userID, prURL) + info, exists := s.dmMessages[key] + return info, exists +} + +// SaveDMMessage saves DM message information for a user and PR. +func (s *JSONStore) SaveDMMessage(userID, prURL string, info DMInfo) error { + s.mu.Lock() + defer s.mu.Unlock() + key := dmKey(userID, prURL) + info.UpdatedAt = time.Now() + s.dmMessages[key] = info + s.modified = true + return s.save() +} + // LastDigest retrieves the last digest timestamp for a user and date. func (s *JSONStore) LastDigest(userID, date string) (time.Time, bool) { s.mu.RLock() @@ -204,6 +226,7 @@ func (s *JSONStore) Cleanup() error { now := time.Now() cleanedThreads := 0 cleanedDMs := 0 + cleanedDMMessages := 0 cleanedDigests := 0 cleanedEvents := 0 @@ -223,6 +246,14 @@ func (s *JSONStore) Cleanup() error { } } + // Clean up old DM messages (>90 days) + for key, info := range s.dmMessages { + if now.Sub(info.UpdatedAt) > 90*24*time.Hour { + delete(s.dmMessages, key) + cleanedDMMessages++ + } + } + // Clean up old digests (>30 days) for key, t := range s.digests { if now.Sub(t) > 30*24*time.Hour { @@ -239,10 +270,11 @@ func (s *JSONStore) Cleanup() error { } } - if cleanedThreads+cleanedDMs+cleanedDigests+cleanedEvents > 0 { + if cleanedThreads+cleanedDMs+cleanedDMMessages+cleanedDigests+cleanedEvents > 0 { slog.Info("cleaned up old state", "threads", cleanedThreads, "dms", cleanedDMs, + "dm_messages", cleanedDMMessages, "digests", cleanedDigests, "events", cleanedEvents) s.modified = true @@ -266,6 +298,7 @@ func (s *JSONStore) Close() error { type persistentState struct { Threads map[string]ThreadInfo `json:"threads"` DMs map[string]time.Time `json:"dms"` + DMMessages map[string]DMInfo `json:"dm_messages"` Digests map[string]time.Time `json:"digests"` Events map[string]time.Time `json:"events"` Notifications map[string]time.Time `json:"notifications"` @@ -281,6 +314,7 @@ func (s *JSONStore) save() error { state := persistentState{ Threads: s.threads, DMs: s.dms, + DMMessages: s.dmMessages, Digests: s.digests, Events: s.events, Notifications: s.notifications, @@ -329,6 +363,7 @@ func (s *JSONStore) load() error { s.threads = state.Threads s.dms = state.DMs + s.dmMessages = state.DMMessages s.digests = state.Digests s.events = state.Events s.notifications = state.Notifications @@ -339,6 +374,9 @@ func (s *JSONStore) load() error { if s.dms == nil { s.dms = make(map[string]time.Time) } + if s.dmMessages == nil { + s.dmMessages = make(map[string]DMInfo) + } if s.digests == nil { s.digests = make(map[string]time.Time) } @@ -352,6 +390,7 @@ func (s *JSONStore) load() error { slog.Info("loaded state from disk", "threads", len(s.threads), "dms", len(s.dms), + "dm_messages", len(s.dmMessages), "digests", len(s.digests), "events", len(s.events), "notifications", len(s.notifications)) diff --git a/internal/state/store.go b/internal/state/store.go index a67d75a..6a76834 100644 --- a/internal/state/store.go +++ b/internal/state/store.go @@ -15,6 +15,15 @@ type ThreadInfo struct { MessageText string `json:"message_text"` } +// DMInfo stores information about a DM message for a PR. +type DMInfo struct { + ChannelID string `json:"channel_id"` // DM conversation channel ID + MessageTS string `json:"message_ts"` // Message timestamp for updating + MessageText string `json:"message_text"` // Current message text + UpdatedAt time.Time `json:"updated_at"` // When we last updated this message + SentAt time.Time `json:"sent_at"` // When we first sent this message +} + // Store provides persistent storage for bot state. // Implementations must be safe for concurrent use. type Store interface { @@ -26,6 +35,10 @@ type Store interface { LastDM(userID, prURL string) (time.Time, bool) RecordDM(userID, prURL string, sentAt time.Time) error + // DM message tracking - store DM message info for updating + DMMessage(userID, prURL string) (DMInfo, bool) + SaveDMMessage(userID, prURL string, info DMInfo) error + // Daily digest tracking - one per user per day LastDigest(userID, date string) (time.Time, bool) RecordDigest(userID, date string, sentAt time.Time) error