Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/codeGROOVE-dev/gh-mailto v0.0.0-20251019162917-c3412c017b1f
github.com/codeGROOVE-dev/gsm v0.0.0-20251019065141-833fe2363d22
github.com/codeGROOVE-dev/retry v1.2.0
github.com/codeGROOVE-dev/sprinkler v0.0.0-20251019110134-896b678fd945
github.com/codeGROOVE-dev/sprinkler v0.0.0-20251020064313-f606185b6b98
github.com/codeGROOVE-dev/turnclient v0.0.0-20251018202306-7cdc0d51856e
github.com/golang-jwt/jwt/v5 v5.3.0
github.com/google/go-github/v50 v50.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ github.com/codeGROOVE-dev/prx v0.0.0-20251016165946-00c6c6e90c29 h1:MSBy3Ywr3ky/
github.com/codeGROOVE-dev/prx v0.0.0-20251016165946-00c6c6e90c29/go.mod h1:7qLbi18baOyS8yO/6/64SBIqtyzSzLFdsDST15NPH3w=
github.com/codeGROOVE-dev/retry v1.2.0 h1:xYpYPX2PQZmdHwuiQAGGzsBm392xIMl4nfMEFApQnu8=
github.com/codeGROOVE-dev/retry v1.2.0/go.mod h1:8OgefgV1XP7lzX2PdKlCXILsYKuz6b4ZpHa/20iLi8E=
github.com/codeGROOVE-dev/sprinkler v0.0.0-20251019110134-896b678fd945 h1:uZhbGjrIEYfs6Bq2PQgbbtag5gAjMp/NQZGAQsL73m4=
github.com/codeGROOVE-dev/sprinkler v0.0.0-20251019110134-896b678fd945/go.mod h1:/kd3ncsRNldD0MUpbtp5ojIzfCkyeXB7JdOrpuqG7Gg=
github.com/codeGROOVE-dev/sprinkler v0.0.0-20251020064313-f606185b6b98 h1:unjiIF1rx/QZfcTEW/n6EJjde1yd3b1ZbjrWee2Afj4=
github.com/codeGROOVE-dev/sprinkler v0.0.0-20251020064313-f606185b6b98/go.mod h1:/kd3ncsRNldD0MUpbtp5ojIzfCkyeXB7JdOrpuqG7Gg=
github.com/codeGROOVE-dev/turnclient v0.0.0-20251018202306-7cdc0d51856e h1:3qoY6h8SgoeNsIYRM7P6PegTXAHPo8OSOapUunVP/Gs=
github.com/codeGROOVE-dev/turnclient v0.0.0-20251018202306-7cdc0d51856e/go.mod h1:fYwtN9Ql6lY8t2WvCfENx+mP5FUwjlqwXCLx9CVLY20=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand Down
163 changes: 161 additions & 2 deletions internal/bot/polling.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"log/slog"
"strings"
"time"

"github.com/codeGROOVE-dev/slacker/internal/github"
Expand Down Expand Up @@ -49,7 +50,7 @@ func (c *Coordinator) PollAndReconcile(ctx context.Context) {
"pr_count", len(prs),
"will_check_each", true)

// Process each PR
// Process each open PR
successCount := 0
errorCount := 0

Expand Down Expand Up @@ -93,9 +94,73 @@ func (c *Coordinator) PollAndReconcile(ctx context.Context) {
}
}

