Skip to content

feat(cmd): parallel default run with streaming scan#6

Merged
bschellenberger2600 merged 3 commits into
mainfrom
feat/default-stream-parallel-scan
Apr 17, 2026
Merged

feat(cmd): parallel default run with streaming scan#6
bschellenberger2600 merged 3 commits into
mainfrom
feat/default-stream-parallel-scan

Conversation

@bschellenberger2600
Copy link
Copy Markdown
Member

@bschellenberger2600 bschellenberger2600 commented Apr 17, 2026

Summary

Implements the parallel streaming default path for git-rain (no args): filesystem scan runs in the background while discovered repos are upserted into the registry and processed by a worker pool (global.fetch_workers, clamped ≥1), similar to git-fire.

Behavior

  • Streaming pipeline: ScanRepositoriesStream → upsert/filter → repoChan → workers calling runRainOnRepo.
  • Live feedback: 🔍 Scanning… every 2s with truncated path (skipped with --no-scan).
  • Output: Each repo block is built then printed under a mutex; headers use [N/M] reponame (M is ? while scan is in flight).
  • Post-run: If scan still running after workers finish, TTY users get Enter/wait or Ctrl+C; non-interactive (GIT_RAIN_NON_INTERACTIVE, CI) cancels the scan.
  • Unchanged: --dry-run (full scan first), --rain TUI, --no-scan semantics.

Tests

  • Streaming integration tests (TestRunRain_DefaultStream_*).
  • Table-driven coverage for helpers touched by the refactor (fetchFailureMessage, weatherSymbol, outcomeLabel, buildKnownPaths, upsertRepoIntoRegistry, scan-progress truncation).

Verification

make test-race
go test -count=1 -cover ./cmd/...

Note

Medium Risk
Introduces a new concurrent scan+worker execution path for the default CLI run, which can affect ordering, output, and cancellation behavior across many repos. Concurrency/mutex/atomic usage and streaming scan error handling raise moderate risk of deadlocks or missed repos if edge cases exist.

Overview
Default git-rain execution is refactored to stream and parallelize work. The CLI now pipelines ScanRepositoriesStream → registry upsert/filter → a worker pool (sized by global.fetch_workers, clamped to >=1) so fetch/sync begins as soon as repos are discovered, rather than waiting for the full scan to finish.

Output handling is changed to build per-repo contiguous blocks (header + branch lines) and print them under a mutex, and adds periodic 🔍 Scanning… progress lines with width-aware path truncation (suppressed under --no-scan). The per-repo runner is extracted (runRainOnRepo) and branch printing is switched to io.Writer-based writeRainBranchResults.

Tests are expanded substantially to cover the new streaming default path (multi-repo parity, --no-scan registry hydration, empty scans, zero workers) and to add table-driven coverage for the new helpers and refactored logic.

Reviewed by Cursor Bugbot for commit cfec80b. Bugbot is set up for automated code reviews on this repo. Configure here.

Summary by CodeRabbit

  • New Features

    • Processes repositories as they are discovered (parallel, streaming processing) with per-repo atomic output blocks.
    • Live scan progress shows the currently scanned directory and handles empty-scan gracefully ("No git repositories found.").
  • Behavior Improvements

    • Better handling for interactive vs non-interactive/CI environments and improved cancellation/reporting of scan errors.
  • Tests

    • Added unit tests covering streaming behavior, progress formatting, fetch-worker logic, outcome mapping, registry upsert and interactivity checks.

- Replace blocking scan+process with runRainDefaultStream (git-fire-style):
  cancellable scan, upsert/filter pipeline, FetchWorkers pool, atomic totals
- Per-repo output via runRainOnRepo with [N/M] headers and mutex-serialized blocks
- Scan progress ticker (2s), post-run TTY scan-wait or non-interactive cancel
- Tests: streaming integration, helper coverage, fetchFailureMessage/weather/outcome/buildKnownPaths/upsert
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 17, 2026

📝 Walkthrough

Walkthrough

Replaces the previous sequential scan-and-process path with a streaming pipeline: repositories are discovered via a streaming scanner, upserted into the registry, and processed concurrently by a worker pool with serialized per-repo output and periodic scan-progress display.

Changes

Cohort / File(s) Summary
Core streaming implementation
cmd/root.go
Added runRainDefaultStream streaming pipeline using git.ScanRepositoriesStream, an upsert goroutine, worker pool processing via runRainOnRepo(...), refactored printRainBranchResultswriteRainBranchResults(out io.Writer, ...), per-repo atomic output serialization (printMu + strings.Builder), atomic outcome counters, scan-progress ticker and path formatting helpers, and helper funcs (fetchWorkerCount, stdinInteractiveOK).
Tests & helpers
cmd/root_test.go
Added test helpers (makeFetchedRepo, cloneIntoScanRoot) and many unit tests: stream parity across repos, --no-scan behavior, empty scan root message, fetch worker count logic, zero-worker behavior, normalization of fetchFailureMessage, mappings for weatherSymbol and outcomeLabel, progress-path formatting tests, registry upsert/rescan behavior, and interactivity env-var checks.

