Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 6 additions & 21 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ func run(ctx context.Context, cancel context.CancelFunc, cfg *config.ServerConfi
homeHandler := slack.NewHomeHandler(slackManager, githubManager, configManager, stateStore, reverseMapping)
slackManager.SetHomeViewHandler(homeHandler.HandleAppHomeOpened)

// Initialize report handler for /r2r report slash command
reportHandler := slack.NewReportHandler(slackManager, githubManager, stateStore, reverseMapping)
slackManager.SetReportHandler(reportHandler.HandleReportCommand)

// Initialize OAuth handler for Slack app installation.
// These credentials are needed for the OAuth flow.
slackClientID := os.Getenv("SLACK_CLIENT_ID")
Expand Down Expand Up @@ -692,10 +696,6 @@ func runBotCoordinators(
lastHealthCheck: time.Now(),
}

// Initialize daily digest scheduler
//nolint:revive // line length acceptable for initialization
dailyDigest := notify.NewDailyDigestScheduler(notifier, github.WrapManager(githubManager), configManager, stateStore, notify.WrapSlackManager(slackManager))

// Start initial coordinators
cm.startCoordinators(ctx)

Expand All @@ -712,19 +712,10 @@ func runBotCoordinators(
defer healthCheckTicker.Stop()

// Poll for PRs every 5 minutes (safety net for missed sprinkler events)
// Daily reports are checked during each poll cycle (6am-11:30am window)
pollTicker := time.NewTicker(5 * time.Minute)
defer pollTicker.Stop()

// Check for daily digest candidates every hour
dailyDigestTicker := time.NewTicker(1 * time.Hour)
defer dailyDigestTicker.Stop()

// Run daily digest check immediately on startup
// (in case server starts during someone's 8-9am window)
go func() {
dailyDigest.CheckAndSend(ctx)
}()

// Setup state cleanup ticker (hourly)
cleanupTicker := time.NewTicker(1 * time.Hour)
defer cleanupTicker.Stop()
Expand Down Expand Up @@ -754,15 +745,9 @@ func runBotCoordinators(
cm.handleRefreshInstallations(ctx)

case <-pollTicker.C:
// Poll all active coordinators
// Poll all active coordinators (includes daily report checks)
cm.handlePolling(ctx)

case <-dailyDigestTicker.C:
// Check for daily digest candidates across all orgs
go func() {
dailyDigest.CheckAndSend(ctx)
}()

case <-cleanupTicker.C:
// Periodic cleanup of old state data
//nolint:contextcheck // Background cleanup should complete even during shutdown
Expand Down
7 changes: 6 additions & 1 deletion pkg/bot/bot.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type Coordinator struct {

// StateStore interface for persistent state - allows dependency injection for testing.
//
//nolint:interfacebloat // 15 methods needed for complete state management
//nolint:interfacebloat // 20 methods needed for complete state management
type StateStore interface {
Thread(ctx context.Context, owner, repo string, number int, channelID string) (cache.ThreadInfo, bool)
SaveThread(ctx context.Context, owner, repo string, number int, channelID string, info cache.ThreadInfo) error
Expand All @@ -85,13 +85,18 @@ type StateStore interface {
DMMessage(ctx context.Context, userID, prURL string) (state.DMInfo, bool)
SaveDMMessage(ctx context.Context, userID, prURL string, info state.DMInfo) error
ListDMUsers(ctx context.Context, prURL string) []string
LastDigest(ctx context.Context, userID, date string) (time.Time, bool)
RecordDigest(ctx context.Context, userID, date string, sentAt time.Time) error
QueuePendingDM(ctx context.Context, dm *state.PendingDM) error
PendingDMs(ctx context.Context, before time.Time) ([]state.PendingDM, error)
RemovePendingDM(ctx context.Context, id string) error
WasProcessed(ctx context.Context, eventKey string) bool
MarkProcessed(ctx context.Context, eventKey string, ttl time.Duration) error
LastNotification(ctx context.Context, prURL string) time.Time
RecordNotification(ctx context.Context, prURL string, notifiedAt time.Time) error
LastReportSent(ctx context.Context, userID string) (time.Time, bool)
RecordReportSent(ctx context.Context, userID string, sentAt time.Time) error
Cleanup(ctx context.Context) error
Close() error
}

Expand Down
55 changes: 55 additions & 0 deletions pkg/bot/coordinator_test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,39 @@ func (m *mockStateStore) RemovePendingDM(ctx context.Context, id string) error {
return nil
}

func (m *mockStateStore) LastDigest(_ context.Context, _ /* userID */, _ /* date */ string) (time.Time, bool) {
m.mu.Lock()
defer m.mu.Unlock()
// Simple mock - always return false
return time.Time{}, false
}

func (m *mockStateStore) RecordDigest(_ context.Context, _ /* userID */, _ /* date */ string, _ /* sentAt */ time.Time) error {
m.mu.Lock()
defer m.mu.Unlock()
// Simple mock - no-op
return nil
}

func (m *mockStateStore) LastReportSent(_ context.Context, _ /* userID */ string) (time.Time, bool) {
m.mu.Lock()
defer m.mu.Unlock()
// Simple mock - always return false (no reports sent)
return time.Time{}, false
}

func (m *mockStateStore) RecordReportSent(_ context.Context, _ /* userID */ string, _ /* sentAt */ time.Time) error {
m.mu.Lock()
defer m.mu.Unlock()
// Simple mock - no-op
return nil
}

func (*mockStateStore) Cleanup(_ context.Context) error {
// Simple mock - no-op
return nil
}

func (*mockStateStore) Close() error {
return nil
}
Expand Down Expand Up @@ -377,6 +410,28 @@ func (m *mockSlackClient) SendDirectMessage(ctx context.Context, userID, text st
return "D" + userID, "1234567890.123456", nil
}

// SendDirectMessageWithBlocks sends a Block Kit DM to a user.
func (*mockSlackClient) SendDirectMessageWithBlocks(
_ context.Context,
userID string,
_ []slack.Block,
) (dmChannelID, messageTS string, err error) {
// Simple mock - just return success
return "D" + userID, "1234567890.123456", nil
}

// IsUserActive checks if a user is currently active.
func (*mockSlackClient) IsUserActive(_ context.Context, _ /* userID */ string) bool {
// Simple mock - always return true (active)
return true
}

// UserTimezone returns the user's IANA timezone.
func (*mockSlackClient) UserTimezone(_ context.Context, _ /* userID */ string) (string, error) {
// Simple mock - return America/New_York
return "America/New_York", nil
}

// IsUserInChannel checks if a user is in a channel.
func (m *mockSlackClient) IsUserInChannel(ctx context.Context, channelID, userID string) bool {
if m.isUserInChannelFunc != nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/bot/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@ import (
// SlackClient defines Slack operations needed by bot.
// Small interface - only methods we actually call.
//
//nolint:interfacebloat // 13 methods needed for Slack integration
//nolint:interfacebloat // 16 methods needed for Slack integration
type SlackClient interface {
PostThread(ctx context.Context, channelID, text string, attachments []slack.Attachment) (string, error)
UpdateMessage(ctx context.Context, channelID, timestamp, text string) error
UpdateDMMessage(ctx context.Context, userID, prURL, text string) error
SendDirectMessage(ctx context.Context, userID, text string) (dmChannelID, messageTS string, err error)
SendDirectMessageWithBlocks(ctx context.Context, userID string, blocks []slack.Block) (dmChannelID, messageTS string, err error)
IsUserInChannel(ctx context.Context, channelID, userID string) bool
IsUserActive(ctx context.Context, userID string) bool
UserTimezone(ctx context.Context, userID string) (string, error)
FindDMMessagesInHistory(ctx context.Context, userID, prURL string, since time.Time) ([]slackapi.DMLocation, error)
ChannelHistory(ctx context.Context, channelID string, oldest, latest string, limit int) (*slack.GetConversationHistoryResponse, error)
ResolveChannelID(ctx context.Context, channelName string) string
Expand Down
158 changes: 158 additions & 0 deletions pkg/bot/polling.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (
"os"
"time"

"github.com/codeGROOVE-dev/slacker/pkg/dailyreport"
"github.com/codeGROOVE-dev/slacker/pkg/github"
"github.com/codeGROOVE-dev/slacker/pkg/home"
"github.com/codeGROOVE-dev/turnclient/pkg/turn"
gogithub "github.com/google/go-github/v50/github"
)

// makePollEventKey creates an event key for poll-based PR processing.
Expand Down Expand Up @@ -192,6 +195,9 @@ func (c *Coordinator) pollAndReconcileWithSearcher(ctx context.Context, searcher
"processed", successCount,
"errors", errorCount,
"next_poll", "5m")

// Check daily reports for users involved in these PRs
c.checkDailyReports(ctx, org, prs)
}

// reconcilePR checks a single PR and sends notifications if needed.
Expand Down Expand Up @@ -439,3 +445,155 @@ func (c *Coordinator) StartupReconciliation(ctx context.Context) {
"skipped", skippedCount,
"errors", errorCount)
}

// extractUniqueGitHubUsers extracts all unique GitHub usernames involved in PRs.
// Currently extracts PR authors. The FetchDashboard call will determine which users
// have incoming PRs to review (as reviewers/assignees).
func extractUniqueGitHubUsers(prs []github.PRSnapshot) map[string]bool {
users := make(map[string]bool)

for i := range prs {
pr := &prs[i]

// Add author
if pr.Author != "" {
users[pr.Author] = true
}
}

return users
}

// checkDailyReports checks if users involved in PRs should receive daily reports.
// Efficiently extracts unique GitHub users from polled PRs instead of iterating all workspace users.
// Reports are sent between 6am-11:30am user local time, with 23+ hour intervals.
func (c *Coordinator) checkDailyReports(ctx context.Context, org string, prs []github.PRSnapshot) {
// Check if daily reports are enabled for this org
cfg, exists := c.configManager.Config(org)
if !exists {
slog.Debug("skipping daily reports - no config found", "org", org)
return
}

if cfg.Global.DisableDailyReport {
slog.Debug("daily reports disabled for org", "org", org)
return
}

// Get domain for user mapping
domain := cfg.Global.EmailDomain

// Extract unique GitHub users from PRs (authors, reviewers, assignees)
githubUsers := extractUniqueGitHubUsers(prs)
if len(githubUsers) == 0 {
slog.Debug("no users involved in PRs, skipping daily reports", "org", org)
return
}

slog.Debug("checking daily reports for PR-involved users",
"org", org,
"unique_github_users", len(githubUsers),
"window", "6am-11:30am local time",
"min_interval", "23 hours")

// Get GitHub client for dashboard fetching
token := c.github.InstallationToken(ctx)
if token == "" {
slog.Warn("skipping daily reports - no GitHub token", "org", org)
return
}

ghClient, ok := c.github.Client().(*gogithub.Client)
if !ok {
slog.Error("skipping daily reports - failed to get GitHub client")
return
}

// Create daily report sender and dashboard fetcher
sender := dailyreport.NewSender(c.stateStore, c.slack)
fetcher := home.NewFetcher(ghClient, c.stateStore, token, "ready-to-review[bot]")

sentCount := 0
skippedCount := 0
errorCount := 0

for githubUsername := range githubUsers {
// Map GitHub user to Slack user ID
slackUserID, err := c.userMapper.SlackHandle(ctx, githubUsername, org, domain)
if err != nil || slackUserID == "" {
slog.Debug("skipping daily report - no Slack mapping",
"github_user", githubUsername,
"error", err)
skippedCount++
continue
}

// Fetch user's dashboard (incoming/outgoing PRs)
dashboard, err := fetcher.FetchDashboard(ctx, githubUsername, []string{org})
if err != nil {
slog.Debug("skipping daily report - dashboard fetch failed",
"github_user", githubUsername,
"slack_user", slackUserID,
"error", err)
errorCount++
continue
}

// Build user blocking info
userInfo := dailyreport.UserBlockingInfo{
GitHubUsername: githubUsername,
SlackUserID: slackUserID,
IncomingPRs: dashboard.IncomingPRs,
OutgoingPRs: dashboard.OutgoingPRs,
}

// Check if should send report
if !sender.ShouldSendReport(ctx, userInfo) {
// Record timestamp even if no report sent (no PRs or outside window)
// This prevents checking this user again for 23 hours
if len(dashboard.IncomingPRs) == 0 && len(dashboard.OutgoingPRs) == 0 {
if err := c.stateStore.RecordReportSent(ctx, slackUserID, time.Now()); err != nil {
slog.Debug("failed to record empty report timestamp",
"github_user", githubUsername,
"slack_user", slackUserID,
"error", err)
}
}
skippedCount++
continue
}

// Send the report
if err := sender.SendReport(ctx, userInfo); err != nil {
slog.Warn("failed to send daily report",
"github_user", githubUsername,
"slack_user", slackUserID,
"error", err)
errorCount++
continue
}

slog.Info("sent daily report",
"github_user", githubUsername,
"slack_user", slackUserID,
"incoming_prs", len(dashboard.IncomingPRs),
"outgoing_prs", len(dashboard.OutgoingPRs))
sentCount++

// Rate limit to avoid overwhelming Slack/GitHub APIs
select {
case <-ctx.Done():
slog.Info("daily report check canceled", "org", org)
return
case <-time.After(500 * time.Millisecond):
}
}

if sentCount > 0 || errorCount > 0 {
slog.Info("daily report check complete",
"org", org,
"sent", sentCount,
"skipped", skippedCount,
"errors", errorCount)
}
}
Loading
Loading