// Query closed/merged PRs in last hour to update existing threads
closedPRs, err := gqlClient.ListClosedPRs(ctx, org, 1)
if err != nil {
slog.Warn("failed to poll closed PRs",
"org", org,
"error", err,
"impact", "will retry next poll")
} else {
slog.Info("poll retrieved closed/merged PRs",
"org", org,
"pr_count", len(closedPRs),
"will_update_threads", true)

closedSuccessCount := 0
closedErrorCount := 0

for i := range closedPRs {
pr := &closedPRs[i]

// Create event key for this PR state change
eventKey := fmt.Sprintf("poll_closed:%s:%s:%s", pr.URL, pr.State, pr.UpdatedAt.Format(time.RFC3339))

// Skip if already processed
if c.stateStore.WasProcessed(eventKey) {
slog.Debug("skipping closed PR - already processed",
"pr", fmt.Sprintf("%s/%s#%d", pr.Owner, pr.Repo, pr.Number),
"state", pr.State)
closedSuccessCount++
continue
}

// Update thread for this closed/merged PR
if err := c.updateClosedPRThread(ctx, pr); err != nil {
slog.Warn("failed to update closed PR thread",
"pr", fmt.Sprintf("%s/%s#%d", pr.Owner, pr.Repo, pr.Number),
"state", pr.State,
"error", err)
closedErrorCount++
} else {
// Mark as processed
if err := c.stateStore.MarkProcessed(eventKey, 24*time.Hour); err != nil {
slog.Warn("failed to mark closed PR event as processed",
"pr", fmt.Sprintf("%s/%s#%d", pr.Owner, pr.Repo, pr.Number),
"error", err)
}
closedSuccessCount++
}

// Rate limit
select {
case <-ctx.Done():
slog.Info("polling canceled during closed PR processing", "org", org)
return
case <-time.After(100 * time.Millisecond):
}
}

slog.Info("closed PR processing complete",
"org", org,
"total_closed_prs", len(closedPRs),
"updated", closedSuccessCount,
"errors", closedErrorCount)
}

slog.Info("poll cycle complete",
"org", org,
"total_prs", len(prs),
"total_open_prs", len(prs),
"processed", successCount,
"errors", errorCount,
"next_poll", "5m")
Expand Down Expand Up @@ -180,6 +245,100 @@ func (c *Coordinator) reconcilePR(ctx context.Context, pr *github.PRSnapshot) er
return nil
}

// updateClosedPRThread updates Slack threads for a closed or merged PR.
func (c *Coordinator) updateClosedPRThread(ctx context.Context, pr *github.PRSnapshot) error {
prKey := fmt.Sprintf("%s/%s#%d", pr.Owner, pr.Repo, pr.Number)
slog.Debug("updating thread for closed/merged PR",
"pr", prKey,
"state", pr.State)

channels := c.configManager.ChannelsForRepo(pr.Owner, pr.Repo)
if len(channels) == 0 {
slog.Debug("no channels configured for closed PR",
"pr", prKey,
"owner", pr.Owner,
"repo", pr.Repo)
return nil
}

n := 0
for _, ch := range channels {
id := c.slack.ResolveChannelID(ctx, ch)
if id == "" {
slog.Debug("could not resolve channel ID for closed PR thread update",
"channel_name", ch,
"pr", prKey)
continue
}

info, ok := c.stateStore.GetThread(pr.Owner, pr.Repo, pr.Number, id)
if !ok {
slog.Debug("no thread found for closed PR in channel",
"pr", prKey,
"channel", ch,
"channel_id", id)
continue
}

if err := c.updateThreadForClosedPR(ctx, pr, id, info); err != nil {
slog.Warn("failed to update thread for closed PR",
"pr", prKey,
"channel", ch,
"error", err)
continue
}

n++
slog.Info("updated thread for closed/merged PR",
"pr", prKey,
"state", pr.State,
"channel", ch,
"thread_ts", info.ThreadTS)
}

if n == 0 {
return errors.New("no threads found or updated for closed PR")
}

return nil
}

// updateThreadForClosedPR updates a single thread's message to reflect closed/merged state.
func (c *Coordinator) updateThreadForClosedPR(ctx context.Context, pr *github.PRSnapshot, channelID string, info ThreadInfo) error {
var emoji, msg string
switch pr.State {
case "MERGED":
emoji = ":rocket:"
msg = "This PR was merged"
case "CLOSED":
emoji = ":x:"
msg = "This PR was closed without merging"
default:
return fmt.Errorf("unexpected PR state: %s", pr.State)
}

// Replace emoji prefix in message (format: ":emoji: Title • repo#123 by @user")
text := info.MessageText
if i := strings.Index(text, " "); i == -1 {
text = emoji + " " + text
} else {
text = emoji + text[i:]
}

if err := c.slack.UpdateMessage(ctx, channelID, info.ThreadTS, text); err != nil {
return fmt.Errorf("failed to update message: %w", err)
}

// Post follow-up comment (don't fail if this errors - main update succeeded)
if err := c.slack.PostThreadReply(ctx, channelID, info.ThreadTS, msg); err != nil {
slog.Debug("failed to post follow-up comment for closed PR",
"pr", fmt.Sprintf("%s/%s#%d", pr.Owner, pr.Repo, pr.Number),
"error", err)
}

return nil
}

