Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 60 additions & 151 deletions cmd/prcost/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,60 +2,15 @@ package main

import (
"context"
"errors"
"fmt"
"log/slog"
"strings"
"sync"
"time"

"github.com/codeGROOVE-dev/prcost/pkg/cost"
"github.com/codeGROOVE-dev/prcost/pkg/github"
)

// countBotPRs counts how many PRs in the list are authored by bots.
// Uses the same bot detection logic as pkg/github/query.go:isBot().
func countBotPRs(prs []github.PRSummary) int {
count := 0
for _, pr := range prs {
if isBotAuthor(pr.Author) {
count++
}
}
return count
}

// isBotAuthor returns true if the author name indicates a bot account.
// This matches the logic in pkg/github/query.go:isBot().
func isBotAuthor(author string) bool {
// Check for common bot name patterns
if strings.HasSuffix(author, "[bot]") || strings.Contains(author, "-bot-") {
return true
}

// Check for specific known bot usernames (case-insensitive)
lowerAuthor := strings.ToLower(author)
knownBots := []string{
"renovate",
"dependabot",
"github-actions",
"codecov",
"snyk",
"greenkeeper",
"imgbot",
"renovate-bot",
"dependabot-preview",
}

for _, botName := range knownBots {
if lowerAuthor == botName {
return true
}
}

return false
}

// analyzeRepository performs repository-wide cost analysis by sampling PRs.
// Uses library functions from pkg/github and pkg/cost for fetching, sampling,
// and extrapolation - all functionality is available to external clients.
Expand All @@ -82,7 +37,7 @@ func analyzeRepository(ctx context.Context, owner, repo string, sampleSize, days
actualDays, _ := github.CalculateActualTimeWindow(prs, days)

// Count bot PRs before sampling
botPRCount := countBotPRs(prs)
botPRCount := github.CountBotPRs(prs)
humanPRCount := len(prs) - botPRCount

// Sample PRs using time-bucket strategy (includes all PRs)
Expand All @@ -103,60 +58,37 @@ func analyzeRepository(ctx context.Context, owner, repo string, sampleSize, days
len(samples), len(prs), actualDays)
}

// Collect breakdowns from each sample using parallel processing
var breakdowns []cost.Breakdown
var mu sync.Mutex
var wg sync.WaitGroup

// Use a buffered channel for worker pool pattern (same as web server)
concurrency := 8 // Process up to 8 PRs concurrently
semaphore := make(chan struct{}, concurrency)

for i, pr := range samples {
wg.Add(1)
go func(index int, prSummary github.PRSummary) {
defer wg.Done()

// Acquire semaphore slot
semaphore <- struct{}{}
defer func() { <-semaphore }()

prURL := fmt.Sprintf("https://github.com/%s/%s/pull/%d", owner, repo, prSummary.Number)
slog.Info("Processing sample PR",
"repo", fmt.Sprintf("%s/%s", owner, repo),
"number", prSummary.Number,
"progress", fmt.Sprintf("%d/%d", index+1, len(samples)))

// Fetch full PR data using configured data source
var prData cost.PRData
var err error
if dataSource == "turnserver" {
// Use turnserver with updatedAt for effective caching
prData, err = github.FetchPRDataViaTurnserver(ctx, prURL, token, prSummary.UpdatedAt)
} else {
// Use prx with updatedAt for effective caching
prData, err = github.FetchPRData(ctx, prURL, token, prSummary.UpdatedAt)
}
if err != nil {
slog.Warn("Failed to fetch PR data, skipping", "pr_number", prSummary.Number, "source", dataSource, "error", err)
return
}

// Calculate cost and accumulate with mutex protection
breakdown := cost.Calculate(prData, cfg)
mu.Lock()
breakdowns = append(breakdowns, breakdown)
mu.Unlock()
}(i, pr)
}

// Wait for all goroutines to complete
wg.Wait()

