Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,15 @@ func run(ctx context.Context, cancel context.CancelFunc, cfg *config.ServerConfi

// Initialize state store (Datastore + JSON fallback).
var stateStore interface {
GetThread(owner, repo string, number int, channelID string) (state.ThreadInfo, bool)
Thread(owner, repo string, number int, channelID string) (state.ThreadInfo, bool)
SaveThread(owner, repo string, number int, channelID string, info state.ThreadInfo) error
GetLastDM(userID, prURL string) (time.Time, bool)
LastDM(userID, prURL string) (time.Time, bool)
RecordDM(userID, prURL string, sentAt time.Time) error
GetLastDigest(userID, date string) (time.Time, bool)
LastDigest(userID, date string) (time.Time, bool)
RecordDigest(userID, date string, sentAt time.Time) error
WasProcessed(eventKey string) bool
MarkProcessed(eventKey string, ttl time.Duration) error
GetLastNotification(prURL string) time.Time
LastNotification(prURL string) time.Time
RecordNotification(prURL string, notifiedAt time.Time) error
Cleanup() error
Close() error
Expand Down Expand Up @@ -668,15 +668,15 @@ func runBotCoordinators(
configManager *config.Manager,
notifier *notify.Manager,
stateStore interface {
GetThread(owner, repo string, number int, channelID string) (state.ThreadInfo, bool)
Thread(owner, repo string, number int, channelID string) (state.ThreadInfo, bool)
SaveThread(owner, repo string, number int, channelID string, info state.ThreadInfo) error
GetLastDM(userID, prURL string) (time.Time, bool)
LastDM(userID, prURL string) (time.Time, bool)
RecordDM(userID, prURL string, sentAt time.Time) error
GetLastDigest(userID, date string) (time.Time, bool)
LastDigest(userID, date string) (time.Time, bool)
RecordDigest(userID, date string, sentAt time.Time) error
WasProcessed(eventKey string) bool
MarkProcessed(eventKey string, ttl time.Duration) error
GetLastNotification(prURL string) time.Time
LastNotification(prURL string) time.Time
RecordNotification(prURL string, notifiedAt time.Time) error
Cleanup() error
Close() error
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/codeGROOVE-dev/gh-mailto v0.0.0-20251019162917-c3412c017b1f
github.com/codeGROOVE-dev/gsm v0.0.0-20251019065141-833fe2363d22
github.com/codeGROOVE-dev/retry v1.2.0
github.com/codeGROOVE-dev/sprinkler v0.0.0-20251020064313-f606185b6b98
github.com/codeGROOVE-dev/sprinkler v0.0.0-20251020140418-efb533e2ff51
github.com/codeGROOVE-dev/turnclient v0.0.0-20251018202306-7cdc0d51856e
github.com/golang-jwt/jwt/v5 v5.3.0
github.com/google/go-github/v50 v50.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ github.com/codeGROOVE-dev/prx v0.0.0-20251016165946-00c6c6e90c29 h1:MSBy3Ywr3ky/
github.com/codeGROOVE-dev/prx v0.0.0-20251016165946-00c6c6e90c29/go.mod h1:7qLbi18baOyS8yO/6/64SBIqtyzSzLFdsDST15NPH3w=
github.com/codeGROOVE-dev/retry v1.2.0 h1:xYpYPX2PQZmdHwuiQAGGzsBm392xIMl4nfMEFApQnu8=
github.com/codeGROOVE-dev/retry v1.2.0/go.mod h1:8OgefgV1XP7lzX2PdKlCXILsYKuz6b4ZpHa/20iLi8E=
github.com/codeGROOVE-dev/sprinkler v0.0.0-20251020064313-f606185b6b98 h1:unjiIF1rx/QZfcTEW/n6EJjde1yd3b1ZbjrWee2Afj4=
github.com/codeGROOVE-dev/sprinkler v0.0.0-20251020064313-f606185b6b98/go.mod h1:/kd3ncsRNldD0MUpbtp5ojIzfCkyeXB7JdOrpuqG7Gg=
github.com/codeGROOVE-dev/sprinkler v0.0.0-20251020140418-efb533e2ff51 h1:oPVbUoZ1jxgmrqybgRCfhwdT8KaXE/hzQ4vAswRybt0=
github.com/codeGROOVE-dev/sprinkler v0.0.0-20251020140418-efb533e2ff51/go.mod h1:/kd3ncsRNldD0MUpbtp5ojIzfCkyeXB7JdOrpuqG7Gg=
github.com/codeGROOVE-dev/turnclient v0.0.0-20251018202306-7cdc0d51856e h1:3qoY6h8SgoeNsIYRM7P6PegTXAHPo8OSOapUunVP/Gs=
github.com/codeGROOVE-dev/turnclient v0.0.0-20251018202306-7cdc0d51856e/go.mod h1:fYwtN9Ql6lY8t2WvCfENx+mP5FUwjlqwXCLx9CVLY20=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand Down
30 changes: 17 additions & 13 deletions internal/bot/bot.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,27 +83,28 @@ func (tc *ThreadCache) Cleanup(maxAge time.Duration) {

// Coordinator coordinates between GitHub, Slack, and notifications for a single org.
type Coordinator struct {
slack *slackpkg.Client
github *github.Client
configManager *config.Manager
notifier *notify.Manager
userMapper *usermapping.Service
sprinklerURL string
threadCache *ThreadCache // In-memory cache for fast lookups
stateStore StateStore // Persistent state across restarts
workspaceName string // Track workspace name for better logging
eventSemaphore chan struct{} // Limits concurrent event processing (prevents overwhelming APIs)
slack *slackpkg.Client
github *github.Client
configManager *config.Manager
notifier *notify.Manager
userMapper *usermapping.Service
sprinklerURL string
threadCache *ThreadCache // In-memory cache for fast lookups
stateStore StateStore // Persistent state across restarts
workspaceName string // Track workspace name for better logging
eventSemaphore chan struct{} // Limits concurrent event processing (prevents overwhelming APIs)
processingEvents sync.WaitGroup // Tracks in-flight event processing for graceful shutdown
}

// StateStore interface for persistent state - allows dependency injection for testing.
type StateStore interface {
GetThread(owner, repo string, number int, channelID string) (ThreadInfo, bool)
Thread(owner, repo string, number int, channelID string) (ThreadInfo, bool)
SaveThread(owner, repo string, number int, channelID string, info ThreadInfo) error
GetLastDM(userID, prURL string) (time.Time, bool)
LastDM(userID, prURL string) (time.Time, bool)
RecordDM(userID, prURL string, sentAt time.Time) error
WasProcessed(eventKey string) bool
MarkProcessed(eventKey string, ttl time.Duration) error
GetLastNotification(prURL string) time.Time
LastNotification(prURL string) time.Time
RecordNotification(prURL string, notifiedAt time.Time) error
Close() error
}
Expand Down Expand Up @@ -1195,6 +1196,9 @@ func (c *Coordinator) handlePullRequestFromSprinkler(
logFieldOwner, owner,
logFieldRepo, repo,
"pr_number", prNumber,
"pr_state", checkResult.PullRequest.State,
"pr_draft", checkResult.PullRequest.Draft,
"pr_merged", checkResult.PullRequest.Merged,
"pr_size", checkResult.Analysis.Size,
"unresolved_comments", checkResult.Analysis.UnresolvedComments,
"checks_state", fmt.Sprintf("%+v", checkResult.Analysis.Checks),
Expand Down
77 changes: 71 additions & 6 deletions internal/bot/bot_sprinkler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"
"time"

"github.com/codeGROOVE-dev/slacker/internal/state"
"github.com/codeGROOVE-dev/sprinkler/pkg/client"
)

Expand Down Expand Up @@ -57,12 +58,23 @@ func (c *Coordinator) handleSprinklerEvent(ctx context.Context, event client.Eve
// Try to claim this event atomically using persistent store (Datastore transaction).
// This is the single source of truth for cross-instance deduplication.
if err := c.stateStore.MarkProcessed(eventKey, 24*time.Hour); err != nil {
slog.Info("skipping duplicate event",
"organization", organization,
"type", event.Type,
"url", event.URL,
"event_key", eventKey,
"reason", "already_processed")
// Check if this is a race condition vs a database error
if errors.Is(err, state.ErrAlreadyProcessed) {
slog.Info("skipping duplicate event - claimed by this or another instance",
"organization", organization,
"type", event.Type,
"url", event.URL,
"event_key", eventKey,
"reason", "deduplication_race")
} else {
slog.Warn("failed to mark event as processed - database error",
"organization", organization,
"type", event.Type,
"url", event.URL,
"event_key", eventKey,
"error", err,
"impact", "will_skip_event")
}
return
}

Expand All @@ -76,7 +88,11 @@ func (c *Coordinator) handleSprinklerEvent(ctx context.Context, event client.Eve
// Process event asynchronously after deduplication checks pass
// This allows the event handler to return immediately and accept the next event
// Semaphore limits concurrent processing to prevent overwhelming APIs
// WaitGroup tracks in-flight events for graceful shutdown
c.processingEvents.Add(1)
go func() {
defer c.processingEvents.Done()

// Acquire semaphore slot (blocks if 10 events already processing)
c.eventSemaphore <- struct{}{}
defer func() { <-c.eventSemaphore }() // Release slot when done
Expand Down Expand Up @@ -166,6 +182,47 @@ func (c *Coordinator) handleSprinklerEvent(ctx context.Context, event client.Eve
}() // Close the goroutine
}

// waitForEventProcessing waits for all in-flight events to complete during shutdown.
// Returns immediately if no events are being processed.
func (c *Coordinator) waitForEventProcessing(organization string, maxWait time.Duration) {
// Check if any events are being processed
queueLen := len(c.eventSemaphore)
if queueLen == 0 {
slog.Info("no events in processing queue, shutdown can proceed immediately",
"organization", organization)
return
}

slog.Warn("waiting for in-flight events to complete before shutdown",
"organization", organization,
"events_in_queue", queueLen,
"max_wait_seconds", maxWait.Seconds())

// Create a channel to signal when all events are done
done := make(chan struct{})
go func() {
c.processingEvents.Wait()
close(done)
}()

// Wait for events to complete or timeout
select {
case <-done:
slog.Info("all in-flight events completed successfully",
"organization", organization,
"graceful_shutdown", true)
case <-time.After(maxWait):
remaining := len(c.eventSemaphore)
slog.Warn("shutdown timeout reached, proceeding with remaining events in queue",
"organization", organization,
"events_still_processing", remaining,
"waited_seconds", maxWait.Seconds(),
"impact", "these events may be incomplete",
"recovery", "polling will catch them in next 5min cycle",
"graceful_shutdown", false)
}
}

// handleAuthError handles authentication errors by refreshing the token and recreating the client.
func (c *Coordinator) handleAuthError(
ctx context.Context,
Expand Down Expand Up @@ -302,6 +359,8 @@ func (c *Coordinator) RunWithSprinklerClient(ctx context.Context) error {
slog.Info("sprinkler client context cancelled, stopping gracefully",
"organization", organization,
"total_attempts", attempts)
// Wait for in-flight events (up to 8 seconds, leaving 2s for HTTP shutdown)
c.waitForEventProcessing(organization, 8*time.Second)
return nil
}

Expand All @@ -311,6 +370,8 @@ func (c *Coordinator) RunWithSprinklerClient(ctx context.Context) error {
slog.Info("context cancelled, stopping sprinkler client",
"organization", organization,
"context_error", ctxErr)
// Wait for in-flight events (up to 8 seconds)
c.waitForEventProcessing(organization, 8*time.Second)
return ctxErr
}

Expand Down Expand Up @@ -347,6 +408,7 @@ func (c *Coordinator) RunWithSprinklerClient(ctx context.Context) error {
"will_retry", true)
select {
case <-ctx.Done():
c.waitForEventProcessing(organization, 8*time.Second)
return ctx.Err()
case <-time.After(retryDelay):
continue
Expand All @@ -371,6 +433,7 @@ func (c *Coordinator) RunWithSprinklerClient(ctx context.Context) error {
select {
case <-ctx.Done():
slog.Info("context cancelled during retry wait", "organization", organization)
c.waitForEventProcessing(organization, 8*time.Second)
return ctx.Err()
case <-time.After(retryDelay):
// Exponential backoff capped at maxRetryDelay
Expand All @@ -391,6 +454,7 @@ func (c *Coordinator) RunWithSprinklerClient(ctx context.Context) error {
if ctxErr := ctx.Err(); ctxErr != nil {
slog.Info("sprinkler client stopped cleanly due to context cancellation",
"organization", organization)
c.waitForEventProcessing(organization, 8*time.Second)
return ctxErr
}

Expand All @@ -407,6 +471,7 @@ func (c *Coordinator) RunWithSprinklerClient(ctx context.Context) error {
// This might be network hiccup or server restart
select {
case <-ctx.Done():
c.waitForEventProcessing(organization, 8*time.Second)
return ctx.Err()
case <-time.After(5 * time.Second):
slog.Info("restarting after unexpected clean disconnect",
Expand Down
16 changes: 12 additions & 4 deletions internal/bot/polling.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ func (c *Coordinator) reconcilePR(ctx context.Context, pr *github.PRSnapshot) er

slog.Debug("turnclient analysis complete",
"pr", fmt.Sprintf("%s/%s#%d", pr.Owner, pr.Repo, pr.Number),
"pr_state", checkResult.PullRequest.State,
"pr_draft", checkResult.PullRequest.Draft,
"pr_merged", checkResult.PullRequest.Merged,
"ready_to_merge", checkResult.Analysis.ReadyToMerge,
"approved", checkResult.Analysis.Approved,
"next_action_count", len(checkResult.Analysis.NextAction))
Expand Down Expand Up @@ -271,12 +274,15 @@ func (c *Coordinator) updateClosedPRThread(ctx context.Context, pr *github.PRSna
continue
}

info, ok := c.stateStore.GetThread(pr.Owner, pr.Repo, pr.Number, id)
info, ok := c.stateStore.Thread(pr.Owner, pr.Repo, pr.Number, id)
if !ok {
slog.Debug("no thread found for closed PR in channel",
"pr", prKey,
"channel", ch,
"channel_id", id)
"channel_id", id,
"pr_state", pr.State,
"pr_updated_at", pr.UpdatedAt,
"possible_reason", "PR closed before thread created or thread in different channel")
continue
}

Expand Down Expand Up @@ -351,7 +357,9 @@ func (c *Coordinator) StartupReconciliation(ctx context.Context) {
slog.Info("🔄 STARTUP RECONCILIATION STARTED",
"org", org,
"purpose", "catch up on missed notifications during downtime",
"window", "24h")
"window", "24h",
"scope", "open_prs_only",
"note", "closed PRs handled by polling cycle")

// Get current GitHub token
token := c.github.InstallationToken(ctx)
Expand Down Expand Up @@ -400,7 +408,7 @@ func (c *Coordinator) StartupReconciliation(ctx context.Context) {
}

// Check notification state
lastNotified := c.stateStore.GetLastNotification(pr.URL)
lastNotified := c.stateStore.LastNotification(pr.URL)

// Determine if we should notify
var reason string
Expand Down
32 changes: 6 additions & 26 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,24 +307,7 @@ func (m *Manager) LoadConfig(ctx context.Context, org string) error {

var config RepoConfig
if err := yaml.Unmarshal([]byte(configContent), &config); err != nil {
defaultConfig := &RepoConfig{
Channels: make(map[string]struct {
ReminderDMDelay *int `yaml:"reminder_dm_delay"` // Optional: override global delay for this channel (0 = disabled)
Repos []string `yaml:"repos"`
Mute bool `yaml:"mute"`
}),
Global: struct {
TeamID string `yaml:"team_id"`
EmailDomain string `yaml:"email_domain"`
ReminderDMDelay int `yaml:"reminder_dm_delay"`
DailyReminders bool `yaml:"daily_reminders"`
}{
TeamID: "",
EmailDomain: "",
ReminderDMDelay: defaultReminderDMDelayMinutes,
DailyReminders: true,
},
}
defaultConfig := createDefaultConfig()
m.configs[org] = defaultConfig
m.cache.set(org, defaultConfig)

Expand Down Expand Up @@ -356,26 +339,23 @@ func (m *Manager) LoadConfig(ctx context.Context, org string) error {
mutedChannels++
}
totalRepos += len(channelConfig.Repos)

hasWildcard := false
for _, repo := range channelConfig.Repos {
if repo == "*" {
wildcardChannels++
hasWildcard = true
break
}
}

slog.Debug("channel configuration loaded",
logFieldOrg, org,
"channel", channelName,
"repos_count", len(channelConfig.Repos),
"repos", channelConfig.Repos,
"muted", channelConfig.Mute,
"has_wildcard", func() bool {
for _, r := range channelConfig.Repos {
if r == "*" {
return true
}
}
return false
}())
"has_wildcard", hasWildcard)
}

m.configs[org] = &config
Expand Down
Loading