Sequence Diagram

sequenceDiagram
    participant User
    participant Main as Main Flow
    participant Scanner as Git Scanner
    participant Registry as Registry
    participant Pool as Worker Pool
    participant Repo as Per-Repo Processor
    participant Output as Stdout

    User->>Main: start run (default)
    Main->>Scanner: start ScanRepositoriesStream
    Main->>Main: start scan-progress ticker

    par scanning and enqueueing
        Scanner-->>Main: discovered repo (stream)
        Main->>Registry: upsertRepoIntoRegistry(repo)
        Main->>Pool: send repo to repoChan
    and processing repos concurrently
        Pool->>Repo: runRainOnRepo(repo)
        Repo->>Repo: fetch & analyze branches
        Repo->>Output: write per-repo block (mutex-serialized)
        Repo-->>Pool: return rainTotals delta (atomic accumulate)
    end

    Scanner-->>Main: scan complete
    Pool-->>Main: all workers finished
    Main->>Output: print final summary (from atomic totals)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Poem

🐰 I hopped through folders, sniffing streams so wide,

Pipelines fed the workers, all working side by side.
I wrote each repo’s story, then printed it in one light,
Tickers told the journey while the workers took flight,
Hooray — concurrent carrots, output neat and bright!

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat(cmd): parallel default run with streaming scan' clearly and concisely summarizes the main change: implementing parallel execution with streaming scan in the default command path.
Docstring Coverage ✅ Passed Docstring coverage is 84.62% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/default-stream-parallel-scan

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
cmd/root.go (1)

680-681: Nit: repoChan buffer sized off opts.Workers (scan workers), not the fetch pool size.

Both scanChan and repoChan use opts.Workers (scan worker count). For the fetch pipeline repoChan buffer would more naturally scale with fetchWorkerCount(cfg.Global.FetchWorkers) to avoid the upsert producer stalling when the fetch pool is larger than the scan pool. Not a correctness issue; purely a throughput tweak.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/root.go` around lines 680 - 681, The repoChan buffer is currently sized
with opts.Workers (scan worker count) which can cause the fetch/upsert pipeline
to stall when the fetch pool is larger; change the repoChan initialization to
use fetchWorkerCount(cfg.Global.FetchWorkers) (or the same fetch pool sizing
helper used elsewhere) instead of opts.Workers so repoChan capacity matches the
fetch worker pool (leave scanChan as-is using opts.Workers). Ensure you update
the repoChan make(...) call that references repoChan, scanChan, opts.Workers,
fetchWorkerCount, and cfg.Global.FetchWorkers.
cmd/root_test.go (1)

723-740: TestStdinInteractiveOK could use a couple more cases.

Both sub-tests exercise only the "returns false" direction. Consider adding:

  • GITHUB_ACTIONS=1 to cover that arm of the env check.
  • A positive case when a TTY is piped, if practical in this suite (otherwise note the limitation).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/root_test.go` around lines 723 - 740, TestStdinInteractiveOK only asserts