if len(breakdowns) == 0 {
return errors.New("no samples could be processed successfully")
// Convert samples to PRSummaryInfo format
var summaries []cost.PRSummaryInfo
for _, pr := range samples {
summaries = append(summaries, cost.PRSummaryInfo{
Owner: pr.Owner,
Repo: pr.Repo,
Number: pr.Number,
UpdatedAt: pr.UpdatedAt,
})
}

// Create fetcher
fetcher := &github.SimpleFetcher{
Token: token,
DataSource: dataSource,
}

// Analyze PRs using shared code path
result, err := cost.AnalyzePRs(ctx, &cost.AnalysisRequest{
Samples: summaries,
Logger: slog.Default(),
Fetcher: fetcher,
Concurrency: 8, // Process up to 8 PRs concurrently
Config: cfg,
})
if err != nil {
return err
}

breakdowns := result.Breakdowns

// Count unique authors across all PRs (not just samples)
totalAuthors := github.CountUniqueAuthors(prs)

Expand Down Expand Up @@ -204,7 +136,7 @@ func analyzeOrganization(ctx context.Context, org string, sampleSize, days int,
actualDays, _ := github.CalculateActualTimeWindow(prs, days)

// Count bot PRs before sampling
botPRCount := countBotPRs(prs)
botPRCount := github.CountBotPRs(prs)
humanPRCount := len(prs) - botPRCount

// Sample PRs using time-bucket strategy (includes all PRs)
Expand All @@ -225,60 +157,37 @@ func analyzeOrganization(ctx context.Context, org string, sampleSize, days int,
len(samples), len(prs), org, actualDays)
}

// Collect breakdowns from each sample using parallel processing
var breakdowns []cost.Breakdown
var mu sync.Mutex
var wg sync.WaitGroup

// Use a buffered channel for worker pool pattern (same as web server)
concurrency := 8 // Process up to 8 PRs concurrently
semaphore := make(chan struct{}, concurrency)

for i, pr := range samples {
wg.Add(1)
go func(index int, prSummary github.PRSummary) {
defer wg.Done()

// Acquire semaphore slot
semaphore <- struct{}{}
defer func() { <-semaphore }()

prURL := fmt.Sprintf("https://github.com/%s/%s/pull/%d", prSummary.Owner, prSummary.Repo, prSummary.Number)
slog.Info("Processing sample PR",
"repo", fmt.Sprintf("%s/%s", prSummary.Owner, prSummary.Repo),
"number", prSummary.Number,
"progress", fmt.Sprintf("%d/%d", index+1, len(samples)))

// Fetch full PR data using configured data source
var prData cost.PRData
var err error
if dataSource == "turnserver" {
// Use turnserver with updatedAt for effective caching
prData, err = github.FetchPRDataViaTurnserver(ctx, prURL, token, prSummary.UpdatedAt)
} else {
// Use prx with updatedAt for effective caching
prData, err = github.FetchPRData(ctx, prURL, token, prSummary.UpdatedAt)
}
if err != nil {
slog.Warn("Failed to fetch PR data, skipping", "pr_number", prSummary.Number, "source", dataSource, "error", err)
return
}

// Calculate cost and accumulate with mutex protection
breakdown := cost.Calculate(prData, cfg)
mu.Lock()
breakdowns = append(breakdowns, breakdown)
mu.Unlock()
}(i, pr)
}

// Wait for all goroutines to complete
wg.Wait()

if len(breakdowns) == 0 {
return errors.New("no samples could be processed successfully")
// Convert samples to PRSummaryInfo format
var summaries []cost.PRSummaryInfo
for _, pr := range samples {
summaries = append(summaries, cost.PRSummaryInfo{
Owner: pr.Owner,
Repo: pr.Repo,
Number: pr.Number,
UpdatedAt: pr.UpdatedAt,
})
}

// Create fetcher
fetcher := &github.SimpleFetcher{
Token: token,
DataSource: dataSource,
}

// Analyze PRs using shared code path
result, err := cost.AnalyzePRs(ctx, &cost.AnalysisRequest{
Samples: summaries,
Logger: slog.Default(),
Fetcher: fetcher,
Concurrency: 8, // Process up to 8 PRs concurrently
Config: cfg,
})
if err != nil {
return err
}

breakdowns := result.Breakdowns

// Count unique authors across all PRs (not just samples)
totalAuthors := github.CountUniqueAuthors(prs)

Expand Down
Loading