// StartupReconciliation runs once at startup to catch up on any missed notifications.
// This ensures that if the service was down, we still notify about PRs that need attention.
func (c *Coordinator) StartupReconciliation(ctx context.Context) {
Expand Down
111 changes: 111 additions & 0 deletions internal/github/graphql.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,117 @@ func (c *GraphQLClient) ListOpenPRs(ctx context.Context, org string, updatedSinc
return prs, nil
}

// ListClosedPRs queries all closed/merged PRs for an organization updated in the last N hours.
// This is used to update Slack threads when PRs are closed or merged.
func (c *GraphQLClient) ListClosedPRs(ctx context.Context, org string, updatedSinceHours int) ([]PRSnapshot, error) {
slog.Debug("querying closed/merged PRs via GraphQL",
"org", org,
"updated_since_hours", updatedSinceHours)

since := time.Now().Add(-time.Duration(updatedSinceHours) * time.Hour)

// GraphQL query structure
var query struct {
Search struct {
Nodes []struct {
PullRequest struct {
Number int
Title string
URL string
UpdatedAt time.Time
CreatedAt time.Time
State string
IsDraft bool
Merged bool
Author struct {
Login string
}
Repository struct {
Name string
Owner struct {
Login string
}
}
} `graphql:"... on PullRequest"`
}
PageInfo struct {
EndCursor string
HasNextPage bool
}
} `graphql:"search(query: $searchQuery, type: ISSUE, first: 100, after: $cursor)"`
}

// Build search query: "is:pr is:closed org:X updated:>YYYY-MM-DD"
// This will include both closed-unmerged and merged PRs
searchQuery := fmt.Sprintf("is:pr is:closed org:%s updated:>%s",
org,
since.Format("2006-01-02"))

variables := map[string]any{
"searchQuery": githubv4.String(searchQuery),
"cursor": (*githubv4.String)(nil),
}

var allPRs []PRSnapshot
pageCount := 0
const maxPages = 10

for {
pageCount++
if pageCount > maxPages {
slog.Warn("reached max page limit for closed PR GraphQL query",
"org", org,
"pages", pageCount,
"prs_collected", len(allPRs))
break
}

err := c.client.Query(ctx, &query, variables)
if err != nil {
return nil, fmt.Errorf("GraphQL query failed: %w", err)
}

// Process this page of results
for i := range query.Search.Nodes {
pr := query.Search.Nodes[i].PullRequest

// Determine state: MERGED takes precedence over CLOSED
state := "CLOSED"
if pr.Merged {
state = "MERGED"
}

allPRs = append(allPRs, PRSnapshot{
Owner: pr.Repository.Owner.Login,
Repo: pr.Repository.Name,
Number: pr.Number,
Title: pr.Title,
Author: pr.Author.Login,
URL: pr.URL,
UpdatedAt: pr.UpdatedAt,
CreatedAt: pr.CreatedAt,
State: state,
IsDraft: pr.IsDraft,
})
}

if !query.Search.PageInfo.HasNextPage {
break
}

cursor := githubv4.String(query.Search.PageInfo.EndCursor)
variables["cursor"] = cursor
}

slog.Info("GraphQL query for closed PRs complete",
"org", org,
"total_prs", len(allPRs),
"pages_fetched", pageCount,
"query", searchQuery)

return allPRs, nil
}

// listOpenPRsGraphQL queries using GraphQL for efficiency.
func (c *GraphQLClient) listOpenPRsGraphQL(ctx context.Context, org string, updatedSinceHours int) ([]PRSnapshot, error) {
slog.Debug("querying open PRs via GraphQL",
Expand Down