diff --git a/cmd/prcost/repository.go b/cmd/prcost/repository.go index b8f076a..fdedd68 100644 --- a/cmd/prcost/repository.go +++ b/cmd/prcost/repository.go @@ -2,60 +2,15 @@ package main import ( "context" - "errors" "fmt" "log/slog" "strings" - "sync" "time" "github.com/codeGROOVE-dev/prcost/pkg/cost" "github.com/codeGROOVE-dev/prcost/pkg/github" ) -// countBotPRs counts how many PRs in the list are authored by bots. -// Uses the same bot detection logic as pkg/github/query.go:isBot(). -func countBotPRs(prs []github.PRSummary) int { - count := 0 - for _, pr := range prs { - if isBotAuthor(pr.Author) { - count++ - } - } - return count -} - -// isBotAuthor returns true if the author name indicates a bot account. -// This matches the logic in pkg/github/query.go:isBot(). -func isBotAuthor(author string) bool { - // Check for common bot name patterns - if strings.HasSuffix(author, "[bot]") || strings.Contains(author, "-bot-") { - return true - } - - // Check for specific known bot usernames (case-insensitive) - lowerAuthor := strings.ToLower(author) - knownBots := []string{ - "renovate", - "dependabot", - "github-actions", - "codecov", - "snyk", - "greenkeeper", - "imgbot", - "renovate-bot", - "dependabot-preview", - } - - for _, botName := range knownBots { - if lowerAuthor == botName { - return true - } - } - - return false -} - // analyzeRepository performs repository-wide cost analysis by sampling PRs. // Uses library functions from pkg/github and pkg/cost for fetching, sampling, // and extrapolation - all functionality is available to external clients. @@ -82,7 +37,7 @@ func analyzeRepository(ctx context.Context, owner, repo string, sampleSize, days actualDays, _ := github.CalculateActualTimeWindow(prs, days) // Count bot PRs before sampling - botPRCount := countBotPRs(prs) + botPRCount := github.CountBotPRs(prs) humanPRCount := len(prs) - botPRCount // Sample PRs using time-bucket strategy (includes all PRs) @@ -103,60 +58,37 @@ func analyzeRepository(ctx context.Context, owner, repo string, sampleSize, days len(samples), len(prs), actualDays) } - // Collect breakdowns from each sample using parallel processing - var breakdowns []cost.Breakdown - var mu sync.Mutex - var wg sync.WaitGroup - - // Use a buffered channel for worker pool pattern (same as web server) - concurrency := 8 // Process up to 8 PRs concurrently - semaphore := make(chan struct{}, concurrency) - - for i, pr := range samples { - wg.Add(1) - go func(index int, prSummary github.PRSummary) { - defer wg.Done() - - // Acquire semaphore slot - semaphore <- struct{}{} - defer func() { <-semaphore }() - - prURL := fmt.Sprintf("https://github.com/%s/%s/pull/%d", owner, repo, prSummary.Number) - slog.Info("Processing sample PR", - "repo", fmt.Sprintf("%s/%s", owner, repo), - "number", prSummary.Number, - "progress", fmt.Sprintf("%d/%d", index+1, len(samples))) - - // Fetch full PR data using configured data source - var prData cost.PRData - var err error - if dataSource == "turnserver" { - // Use turnserver with updatedAt for effective caching - prData, err = github.FetchPRDataViaTurnserver(ctx, prURL, token, prSummary.UpdatedAt) - } else { - // Use prx with updatedAt for effective caching - prData, err = github.FetchPRData(ctx, prURL, token, prSummary.UpdatedAt) - } - if err != nil { - slog.Warn("Failed to fetch PR data, skipping", "pr_number", prSummary.Number, "source", dataSource, "error", err) - return - } - - // Calculate cost and accumulate with mutex protection - breakdown := cost.Calculate(prData, cfg) - mu.Lock() - breakdowns = append(breakdowns, breakdown) - mu.Unlock() - }(i, pr) - } - - // Wait for all goroutines to complete - wg.Wait() - - if len(breakdowns) == 0 { - return errors.New("no samples could be processed successfully") + // Convert samples to PRSummaryInfo format + var summaries []cost.PRSummaryInfo + for _, pr := range samples { + summaries = append(summaries, cost.PRSummaryInfo{ + Owner: pr.Owner, + Repo: pr.Repo, + Number: pr.Number, + UpdatedAt: pr.UpdatedAt, + }) + } + + // Create fetcher + fetcher := &github.SimpleFetcher{ + Token: token, + DataSource: dataSource, } + // Analyze PRs using shared code path + result, err := cost.AnalyzePRs(ctx, &cost.AnalysisRequest{ + Samples: summaries, + Logger: slog.Default(), + Fetcher: fetcher, + Concurrency: 8, // Process up to 8 PRs concurrently + Config: cfg, + }) + if err != nil { + return err + } + + breakdowns := result.Breakdowns + // Count unique authors across all PRs (not just samples) totalAuthors := github.CountUniqueAuthors(prs) @@ -204,7 +136,7 @@ func analyzeOrganization(ctx context.Context, org string, sampleSize, days int, actualDays, _ := github.CalculateActualTimeWindow(prs, days) // Count bot PRs before sampling - botPRCount := countBotPRs(prs) + botPRCount := github.CountBotPRs(prs) humanPRCount := len(prs) - botPRCount // Sample PRs using time-bucket strategy (includes all PRs) @@ -225,60 +157,37 @@ func analyzeOrganization(ctx context.Context, org string, sampleSize, days int, len(samples), len(prs), org, actualDays) } - // Collect breakdowns from each sample using parallel processing - var breakdowns []cost.Breakdown - var mu sync.Mutex - var wg sync.WaitGroup - - // Use a buffered channel for worker pool pattern (same as web server) - concurrency := 8 // Process up to 8 PRs concurrently - semaphore := make(chan struct{}, concurrency) - - for i, pr := range samples { - wg.Add(1) - go func(index int, prSummary github.PRSummary) { - defer wg.Done() - - // Acquire semaphore slot - semaphore <- struct{}{} - defer func() { <-semaphore }() - - prURL := fmt.Sprintf("https://github.com/%s/%s/pull/%d", prSummary.Owner, prSummary.Repo, prSummary.Number) - slog.Info("Processing sample PR", - "repo", fmt.Sprintf("%s/%s", prSummary.Owner, prSummary.Repo), - "number", prSummary.Number, - "progress", fmt.Sprintf("%d/%d", index+1, len(samples))) - - // Fetch full PR data using configured data source - var prData cost.PRData - var err error - if dataSource == "turnserver" { - // Use turnserver with updatedAt for effective caching - prData, err = github.FetchPRDataViaTurnserver(ctx, prURL, token, prSummary.UpdatedAt) - } else { - // Use prx with updatedAt for effective caching - prData, err = github.FetchPRData(ctx, prURL, token, prSummary.UpdatedAt) - } - if err != nil { - slog.Warn("Failed to fetch PR data, skipping", "pr_number", prSummary.Number, "source", dataSource, "error", err) - return - } - - // Calculate cost and accumulate with mutex protection - breakdown := cost.Calculate(prData, cfg) - mu.Lock() - breakdowns = append(breakdowns, breakdown) - mu.Unlock() - }(i, pr) - } - - // Wait for all goroutines to complete - wg.Wait() - - if len(breakdowns) == 0 { - return errors.New("no samples could be processed successfully") + // Convert samples to PRSummaryInfo format + var summaries []cost.PRSummaryInfo + for _, pr := range samples { + summaries = append(summaries, cost.PRSummaryInfo{ + Owner: pr.Owner, + Repo: pr.Repo, + Number: pr.Number, + UpdatedAt: pr.UpdatedAt, + }) + } + + // Create fetcher + fetcher := &github.SimpleFetcher{ + Token: token, + DataSource: dataSource, } + // Analyze PRs using shared code path + result, err := cost.AnalyzePRs(ctx, &cost.AnalysisRequest{ + Samples: summaries, + Logger: slog.Default(), + Fetcher: fetcher, + Concurrency: 8, // Process up to 8 PRs concurrently + Config: cfg, + }) + if err != nil { + return err + } + + breakdowns := result.Breakdowns + // Count unique authors across all PRs (not just samples) totalAuthors := github.CountUniqueAuthors(prs) diff --git a/pkg/cost/analyze.go b/pkg/cost/analyze.go new file mode 100644 index 0000000..c109a9a --- /dev/null +++ b/pkg/cost/analyze.go @@ -0,0 +1,146 @@ +package cost + +import ( + "context" + "errors" + "fmt" + "log/slog" + "sync" + "time" +) + +var ( + errNoSamples = errors.New("no samples provided") + errNoFetcher = errors.New("fetcher is required") +) + +// PRFetcher is an interface for fetching PR data. +// This allows different implementations (with/without caching, different data sources). +type PRFetcher interface { + // FetchPRData fetches full PR data for analysis. + FetchPRData(ctx context.Context, prURL string, updatedAt time.Time) (PRData, error) +} + +// AnalysisRequest contains parameters for analyzing a set of PRs. +type AnalysisRequest struct { + Samples []PRSummaryInfo // PRs to analyze + Logger *slog.Logger // Optional logger for progress + Fetcher PRFetcher // PR data fetcher + Concurrency int // Number of concurrent fetches (0 = sequential) + Config Config // Cost calculation configuration +} + +// PRSummaryInfo contains basic PR information needed for fetching. +type PRSummaryInfo struct { + UpdatedAt time.Time + Owner string + Repo string + Number int +} + +// AnalysisResult contains the breakdowns from analyzed PRs. +type AnalysisResult struct { + Breakdowns []Breakdown + Skipped int // Number of PRs that failed to fetch +} + +// AnalyzePRs processes a set of PRs and returns their cost breakdowns. +// This is the shared code path used by both CLI and server. +func AnalyzePRs(ctx context.Context, req *AnalysisRequest) (*AnalysisResult, error) { + if len(req.Samples) == 0 { + return nil, errNoSamples + } + + if req.Fetcher == nil { + return nil, errNoFetcher + } + + // Default to sequential processing if concurrency not specified + concurrency := req.Concurrency + if concurrency <= 0 { + concurrency = 1 + } + + var breakdowns []Breakdown + var mu sync.Mutex + var skipped int + + // Sequential processing + if concurrency == 1 { + for i, pr := range req.Samples { + prURL := fmt.Sprintf("https://github.com/%s/%s/pull/%d", pr.Owner, pr.Repo, pr.Number) + + if req.Logger != nil { + req.Logger.InfoContext(ctx, "Processing sample PR", + "repo", fmt.Sprintf("%s/%s", pr.Owner, pr.Repo), + "number", pr.Number, + "progress", fmt.Sprintf("%d/%d", i+1, len(req.Samples))) + } + + prData, err := req.Fetcher.FetchPRData(ctx, prURL, pr.UpdatedAt) + if err != nil { + if req.Logger != nil { + req.Logger.WarnContext(ctx, "Failed to fetch PR data, skipping", + "pr_number", pr.Number, "error", err) + } + skipped++ + continue + } + + breakdown := Calculate(prData, req.Config) + breakdowns = append(breakdowns, breakdown) + } + } else { + // Parallel processing with semaphore + var wg sync.WaitGroup + semaphore := make(chan struct{}, concurrency) + + for i, pr := range req.Samples { + wg.Add(1) + go func(index int, prInfo PRSummaryInfo) { + defer wg.Done() + + // Acquire semaphore slot + semaphore <- struct{}{} + defer func() { <-semaphore }() + + prURL := fmt.Sprintf("https://github.com/%s/%s/pull/%d", prInfo.Owner, prInfo.Repo, prInfo.Number) + + if req.Logger != nil { + req.Logger.InfoContext(ctx, "Processing sample PR", + "repo", fmt.Sprintf("%s/%s", prInfo.Owner, prInfo.Repo), + "number", prInfo.Number, + "progress", fmt.Sprintf("%d/%d", index+1, len(req.Samples))) + } + + prData, err := req.Fetcher.FetchPRData(ctx, prURL, prInfo.UpdatedAt) + if err != nil { + if req.Logger != nil { + req.Logger.WarnContext(ctx, "Failed to fetch PR data, skipping", + "pr_number", prInfo.Number, "error", err) + } + mu.Lock() + skipped++ + mu.Unlock() + return + } + + breakdown := Calculate(prData, req.Config) + mu.Lock() + breakdowns = append(breakdowns, breakdown) + mu.Unlock() + }(i, pr) + } + + wg.Wait() + } + + if len(breakdowns) == 0 { + return nil, fmt.Errorf("no samples could be processed successfully (%d skipped)", skipped) + } + + return &AnalysisResult{ + Breakdowns: breakdowns, + Skipped: skipped, + }, nil +} diff --git a/pkg/github/fetcher.go b/pkg/github/fetcher.go new file mode 100644 index 0000000..be03198 --- /dev/null +++ b/pkg/github/fetcher.go @@ -0,0 +1,23 @@ +package github + +import ( + "context" + "time" + + "github.com/codeGROOVE-dev/prcost/pkg/cost" +) + +// SimpleFetcher is a PRFetcher that fetches PR data without caching. +// It uses either prx or turnserver based on configuration. +type SimpleFetcher struct { + Token string + DataSource string // "prx" or "turnserver" +} + +// FetchPRData implements the PRFetcher interface from pkg/cost. +func (f *SimpleFetcher) FetchPRData(ctx context.Context, prURL string, updatedAt time.Time) (cost.PRData, error) { + if f.DataSource == "turnserver" { + return FetchPRDataViaTurnserver(ctx, prURL, f.Token, updatedAt) + } + return FetchPRData(ctx, prURL, f.Token, updatedAt) +} diff --git a/pkg/github/query.go b/pkg/github/query.go index 24eb366..5ce6b9a 100644 --- a/pkg/github/query.go +++ b/pkg/github/query.go @@ -555,8 +555,8 @@ func deduplicatePRsByOwnerRepoNumber(prs []PRSummary) []PRSummary { return unique } -// isBot returns true if the author name indicates a bot account. -func isBot(author string) bool { +// IsBot returns true if the author name indicates a bot account. +func IsBot(author string) bool { // Check for common bot name patterns if strings.HasSuffix(author, "[bot]") || strings.Contains(author, "-bot-") { return true @@ -585,6 +585,17 @@ func isBot(author string) bool { return false } +// CountBotPRs counts how many PRs in the list are authored by bots. +func CountBotPRs(prs []PRSummary) int { + count := 0 + for _, pr := range prs { + if IsBot(pr.Author) { + count++ + } + } + return count +} + // SamplePRs uses a time-bucket strategy to evenly sample PRs across the time range. // This ensures samples are distributed throughout the period rather than clustered. // Bot-authored PRs are excluded from sampling. @@ -692,7 +703,7 @@ func SamplePRs(prs []PRSummary, sampleSize int) []PRSummary { func CountUniqueAuthors(prs []PRSummary) int { uniqueAuthors := make(map[string]bool) for _, pr := range prs { - if !isBot(pr.Author) { + if !IsBot(pr.Author) { uniqueAuthors[pr.Author] = true } }