From f070ef4a7d539225cb37601a64e11f74618f7ed4 Mon Sep 17 00:00:00 2001 From: Thomas Stromberg Date: Mon, 20 Oct 2025 16:48:01 +0200 Subject: [PATCH] Include today in closed PR check, lint --- cmd/server/main.go | 16 ++++---- go.mod | 2 +- go.sum | 4 +- internal/bot/bot.go | 30 ++++++++------ internal/bot/bot_sprinkler.go | 77 ++++++++++++++++++++++++++++++++--- internal/bot/polling.go | 16 ++++++-- internal/config/config.go | 32 +++------------ internal/github/graphql.go | 24 +++++++---- internal/notify/daily.go | 4 +- internal/state/datastore.go | 22 +++++----- internal/state/json.go | 16 ++++---- internal/state/store.go | 8 ++-- pkg/home/fetcher.go | 2 +- 13 files changed, 159 insertions(+), 94 deletions(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index ad1c0a6..66b2244 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -161,15 +161,15 @@ func run(ctx context.Context, cancel context.CancelFunc, cfg *config.ServerConfi // Initialize state store (Datastore + JSON fallback). var stateStore interface { - GetThread(owner, repo string, number int, channelID string) (state.ThreadInfo, bool) + Thread(owner, repo string, number int, channelID string) (state.ThreadInfo, bool) SaveThread(owner, repo string, number int, channelID string, info state.ThreadInfo) error - GetLastDM(userID, prURL string) (time.Time, bool) + LastDM(userID, prURL string) (time.Time, bool) RecordDM(userID, prURL string, sentAt time.Time) error - GetLastDigest(userID, date string) (time.Time, bool) + LastDigest(userID, date string) (time.Time, bool) RecordDigest(userID, date string, sentAt time.Time) error WasProcessed(eventKey string) bool MarkProcessed(eventKey string, ttl time.Duration) error - GetLastNotification(prURL string) time.Time + LastNotification(prURL string) time.Time RecordNotification(prURL string, notifiedAt time.Time) error Cleanup() error Close() error @@ -668,15 +668,15 @@ func runBotCoordinators( configManager *config.Manager, notifier *notify.Manager, stateStore interface { - GetThread(owner, repo string, number int, channelID string) (state.ThreadInfo, bool) + Thread(owner, repo string, number int, channelID string) (state.ThreadInfo, bool) SaveThread(owner, repo string, number int, channelID string, info state.ThreadInfo) error - GetLastDM(userID, prURL string) (time.Time, bool) + LastDM(userID, prURL string) (time.Time, bool) RecordDM(userID, prURL string, sentAt time.Time) error - GetLastDigest(userID, date string) (time.Time, bool) + LastDigest(userID, date string) (time.Time, bool) RecordDigest(userID, date string, sentAt time.Time) error WasProcessed(eventKey string) bool MarkProcessed(eventKey string, ttl time.Duration) error - GetLastNotification(prURL string) time.Time + LastNotification(prURL string) time.Time RecordNotification(prURL string, notifiedAt time.Time) error Cleanup() error Close() error diff --git a/go.mod b/go.mod index d853404..fb52c78 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/codeGROOVE-dev/gh-mailto v0.0.0-20251019162917-c3412c017b1f github.com/codeGROOVE-dev/gsm v0.0.0-20251019065141-833fe2363d22 github.com/codeGROOVE-dev/retry v1.2.0 - github.com/codeGROOVE-dev/sprinkler v0.0.0-20251020064313-f606185b6b98 + github.com/codeGROOVE-dev/sprinkler v0.0.0-20251020140418-efb533e2ff51 github.com/codeGROOVE-dev/turnclient v0.0.0-20251018202306-7cdc0d51856e github.com/golang-jwt/jwt/v5 v5.3.0 github.com/google/go-github/v50 v50.2.0 diff --git a/go.sum b/go.sum index 80fafad..1a79fc5 100644 --- a/go.sum +++ b/go.sum @@ -22,8 +22,8 @@ github.com/codeGROOVE-dev/prx v0.0.0-20251016165946-00c6c6e90c29 h1:MSBy3Ywr3ky/ github.com/codeGROOVE-dev/prx v0.0.0-20251016165946-00c6c6e90c29/go.mod h1:7qLbi18baOyS8yO/6/64SBIqtyzSzLFdsDST15NPH3w= github.com/codeGROOVE-dev/retry v1.2.0 h1:xYpYPX2PQZmdHwuiQAGGzsBm392xIMl4nfMEFApQnu8= github.com/codeGROOVE-dev/retry v1.2.0/go.mod h1:8OgefgV1XP7lzX2PdKlCXILsYKuz6b4ZpHa/20iLi8E= -github.com/codeGROOVE-dev/sprinkler v0.0.0-20251020064313-f606185b6b98 h1:unjiIF1rx/QZfcTEW/n6EJjde1yd3b1ZbjrWee2Afj4= -github.com/codeGROOVE-dev/sprinkler v0.0.0-20251020064313-f606185b6b98/go.mod h1:/kd3ncsRNldD0MUpbtp5ojIzfCkyeXB7JdOrpuqG7Gg= +github.com/codeGROOVE-dev/sprinkler v0.0.0-20251020140418-efb533e2ff51 h1:oPVbUoZ1jxgmrqybgRCfhwdT8KaXE/hzQ4vAswRybt0= +github.com/codeGROOVE-dev/sprinkler v0.0.0-20251020140418-efb533e2ff51/go.mod h1:/kd3ncsRNldD0MUpbtp5ojIzfCkyeXB7JdOrpuqG7Gg= github.com/codeGROOVE-dev/turnclient v0.0.0-20251018202306-7cdc0d51856e h1:3qoY6h8SgoeNsIYRM7P6PegTXAHPo8OSOapUunVP/Gs= github.com/codeGROOVE-dev/turnclient v0.0.0-20251018202306-7cdc0d51856e/go.mod h1:fYwtN9Ql6lY8t2WvCfENx+mP5FUwjlqwXCLx9CVLY20= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= diff --git a/internal/bot/bot.go b/internal/bot/bot.go index b9422ab..fc27952 100644 --- a/internal/bot/bot.go +++ b/internal/bot/bot.go @@ -83,27 +83,28 @@ func (tc *ThreadCache) Cleanup(maxAge time.Duration) { // Coordinator coordinates between GitHub, Slack, and notifications for a single org. type Coordinator struct { - slack *slackpkg.Client - github *github.Client - configManager *config.Manager - notifier *notify.Manager - userMapper *usermapping.Service - sprinklerURL string - threadCache *ThreadCache // In-memory cache for fast lookups - stateStore StateStore // Persistent state across restarts - workspaceName string // Track workspace name for better logging - eventSemaphore chan struct{} // Limits concurrent event processing (prevents overwhelming APIs) + slack *slackpkg.Client + github *github.Client + configManager *config.Manager + notifier *notify.Manager + userMapper *usermapping.Service + sprinklerURL string + threadCache *ThreadCache // In-memory cache for fast lookups + stateStore StateStore // Persistent state across restarts + workspaceName string // Track workspace name for better logging + eventSemaphore chan struct{} // Limits concurrent event processing (prevents overwhelming APIs) + processingEvents sync.WaitGroup // Tracks in-flight event processing for graceful shutdown } // StateStore interface for persistent state - allows dependency injection for testing. type StateStore interface { - GetThread(owner, repo string, number int, channelID string) (ThreadInfo, bool) + Thread(owner, repo string, number int, channelID string) (ThreadInfo, bool) SaveThread(owner, repo string, number int, channelID string, info ThreadInfo) error - GetLastDM(userID, prURL string) (time.Time, bool) + LastDM(userID, prURL string) (time.Time, bool) RecordDM(userID, prURL string, sentAt time.Time) error WasProcessed(eventKey string) bool MarkProcessed(eventKey string, ttl time.Duration) error - GetLastNotification(prURL string) time.Time + LastNotification(prURL string) time.Time RecordNotification(prURL string, notifiedAt time.Time) error Close() error } @@ -1195,6 +1196,9 @@ func (c *Coordinator) handlePullRequestFromSprinkler( logFieldOwner, owner, logFieldRepo, repo, "pr_number", prNumber, + "pr_state", checkResult.PullRequest.State, + "pr_draft", checkResult.PullRequest.Draft, + "pr_merged", checkResult.PullRequest.Merged, "pr_size", checkResult.Analysis.Size, "unresolved_comments", checkResult.Analysis.UnresolvedComments, "checks_state", fmt.Sprintf("%+v", checkResult.Analysis.Checks), diff --git a/internal/bot/bot_sprinkler.go b/internal/bot/bot_sprinkler.go index b6deed0..6b06179 100644 --- a/internal/bot/bot_sprinkler.go +++ b/internal/bot/bot_sprinkler.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/codeGROOVE-dev/slacker/internal/state" "github.com/codeGROOVE-dev/sprinkler/pkg/client" ) @@ -57,12 +58,23 @@ func (c *Coordinator) handleSprinklerEvent(ctx context.Context, event client.Eve // Try to claim this event atomically using persistent store (Datastore transaction). // This is the single source of truth for cross-instance deduplication. if err := c.stateStore.MarkProcessed(eventKey, 24*time.Hour); err != nil { - slog.Info("skipping duplicate event", - "organization", organization, - "type", event.Type, - "url", event.URL, - "event_key", eventKey, - "reason", "already_processed") + // Check if this is a race condition vs a database error + if errors.Is(err, state.ErrAlreadyProcessed) { + slog.Info("skipping duplicate event - claimed by this or another instance", + "organization", organization, + "type", event.Type, + "url", event.URL, + "event_key", eventKey, + "reason", "deduplication_race") + } else { + slog.Warn("failed to mark event as processed - database error", + "organization", organization, + "type", event.Type, + "url", event.URL, + "event_key", eventKey, + "error", err, + "impact", "will_skip_event") + } return } @@ -76,7 +88,11 @@ func (c *Coordinator) handleSprinklerEvent(ctx context.Context, event client.Eve // Process event asynchronously after deduplication checks pass // This allows the event handler to return immediately and accept the next event // Semaphore limits concurrent processing to prevent overwhelming APIs + // WaitGroup tracks in-flight events for graceful shutdown + c.processingEvents.Add(1) go func() { + defer c.processingEvents.Done() + // Acquire semaphore slot (blocks if 10 events already processing) c.eventSemaphore <- struct{}{} defer func() { <-c.eventSemaphore }() // Release slot when done @@ -166,6 +182,47 @@ func (c *Coordinator) handleSprinklerEvent(ctx context.Context, event client.Eve }() // Close the goroutine } +// waitForEventProcessing waits for all in-flight events to complete during shutdown. +// Returns immediately if no events are being processed. +func (c *Coordinator) waitForEventProcessing(organization string, maxWait time.Duration) { + // Check if any events are being processed + queueLen := len(c.eventSemaphore) + if queueLen == 0 { + slog.Info("no events in processing queue, shutdown can proceed immediately", + "organization", organization) + return + } + + slog.Warn("waiting for in-flight events to complete before shutdown", + "organization", organization, + "events_in_queue", queueLen, + "max_wait_seconds", maxWait.Seconds()) + + // Create a channel to signal when all events are done + done := make(chan struct{}) + go func() { + c.processingEvents.Wait() + close(done) + }() + + // Wait for events to complete or timeout + select { + case <-done: + slog.Info("all in-flight events completed successfully", + "organization", organization, + "graceful_shutdown", true) + case <-time.After(maxWait): + remaining := len(c.eventSemaphore) + slog.Warn("shutdown timeout reached, proceeding with remaining events in queue", + "organization", organization, + "events_still_processing", remaining, + "waited_seconds", maxWait.Seconds(), + "impact", "these events may be incomplete", + "recovery", "polling will catch them in next 5min cycle", + "graceful_shutdown", false) + } +} + // handleAuthError handles authentication errors by refreshing the token and recreating the client. func (c *Coordinator) handleAuthError( ctx context.Context, @@ -302,6 +359,8 @@ func (c *Coordinator) RunWithSprinklerClient(ctx context.Context) error { slog.Info("sprinkler client context cancelled, stopping gracefully", "organization", organization, "total_attempts", attempts) + // Wait for in-flight events (up to 8 seconds, leaving 2s for HTTP shutdown) + c.waitForEventProcessing(organization, 8*time.Second) return nil } @@ -311,6 +370,8 @@ func (c *Coordinator) RunWithSprinklerClient(ctx context.Context) error { slog.Info("context cancelled, stopping sprinkler client", "organization", organization, "context_error", ctxErr) + // Wait for in-flight events (up to 8 seconds) + c.waitForEventProcessing(organization, 8*time.Second) return ctxErr } @@ -347,6 +408,7 @@ func (c *Coordinator) RunWithSprinklerClient(ctx context.Context) error { "will_retry", true) select { case <-ctx.Done(): + c.waitForEventProcessing(organization, 8*time.Second) return ctx.Err() case <-time.After(retryDelay): continue @@ -371,6 +433,7 @@ func (c *Coordinator) RunWithSprinklerClient(ctx context.Context) error { select { case <-ctx.Done(): slog.Info("context cancelled during retry wait", "organization", organization) + c.waitForEventProcessing(organization, 8*time.Second) return ctx.Err() case <-time.After(retryDelay): // Exponential backoff capped at maxRetryDelay @@ -391,6 +454,7 @@ func (c *Coordinator) RunWithSprinklerClient(ctx context.Context) error { if ctxErr := ctx.Err(); ctxErr != nil { slog.Info("sprinkler client stopped cleanly due to context cancellation", "organization", organization) + c.waitForEventProcessing(organization, 8*time.Second) return ctxErr } @@ -407,6 +471,7 @@ func (c *Coordinator) RunWithSprinklerClient(ctx context.Context) error { // This might be network hiccup or server restart select { case <-ctx.Done(): + c.waitForEventProcessing(organization, 8*time.Second) return ctx.Err() case <-time.After(5 * time.Second): slog.Info("restarting after unexpected clean disconnect", diff --git a/internal/bot/polling.go b/internal/bot/polling.go index 7e924ca..6ca6e27 100644 --- a/internal/bot/polling.go +++ b/internal/bot/polling.go @@ -198,6 +198,9 @@ func (c *Coordinator) reconcilePR(ctx context.Context, pr *github.PRSnapshot) er slog.Debug("turnclient analysis complete", "pr", fmt.Sprintf("%s/%s#%d", pr.Owner, pr.Repo, pr.Number), + "pr_state", checkResult.PullRequest.State, + "pr_draft", checkResult.PullRequest.Draft, + "pr_merged", checkResult.PullRequest.Merged, "ready_to_merge", checkResult.Analysis.ReadyToMerge, "approved", checkResult.Analysis.Approved, "next_action_count", len(checkResult.Analysis.NextAction)) @@ -271,12 +274,15 @@ func (c *Coordinator) updateClosedPRThread(ctx context.Context, pr *github.PRSna continue } - info, ok := c.stateStore.GetThread(pr.Owner, pr.Repo, pr.Number, id) + info, ok := c.stateStore.Thread(pr.Owner, pr.Repo, pr.Number, id) if !ok { slog.Debug("no thread found for closed PR in channel", "pr", prKey, "channel", ch, - "channel_id", id) + "channel_id", id, + "pr_state", pr.State, + "pr_updated_at", pr.UpdatedAt, + "possible_reason", "PR closed before thread created or thread in different channel") continue } @@ -351,7 +357,9 @@ func (c *Coordinator) StartupReconciliation(ctx context.Context) { slog.Info("🔄 STARTUP RECONCILIATION STARTED", "org", org, "purpose", "catch up on missed notifications during downtime", - "window", "24h") + "window", "24h", + "scope", "open_prs_only", + "note", "closed PRs handled by polling cycle") // Get current GitHub token token := c.github.InstallationToken(ctx) @@ -400,7 +408,7 @@ func (c *Coordinator) StartupReconciliation(ctx context.Context) { } // Check notification state - lastNotified := c.stateStore.GetLastNotification(pr.URL) + lastNotified := c.stateStore.LastNotification(pr.URL) // Determine if we should notify var reason string diff --git a/internal/config/config.go b/internal/config/config.go index 6697a61..dddb4c6 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -307,24 +307,7 @@ func (m *Manager) LoadConfig(ctx context.Context, org string) error { var config RepoConfig if err := yaml.Unmarshal([]byte(configContent), &config); err != nil { - defaultConfig := &RepoConfig{ - Channels: make(map[string]struct { - ReminderDMDelay *int `yaml:"reminder_dm_delay"` // Optional: override global delay for this channel (0 = disabled) - Repos []string `yaml:"repos"` - Mute bool `yaml:"mute"` - }), - Global: struct { - TeamID string `yaml:"team_id"` - EmailDomain string `yaml:"email_domain"` - ReminderDMDelay int `yaml:"reminder_dm_delay"` - DailyReminders bool `yaml:"daily_reminders"` - }{ - TeamID: "", - EmailDomain: "", - ReminderDMDelay: defaultReminderDMDelayMinutes, - DailyReminders: true, - }, - } + defaultConfig := createDefaultConfig() m.configs[org] = defaultConfig m.cache.set(org, defaultConfig) @@ -356,26 +339,23 @@ func (m *Manager) LoadConfig(ctx context.Context, org string) error { mutedChannels++ } totalRepos += len(channelConfig.Repos) + + hasWildcard := false for _, repo := range channelConfig.Repos { if repo == "*" { wildcardChannels++ + hasWildcard = true break } } + slog.Debug("channel configuration loaded", logFieldOrg, org, "channel", channelName, "repos_count", len(channelConfig.Repos), "repos", channelConfig.Repos, "muted", channelConfig.Mute, - "has_wildcard", func() bool { - for _, r := range channelConfig.Repos { - if r == "*" { - return true - } - } - return false - }()) + "has_wildcard", hasWildcard) } m.configs[org] = &config diff --git a/internal/github/graphql.go b/internal/github/graphql.go index 2eeec7d..a2dd4b6 100644 --- a/internal/github/graphql.go +++ b/internal/github/graphql.go @@ -64,10 +64,6 @@ func (c *GraphQLClient) ListOpenPRs(ctx context.Context, org string, updatedSinc // ListClosedPRs queries all closed/merged PRs for an organization updated in the last N hours. // This is used to update Slack threads when PRs are closed or merged. func (c *GraphQLClient) ListClosedPRs(ctx context.Context, org string, updatedSinceHours int) ([]PRSnapshot, error) { - slog.Debug("querying closed/merged PRs via GraphQL", - "org", org, - "updated_since_hours", updatedSinceHours) - since := time.Now().Add(-time.Duration(updatedSinceHours) * time.Hour) // GraphQL query structure @@ -101,9 +97,10 @@ func (c *GraphQLClient) ListClosedPRs(ctx context.Context, org string, updatedSi } `graphql:"search(query: $searchQuery, type: ISSUE, first: 100, after: $cursor)"` } - // Build search query: "is:pr is:closed org:X updated:>YYYY-MM-DD" - // This will include both closed-unmerged and merged PRs - searchQuery := fmt.Sprintf("is:pr is:closed org:%s updated:>%s", + // Build search query: "is:pr is:closed org:X updated:>=YYYY-MM-DD" + // Use >= instead of > to include PRs closed/merged on the since date + // Note: GitHub search uses date-only granularity, so we need >= to catch PRs from today + searchQuery := fmt.Sprintf("is:pr is:closed org:%s updated:>=%s", org, since.Format("2006-01-02")) @@ -135,6 +132,16 @@ func (c *GraphQLClient) ListClosedPRs(ctx context.Context, org string, updatedSi for i := range query.Search.Nodes { pr := query.Search.Nodes[i].PullRequest + // Filter by UpdatedAt since GitHub search only has date granularity + if pr.UpdatedAt.Before(since) { + slog.Debug("filtered out closed PR - updated before window", + "pr", fmt.Sprintf("%s/%s#%d", pr.Repository.Owner.Login, pr.Repository.Name, pr.Number), + "pr_updated_at", pr.UpdatedAt, + "window_start", since, + "reason", "outside_time_window") + continue + } + // Determine state: MERGED takes precedence over CLOSED state := "CLOSED" if pr.Merged { @@ -167,7 +174,8 @@ func (c *GraphQLClient) ListClosedPRs(ctx context.Context, org string, updatedSi "org", org, "total_prs", len(allPRs), "pages_fetched", pageCount, - "query", searchQuery) + "query", searchQuery, + "time_window_start", since.Format(time.RFC3339)) return allPRs, nil } diff --git a/internal/notify/daily.go b/internal/notify/daily.go index ca30d7e..9c2e66c 100644 --- a/internal/notify/daily.go +++ b/internal/notify/daily.go @@ -16,9 +16,9 @@ type ConfigProvider interface { // StateProvider provides state storage for daily digests. type StateProvider interface { - GetLastDigest(userID, date string) (time.Time, bool) + LastDigest(userID, date string) (time.Time, bool) RecordDigest(userID, date string, sentAt time.Time) error - GetLastDM(userID, prURL string) (time.Time, bool) + LastDM(userID, prURL string) (time.Time, bool) } // DailyDigestScheduler handles sending daily digest DMs to users blocking PRs. diff --git a/internal/state/datastore.go b/internal/state/datastore.go index 0a3ec80..43ca675 100644 --- a/internal/state/datastore.go +++ b/internal/state/datastore.go @@ -126,12 +126,12 @@ func NewDatastoreStore(ctx context.Context, projectID, databaseID string) (*Data }, nil } -// GetThread retrieves thread info with Datastore-first, JSON fallback. -func (s *DatastoreStore) GetThread(owner, repo string, number int, channelID string) (ThreadInfo, bool) { +// Thread retrieves thread info with Datastore-first, JSON fallback. +func (s *DatastoreStore) Thread(owner, repo string, number int, channelID string) (ThreadInfo, bool) { key := threadKey(owner, repo, number, channelID) // Fast path: Check JSON cache first - info, exists := s.json.GetThread(owner, repo, number, channelID) + info, exists := s.json.Thread(owner, repo, number, channelID) if exists { return info, true } @@ -215,10 +215,10 @@ func (s *DatastoreStore) SaveThread(owner, repo string, number int, channelID st return nil } -// GetLastDM retrieves last DM time with Datastore-first, JSON fallback. -func (s *DatastoreStore) GetLastDM(userID, prURL string) (time.Time, bool) { +// LastDM retrieves last DM time with Datastore-first, JSON fallback. +func (s *DatastoreStore) LastDM(userID, prURL string) (time.Time, bool) { // Check JSON first (fast) - t, exists := s.json.GetLastDM(userID, prURL) + t, exists := s.json.LastDM(userID, prURL) if exists { return t, true } @@ -286,10 +286,10 @@ func (s *DatastoreStore) RecordDM(userID, prURL string, sentAt time.Time) error return nil } -// GetLastDigest retrieves last digest time. -func (s *DatastoreStore) GetLastDigest(userID, date string) (time.Time, bool) { +// LastDigest retrieves last digest time. +func (s *DatastoreStore) LastDigest(userID, date string) (time.Time, bool) { // Check JSON first - t, exists := s.json.GetLastDigest(userID, date) + t, exists := s.json.LastDigest(userID, date) if exists { return t, true } @@ -447,8 +447,8 @@ func (s *DatastoreStore) MarkProcessed(eventKey string, ttl time.Duration) error return err } -// GetLastNotification retrieves when a PR was last notified about. -func (s *DatastoreStore) GetLastNotification(prURL string) time.Time { +// LastNotification retrieves when a PR was last notified about. +func (s *DatastoreStore) LastNotification(prURL string) time.Time { // Datastore disabled if s.disabled || s.ds == nil { return time.Time{} diff --git a/internal/state/json.go b/internal/state/json.go index 8a60fe4..7daa773 100644 --- a/internal/state/json.go +++ b/internal/state/json.go @@ -103,8 +103,8 @@ func digestKey(userID, date string) string { return fmt.Sprintf("digest:%s:%s", userID, date) } -// GetThread retrieves thread information for a PR. -func (s *JSONStore) GetThread(owner, repo string, number int, channelID string) (ThreadInfo, bool) { +// Thread retrieves thread information for a PR. +func (s *JSONStore) Thread(owner, repo string, number int, channelID string) (ThreadInfo, bool) { s.mu.RLock() defer s.mu.RUnlock() key := threadKey(owner, repo, number, channelID) @@ -123,8 +123,8 @@ func (s *JSONStore) SaveThread(owner, repo string, number int, channelID string, return s.save() } -// GetLastDM retrieves the last DM timestamp for a user and PR. -func (s *JSONStore) GetLastDM(userID, prURL string) (time.Time, bool) { +// LastDM retrieves the last DM timestamp for a user and PR. +func (s *JSONStore) LastDM(userID, prURL string) (time.Time, bool) { s.mu.RLock() defer s.mu.RUnlock() key := dmKey(userID, prURL) @@ -142,8 +142,8 @@ func (s *JSONStore) RecordDM(userID, prURL string, sentAt time.Time) error { return s.save() } -// GetLastDigest retrieves the last digest timestamp for a user and date. -func (s *JSONStore) GetLastDigest(userID, date string) (time.Time, bool) { +// LastDigest retrieves the last digest timestamp for a user and date. +func (s *JSONStore) LastDigest(userID, date string) (time.Time, bool) { s.mu.RLock() defer s.mu.RUnlock() key := digestKey(userID, date) @@ -180,8 +180,8 @@ func (s *JSONStore) MarkProcessed(eventKey string, _ time.Duration) error { return s.save() } -// GetLastNotification retrieves the last notification timestamp for a PR. -func (s *JSONStore) GetLastNotification(prURL string) time.Time { +// LastNotification retrieves the last notification timestamp for a PR. +func (s *JSONStore) LastNotification(prURL string) time.Time { s.mu.RLock() defer s.mu.RUnlock() return s.notifications[prURL] diff --git a/internal/state/store.go b/internal/state/store.go index 2f7772e..a67d75a 100644 --- a/internal/state/store.go +++ b/internal/state/store.go @@ -19,15 +19,15 @@ type ThreadInfo struct { // Implementations must be safe for concurrent use. type Store interface { // Thread operations - map PR to Slack thread - GetThread(owner, repo string, number int, channelID string) (ThreadInfo, bool) + Thread(owner, repo string, number int, channelID string) (ThreadInfo, bool) SaveThread(owner, repo string, number int, channelID string, info ThreadInfo) error // DM tracking - prevent duplicate notifications - GetLastDM(userID, prURL string) (time.Time, bool) + LastDM(userID, prURL string) (time.Time, bool) RecordDM(userID, prURL string, sentAt time.Time) error // Daily digest tracking - one per user per day - GetLastDigest(userID, date string) (time.Time, bool) + LastDigest(userID, date string) (time.Time, bool) RecordDigest(userID, date string, sentAt time.Time) error // Event deduplication - prevent processing same event twice @@ -35,7 +35,7 @@ type Store interface { MarkProcessed(eventKey string, ttl time.Duration) error // Notification tracking - track when we last notified about a PR - GetLastNotification(prURL string) time.Time + LastNotification(prURL string) time.Time RecordNotification(prURL string, notifiedAt time.Time) error // Cleanup old data diff --git a/pkg/home/fetcher.go b/pkg/home/fetcher.go index 90a61de..06e89d5 100644 --- a/pkg/home/fetcher.go +++ b/pkg/home/fetcher.go @@ -195,7 +195,7 @@ func (f *Fetcher) searchPRs(ctx context.Context, query string) ([]PR, error) { continue // Skip malformed repo } owner, repoName := repoParts[0], repoParts[1] - if threadInfo, exists := f.stateStore.GetThread(owner, repoName, pr.Number, ""); exists { + if threadInfo, exists := f.stateStore.Thread(owner, repoName, pr.Number, ""); exists { pr.LastEventTime = threadInfo.LastEventTime }