Skip to content
Merged

lint #52

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/registrar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func main() {
logHandler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
AddSource: true,
Level: slog.LevelInfo,
ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr {
ReplaceAttr: func(_ []string, a slog.Attr) slog.Attr {
// Shorten source paths to relative paths for cleaner logs
if a.Key == slog.SourceKey {
if source, ok := a.Value.Any().(*slog.Source); ok {
Expand Down
11 changes: 8 additions & 3 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,16 @@ import (
// Returns empty string if not running on GCP or detection fails.
func detectGCPProjectID(ctx context.Context) string {
// Try metadata service (works on Cloud Run, GCE, GKE, Cloud Functions)
client := &http.Client{Timeout: 2 * time.Second}
httpClient := &http.Client{Timeout: 2 * time.Second}
//nolint:revive // GCP metadata service is internal and always accessed via HTTP
req, err := http.NewRequestWithContext(ctx, http.MethodGet,
"http://metadata.google.internal/computeMetadata/v1/project/project-id", http.NoBody)
if err != nil {
return ""
}
req.Header.Set("Metadata-Flavor", "Google")

resp, err := client.Do(req)
resp, err := httpClient.Do(req)
if err != nil {
slog.Debug("metadata service not available (not running on GCP?)", "error", err)
return ""
Expand Down Expand Up @@ -87,7 +88,7 @@ func main() {
logHandler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
AddSource: true,
Level: slog.LevelInfo,
ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr {
ReplaceAttr: func(_ []string, a slog.Attr) slog.Attr {
// Shorten source paths to relative paths for cleaner logs
if a.Key == slog.SourceKey {
if source, ok := a.Value.Any().(*slog.Source); ok {
Expand Down Expand Up @@ -121,6 +122,7 @@ func main() {
os.Exit(exitCode)
}

//nolint:revive,maintidx // Complex initialization requires length for clarity and maintainability
func run(ctx context.Context, cancel context.CancelFunc, cfg *config.ServerConfig) int {
// Handle graceful shutdown.
sigChan := make(chan os.Signal, 1)
Expand Down Expand Up @@ -160,6 +162,7 @@ func run(ctx context.Context, cancel context.CancelFunc, cfg *config.ServerConfi
slackManager := slack.NewManager(cfg.SlackSigningSecret)

// Initialize state store (in-memory + Datastore or JSON for persistence).
//nolint:interfacebloat // Interface mirrors state.Store for local type safety
var stateStore interface {
Thread(owner, repo string, number int, channelID string) (state.ThreadInfo, bool)
SaveThread(owner, repo string, number int, channelID string, info state.ThreadInfo) error
Expand Down Expand Up @@ -668,6 +671,8 @@ func (cm *coordinatorManager) handleRefreshInstallations(ctx context.Context) {
// runBotCoordinators manages bot coordinators for all GitHub installations.
// It spawns one coordinator per org and refreshes the list every 5 minutes.
// Failed coordinators are automatically restarted every minute.
//
//nolint:interfacebloat // Interface mirrors state.Store for local type safety
func runBotCoordinators(
ctx context.Context,
slackManager *slack.Manager,
Expand Down
118 changes: 79 additions & 39 deletions internal/bot/bot.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ type prContext struct {
}

// ThreadCache manages PR thread IDs for a workspace.
//
//nolint:govet // Field order optimized for logical grouping over memory alignment
type ThreadCache struct {
prThreads map[string]ThreadInfo // "owner/repo#123" -> thread info
mu sync.RWMutex
creationLock sync.Mutex // Prevents concurrent creation of the same PR thread
creating map[string]bool // Track PRs currently being created
creationLock sync.Mutex // Prevents concurrent creation of the same PR thread
prThreads map[string]ThreadInfo // "owner/repo#123" -> thread info
creating map[string]bool // Track PRs currently being created
}

// ThreadInfo is an alias to state.ThreadInfo to avoid duplication.
Expand Down Expand Up @@ -82,18 +84,20 @@ func (tc *ThreadCache) Cleanup(maxAge time.Duration) {
}

// Coordinator coordinates between GitHub, Slack, and notifications for a single org.
//
//nolint:govet // Field order optimized for logical grouping over memory alignment
type Coordinator struct {
processingEvents sync.WaitGroup // Tracks in-flight event processing for graceful shutdown
stateStore StateStore // Persistent state across restarts
sprinklerURL string
workspaceName string // Track workspace name for better logging
slack *slackpkg.Client
github *github.Client
configManager *config.Manager
notifier *notify.Manager
userMapper *usermapping.Service
sprinklerURL string
threadCache *ThreadCache // In-memory cache for fast lookups
stateStore StateStore // Persistent state across restarts
workspaceName string // Track workspace name for better logging
eventSemaphore chan struct{} // Limits concurrent event processing (prevents overwhelming APIs)
processingEvents sync.WaitGroup // Tracks in-flight event processing for graceful shutdown
threadCache *ThreadCache // In-memory cache for fast lookups
eventSemaphore chan struct{} // Limits concurrent event processing (prevents overwhelming APIs)
}

// StateStore interface for persistent state - allows dependency injection for testing.
Expand Down Expand Up @@ -157,16 +161,40 @@ func New(
return c
}

// saveThread persists thread info to both cache and persistent storage.
// This ensures threads survive restarts and are available for closed PR updates.
func (c *Coordinator) saveThread(owner, repo string, number int, channelID string, info ThreadInfo) {
// Save to in-memory cache for fast lookups
key := fmt.Sprintf("%s/%s#%d:%s", owner, repo, number, channelID)
c.threadCache.Set(key, info)

// Persist to state store for cross-instance sharing and restart recovery
if err := c.stateStore.SaveThread(owner, repo, number, channelID, info); err != nil {
slog.Warn("failed to persist thread to state store",
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, number),
"channel_id", channelID,
"error", err,
"impact", "thread updates may fail after restart")
} else {
slog.Debug("persisted thread to state store",
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, number),
"channel_id", channelID,
"thread_ts", info.ThreadTS)
}
}

// findOrCreatePRThread finds an existing thread or creates a new one for a PR.
// Returns (threadTS, wasNewlyCreated, error).
// Returns (threadTS, wasNewlyCreated, currentMessageText, error).
//
//nolint:revive // Four return values needed to track thread state and creation status
func (c *Coordinator) findOrCreatePRThread(ctx context.Context, channelID, owner, repo string, prNumber int, prState string, pullRequest struct {
User struct {
CreatedAt time.Time `json:"created_at"`
User struct {
Login string `json:"login"`
} `json:"user"`
HTMLURL string `json:"html_url"`
Title string `json:"title"`
Number int `json:"number"`
CreatedAt time.Time `json:"created_at"`
HTMLURL string `json:"html_url"`
Title string `json:"title"`
Number int `json:"number"`
}, checkResult *turn.CheckResponse,
) (threadTS string, wasNewlyCreated bool, currentMessageText string, err error) {
// Use cache key that includes channel ID to support multiple channels per PR
Expand Down Expand Up @@ -212,8 +240,8 @@ func (c *Coordinator) findOrCreatePRThread(ctx context.Context, channelID, owner
logFieldChannel, channelID,
"current_message_preview", initialSearchText[:min(100, len(initialSearchText))])

// Cache the found thread with its current message text
c.threadCache.Set(cacheKey, ThreadInfo{
// Save the found thread (cache + persist)
c.saveThread(owner, repo, prNumber, channelID, ThreadInfo{
ThreadTS: initialSearchTS,
ChannelID: channelID,
LastState: prState,
Expand Down Expand Up @@ -289,8 +317,8 @@ func (c *Coordinator) findOrCreatePRThread(ctx context.Context, channelID, owner
"current_message_preview", crossInstanceText[:min(100, len(crossInstanceText))],
"note", "this prevented duplicate thread creation during rolling deployment")

// Cache it and return
c.threadCache.Set(cacheKey, ThreadInfo{
// Save it and return (cache + persist)
c.saveThread(owner, repo, prNumber, channelID, ThreadInfo{
ThreadTS: crossInstanceCheckTS,
ChannelID: channelID,
LastState: prState,
Expand All @@ -312,8 +340,8 @@ func (c *Coordinator) findOrCreatePRThread(ctx context.Context, channelID, owner
return "", false, "", fmt.Errorf("failed to create PR thread: %w", err)
}

// Cache the new thread with its message text
c.threadCache.Set(cacheKey, ThreadInfo{
// Save the new thread (cache + persist)
c.saveThread(owner, repo, prNumber, channelID, ThreadInfo{
ThreadTS: newThreadTS,
ChannelID: channelID,
LastState: prState,
Expand All @@ -337,7 +365,7 @@ func (c *Coordinator) findOrCreatePRThread(ctx context.Context, channelID, owner
// Note: This is more expensive than search API but works reliably with basic bot permissions.
// Results are cached by the calling code to minimize API calls.
// Returns (threadTS, currentMessageText) - both empty if not found.
func (c *Coordinator) searchForPRThread(ctx context.Context, channelID, prURL string, prCreatedAt time.Time) (string, string) {
func (c *Coordinator) searchForPRThread(ctx context.Context, channelID, prURL string, prCreatedAt time.Time) (threadTS string, messageText string) {
slog.Info("searching for existing PR thread using channel history",
logFieldChannel, channelID,
"pr_url", prURL)
Expand Down Expand Up @@ -789,6 +817,10 @@ func (*Coordinator) extractStateFromTurnclient(checkResult *turn.CheckResponse)
func (*Coordinator) extractBlockedUsersFromTurnclient(checkResult *turn.CheckResponse) []string {
var blockedUsers []string
for user := range checkResult.Analysis.NextAction {
// Skip _system sentinel value - it indicates processing state, not an actual user
if user == "_system" {
continue
}
blockedUsers = append(blockedUsers, user)
}
return blockedUsers
Expand Down Expand Up @@ -899,10 +931,11 @@ func (c *Coordinator) updateDMMessagesForPR(ctx context.Context, pr prUpdateInfo

for _, slackUserID := range slackUserIDs {
if err := c.slack.UpdateDMMessage(ctx, slackUserID, prURL, message); err != nil {
slog.Debug("failed to update DM message",
slog.Warn("failed to update DM message",
"user", slackUserID,
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber),
"error", err,
"impact", "user sees stale PR state in DM",
"reason", "DM may not exist or too old")
skippedCount++
} else {
Expand Down Expand Up @@ -1140,28 +1173,32 @@ func (c *Coordinator) processPRForChannel(
// Find or create thread for this PR in this channel
// Convert to the expected struct format
pullRequestStruct := struct {
User struct {
CreatedAt time.Time `json:"created_at"`
User struct {
Login string `json:"login"`
} `json:"user"`
HTMLURL string `json:"html_url"`
Title string `json:"title"`
Number int `json:"number"`
CreatedAt time.Time `json:"created_at"`
HTMLURL string `json:"html_url"`
Title string `json:"title"`
Number int `json:"number"`
}{
User: event.PullRequest.User,
HTMLURL: event.PullRequest.HTMLURL,
Title: event.PullRequest.Title,
Number: event.PullRequest.Number,
CreatedAt: event.PullRequest.CreatedAt,
}
threadTS, wasNewlyCreated, currentText, err := c.findOrCreatePRThread(ctx, channelID, owner, repo, prNumber, prState, pullRequestStruct, checkResult)
threadTS, wasNewlyCreated, currentText, err := c.findOrCreatePRThread(
ctx, channelID, owner, repo, prNumber, prState, pullRequestStruct, checkResult,
)
if err != nil {
slog.Error("failed to find or create PR thread",
"workspace", c.workspaceName,
logFieldPR, fmt.Sprintf(prFormatString, owner, repo, prNumber),
"channel", channelDisplay,
"channel_id", channelID,
"error", err,
"impact", "channel_update_skipped_will_retry_via_polling",
"next_poll_in", "5m",
"will_continue_with_next_channel", true)
return
}
Expand Down Expand Up @@ -1240,11 +1277,12 @@ func (c *Coordinator) processPRForChannel(
"channel", channelDisplay,
"channel_id", channelID,
"thread_ts", threadTS,
"error", err)
"error", err,
"impact", "message_update_skipped_will_retry_via_polling",
"next_poll_in", "5m")
} else {
// Update cache with new message text
cacheKey := fmt.Sprintf("%s/%s#%d:%s", owner, repo, prNumber, channelID)
c.threadCache.Set(cacheKey, ThreadInfo{
// Save updated thread info (cache + persist)
c.saveThread(owner, repo, prNumber, channelID, ThreadInfo{
ThreadTS: threadTS,
ChannelID: channelID,
LastState: prState,
Expand Down Expand Up @@ -1411,7 +1449,9 @@ func (c *Coordinator) handlePullRequestFromSprinkler(
// handlePullRequestReviewFromSprinkler handles PR review events from sprinkler.
// Reviews update PR state (approved, changes requested, etc), so we treat them
// like regular pull_request events and let turnclient analyze the current state.
func (c *Coordinator) handlePullRequestReviewFromSprinkler(ctx context.Context, owner, repo string, prNumber int, sprinklerURL string, eventTimestamp time.Time) {
func (c *Coordinator) handlePullRequestReviewFromSprinkler(
ctx context.Context, owner, repo string, prNumber int, sprinklerURL string, eventTimestamp time.Time,
) {
slog.Info("handling PR review event from sprinkler",
logFieldOwner, owner,
logFieldRepo, repo,
Expand All @@ -1427,13 +1467,13 @@ func (c *Coordinator) handlePullRequestReviewFromSprinkler(ctx context.Context,
// Critical performance optimization: Posts thread immediately WITHOUT user mentions,
// then updates asynchronously once email lookups complete (which take 13-20 seconds each).
func (c *Coordinator) createPRThread(ctx context.Context, channel, owner, repo string, number int, prState string, pr struct {
User struct {
CreatedAt time.Time `json:"created_at"`
User struct {
Login string `json:"login"`
} `json:"user"`
HTMLURL string `json:"html_url"`
Title string `json:"title"`
Number int `json:"number"`
CreatedAt time.Time `json:"created_at"`
HTMLURL string `json:"html_url"`
Title string `json:"title"`
Number int `json:"number"`
}, checkResult *turn.CheckResponse,
) (threadTS string, messageText string, err error) {
// Get state-based prefix
Expand Down
23 changes: 18 additions & 5 deletions internal/bot/bot_sprinkler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,14 @@ func (c *Coordinator) handleSprinklerEvent(ctx context.Context, event client.Eve
"event_key", eventKey,
"reason", "deduplication_race")
} else {
slog.Warn("failed to mark event as processed - database error",
slog.Error("failed to mark event as processed - database error",
"organization", organization,
"type", event.Type,
"url", event.URL,
"event_key", eventKey,
"error", err,
"impact", "will_skip_event")
"impact", "event_dropped_will_retry_via_polling",
"next_poll_in", "5m")
}
return
}
Expand Down Expand Up @@ -161,16 +162,24 @@ func (c *Coordinator) handleSprinklerEvent(ctx context.Context, event client.Eve
Timestamp: event.Timestamp,
}

if err := c.processEvent(ctx, msg); err != nil {
// Add timeout to prevent hanging on external API failures
processCtx, processCancel := context.WithTimeout(ctx, 30*time.Second)
defer processCancel()

if err := c.processEvent(processCtx, msg); err != nil {
timedOut := errors.Is(err, context.DeadlineExceeded)
slog.Error("error processing event",
"organization", organization,
"error", err,
"type", event.Type,
"url", event.URL,
"repo", repo)
"repo", repo,
"timed_out", timedOut,
"impact", "event_dropped_will_retry_via_polling",
"next_poll_in", "5m")
// Event already marked as processed before goroutine started.
// Failed processing won't be retried automatically.
// This is intentional - we don't want infinite retries of broken events.
// Polling will catch this within 5 minutes for open PRs.
return
}

Expand All @@ -184,6 +193,8 @@ func (c *Coordinator) handleSprinklerEvent(ctx context.Context, event client.Eve

// waitForEventProcessing waits for all in-flight events to complete during shutdown.
// Returns immediately if no events are being processed.
//
//nolint:unparam // maxWait parameter provides flexibility for different shutdown scenarios
func (c *Coordinator) waitForEventProcessing(organization string, maxWait time.Duration) {
// Check if any events are being processed
queueLen := len(c.eventSemaphore)
Expand Down Expand Up @@ -252,6 +263,8 @@ func (c *Coordinator) handleAuthError(
}

// RunWithSprinklerClient runs the bot using the official sprinkler client library.
//
//nolint:revive,maintidx // Complex retry/reconnection logic requires length for robustness
func (c *Coordinator) RunWithSprinklerClient(ctx context.Context) error {
slog.Info("starting bot coordinator with sprinkler client")

Expand Down
4 changes: 2 additions & 2 deletions internal/bot/dedup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func TestConcurrentEventDeduplicationStress(t *testing.T) {
var wg sync.WaitGroup
wg.Add(numConcurrentEvents)

handleEvent := func(id int) {
handleEvent := func(_ int) {
defer wg.Done()

// Check if currently being processed
Expand Down Expand Up @@ -299,7 +299,7 @@ func TestConcurrentEventDeduplicationStress(t *testing.T) {
}

// Launch all goroutines simultaneously
for i := 0; i < numConcurrentEvents; i++ {
for i := range numConcurrentEvents {
go handleEvent(i)
}

Expand Down
Loading