From 97b05fe7b9d5f4194b3647e01085746f6ed572aa Mon Sep 17 00:00:00 2001 From: Thomas Stromberg Date: Wed, 22 Oct 2025 09:40:52 +0200 Subject: [PATCH] Daily reminder feature --- cmd/registrar/main.go | 4 - cmd/server/main.go | 6 +- internal/bot/bot.go | 26 ++- internal/notify/daily.go | 420 +++++++++++++++++++++++++++++++++++- internal/state/datastore.go | 58 ++--- internal/state/json.go | 46 +++- 6 files changed, 498 insertions(+), 62 deletions(-) diff --git a/cmd/registrar/main.go b/cmd/registrar/main.go index 05c50d8..b5b990b 100644 --- a/cmd/registrar/main.go +++ b/cmd/registrar/main.go @@ -86,10 +86,6 @@ func main() { router.Handle("/install", rateLimitMiddleware(oauthLimiter)(http.HandlerFunc(oauthHandler.HandleInstall))).Methods("GET") router.Handle("/oauth/callback", rateLimitMiddleware(oauthLimiter)(http.HandlerFunc(oauthHandler.HandleCallback))).Methods("GET") - // Debug endpoint - DO NOT EXPOSE IN PRODUCTION - // Remove this endpoint entirely or protect with strong authentication - // router.HandleFunc("/debug", oauthHandler.HandleDebug).Methods("GET") - // Determine port port := os.Getenv("PORT") if port == "" { diff --git a/cmd/server/main.go b/cmd/server/main.go index 36b3372..d898a55 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -314,10 +314,6 @@ func run(ctx context.Context, cancel context.CancelFunc, cfg *config.ServerConfi router.Handle("/slack/install", rateLimitMiddleware(oauthLimiter)(http.HandlerFunc(oauthHandler.HandleInstall))).Methods("GET") router.Handle("/slack/oauth/callback", rateLimitMiddleware(oauthLimiter)(http.HandlerFunc(oauthHandler.HandleCallback))).Methods("GET") - // Debug endpoint - DO NOT EXPOSE IN PRODUCTION - // Remove this endpoint entirely or protect with strong authentication - // router.HandleFunc("/slack/debug", oauthHandler.HandleDebug).Methods("GET") - slog.Info("registered OAuth endpoints with rate limiting", "install_url", "/slack/install", "callback_url", "/slack/oauth/callback", @@ -710,7 +706,7 @@ func runBotCoordinators( } // Initialize daily digest scheduler - dailyDigest := notify.NewDailyDigestScheduler(notifier, githubManager, configManager, stateStore) + dailyDigest := notify.NewDailyDigestScheduler(notifier, githubManager, configManager, stateStore, slackManager) // Start initial coordinators cm.startCoordinators(ctx) diff --git a/internal/bot/bot.go b/internal/bot/bot.go index e349614..a376020 100644 --- a/internal/bot/bot.go +++ b/internal/bot/bot.go @@ -796,14 +796,14 @@ func (*Coordinator) extractBlockedUsersFromTurnclient(checkResult *turn.CheckRes // prUpdateInfo groups PR information for DM updates. type prUpdateInfo struct { - checkRes *turn.CheckResponse - owner string - repo string - title string - author string - state string - url string - number int + checkRes *turn.CheckResponse + owner string + repo string + title string + author string + state string + url string + number int } // updateDMMessagesForPR updates DM messages for all relevant users on a PR. @@ -1409,14 +1409,18 @@ func (c *Coordinator) handlePullRequestFromSprinkler( } // handlePullRequestReviewFromSprinkler handles PR review events from sprinkler. -func (*Coordinator) handlePullRequestReviewFromSprinkler(ctx context.Context, owner, repo string, prNumber int, sprinklerURL string, _ time.Time) { +// Reviews update PR state (approved, changes requested, etc), so we treat them +// like regular pull_request events and let turnclient analyze the current state. +func (c *Coordinator) handlePullRequestReviewFromSprinkler(ctx context.Context, owner, repo string, prNumber int, sprinklerURL string, eventTimestamp time.Time) { slog.Info("handling PR review event from sprinkler", logFieldOwner, owner, logFieldRepo, repo, "pr_number", prNumber, "sprinkler_url", sprinklerURL, - "note", "review events not fully implemented yet") - // TODO: Implement review event handling if needed + "note", "treating as pull_request event - turnclient will analyze review state") + + // Reviews change PR state, so handle like any other PR update + c.handlePullRequestFromSprinkler(ctx, owner, repo, prNumber, sprinklerURL, eventTimestamp) } // createPRThread creates a new thread in Slack for a PR. diff --git a/internal/notify/daily.go b/internal/notify/daily.go index 9c2e66c..17c5b9c 100644 --- a/internal/notify/daily.go +++ b/internal/notify/daily.go @@ -2,16 +2,27 @@ package notify import ( "context" + "fmt" "log/slog" + "sort" + "strings" "time" + "github.com/codeGROOVE-dev/retry" + "github.com/codeGROOVE-dev/slacker/internal/config" "github.com/codeGROOVE-dev/slacker/internal/github" + slackpkg "github.com/codeGROOVE-dev/slacker/internal/slack" + "github.com/codeGROOVE-dev/slacker/internal/usermapping" + "github.com/codeGROOVE-dev/slacker/pkg/home" + "github.com/codeGROOVE-dev/turnclient/pkg/turn" + gh "github.com/google/go-github/v50/github" ) // ConfigProvider provides configuration for daily digests. type ConfigProvider interface { DailyRemindersEnabled(org string) bool Domain(org string) string + Config(org string) (*config.RepoConfig, bool) } // StateProvider provides state storage for daily digests. @@ -21,12 +32,18 @@ type StateProvider interface { LastDM(userID, prURL string) (time.Time, bool) } +// SlackManager provides Slack client operations across workspaces. +type SlackManager interface { + Client(ctx context.Context, teamID string) (*slackpkg.Client, error) +} + // DailyDigestScheduler handles sending daily digest DMs to users blocking PRs. type DailyDigestScheduler struct { notifier *Manager githubManager *github.Manager configManager ConfigProvider stateStore StateProvider + slackManager SlackManager } // NewDailyDigestScheduler creates a new daily digest scheduler. @@ -35,12 +52,14 @@ func NewDailyDigestScheduler( githubManager *github.Manager, configManager ConfigProvider, stateStore StateProvider, + slackManager SlackManager, ) *DailyDigestScheduler { return &DailyDigestScheduler{ notifier: notifier, githubManager: githubManager, configManager: configManager, stateStore: stateStore, + slackManager: slackManager, } } @@ -64,14 +83,13 @@ func (d *DailyDigestScheduler) CheckAndSend(ctx context.Context) { // Check if daily reminders are enabled for this org if !d.configManager.DailyRemindersEnabled(org) { slog.Debug("daily reminders disabled for org", "org", org) + totalSkipped++ continue } - // TODO: Implement full daily digest logic - // For now, log that we would process this org - slog.Debug("daily digest check for org (full implementation pending)", - "org", org) - totalSkipped++ + sent, errors := d.processOrgDigests(ctx, org) + totalSent += sent + totalErrors += errors } slog.Info("daily digest check complete", @@ -81,9 +99,389 @@ func (d *DailyDigestScheduler) CheckAndSend(ctx context.Context) { "errors", totalErrors) } -// TODO: Implement full daily digest functionality. -// The remaining implementation will require: -// - User mapping from GitHub to Slack -// - PR analysis with turnclient -// - Timezone-aware message delivery -// - Deduplication with existing DM notifications. +// processOrgDigests processes daily digests for all users in an organization. +func (d *DailyDigestScheduler) processOrgDigests(ctx context.Context, org string) (sent, errors int) { + // Get GitHub client for this org + githubClient, ok := d.githubManager.ClientForOrg(org) + if !ok { + slog.Warn("no GitHub client for org", "org", org) + return 0, 1 + } + + // Get team ID from config (needed for Slack client) + cfg, exists := d.configManager.Config(org) + if !exists { + slog.Warn("no config for org", "org", org) + return 0, 1 + } + teamID := cfg.Global.TeamID + + // Get Slack client for this workspace + slackClient, err := d.slackManager.Client(ctx, teamID) + if err != nil { + slog.Warn("failed to get Slack client", "org", org, "team_id", teamID, "error", err) + return 0, 1 + } + + // Create user mapper for this org + userMapper := usermapping.New(slackClient.API(), githubClient.InstallationToken(ctx)) + + // Get all open PRs for this org + prs, err := d.fetchOrgPRs(ctx, githubClient, org) + if err != nil { + slog.Error("failed to fetch PRs for org", "org", org, "error", err) + return 0, 1 + } + + if len(prs) == 0 { + slog.Debug("no open PRs for org", "org", org) + return 0, 0 + } + + // Analyze each PR with turnclient to find blocked users + blockedUsers := make(map[string][]home.PR) // githubUsername -> PRs they're blocking + + for i := range prs { + pr := &prs[i] + // Skip PRs older than 90 days (stale) + if time.Since(pr.UpdatedAt) > 90*24*time.Hour { + continue + } + + // Skip if PR was updated very recently (already sent DMs) + if time.Since(pr.UpdatedAt) < 8*time.Hour { + continue + } + + // Analyze with turnclient + checkResult, err := d.analyzePR(ctx, githubClient, org, *pr) + if err != nil { + slog.Debug("failed to analyze PR", "org", org, "pr", pr.Number, "error", err) + continue + } + + // Find users who are blocking this PR + for githubUser, action := range checkResult.Analysis.NextAction { + if githubUser == "_system" { + continue + } + + // Enrich PR with turnclient data + enrichedPR := d.enrichPR(*pr, checkResult, githubUser, action) + blockedUsers[githubUser] = append(blockedUsers[githubUser], enrichedPR) + } + } + + slog.Debug("analyzed PRs for daily digest", + "org", org, + "total_prs", len(prs), + "blocked_users", len(blockedUsers)) + + // Send digests to users who are in their 8-9am window + domain := d.configManager.Domain(org) + for githubUser, userPRs := range blockedUsers { + if d.shouldSendDigest(ctx, userMapper, slackClient, githubUser, org, domain, userPRs) { + if err := d.sendDigest(ctx, userMapper, slackClient, githubUser, org, domain, userPRs); err != nil { + slog.Error("failed to send daily digest", + "org", org, + "github_user", githubUser, + "pr_count", len(userPRs), + "error", err) + errors++ + } else { + sent++ + } + } + } + + return sent, errors +} + +// fetchOrgPRs fetches all open PRs for an organization. +func (d *DailyDigestScheduler) fetchOrgPRs(ctx context.Context, githubClient *github.Client, org string) ([]home.PR, error) { + client := githubClient.Client() + + // Search for all open PRs in this org + query := fmt.Sprintf("is:pr is:open org:%s", org) + opts := &gh.SearchOptions{ + ListOptions: gh.ListOptions{PerPage: 100}, + } + + var allPRs []home.PR + + for { + var result *gh.IssuesSearchResult + var resp *gh.Response + + // Retry GitHub API call with exponential backoff + err := retry.Do( + func() error { + var searchErr error + result, resp, searchErr = client.Search.Issues(ctx, query, opts) + return searchErr + }, + retry.Attempts(5), + retry.Delay(time.Second), + retry.MaxDelay(2*time.Minute), + retry.DelayType(retry.BackOffDelay), + retry.OnRetry(func(n uint, err error) { + slog.Warn("retrying GitHub search after failure", + "org", org, + "attempt", n+1, + "error", err) + }), + ) + if err != nil { + return nil, fmt.Errorf("failed to search PRs after retries: %w", err) + } + + for _, issue := range result.Issues { + if issue.PullRequestLinks == nil { + continue // Skip non-PRs + } + + // Extract repo from URL + parts := strings.Split(*issue.RepositoryURL, "/") + if len(parts) < 2 { + continue + } + repo := parts[len(parts)-1] + + allPRs = append(allPRs, home.PR{ + Number: *issue.Number, + Title: *issue.Title, + Author: *issue.User.Login, + Repository: fmt.Sprintf("%s/%s", org, repo), + URL: *issue.HTMLURL, + UpdatedAt: issue.UpdatedAt.Time, + }) + } + + if resp.NextPage == 0 { + break + } + opts.Page = resp.NextPage + } + + return allPRs, nil +} + +// analyzePR analyzes a PR with turnclient. +func (d *DailyDigestScheduler) analyzePR(ctx context.Context, githubClient *github.Client, org string, pr home.PR) (*turn.CheckResponse, error) { + turnClient, err := turn.NewDefaultClient() + if err != nil { + return nil, fmt.Errorf("failed to create turn client: %w", err) + } + + turnClient.SetAuthToken(githubClient.InstallationToken(ctx)) + + checkCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + // Retry turnclient call with exponential backoff + var result *turn.CheckResponse + err = retry.Do( + func() error { + var checkErr error + result, checkErr = turnClient.Check(checkCtx, pr.URL, pr.Author, pr.UpdatedAt) + return checkErr + }, + retry.Attempts(5), + retry.Delay(time.Second), + retry.MaxDelay(2*time.Minute), + retry.DelayType(retry.BackOffDelay), + retry.OnRetry(func(n uint, err error) { + slog.Warn("retrying turnclient check after failure", + "pr", pr.URL, + "attempt", n+1, + "error", err) + }), + ) + if err != nil { + return nil, fmt.Errorf("failed to check PR after retries: %w", err) + } + + return result, nil +} + +// enrichPR enriches a PR with turnclient analysis results. +func (d *DailyDigestScheduler) enrichPR(pr home.PR, checkResult *turn.CheckResponse, githubUser string, action turn.Action) home.PR { + pr.ActionKind = string(action.Kind) + pr.ActionReason = action.Reason + pr.NeedsReview = action.Kind == "review" || action.Kind == "approve" + pr.IsBlocked = true // They're in NextAction, so they're blocking + + return pr +} + +// shouldSendDigest determines if a digest should be sent to a user now. +func (d *DailyDigestScheduler) shouldSendDigest(ctx context.Context, userMapper *usermapping.Service, slackClient *slackpkg.Client, githubUser, org, domain string, prs []home.PR) bool { + // Map to Slack user + slackUserID, err := userMapper.SlackHandle(ctx, githubUser, org, domain) + if err != nil || slackUserID == "" { + slog.Debug("no Slack mapping for GitHub user", + "github_user", githubUser, + "org", org) + return false + } + + // Get user's timezone + tzName, err := slackClient.UserTimezone(ctx, slackUserID) + if err != nil { + slog.Debug("failed to get user timezone", + "slack_user", slackUserID, + "github_user", githubUser, + "error", err) + return false + } + + // Parse timezone + loc, err := time.LoadLocation(tzName) + if err != nil { + slog.Debug("invalid timezone", + "slack_user", slackUserID, + "timezone", tzName, + "error", err) + return false + } + + // Check if it's 8-9am in user's timezone + now := time.Now().In(loc) + hour := now.Hour() + + if hour < 8 || hour >= 9 { + return false // Not in the 8-9am window + } + + // Check if we already sent a digest today + today := now.Format("2006-01-02") + if lastDigest, exists := d.stateStore.LastDigest(slackUserID, today); exists { + slog.Debug("already sent digest today", + "slack_user", slackUserID, + "github_user", githubUser, + "today", today, + "last_sent", lastDigest) + return false + } + + return true +} + +// sendDigest sends a daily digest to a user. +func (d *DailyDigestScheduler) sendDigest(ctx context.Context, userMapper *usermapping.Service, slackClient *slackpkg.Client, githubUser, org, domain string, prs []home.PR) error { + // Map to Slack user + slackUserID, err := userMapper.SlackHandle(ctx, githubUser, org, domain) + if err != nil { + return fmt.Errorf("failed to map user: %w", err) + } + + // Separate incoming (need to review) vs outgoing (user is author) + var incoming, outgoing []home.PR + for i := range prs { + if prs[i].Author == githubUser { + outgoing = append(outgoing, prs[i]) + } else { + incoming = append(incoming, prs[i]) + } + } + + // Sort both lists by most recently updated first + sort.Slice(incoming, func(i, j int) bool { + return incoming[i].UpdatedAt.After(incoming[j].UpdatedAt) + }) + sort.Slice(outgoing, func(i, j int) bool { + return outgoing[i].UpdatedAt.After(outgoing[j].UpdatedAt) + }) + + // Format digest message with separated sections + message := d.formatDigestMessage(incoming, outgoing) + + // Send DM + _, _, err = slackClient.SendDirectMessage(ctx, slackUserID, message) + if err != nil { + return fmt.Errorf("failed to send DM: %w", err) + } + + // Record that we sent a digest today (memory + best-effort persistence) + tzName, tzErr := slackClient.UserTimezone(ctx, slackUserID) + if tzErr != nil { + slog.Debug("failed to get user timezone, using UTC", "user", slackUserID, "error", tzErr) + tzName = "UTC" + } + loc, err := time.LoadLocation(tzName) + if err != nil { + slog.Debug("failed to load timezone location, using UTC", "timezone", tzName, "error", err) + loc = time.UTC + } + today := time.Now().In(loc).Format("2006-01-02") + + // RecordDigest always succeeds (memory) and attempts persistence (best-effort) + if err := d.stateStore.RecordDigest(slackUserID, today, time.Now()); err != nil { + slog.Debug("state store returned error for RecordDigest", "error", err) + } + + slog.Info("sent daily digest", + "slack_user", slackUserID, + "github_user", githubUser, + "incoming_count", len(incoming), + "outgoing_count", len(outgoing)) + + return nil +} + +// formatDigestMessage formats a daily digest message with friendly, varied greetings. +func (d *DailyDigestScheduler) formatDigestMessage(incoming, outgoing []home.PR) string { + return d.formatDigestMessageAt(incoming, outgoing, time.Now()) +} + +// formatDigestMessageAt formats a daily digest message at a specific time (for testing). +func (d *DailyDigestScheduler) formatDigestMessageAt(incoming, outgoing []home.PR, now time.Time) string { + var sb strings.Builder + + // Friendly, happy greetings - keep it chill and inviting + greetings := []string{ + "โ˜€๏ธ *Good morning!*", + "๐Ÿ‘‹ *Hey there!*", + "โ˜• *Coffee's ready!*", + "๐ŸŒˆ *Happy morning!*", + "๐ŸŽจ *Time to create!*", + "๐ŸŒป *Hello sunshine!*", + "๐ŸŽต *Morning vibes!*", + "โœจ *Hey friend!*", + "๐ŸŒธ *Beautiful day!*", + "๐Ÿ’ซ *Greetings!*", + } + + // Pick greeting based on time for variety + greetingIdx := (now.Hour()*60 + now.Minute()) % len(greetings) + sb.WriteString(greetings[greetingIdx]) + sb.WriteString("\n\n") + + // Show incoming PRs (user needs to review) + if len(incoming) > 0 { + sb.WriteString("*To Review:*\n") + for i := range incoming { + sb.WriteString(fmt.Sprintf(":hourglass: <%s|%s> ยท %s\n", + incoming[i].URL, + incoming[i].Title, + incoming[i].ActionKind)) + } + sb.WriteString("\n") + } + + // Show outgoing PRs (user is author, needs to address feedback) + if len(outgoing) > 0 { + sb.WriteString("*Your PRs:*\n") + for i := range outgoing { + sb.WriteString(fmt.Sprintf(":hourglass: <%s|%s> ยท %s\n", + outgoing[i].URL, + outgoing[i].Title, + outgoing[i].ActionKind)) + } + sb.WriteString("\n") + } + + sb.WriteString("_Your daily digest from Ready to Review_") + + return sb.String() +} diff --git a/internal/state/datastore.go b/internal/state/datastore.go index 36abf8e..522319a 100644 --- a/internal/state/datastore.go +++ b/internal/state/datastore.go @@ -211,7 +211,7 @@ func (s *DatastoreStore) SaveThread(owner, repo string, number int, channelID st } if _, err := s.ds.Put(ctx, dsKey, entity); err != nil { - slog.Warn("failed to save thread to Datastore", + slog.Error("failed to save thread to Datastore", "key", key, "error", err) } @@ -282,7 +282,7 @@ func (s *DatastoreStore) RecordDM(userID, prURL string, sentAt time.Time) error } if _, err := s.ds.Put(ctx, dsKey, entity); err != nil { - slog.Warn("failed to record DM in Datastore", + slog.Error("failed to record DM in Datastore", "user", userID, "error", err) } @@ -358,7 +358,7 @@ func (s *DatastoreStore) SaveDMMessage(userID, prURL string, info DMInfo) error } if _, err := s.ds.Put(ctx, dsKey, entity); err != nil { - slog.Warn("failed to save DM message to Datastore", + slog.Error("failed to save DM message to Datastore", "user", userID, "error", err) } @@ -456,9 +456,11 @@ func (s *DatastoreStore) LastDigest(userID, date string) (time.Time, bool) { return entity.SentAt, true } -// RecordDigest saves digest timestamp. +// RecordDigest saves digest timestamp to memory and attempts persistence to Datastore. +// Memory is always updated (primary storage for runtime). Datastore is best-effort for restart recovery. +// Degrades gracefully: logs errors but continues operating if Datastore unavailable. func (s *DatastoreStore) RecordDigest(userID, date string, sentAt time.Time) error { - // Save to memory + // Always save to memory first (primary storage, must succeed) if err := s.memory.RecordDigest(userID, date, sentAt); err != nil { slog.Warn("failed to record digest in memory", "error", err) } @@ -468,23 +470,27 @@ func (s *DatastoreStore) RecordDigest(userID, date string, sentAt time.Time) err return nil } - // Save to Datastore async - go func() { - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() + // Best-effort persistence to Datastore for restart recovery + // Synchronous write for maximum reliability, but don't fail operation if it doesn't work + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() - key := digestKey(userID, date) - dsKey := datastore.NameKey(kindDigest, key, nil) - entity := &digestEntity{ - UserID: userID, - Date: date, - SentAt: sentAt, - } + key := digestKey(userID, date) + dsKey := datastore.NameKey(kindDigest, key, nil) + entity := &digestEntity{ + UserID: userID, + Date: date, + SentAt: sentAt, + } - if _, err := s.ds.Put(ctx, dsKey, entity); err != nil { - slog.Warn("failed to record digest in Datastore", "error", err) - } - }() + if _, err := s.ds.Put(ctx, dsKey, entity); err != nil { + slog.Error("failed to persist digest to Datastore - may send duplicate after restart", + "user", userID, + "date", date, + "error", err) + // Graceful degradation: log error but don't fail the operation + // System continues running even if external persistence unavailable + } return nil } @@ -566,19 +572,21 @@ func (s *DatastoreStore) MarkProcessed(eventKey string, ttl time.Duration) error // Other error return err }) - // Return the error to caller so they can detect race condition + // Return error only for race conditions (ErrAlreadyProcessed) + // For other errors, degrade gracefully if err != nil { if errors.Is(err, ErrAlreadyProcessed) { // This is expected during rolling deployments - return error to caller return err } - // Unexpected error - log but don't fail processing - slog.Warn("failed to mark event in Datastore", + // Unexpected error - log but don't fail processing (graceful degradation) + slog.Error("failed to mark event in Datastore - continuing with memory-only tracking", "event", eventKey, "error", err) + return nil } - return err + return nil } // LastNotification retrieves when a PR was last notified about. @@ -621,7 +629,7 @@ func (s *DatastoreStore) RecordNotification(prURL string, notifiedAt time.Time) } if _, err := s.ds.Put(ctx, dsKey, entity); err != nil { - slog.Warn("failed to record notification in Datastore", "error", err) + slog.Error("failed to record notification in Datastore", "error", err) } }() diff --git a/internal/state/json.go b/internal/state/json.go index be582c9..868b7e4 100644 --- a/internal/state/json.go +++ b/internal/state/json.go @@ -120,9 +120,14 @@ func (s *JSONStore) SaveThread(owner, repo string, number int, channelID string, defer s.mu.Unlock() key := threadKey(owner, repo, number, channelID) info.UpdatedAt = time.Now() + // Always save to memory (primary storage) s.threads[key] = info s.modified = true - return s.save() + // Best-effort persistence to JSON file for restart recovery + if err := s.save(); err != nil { + slog.Error("failed to persist thread to JSON file", "key", key, "error", err) + } + return nil } // LastDM retrieves the last DM timestamp for a user and PR. @@ -139,9 +144,14 @@ func (s *JSONStore) RecordDM(userID, prURL string, sentAt time.Time) error { s.mu.Lock() defer s.mu.Unlock() key := dmKey(userID, prURL) + // Always save to memory (primary storage) s.dms[key] = sentAt s.modified = true - return s.save() + // Best-effort persistence to JSON file for restart recovery + if err := s.save(); err != nil { + slog.Error("failed to persist DM record to JSON file", "user", userID, "error", err) + } + return nil } // DMMessage retrieves DM message information for a user and PR. @@ -159,9 +169,14 @@ func (s *JSONStore) SaveDMMessage(userID, prURL string, info DMInfo) error { defer s.mu.Unlock() key := dmKey(userID, prURL) info.UpdatedAt = time.Now() + // Always save to memory (primary storage) s.dmMessages[key] = info s.modified = true - return s.save() + // Best-effort persistence to JSON file for restart recovery + if err := s.save(); err != nil { + slog.Error("failed to persist DM message to JSON file", "user", userID, "error", err) + } + return nil } // ListDMUsers returns all user IDs who have received DMs for a given PR. @@ -199,9 +214,18 @@ func (s *JSONStore) RecordDigest(userID, date string, sentAt time.Time) error { s.mu.Lock() defer s.mu.Unlock() key := digestKey(userID, date) + // Always save to memory (primary storage) s.digests[key] = sentAt s.modified = true - return s.save() + // Best-effort persistence to JSON file for restart recovery + if err := s.save(); err != nil { + slog.Error("failed to persist digest to JSON file - may send duplicate after restart", + "user", userID, + "date", date, + "error", err) + // Graceful degradation: log error but don't fail the operation + } + return nil } // WasProcessed checks if an event was already processed. @@ -218,9 +242,14 @@ func (s *JSONStore) WasProcessed(eventKey string) bool { func (s *JSONStore) MarkProcessed(eventKey string, _ time.Duration) error { s.mu.Lock() defer s.mu.Unlock() + // Always save to memory (primary storage) s.events[eventKey] = time.Now() s.modified = true - return s.save() + // Best-effort persistence to JSON file for restart recovery + if err := s.save(); err != nil { + slog.Error("failed to persist event record to JSON file", "event", eventKey, "error", err) + } + return nil } // LastNotification retrieves the last notification timestamp for a PR. @@ -234,9 +263,14 @@ func (s *JSONStore) LastNotification(prURL string) time.Time { func (s *JSONStore) RecordNotification(prURL string, notifiedAt time.Time) error { s.mu.Lock() defer s.mu.Unlock() + // Always save to memory (primary storage) s.notifications[prURL] = notifiedAt s.modified = true - return s.save() + // Best-effort persistence to JSON file for restart recovery + if err := s.save(); err != nil { + slog.Error("failed to persist notification record to JSON file", "pr_url", prURL, "error", err) + } + return nil } // Cleanup removes old data from all maps.