Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 28 additions & 16 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import (
func detectGCPProjectID(ctx context.Context) string {
// Try metadata service (works on Cloud Run, GCE, GKE, Cloud Functions)
client := &http.Client{Timeout: 2 * time.Second}
req, err := http.NewRequestWithContext(ctx, "GET",
"http://metadata.google.internal/computeMetadata/v1/project/project-id", nil)
req, err := http.NewRequestWithContext(ctx, http.MethodGet,
"http://metadata.google.internal/computeMetadata/v1/project/project-id", http.NoBody)
if err != nil {
return ""
}
Expand All @@ -46,7 +46,11 @@ func detectGCPProjectID(ctx context.Context) string {
slog.Debug("metadata service not available (not running on GCP?)", "error", err)
return ""
}
defer resp.Body.Close()
defer func() {
if err := resp.Body.Close(); err != nil {
slog.Debug("failed to close metadata response body", "error", err)
}
}()

if resp.StatusCode != http.StatusOK {
slog.Debug("metadata service returned non-200", "status", resp.StatusCode)
Expand Down Expand Up @@ -75,8 +79,11 @@ const (
)

func main() {
// Configure logging with source locations and PID for better debugging
pid := os.Getpid()
// Configure logging with source locations and instance ID for better debugging
hostname, err := os.Hostname()
if err != nil {
hostname = "unknown"
}
logHandler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
AddSource: true,
Level: slog.LevelInfo,
Expand All @@ -93,8 +100,10 @@ func main() {
return a
},
})
// Create logger with PID as a default attribute
logger := slog.New(logHandler).With("pid", pid)
// Create logger with hostname as a default attribute
// In Cloud Run, hostname uniquely identifies each instance (e.g., slacker-abc123-xyz789)
// This is critical for disambiguating instances during rolling deployments
logger := slog.New(logHandler).With("instance", hostname)
slog.SetDefault(logger)

// Load configuration from environment.
Expand Down Expand Up @@ -189,20 +198,23 @@ func run(ctx context.Context, cancel context.CancelFunc, cfg *config.ServerConfi
if datastoreDB != "" && projectID != "" {
slog.Info("initializing Cloud Datastore for persistent state",
"project_id", projectID,
"database", datastoreDB,
"fallback", "JSON files")
"database", datastoreDB)
var err error
stateStore, err = state.NewDatastoreStore(ctx, projectID, datastoreDB)
if err != nil {
slog.Error("failed to initialize Datastore, using JSON only",
// FATAL: If DATASTORE is explicitly configured, fail startup on initialization errors.
// This prevents silent fallback to JSON-only mode which causes duplicate messages
// during rolling deployments (no cross-instance event deduplication).
slog.Error("FATAL: failed to initialize Cloud Datastore - DATASTORE variable is set but initialization failed",
"project_id", projectID,
"database", datastoreDB,
"error", err)
stateStore, err = state.NewJSONStore()
if err != nil {
slog.Error("failed to initialize JSON store", "error", err)
cancel()
return 1
}
cancel()
return 1
}
slog.Info("successfully initialized Cloud Datastore",
"project_id", projectID,
"database", datastoreDB)
} else {
var reason string
if datastoreDB == "" {
Expand Down
49 changes: 26 additions & 23 deletions internal/bot/bot.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,16 @@ func (tc *ThreadCache) Cleanup(maxAge time.Duration) {

// Coordinator coordinates between GitHub, Slack, and notifications for a single org.
type Coordinator struct {
slack *slackpkg.Client
github *github.Client
configManager *config.Manager
notifier *notify.Manager
userMapper *usermapping.Service
sprinklerURL string
threadCache *ThreadCache // In-memory cache for fast lookups
stateStore StateStore // Persistent state across restarts
workspaceName string // Track workspace name for better logging
processedEvents map[string]time.Time // In-memory event deduplication: "timestamp:url:type" -> processed time
processedEventMu sync.RWMutex
processingEvents map[string]bool // Track events currently being processed (prevents concurrent duplicates)
processingEventMu sync.Mutex
eventSemaphore chan struct{} // Limits concurrent event processing (prevents overwhelming APIs)
slack *slackpkg.Client
github *github.Client
configManager *config.Manager
notifier *notify.Manager
userMapper *usermapping.Service
sprinklerURL string
threadCache *ThreadCache // In-memory cache for fast lookups
stateStore StateStore // Persistent state across restarts
workspaceName string // Track workspace name for better logging
eventSemaphore chan struct{} // Limits concurrent event processing (prevents overwhelming APIs)
}

// StateStore interface for persistent state - allows dependency injection for testing.
Expand Down Expand Up @@ -134,9 +130,7 @@ func New(
prThreads: make(map[string]ThreadInfo),
creating: make(map[string]bool),
},
processedEvents: make(map[string]time.Time),
processingEvents: make(map[string]bool),
eventSemaphore: make(chan struct{}, 10), // Allow 10 concurrent events per org
eventSemaphore: make(chan struct{}, 10), // Allow 10 concurrent events per org
}

// Set GitHub client in config manager for this org.
Expand Down Expand Up @@ -621,10 +615,13 @@ func (c *Coordinator) handlePullRequestEventWithData(ctx context.Context, owner,
"warning", "DMs are sent async - if instance crashes before completion, another instance may retry and send duplicates")

// Send DMs asynchronously to avoid blocking event processing
// Use a detached context with timeout to allow graceful completion even if parent context is cancelled
// SECURITY NOTE: Use detached context to allow graceful completion of DM notifications
// even if parent context is cancelled during shutdown. Operations are still bounded by
// explicit 15-second timeout, ensuring reasonably fast shutdown while handling slow API calls.
// This pattern prevents incomplete DM delivery while maintaining shutdown security.
// Note: No panic recovery - we want panics to propagate and restart the service (Cloud Run will handle it)
// A quiet failure is worse than a visible crash that triggers automatic recovery
dmCtx, dmCancel := context.WithTimeout(context.WithoutCancel(ctx), 2*time.Minute)
dmCtx, dmCancel := context.WithTimeout(context.WithoutCancel(ctx), 15*time.Second)
go func() {
defer dmCancel()
c.sendDMNotifications(dmCtx, workspaceID, owner, repo, prNumber, uniqueUsers, event, prState)
Expand Down Expand Up @@ -1028,8 +1025,11 @@ func (c *Coordinator) processPRForChannel(
domain := c.configManager.Domain(owner)
if len(blockedUsers) > 0 {
// Run email lookups in background to avoid blocking message delivery
// Use detached context to allow completion even if parent context is cancelled
lookupCtx, lookupCancel := context.WithTimeout(context.WithoutCancel(ctx), 2*time.Minute)
// SECURITY NOTE: Use detached context to complete email lookups even during shutdown.
// Operations bounded by 15-second timeout. This ensures reasonably fast shutdown while
// completing active lookups for accurate DM delivery (most lookups hit cache instantly,
// but occasional cold lookups can take 10+ seconds).
lookupCtx, lookupCancel := context.WithTimeout(context.WithoutCancel(ctx), 15*time.Second)
go func() {
defer lookupCancel()
for _, githubUser := range blockedUsers {
Expand Down Expand Up @@ -1310,8 +1310,11 @@ func (c *Coordinator) createPRThread(ctx context.Context, channel, owner, repo s
// This avoids blocking thread creation on slow email lookups (13-20 seconds each)
domain := c.configManager.Domain(owner)
if checkResult != nil && len(checkResult.Analysis.NextAction) > 0 {
// Use detached context to allow completion even if parent context is cancelled
enrichCtx, enrichCancel := context.WithTimeout(context.WithoutCancel(ctx), 2*time.Minute)
// SECURITY NOTE: Use detached context to complete message enrichment even during shutdown.
// Operations bounded by 15-second timeout. This ensures reasonably fast shutdown while
// completing active message updates (most lookups hit cache instantly, but occasional
// cold lookups can take 10+ seconds).
enrichCtx, enrichCancel := context.WithTimeout(context.WithoutCancel(ctx), 15*time.Second)
// Capture variables to avoid data race
capturedThreadTS := threadTS
capturedOwner := owner
Expand Down
114 changes: 32 additions & 82 deletions internal/bot/bot_sprinkler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,87 +36,35 @@ func parsePRNumberFromURL(url string) (int, error) {
return num, nil
}

// handleSprinklerEvent processes a single event from sprinkler.
func (c *Coordinator) handleSprinklerEvent(ctx context.Context, event client.Event, organization string) {
// Deduplicate events using delivery_id if available, otherwise fall back to timestamp + URL + type
// delivery_id is unique per GitHub webhook and is the same across all instances receiving the event
var eventKey string
// eventKey generates a unique key for event deduplication.
// Uses delivery_id if available (GitHub's unique webhook ID),
// otherwise falls back to timestamp + URL + type.
func eventKey(event client.Event) string {
if event.Raw != nil {
if deliveryID, ok := event.Raw["delivery_id"].(string); ok && deliveryID != "" {
eventKey = deliveryID
if id, ok := event.Raw["delivery_id"].(string); ok && id != "" {
return id
}
}
if eventKey == "" {
// Fallback to timestamp-based key if delivery_id not available
eventKey = fmt.Sprintf("%s:%s:%s", event.Timestamp.Format(time.RFC3339Nano), event.URL, event.Type)
}

// Check persistent state first (survives restarts)
if c.stateStore.WasProcessed(eventKey) {
slog.Info("skipping duplicate event (persistent check)",
"organization", organization,
"type", event.Type,
"url", event.URL,
"timestamp", event.Timestamp,
"event_key", eventKey)
return
}

// Check if this event is currently being processed (prevents concurrent duplicates)
// This is critical when sprinkler delivers the same event twice in quick succession
c.processingEventMu.Lock()
if c.processingEvents[eventKey] {
c.processingEventMu.Unlock()
slog.Info("skipping duplicate event (currently processing)",
"organization", organization,
"type", event.Type,
"url", event.URL,
"timestamp", event.Timestamp,
"event_key", eventKey)
return
}
// Mark as currently processing
c.processingEvents[eventKey] = true
c.processingEventMu.Unlock()

// Ensure we clean up the processing flag when done
defer func() {
c.processingEventMu.Lock()
delete(c.processingEvents, eventKey)
c.processingEventMu.Unlock()
}()
return fmt.Sprintf("%s:%s:%s", event.Timestamp.Format(time.RFC3339Nano), event.URL, event.Type)
}

// Also check in-memory for fast deduplication during normal operation
c.processedEventMu.Lock()
if processedTime, exists := c.processedEvents[eventKey]; exists {
c.processedEventMu.Unlock()
slog.Info("skipping duplicate event (memory check)",
// handleSprinklerEvent processes a single event from sprinkler.
func (c *Coordinator) handleSprinklerEvent(ctx context.Context, event client.Event, organization string) {
// Generate event key using delivery_id if available, otherwise timestamp + URL + type.
// delivery_id is unique per GitHub webhook and is the same across all instances.
eventKey := eventKey(event)

// Try to claim this event atomically using persistent store (Datastore transaction).
// This is the single source of truth for cross-instance deduplication.
if err := c.stateStore.MarkProcessed(eventKey, 24*time.Hour); err != nil {
slog.Info("skipping duplicate event",
"organization", organization,
"type", event.Type,
"url", event.URL,
"timestamp", event.Timestamp,
"first_processed", processedTime,
"event_key", eventKey)
"event_key", eventKey,
"reason", "already_processed")
return
}
c.processedEvents[eventKey] = time.Now()

// Cleanup old in-memory events (older than 1 hour - persistent store handles long-term)
cutoff := time.Now().Add(-1 * time.Hour)
cleanedCount := 0
for key, processedTime := range c.processedEvents {
if processedTime.Before(cutoff) {
delete(c.processedEvents, key)
cleanedCount++
}
}
if cleanedCount > 0 {
slog.Debug("cleaned up old in-memory processed events",
"organization", organization,
"removed_count", cleanedCount,
"remaining_count", len(c.processedEvents))
}
c.processedEventMu.Unlock()

slog.Info("accepted event for async processing",
"organization", organization,
Expand Down Expand Up @@ -204,18 +152,17 @@ func (c *Coordinator) handleSprinklerEvent(ctx context.Context, event client.Eve
"type", event.Type,
"url", event.URL,
"repo", repo)
// Don't mark as processed if processing failed - allow retry
// Event already marked as processed before goroutine started.
// Failed processing won't be retried automatically.
// This is intentional - we don't want infinite retries of broken events.
return
}

// Mark event as processed in persistent state (survives restarts)
if err := c.stateStore.MarkProcessed(eventKey, 24*time.Hour); err != nil {
slog.Warn("failed to mark event as processed",
"organization", organization,
"event_key", eventKey,
"error", err)
// Continue anyway - in-memory dedup will prevent immediate duplicates
}
slog.Info("successfully processed sprinkler event",
"organization", organization,
"type", event.Type,
"url", event.URL,
"event_key", eventKey)
}() // Close the goroutine
}

Expand Down Expand Up @@ -289,7 +236,10 @@ func (c *Coordinator) RunWithSprinklerClient(ctx context.Context) error {
"organization", organization)
},
OnEvent: func(event client.Event) {
// Use background context for event processing to avoid losing events during shutdown.
// SECURITY NOTE: Use detached context for event processing to prevent webhook
// events from being lost during shutdown. Event processing has internal timeouts
// (30s for turnclient, semaphore limits) to prevent resource exhaustion.
// This ensures all GitHub events are processed reliably while maintaining security.
// Note: No panic recovery - we want panics to propagate and restart the service.
eventCtx := context.WithoutCancel(ctx)
c.handleSprinklerEvent(eventCtx, event, organization)
Expand Down
10 changes: 8 additions & 2 deletions internal/github/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,16 @@ func (c *Client) authenticate(ctx context.Context) error {
slog.Debug("token validated successfully (repo list check)")
}

// Log minimal token info to reduce exposure in logs (security best practice)
tokenStr := token.GetToken()
tokenSuffix := "..."
if len(tokenStr) >= 4 {
tokenSuffix = "..." + tokenStr[len(tokenStr)-4:]
}
slog.Info("successfully authenticated GitHub App",
"app_id", c.appID,
"token_length", len(token.GetToken()),
"token_prefix", token.GetToken()[:min(10, len(token.GetToken()))]+"...",
"token_length", len(tokenStr),
"token_suffix", tokenSuffix,
"token_expires_at", token.GetExpiresAt())
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/notify/daily.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,4 @@ func (d *DailyDigestScheduler) CheckAndSend(ctx context.Context) {
// - User mapping from GitHub to Slack
// - PR analysis with turnclient
// - Timezone-aware message delivery
// - Deduplication with existing DM notifications
// - Deduplication with existing DM notifications.
21 changes: 15 additions & 6 deletions internal/state/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ const (
kindNotify = "SlackerNotification"
)

// ErrAlreadyProcessed indicates an event was already processed by another instance.
// This is used for cross-instance deduplication during rolling deployments.
var ErrAlreadyProcessed = errors.New("event already processed by another instance")

// Thread entity for Datastore.
type threadEntity struct {
ThreadTS string `datastore:"thread_ts"`
Expand Down Expand Up @@ -386,9 +390,9 @@ func (s *DatastoreStore) WasProcessed(eventKey string) bool {
}

// MarkProcessed marks an event as processed (distributed coordination).
// Returns true if successfully marked, false if already marked by another instance.
// Returns error if already processed by another instance (enables race detection).
func (s *DatastoreStore) MarkProcessed(eventKey string, ttl time.Duration) error {
// Mark in JSON
// Mark in JSON first for fast local lookups
if err := s.json.MarkProcessed(eventKey, ttl); err != nil {
slog.Warn("failed to mark event in JSON", "error", err)
}
Expand All @@ -412,7 +416,7 @@ func (s *DatastoreStore) MarkProcessed(eventKey string, ttl time.Duration) error

// Already exists - another instance processed it
if err == nil {
return errors.New("event already processed")
return ErrAlreadyProcessed
}

// Not found - safe to insert
Expand All @@ -428,14 +432,19 @@ func (s *DatastoreStore) MarkProcessed(eventKey string, ttl time.Duration) error
// Other error
return err
})

if err != nil && err.Error() != "event already processed" {
// Return the error to caller so they can detect race condition
if err != nil {
if errors.Is(err, ErrAlreadyProcessed) {
// This is expected during rolling deployments - return error to caller
return err
}
// Unexpected error - log but don't fail processing
slog.Warn("failed to mark event in Datastore",
"event", eventKey,
"error", err)
}

return nil
return err
}

// GetLastNotification retrieves when a PR was last notified about.
Expand Down
Loading