the "false" branches; add a subtest that sets GITHUB_ACTIONS=1 and expects
stdinInteractiveOK() to return false, and add a positive subtest that asserts
stdinInteractiveOK() returns true when stdin is a TTY—implement the TTY case by
either creating a subtest that injects or mocks the terminal-check used by
stdinInteractiveOK (e.g., replace or wrap the isTerminal/isTTY call used inside
stdinInteractiveOK) so you can simulate a TTY, or if that injection isn’t
practical in the test harness, add a skipped test that documents the limitation;
update tests referencing stdinInteractiveOK to include these cases.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@cmd/root.go`:
- Around line 696-725: The ticker goroutine writes the "🔍 Scanning…" line to
stdout without acquiring the global printMu, so its output can interleave with
per-repo blocks; move the declaration of printMu so it exists before the
folderProgress/ticker goroutines are started (so both goroutines can access it)
and wrap the fmt.Printf call in the ticker goroutine with
printMu.Lock()/Unlock(); keep the lastFolder, scanDone, scanPrefix,
truncateScanProgressPath and scanProgressPathMaxLen usage the same but ensure
the ticker acquires printMu before printing to preserve the atomic per-repo
block invariant.

---

Nitpick comments:
In `@cmd/root_test.go`:
- Around line 723-740: TestStdinInteractiveOK only asserts the "false" branches;
add a subtest that sets GITHUB_ACTIONS=1 and expects stdinInteractiveOK() to
return false, and add a positive subtest that asserts stdinInteractiveOK()
returns true when stdin is a TTY—implement the TTY case by either creating a
subtest that injects or mocks the terminal-check used by stdinInteractiveOK
(e.g., replace or wrap the isTerminal/isTTY call used inside stdinInteractiveOK)
so you can simulate a TTY, or if that injection isn’t practical in the test
harness, add a skipped test that documents the limitation; update tests
referencing stdinInteractiveOK to include these cases.

In `@cmd/root.go`:
- Around line 680-681: The repoChan buffer is currently sized with opts.Workers
(scan worker count) which can cause the fetch/upsert pipeline to stall when the
fetch pool is larger; change the repoChan initialization to use
fetchWorkerCount(cfg.Global.FetchWorkers) (or the same fetch pool sizing helper
used elsewhere) instead of opts.Workers so repoChan capacity matches the fetch
worker pool (leave scanChan as-is using opts.Workers). Ensure you update the
repoChan make(...) call that references repoChan, scanChan, opts.Workers,
fetchWorkerCount, and cfg.Global.FetchWorkers.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 66889b4b-8dbe-4adf-a1ff-38af24165807

📥 Commits

Reviewing files that changed from the base of the PR and between 3bec360 and b541365.

📒 Files selected for processing (2)
  • cmd/root.go
  • cmd/root_test.go

Comment thread cmd/root.go
Comment thread cmd/root.go Outdated
Comment thread cmd/root.go
Drop unreachable post-worker scan prompt (scan always finishes before
repoChan closes). Serialize folder-progress lines with printMu like
worker output. Await scanDone before reading scanErr to avoid a race.

Co-authored-by: Ben Schellenberger <bschellenberger2600@users.noreply.github.com>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
cmd/root.go (1)

767-773: Nit: the total < current fallback is unreachable.

totalFound is incremented before the send on repoChan (line 743) and a channel receive synchronizes-with that send, so every worker observes totalFound >= current after its own AddInt64(&seq, 1). The comment above even says total can only equal or lead current. The guard is harmless but misleading — consider dropping it or rewording the comment to call it a defensive no-op, to prevent future readers from assuming a real race exists here.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/root.go` around lines 767 - 773, The fallback branch that sets total = 0
when total < current is unreachable and misleading; update the block around
atomic.AddInt64(&seq, 1) and atomic.LoadInt64(&totalFound) (used with seq,
totalFound and repoChan synchronization) by either removing the if total <
current guard entirely or changing the comment to explicitly mark it as a
defensive no-op (e.g., "defensive check retained for readability; cannot occur
due to synchronization via repoChan"), so readers won't assume a real race
exists.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@cmd/root.go`:
- Around line 767-773: The fallback branch that sets total = 0 when total <
current is unreachable and misleading; update the block around
atomic.AddInt64(&seq, 1) and atomic.LoadInt64(&totalFound) (used with seq,
totalFound and repoChan synchronization) by either removing the if total <
current guard entirely or changing the comment to explicitly mark it as a
defensive no-op (e.g., "defensive check retained for readability; cannot occur
due to synchronization via repoChan"), so readers won't assume a real race
exists.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 778cb6f6-372d-4beb-b2c1-e552d736dbd6

📥 Commits

Reviewing files that changed from the base of the PR and between b541365 and cfec80b.

📒 Files selected for processing (1)
  • cmd/root.go

@bschellenberger2600 bschellenberger2600 merged commit 5ea771c into main Apr 17, 2026
9 checks passed
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Fix All in Cursor

Bugbot Autofix prepared fixes for both issues found in the latest run.

  • ✅ Fixed: Unused production function only called from tests
    • Removed stdinInteractiveOK, its test, and the unused go-isatty direct dependency from go.mod.
  • ✅ Fixed: Summary output may interleave with ticker goroutine
    • Held printMu for all stdout summary lines after workers and scan complete so they cannot race the ticker’s locked writes.

You can send follow-ups to the cloud agent here.

Reviewed by Cursor Bugbot for commit cfec80b. Configure here.

Comment thread cmd/root.go
}
fd := os.Stdin.Fd()
return isatty.IsTerminal(fd) || isatty.IsCygwinTerminal(fd)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused production function only called from tests

Low Severity

stdinInteractiveOK is defined in production code but never called from any production code path. A grep across the entire repository confirms it is only referenced in cmd/root_test.go. The PR description mentions post-run interactive handling for TTY users, but the implementation in runRainDefaultStream doesn't call this function anywhere. This is dead production code that adds unused dependencies (go-isatty).

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit cfec80b. Configure here.

Comment thread cmd/root.go
return nil
}
fmt.Printf("🌧 rain delivered — %d updated, %d skipped\n", totals.updated, totals.skipped)
return nil
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary output may interleave with ticker goroutine

Low Severity

After workersWG.Wait() and <-scanDone, the summary lines are printed directly to stdout via fmt.Println without acquiring printMu. The ticker goroutine could still be mid-print under printMu (between its Lock and Unlock) on a final tick that fired just before scanDone was closed. This creates a narrow window where scan-progress and summary output could interleave.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit cfec80b. Configure here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants