diff --git a/.githooks/pre-commit b/.githooks/pre-commit new file mode 100644 index 00000000..e3aefc1b --- /dev/null +++ b/.githooks/pre-commit @@ -0,0 +1,29 @@ +#!/usr/bin/env bash +# Markdown lint + autofix staged Markdown files +set -euo pipefail + +mapfile -d '' -t _staged < <(git diff --cached --name-only --diff-filter=ACM -z) +md_files=() +for f in "${_staged[@]}"; do + [[ "$f" =~ \.(md|MD|mdx|MDX)$ ]] && md_files+=("$f") +done +if [[ ${#md_files[@]} -eq 0 ]]; then + exit 0 +fi + +echo "[pre-commit] markdownlint-cli2 --fix on staged Markdown files" +if command -v npx >/dev/null 2>&1; then + # Run with --fix so minor issues are auto-corrected + npx -y markdownlint-cli2 --fix "${md_files[@]}" || true + # Re-stage any modified files + git add -- "${md_files[@]}" || true + # Verify no errors remain; block commit if they do + if ! npx -y markdownlint-cli2 "${md_files[@]}"; then + echo "Markdownlint errors remain after autofix. Aborting commit." >&2 + exit 1 + fi +else + echo "npx not found. Skipping markdownlint autofix. Install Node.js to enable autofix." >&2 +fi + +exit 0 diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 00000000..5d00624d --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1,18 @@ +# CODEOWNERS maps file patterns to required reviewers. +# Patterns the people will be automatically requested for review. +# See: https://docs.github.com/repositories/managing-your-repositorys-settings-and-features/customizing-your-repository/about-code-owners + +# Default owner for entire repository +* @flyingrobots + +# CI/CD and workflows +.github/** @flyingrobots + +# Helm chart and deployment assets +deploy/** @flyingrobots + +# Go source +cmd/** @flyingrobots +internal/** @flyingrobots +test/** @flyingrobots + diff --git a/.github/workflows/changelog.yml b/.github/workflows/changelog.yml new file mode 100644 index 00000000..aaffef3f --- /dev/null +++ b/.github/workflows/changelog.yml @@ -0,0 +1,40 @@ +name: Update Changelog + +on: + workflow_dispatch: + push: + tags: + - 'v*' + +permissions: + contents: write + +jobs: + changelog: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + fetch-tags: true + ref: ${{ github.event.repository.default_branch }} + - name: Setup Go + uses: actions/setup-go@v5 + with: + go-version: '1.25.x' + - name: Install git-chglog + run: go install github.com/git-chglog/git-chglog/cmd/git-chglog@v0.15.4 + - name: Generate CHANGELOG.md + run: | + if ! $(go env GOPATH)/bin/git-chglog -o CHANGELOG.md; then + echo "git-chglog not configured; skipping update"; exit 0 + fi + - name: Commit changes + run: | + git config user.name "github-actions" + git config user.email "github-actions@github.com" + git add CHANGELOG.md + git diff --cached --quiet && echo "no changes" || git commit -m "chore(changelog): update CHANGELOG for ${GITHUB_REF_NAME}" + - name: Push changes + run: | + git push origin HEAD:${{ github.event.repository.default_branch }} || echo "no push" diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 762649de..283fc76a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -24,7 +24,18 @@ jobs: - name: Setup Go uses: actions/setup-go@v5 with: - go-version: '1.24.x' + - name: Setup Go + uses: actions/setup-go@v5 + with: + go-version: '1.25.x' + cache: true + - name: Tidy + run: go mod download + - name: Govulncheck + uses: golang/govulncheck-action@v1 + with: + go-version-input: '1.25.x' + vulncheck-version: 'v1.1.3' - name: Tidy run: go mod tidy - name: Vet @@ -35,6 +46,15 @@ jobs: env: E2E_REDIS_ADDR: localhost:6379 run: go test ./... -race -count=1 + - name: E2E determinism (5x) + env: + E2E_REDIS_ADDR: 127.0.0.1:6379 + run: | + set -euo pipefail + for i in {1..5}; do + echo "Run #$i" + go test ./test/e2e -run TestE2E_WorkerCompletesJobWithRealRedis -race -count=1 + done - name: Govulncheck run: | go install golang.org/x/vuln/cmd/govulncheck@latest diff --git a/.github/workflows/goreleaser.yml b/.github/workflows/goreleaser.yml new file mode 100644 index 00000000..bf953c9e --- /dev/null +++ b/.github/workflows/goreleaser.yml @@ -0,0 +1,41 @@ +name: GoReleaser + +on: + push: + tags: + - 'v*' + workflow_dispatch: {} + +permissions: + contents: write + packages: write + +jobs: + release: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + fetch-tags: true + - uses: actions/setup-go@v5 + with: + go-version: '1.25.x' + - name: Login to GHCR + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + - name: Set up Buildx + uses: docker/setup-buildx-action@v3 + - name: Run GoReleaser + uses: goreleaser/goreleaser-action@v6 + with: + distribution: goreleaser + version: latest + args: release --clean + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/markdownlint.yml b/.github/workflows/markdownlint.yml new file mode 100644 index 00000000..0767992a --- /dev/null +++ b/.github/workflows/markdownlint.yml @@ -0,0 +1,21 @@ +name: Markdown Lint + +on: + pull_request: + push: + branches: [ main ] + +jobs: + lint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + - name: Run markdownlint-cli2 + uses: DavidAnson/markdownlint-cli2-action@v17 + with: + config: .markdownlint.yaml + globs: | + **/*.md + !**/node_modules/** diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 5d4076f4..e1f16363 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -17,7 +17,7 @@ jobs: - name: Setup Go uses: actions/setup-go@v5 with: - go-version: '1.24.x' + go-version: '1.25.x' - name: Build binary run: | make tidy @@ -39,4 +39,3 @@ jobs: uses: softprops/action-gh-release@v2 with: generate_release_notes: true - diff --git a/.gitignore b/.gitignore index 79ceca3e..590cb9d8 100644 --- a/.gitignore +++ b/.gitignore @@ -24,10 +24,17 @@ go.work # IDE/Editor .idea/ -.vscode/ +# VS Code: ignore all by default, allow key shared files +.vscode/* +!.vscode/extensions.json +!.vscode/settings.json +!.vscode/launch.json *.swp *.swo +# Obsidian +.obsidian/ + # Logs *.log logs/ diff --git a/.goreleaser.yaml b/.goreleaser.yaml new file mode 100644 index 00000000..ac9f60d9 --- /dev/null +++ b/.goreleaser.yaml @@ -0,0 +1,47 @@ +version: 2 + +project_name: job-queue-system + +builds: + - id: job-queue-system + main: ./cmd/job-queue-system + env: + - CGO_ENABLED=0 + goos: [linux, darwin, windows] + goarch: [amd64, arm64] + ldflags: + - -s -w -X main.version={{.Version}} + +archives: + - id: binaries + builds: [job-queue-system] + format: tar.gz + name_template: "{{ .ProjectName }}_{{ .Version }}_{{ .Os }}_{{ .Arch }}" + format_overrides: + - goos: windows + format: zip +checksum: + name_template: "checksums_{{ .Version }}.txt" + +changelog: + sort: desc + use: github + +dockers: + - image_templates: + - ghcr.io/{{ .RepoOwner }}/{{ .RepoName }}:{{ .Version }} + - ghcr.io/{{ .RepoOwner }}/{{ .RepoName }}:latest + dockerfile: Dockerfile + build_flag_templates: + - "--pull" + +release: + github: + owner: '{{.Env.GITHUB_REPOSITORY_OWNER}}' + name: '{{.Env.GITHUB_REPOSITORY_NAME}}' + draft: false + prerelease: auto + +snapshot: + name_template: SNAPSHOT-{{ .ShortCommit }} + diff --git a/.markdownlint.yaml b/.markdownlint.yaml new file mode 100644 index 00000000..e9c1be79 --- /dev/null +++ b/.markdownlint.yaml @@ -0,0 +1,28 @@ +default: true + +# Allow long lines in docs where URLs and code blocks occur +MD013: false + +# Permit duplicate headings at different levels/siblings only +MD024: + siblings_only: true + +# Heading punctuation rules (common sentence punctuation) +MD026: + punctuation: ".,;:!" + +# Allow first line not to be a top-level heading (some docs start with metadata) +MD041: false + +# Code block style +MD046: + style: fenced + +# Inline code spans: allow backticks in text without strict checks +MD038: false + +# Ordered list item prefix flexible (allow any numbering) +MD029: false + +# Allow inline HTML (used in some docs) +MD033: false diff --git a/.vscode/extensions.json b/.vscode/extensions.json new file mode 100644 index 00000000..f8b756db --- /dev/null +++ b/.vscode/extensions.json @@ -0,0 +1,6 @@ +{ + "recommendations": [ + "golang.go" + ], + "unwantedRecommendations": [] +} diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..d835b8de --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,18 @@ +{ + "editor.formatOnSave": false, + "[go]": { + "editor.formatOnSave": true, +"[go]": { + "editor.formatOnSave": true, + "editor.codeActionsOnSave": { + "source.organizeImports": "explicit" + } +}, + }, + "gopls": { + "ui.semanticTokens": true, + "build.experimentalWorkspaceModule": true + }, + "go.testFlags": ["-race", "-count=1"], + "go.toolsManagement.autoUpdate": true +} diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d8c1da3..ff3e7c5a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ All notable changes to this project will be documented in this file. ## [Unreleased] + - Admin CLI: stats, peek, purge-dlq - Health/readiness endpoints - Queue length gauges updater @@ -12,4 +13,3 @@ All notable changes to this project will be documented in this file. - Worker active gauge - E2E tests with Redis service in CI - Govulncheck in CI - diff --git a/Makefile b/Makefile index 47340875..9c03f1ee 100644 --- a/Makefile +++ b/Makefile @@ -23,3 +23,17 @@ tidy: version: @echo $(VERSION) + +.PHONY: hooks +hooks: + @git config core.hooksPath .githooks + @chmod +x .githooks/pre-commit + @echo "Git hooks enabled (pre-commit markdownlint autofix)." + +.PHONY: mdlint +mdlint: + @if ! command -v npx >/dev/null 2>&1; then \ + echo "npx not found. Please install Node.js to run markdownlint."; \ + exit 1; \ + fi + @npx -y markdownlint-cli2 "**/*.md" "!**/node_modules/**" diff --git a/README.md b/README.md index 57b86354..53ac89b9 100644 --- a/README.md +++ b/README.md @@ -17,25 +17,66 @@ See docs/ for the Product Requirements Document (PRD) and detailed design. A sam ### Build and run -- Copy config/config.example.yaml to config/config.yaml and adjust as needed -- Build (Go 1.24+): make build -- Run all-in-one: ./bin/job-queue-system --role=all --config=config/config.yaml -- Or run producer only: ./bin/job-queue-system --role=producer --config=config/config.yaml -- Or run worker only: ./bin/job-queue-system --role=worker --config=config/config.yaml -- Admin commands: - - Stats: ./bin/job-queue-system --role=admin --admin-cmd=stats --config=config/config.yaml - - Peek: ./bin/job-queue-system --role=admin --admin-cmd=peek --queue=low --n=10 --config=config/config.yaml - - Purge DLQ: ./bin/job-queue-system --role=admin --admin-cmd=purge-dlq --yes --config=config/config.yaml - - Version: ./bin/job-queue-system --version +- Copy example config + +```bash +cp config/config.example.yaml config/config.yaml +``` + +- Build (Go 1.25+) + +```bash +make build +``` + +- Run all-in-one + +```bash +./bin/job-queue-system --role=all --config=config/config.yaml +``` + +- Run producer only + +```bash +./bin/job-queue-system --role=producer --config=config/config.yaml +``` + +- Run worker only + +```bash +./bin/job-queue-system --role=worker --config=config/config.yaml +``` + +- Admin commands + +```bash +# Stats +./bin/job-queue-system --role=admin --admin-cmd=stats --config=config/config.yaml + +# Peek +./bin/job-queue-system --role=admin --admin-cmd=peek --queue=low --n=10 --config=config/config.yaml + +# Purge DLQ +./bin/job-queue-system --role=admin --admin-cmd=purge-dlq --yes --config=config/config.yaml + +# Purge all (test keys) +./bin/job-queue-system --role=admin --admin-cmd=purge-all --yes --config=config/config.yaml + +# Stats (keys) +./bin/job-queue-system --role=admin --admin-cmd=stats-keys --config=config/config.yaml + +# Version +./bin/job-queue-system --version +``` ### Metrics -- Prometheus metrics exposed at http://localhost:9090/metrics by default +- Prometheus metrics exposed at by default ### Health and Readiness -- Liveness: http://localhost:9090/healthz returns 200 when the process is up -- Readiness: http://localhost:9090/readyz returns 200 only when Redis is reachable +- Liveness: returns 200 when the process is up +- Readiness: returns 200 only when Redis is reachable ### Priority Fetching @@ -47,10 +88,57 @@ See docs/ for the Product Requirements Document (PRD) and detailed design. A sam ### Docker -- Build: docker build -t job-queue-system:latest . -- Run: docker run --rm -p 9090:9090 --env-file env.list job-queue-system:latest --role=all -- Compose: see deploy/docker-compose.yml for multi-service setup (redis + worker/producer/all-in-one) +- Build + +```bash +docker build -t job-queue-system:latest . +``` + +- Run + +```bash +docker run --rm -p 9090:9090 --env-file env.list job-queue-system:latest --role=all +``` + +- Compose + +```bash +docker compose -f deploy/docker-compose.yml up --build +``` ## Status -Scaffolding in place. Implementation, PRD, tests, and CI are coming next per plan. +Release branch open for v0.4.0-alpha: see PR + +Promotion gates and confidence summary (details in docs/15_promotion_checklists.md): + +- Alpha → Beta: overall confidence ~0.85 (functional/observability/CI strong; perf and coverage improvements planned) +- Beta → RC: overall confidence ~0.70 (needs controlled perf run, chaos tests, soak) +- RC → GA: overall confidence ~0.70 (release flow ready; soak and rollback rehearsal pending) + +Evidence artifacts (docs/evidence/): + +- ci_run.json (CI URL), bench.json (throughput/latency), metrics_before/after.txt, config.alpha.yaml + +To reproduce evidence locally, see docs/evidence/README.md. + +## Testing + +See docs/testing-guide.md for a package-by-package overview and copy/paste commands to run individual tests or the full suite with the race detector. + +## Contributing / Docs Linting + +- Enable Git hooks (auto-fix Markdown on commit): + +```bash +make hooks +``` + +- Run Markdown lint locally (optional): + +```bash +# Using Node (autofix staged files happens on commit via hook) +npx -y markdownlint-cli2 "**/*.md" "!**/node_modules/**" +``` + +- CI runs markdownlint on every PR and on pushes to `main`. diff --git a/cmd/job-queue-system/main.go b/cmd/job-queue-system/main.go index eee0654e..db1f6d5e 100644 --- a/cmd/job-queue-system/main.go +++ b/cmd/job-queue-system/main.go @@ -1,3 +1,4 @@ +// Copyright 2025 James Ross package main import ( @@ -38,7 +39,7 @@ func main() { fs := flag.NewFlagSet(os.Args[0], flag.ExitOnError) fs.StringVar(&role, "role", "all", "Role to run: producer|worker|all|admin") fs.StringVar(&configPath, "config", "config/config.yaml", "Path to YAML config") - fs.StringVar(&adminCmd, "admin-cmd", "", "Admin command: stats|peek|purge-dlq") + fs.StringVar(&adminCmd, "admin-cmd", "", "Admin command: stats|peek|purge-dlq|purge-all|bench|stats-keys") fs.StringVar(&adminQueue, "queue", "", "Queue alias or full key for admin peek (high|low|completed|dead_letter|jobqueue:...)") fs.IntVar(&adminN, "n", 10, "Number of items for admin peek") fs.BoolVar(&adminYes, "yes", false, "Automatic yes to prompts (dangerous operations)") @@ -165,11 +166,21 @@ func runAdmin(ctx context.Context, cfg *config.Config, rdb *redis.Client, logger if !yes { logger.Fatal("refusing to purge without --yes") } if err := admin.PurgeDLQ(ctx, cfg, rdb); err != nil { logger.Fatal("admin purge-dlq error", obs.Err(err)) } fmt.Println("dead letter queue purged") + case "purge-all": + if !yes { logger.Fatal("refusing to purge without --yes") } + n, err := admin.PurgeAll(ctx, cfg, rdb) + if err != nil { logger.Fatal("admin purge-all error", obs.Err(err)) } + fmt.Printf("purged %d keys\n", n) case "bench": res, err := admin.Bench(ctx, cfg, rdb, benchPriority, benchCount, benchRate, benchTimeout) if err != nil { logger.Fatal("admin bench error", obs.Err(err)) } b, _ := json.MarshalIndent(res, "", " ") fmt.Println(string(b)) + case "stats-keys": + res, err := admin.StatsKeys(ctx, cfg, rdb) + if err != nil { logger.Fatal("admin stats-keys error", obs.Err(err)) } + b, _ := json.MarshalIndent(res, "", " ") + fmt.Println(string(b)) default: logger.Fatal("unknown admin command", obs.String("cmd", cmd)) } diff --git a/deploy/grafana/dashboards/work-queue.json b/deploy/grafana/dashboards/work-queue.json new file mode 100644 index 00000000..f2e59259 --- /dev/null +++ b/deploy/grafana/dashboards/work-queue.json @@ -0,0 +1,37 @@ +{ + "title": "Go Redis Work Queue", + "schemaVersion": 38, + "panels": [ + { + "type": "timeseries", + "title": "Job Processing Duration (p95)", + "targets": [{"expr": "histogram_quantile(0.95, sum(rate(job_processing_duration_seconds_bucket[5m])) by (le))"}] + }, + { + "type": "timeseries", + "title": "Jobs Completed / Failed / Retried", + "targets": [ + {"expr": "rate(jobs_completed_total[5m])"}, + {"expr": "rate(jobs_failed_total[5m])"}, + {"expr": "rate(jobs_retried_total[5m])"} + ] + }, + { + "type": "stat", + "title": "Circuit Breaker State", + "targets": [{"expr": "circuit_breaker_state"}], + "options": {"reduceOptions": {"calcs": ["last"], "fields": ""}} + }, + { + "type": "timeseries", + "title": "Queue Lengths", + "targets": [{"expr": "sum by (queue) (queue_length)"}] + }, + { + "type": "stat", + "title": "Active Workers", + "targets": [{"expr": "worker_active"}], + "options": {"reduceOptions": {"calcs": ["last"], "fields": ""}} + } + ] +} diff --git a/docs/00_assessment.md b/docs/00_assessment.md index 51d1eb13..b0998c2d 100644 --- a/docs/00_assessment.md +++ b/docs/00_assessment.md @@ -3,9 +3,11 @@ - Last updated: 2025-09-12 ## Executive Summary + The project has a working foundation: a single Go binary with producer, worker, reaper, circuit breaker, configuration, logging, metrics, optional tracing, tests, and CI. To reach a production-grade v1.0.0, we need to harden prioritization semantics, improve observability depth, finalize graceful recovery edge cases, add health endpoints, enhance documentation, and conduct performance and failure-mode testing. ## Table of Contents + - [Current Implementation](#current-implementation) - [What’s Working](#whats-working) - [Gaps vs. Spec](#gaps-vs-spec) @@ -13,6 +15,7 @@ The project has a working foundation: a single Go binary with producer, worker, - [Immediate Priorities](#immediate-priorities) ## Current Implementation + - Modes: `--role=producer|worker|all` with YAML config and env overrides. - Redis: go-redis v8 with dynamic pool, retries, timeouts. - Queues: priority lists (`high`, `low`), per-worker processing list, completed and dead-letter lists. @@ -24,12 +27,14 @@ The project has a working foundation: a single Go binary with producer, worker, - CI: GitHub Actions build + race tests. ## What’s Working + - End-to-end enqueue → consume → complete/ retry/ DLQ → requeue on orphaned processing. - Graceful shutdown using contexts with signal handling. - Configurable backoff and retry behavior. - Baseline observability metrics and structured logs. ## Gaps vs. Spec + - Prioritized blocking dequeue across multiple queues: current approach loops BRPOPLPUSH per-queue with small timeouts. Spec implies multi-queue blocking pop with atomic move. Redis lacks native multi-source BRPOPLPUSH; we will document and validate current approach, and optionally add a Lua-assisted non-blocking RPOPLPUSH sweep to reduce latency. - Queue length gauges: not yet periodically updated. - Health/readiness endpoint: missing. @@ -40,15 +45,16 @@ The project has a working foundation: a single Go binary with producer, worker, - Performance validation: load tests and tuning for pool sizes and timeouts remain. ## Technical Debt + - Emulated priority fetch could be improved or justified formally. - Reaper scans by processing lists; ensure worst-case behavior with many workers is efficient (SCAN pacing and limits). - Simulated processing; provide pluggable processor interface. - Configurable metrics cardinality controls and labels. ## Immediate Priorities + 1. Add health/readiness probe and queue length updater. 2. Use TraceID/SpanID to start spans and enrich logs. 3. Strengthen rate limiter timing and jitter; document guarantees. 4. Add config validation and error reporting. 5. Write e2e tests with real Redis (service container) and performance benchmarks. - diff --git a/docs/01_product_roadmap.md b/docs/01_product_roadmap.md index 7c725c90..55d949dc 100644 --- a/docs/01_product_roadmap.md +++ b/docs/01_product_roadmap.md @@ -3,9 +3,11 @@ - Last updated: 2025-09-12 ## Executive Summary + This roadmap sequences the remaining work to reach v1.0.0 and beyond. Priorities are reliability, observability, and operational readiness. We map initiatives quarterly with explicit dependencies and a standing monthly review cadence. ## Table of Contents + - [Objectives](#objectives) - [Quarterly Roadmap](#quarterly-roadmap) - [Initiative Dependencies](#initiative-dependencies) @@ -13,6 +15,7 @@ This roadmap sequences the remaining work to reach v1.0.0 and beyond. Priorities - [Review and Update Cadence](#review-and-update-cadence) ## Objectives + - Ship v1.0.0 by 2025-11-07 with production readiness. - Support sustained throughput of 1k jobs/min per 4 vCPU node. - Provide actionable metrics, health endpoints, and robust recovery. @@ -20,6 +23,7 @@ This roadmap sequences the remaining work to reach v1.0.0 and beyond. Priorities ## Quarterly Roadmap ### Q3 2025 (Sep) + - Prioritized dequeue strategy finalized and documented - Health/readiness endpoints - Queue length gauges and config validation @@ -27,6 +31,7 @@ This roadmap sequences the remaining work to reach v1.0.0 and beyond. Priorities - Alpha release v0.4.0 (2025-09-26) ### Q4 2025 (Oct–Dec) + - Beta hardening, admin tooling (peek/purge/list) - Performance tuning and load tests; pool sizing guidance - E2E tests with real Redis; chaos testing scenarios @@ -34,16 +39,19 @@ This roadmap sequences the remaining work to reach v1.0.0 and beyond. Priorities - Post-1.0: Helm chart and Docker Compose examples (Dec) ### Q1 2026 + - Horizontal sharding guidance and queue-partitioning patterns - Optional Redis Streams backend as an alternative - Advanced observability: exemplars, RED metrics, SLO dashboards ## Initiative Dependencies + - Tracing propagation depends on finalized Job struct and processor API. - Reaper improvements depend on reliable heartbeat semantics. - Performance tuning depends on priority dequeue semantics and metrics completeness. ## Business Priorities + 1) Reliability and data safety (DLQ, retries, reaper) — P0 2) Operational visibility (metrics, health, tracing) — P0 3) Performance and scale guidance — P1 @@ -51,6 +59,6 @@ This roadmap sequences the remaining work to reach v1.0.0 and beyond. Priorities 5) Packaging and deployment assets (Docker/Helm) — P2 ## Review and Update Cadence + - Monthly roadmap review on first business day of each month. - Sprint reviews bi-weekly; adjust scope based on findings. - diff --git a/docs/02_release_plan.md b/docs/02_release_plan.md index 11d3182f..0656ba4e 100644 --- a/docs/02_release_plan.md +++ b/docs/02_release_plan.md @@ -3,15 +3,18 @@ - Last updated: 2025-09-12 ## Executive Summary + Three pre-GA releases (alpha, beta, RC) precede v1.0.0. Each release has specific feature scope, risks with mitigations, and acceptance criteria. Target GA date: 2025-11-07. ## Table of Contents + - [Release Cadence](#release-cadence) - [Release Scope](#release-scope) - [Risks and Mitigations](#risks-and-mitigations) - [Acceptance Criteria](#acceptance-criteria) ## Release Cadence + - v0.4.0-alpha — 2025-09-26 - v0.7.0-beta — 2025-10-10 - v0.9.0-rc — 2025-10-24 @@ -20,6 +23,7 @@ Three pre-GA releases (alpha, beta, RC) precede v1.0.0. Each release has specifi ## Release Scope ### v0.4.0-alpha (2025-09-26) + - Health/readiness endpoints - Queue length gauges; periodic updater - Tracing propagation (TraceID/SpanID) @@ -27,18 +31,21 @@ Three pre-GA releases (alpha, beta, RC) precede v1.0.0. Each release has specifi - Document prioritized dequeue strategy and guarantees ### v0.7.0-beta (2025-10-10) + - Admin CLI subcommands: `stats`, `peek`, `purge-dlq` - Rate limiter improvements (jitter, precise window sleep) - E2E tests with real Redis via service container - Initial performance baseline doc and tuning guidance ### v0.9.0-rc (2025-10-24) + - Hardening fixes from beta feedback - Chaos tests (Redis unavailability, slow network) - Security checks (govulncheck) wired in CI - Documentation complete: ops runbook, dashboards guidance ### v1.0.0 (2025-11-07) + - Final polish, issue triage zero, CHANGELOG, version pinning - Docker image published; example deployment assets @@ -53,9 +60,9 @@ Three pre-GA releases (alpha, beta, RC) precede v1.0.0. Each release has specifi | Tracing exporter instability | Low | 2 | Make tracing optional; timeouts | Disable tracing by config | ## Acceptance Criteria + - All acceptance tests for the release pass in CI (unit, integration, race). - Metrics present: job counters, histograms, queue length gauges, breaker state. - Health endpoints return HTTP 200 and surface readiness correctly. - On chaos tests: automatic recovery without manual intervention; no lost jobs. - Documentation updated (README, PRD, ops guide) for each release. - diff --git a/docs/03_milestones.md b/docs/03_milestones.md index 18a4e718..f0051b99 100644 --- a/docs/03_milestones.md +++ b/docs/03_milestones.md @@ -3,9 +3,11 @@ - Last updated: 2025-09-12 ## Executive Summary + Milestones define concrete deliverables with success criteria, dependencies, and decision gates. These map to the alpha, beta, RC, and GA releases. ## Table of Contents + - [Milestone List](#milestone-list) - [Dependencies](#dependencies) - [Go/No-Go Decision Gates](#gono-go-decision-gates) @@ -35,14 +37,15 @@ Milestones define concrete deliverables with success criteria, dependencies, and - Success: zero P0/P1 issues; CI green across matrix ## Dependencies + - 2 depends on 1 (observability requires validated config and endpoints) - 5 depends on 4 (perf needs stabilized limiter and tools) - 6 depends on 5 (chaos built on validated e2e) - 7 depends on 6 (GA after RC validation) ## Go/No-Go Decision Gates + - Alpha (2025-09-26): must meet milestone 1–3 success criteria. - Beta (2025-10-10): limiter behavior validated, admin tools safe; perf doc drafted. - RC (2025-10-24): chaos tests pass; no critical security findings. - GA (2025-11-07): p95 < 2s small files; SLOs defined; docs complete. - diff --git a/docs/04_sprint_plans.md b/docs/04_sprint_plans.md index b65e93b4..b6ba396c 100644 --- a/docs/04_sprint_plans.md +++ b/docs/04_sprint_plans.md @@ -3,9 +3,11 @@ - Last updated: 2025-09-12 ## Executive Summary + Four bi-weekly sprints lead to v1.0.0. Each sprint contains user stories with acceptance criteria, tasks, and estimates. ## Table of Contents + - [Sprint 1 (2025-09-12 → 2025-09-25)](#sprint-1-2025-09-12--2025-09-25) - [Sprint 2 (2025-09-26 → 2025-10-09)](#sprint-2-2025-09-26--2025-10-09) - [Sprint 3 (2025-10-10 → 2025-10-23)](#sprint-3-2025-10-10--2025-10-23) @@ -14,6 +16,7 @@ Four bi-weekly sprints lead to v1.0.0. Each sprint contains user stories with ac ## Sprint 1 (2025-09-12 → 2025-09-25) Stories (points): + 1) As an operator, I need `/healthz` and `/readyz` so I can probe liveness/readiness. (5) - Acceptance: `/healthz`=200 always after start; `/readyz`=200 only when Redis reachable and metrics server running. - Tasks: add handlers, wire checks, tests, docs. @@ -32,6 +35,7 @@ Stories (points): ## Sprint 2 (2025-09-26 → 2025-10-09) Stories (points): + 1) As an operator, I can run admin commands to inspect and manage queues. (8) - Acceptance: `stats`, `peek`, `purge-dlq` subcommands; safe operations with confirmation; tests. 2) As a producer, my rate limiter smooths bursts with jitter and precise sleep. (5) @@ -44,6 +48,7 @@ Stories (points): ## Sprint 3 (2025-10-10 → 2025-10-23) Stories (points): + 1) As a platform team, I need performance guidance and validated numbers. (8) - Acceptance: doc with 1k jobs/min baseline and tuning steps; reproducible script. 2) As an SRE, the system recovers from injected failures automatically. (8) @@ -56,10 +61,10 @@ Stories (points): ## Sprint 4 (2025-10-24 → 2025-11-06) Stories (points): + 1) As a user, I want GA-quality docs and samples. (5) - Acceptance: comprehensive README, PRD, ops runbook, examples. 2) As a release manager, I want RC stabilization and GA. (5) - Acceptance: all blocking bugs fixed; v1.0.0 tagged and released. 3) As DevOps, I need rollback procedures validated. (3) - Acceptance: rollback SOP tested; documented step-by-step. - diff --git a/docs/05_architecture.md b/docs/05_architecture.md index e83d1400..96a79044 100644 --- a/docs/05_architecture.md +++ b/docs/05_architecture.md @@ -3,9 +3,11 @@ - Last updated: 2025-09-12 ## Executive Summary + Single Go binary operates as producer, worker, or both. Redis provides prioritized lists, per-worker processing lists, and heartbeats. A reaper rescues orphaned jobs. Circuit breaker protects Redis during failure spikes. Observability includes Prometheus metrics and optional OTEL tracing. ## Table of Contents + - [System Diagram](#system-diagram) - [Components](#components) - [Data Flows](#data-flows) @@ -52,6 +54,7 @@ flowchart LR ``` ## Components + - Producer: scans directories, prioritizes files, rate-limits enqueue. - Worker: prioritized fetch via short-timeout BRPOPLPUSH per queue; heartbeat and cleanup. - Reaper: rescues jobs from processing lists when heartbeats expire. @@ -59,6 +62,7 @@ flowchart LR - Observability: metrics server, structured logging, optional OTEL tracing. ## Data Flows + 1) Produce: file -> Job JSON -> LPUSH to `high` or `low`. 2) Consume: BRPOPLPUSH from `high` then `low` (short timeout) -> processing list. 3) Heartbeat: SET `processing:worker:` with EX=ttl, value=payload. @@ -67,6 +71,7 @@ flowchart LR 6) Reap: if heartbeat missing, RPOP processing list items back to originating priority queue. ## Technology Stack + - Language: Go 1.21+ - Redis Client: go-redis v8 - Metrics: Prometheus client_golang @@ -76,12 +81,13 @@ flowchart LR - Container: Distroless base image ## Scaling Strategy + - Horizontal scale workers across nodes; each worker uses `PoolSize=10*NumCPU`. - Tune `MinIdleConns`, timeouts, and backoff per environment. - Shard queues by prefix if needed: e.g., `jobqueue:high:0..N` with consistent hashing. ## Performance Targets + - Throughput: ≥ 1k jobs/min per 4 vCPU worker node (small files <1MB) - Latency: p95 < 2s for small files end-to-end under normal load - Recovery: < 10s to drain orphaned processing lists after crash of a single worker node - diff --git a/docs/06_technical_spec.md b/docs/06_technical_spec.md index 806e0337..571dd131 100644 --- a/docs/06_technical_spec.md +++ b/docs/06_technical_spec.md @@ -3,9 +3,11 @@ - Last updated: 2025-09-12 ## Executive Summary + This document details implementation approaches, contracts, schemas, and algorithms underpinning producer, worker, reaper, and observability subsystems. ## Table of Contents + - [Configuration Schema](#configuration-schema) - [CLI and Process Contracts](#cli-and-process-contracts) - [Job Schema](#job-schema) @@ -14,6 +16,7 @@ This document details implementation approaches, contracts, schemas, and algorit - [Logging and Tracing](#logging-and-tracing) ## Configuration Schema + YAML keys (with env overrides using upper snake case): ```yaml @@ -61,12 +64,14 @@ observability: ``` Validation rules: + - `worker.count >= 1` - `worker.priorities` non-empty; each has entry in `worker.queues` - `worker.heartbeat_ttl >= 5s`, `brpoplpush_timeout <= heartbeat_ttl/2` - `producer.rate_limit_per_sec >= 0` ## CLI and Process Contracts + - `--role={producer|worker|all}` selects the operational role. - `--config=PATH` points to YAML. Missing file is allowed (defaults), invalid values are not. - Process exits non-zero on fatal config errors or unrecoverable subsystem init. @@ -89,33 +94,40 @@ Validation rules: ## Algorithms ### Prioritized Fetch + - Loop priorities (e.g., high then low), executing `BRPOPLPUSH src -> processing` with short timeout (e.g., 1s). - Guarantees: atomic move per-queue; priority preference within timeout granularity; no job loss between queues and processing list. - Tradeoffs: small added latency for lower-priority items; documented in README. ### Heartbeat + - On fetch, `SET heartbeatKey payload EX=heartbeat_ttl`. - Heartbeat refreshed on each loop iteration for the active job (optional enhancement) or kept static (current). ### Completion + - Success: `LPUSH completed payload`; `LREM processing 1 payload`; `DEL heartbeatKey`. - Failure: increment `Retries`; exponential backoff `min(base*2^(n-1), max)`; requeue or DLQ after `max_retries`. ### Reaper + - Periodically SCAN `jobqueue:worker:*:processing`. For each list: - Compose heartbeat key from worker id; if missing, `RPOP` items and `LPUSH` back to original priority queue inferred from payload. - Bounded per-scan to avoid long stalls; sleep between SCAN pages. ### Circuit Breaker + - Sliding window of recent results; failure rate `fails/total` compared to `failure_threshold`. - States: Closed → Open on threshold; Open → HalfOpen on cooldown; HalfOpen → Closed on success, else Open. ## Metrics + - `jobs_produced_total`, `jobs_consumed_total`, `jobs_completed_total`, `jobs_failed_total`, `jobs_retried_total`, `jobs_dead_letter_total` (counters) - `job_processing_duration_seconds` (histogram) - `queue_length{queue}` (gauge) - `circuit_breaker_state` (gauge: 0 Closed, 1 HalfOpen, 2 Open) ## Logging and Tracing + - Logs are JSON with keys: `level`, `ts`, `msg`, `trace_id`, `span_id`, `job_id`, `queue`, `worker_id`. - Tracing: create a span per job processing; propagate `trace_id/span_id` if present; otherwise create new. diff --git a/docs/07_test_plan.md b/docs/07_test_plan.md index f99a8b6f..65e5c827 100644 --- a/docs/07_test_plan.md +++ b/docs/07_test_plan.md @@ -3,9 +3,11 @@ - Last updated: 2025-09-12 ## Executive Summary + This plan defines coverage goals, scenarios, performance benchmarks, and security testing to ensure production readiness by v1.0.0. ## Table of Contents + - [Coverage Goals](#coverage-goals) - [Test Types](#test-types) - [Critical Path Scenarios](#critical-path-scenarios) @@ -13,17 +15,20 @@ This plan defines coverage goals, scenarios, performance benchmarks, and securit - [Security Testing](#security-testing) ## Coverage Goals + - Unit: ≥ 80% on core packages (config, worker, reaper, breaker, producer) - Integration: end-to-end flows with Redis service container - Race detector: enabled in CI for all tests ## Test Types + - Unit: algorithms (breaker), backoff, job marshal/unmarshal, rate limiter math, config validation. - Integration: produce→consume→complete/retry/DLQ; reaper resurrection; graceful shutdown. - E2E: GitHub Actions job with Redis service container; real network timings. - Chaos: Redis unavailability, latency injection, connection resets (where feasible in CI). ## Critical Path Scenarios + 1) Single worker: consume success path; completed recorded; heartbeat deleted. 2) Retry then requeue: failure increments retry, backoff, LPUSH back; processing cleaned. 3) DLQ after threshold: job moved to DLQ; counters updated. @@ -33,12 +38,13 @@ This plan defines coverage goals, scenarios, performance benchmarks, and securit 7) Graceful shutdown: no lost jobs; in-flight completes or is requeued. ## Performance Benchmarks + - Baseline: 1k jobs/min per 4 vCPU node; p95 < 2s for small files. - Method: synthetic job generation via producer; worker-only mode on dedicated runner; capture metrics. - Reporting: include CPU, memory, Redis CPU/latency, queue depths. ## Security Testing + - `govulncheck` in CI; fail on critical CVEs. - Static checks: `go vet` and `golangci-lint` (optional) for code issues. - Secrets: ensure no secrets in logs; validate config does not dump secrets. - diff --git a/docs/08_deployment.md b/docs/08_deployment.md index 1a484775..75a3ceb0 100644 --- a/docs/08_deployment.md +++ b/docs/08_deployment.md @@ -3,16 +3,20 @@ - Last updated: 2025-09-12 ## Executive Summary + Defines CI/CD stages, environments, rollback procedures, monitoring metrics, and alert thresholds for safe operations. ## Table of Contents + - [CI/CD Pipeline](#cicd-pipeline) - [Environments](#environments) - [Rollback Procedures](#rollback-procedures) - [Monitoring and Alerts](#monitoring-and-alerts) ## CI/CD Pipeline + Stages (GitHub Actions): + 1) Lint & Vet: run `go vet` (optionally `golangci-lint`) 2) Build: compile all packages 3) Unit + Race: `go test -race ./...` @@ -22,13 +26,18 @@ Stages (GitHub Actions): 7) Release: tag + changelog on releases ## Environments + - Dev: local machine or CI; Redis via Docker `redis:latest`. - Prod: managed Redis or self-hosted cluster; binary orchestrated via systemd/K8s. Config overrides via env vars. Example: -`WORKER_COUNT=32 REDIS_ADDR=redis:6379 ./job-queue-system --role=worker --config=config.yaml` + +```bash +WORKER_COUNT=32 REDIS_ADDR=redis:6379 ./job-queue-system --role=worker --config=config.yaml +``` ## Rollback Procedures + 1) Identify the target rollback version (last known-good tag). 2) Redeploy binary or container with the previous version. 3) Verify `/healthz` and `/readyz` return 200. @@ -37,9 +46,9 @@ Config overrides via env vars. Example: 6) Document incident and root cause. ## Monitoring and Alerts + - Alerts (suggested thresholds): - `circuit_breaker_state > 0` for > 60s → WARN - `rate(jobs_failed_total[5m]) > 0.1 * rate(jobs_consumed_total[5m])` → CRITICAL - `queue_length{queue="jobqueue:dead_letter"} > 100` → WARN - `/readyz` non-200 for > 30s → CRITICAL - diff --git a/docs/09_requirements.md b/docs/09_requirements.md index 9daf4ca1..9a56af4d 100644 --- a/docs/09_requirements.md +++ b/docs/09_requirements.md @@ -3,9 +3,11 @@ - Last updated: 2025-09-12 ## Executive Summary + Functional and non-functional requirements with user stories, acceptance criteria, and definition of done for a production-ready release. ## Table of Contents + - [Functional Requirements](#functional-requirements) - [Non-Functional Requirements](#non-functional-requirements) - [User Stories](#user-stories) @@ -13,6 +15,7 @@ Functional and non-functional requirements with user stories, acceptance criteri - [Definition of Done](#definition-of-done) ## Functional Requirements + - Producer scans directory, prioritizes by extension, and enqueues JSON jobs. (complexity: ~80 LoC; O(1) `LPUSH` per job) - Worker pool consumes jobs by priority; atomic move to processing list; heartbeat; retries with backoff; DLQ. (~200 LoC; O(1) list ops) - Reaper detects missing heartbeats and requeues abandoned jobs. (~100 LoC) @@ -23,12 +26,14 @@ Functional and non-functional requirements with user stories, acceptance criteri - Admin tooling: `stats`, `peek`, `purge-dlq`. (~120 LoC) ## Non-Functional Requirements + - Performance: ≥1k jobs/min per 4 vCPU node; p95 < 2s small files. - Reliability: no job loss; DLQ for failures; reaper recovers within 10s. - Security: no secrets in logs; dependencies free of critical CVEs in CI. - Usability: single binary; clear CLI; documented examples and configs. ## User Stories + - As a producer operator, I can limit enqueue rate to prevent flooding Redis. - As a worker operator, I can scale worker count and observe breaker state. - As an SRE, I can monitor queue lengths and processing latencies. @@ -36,13 +41,14 @@ Functional and non-functional requirements with user stories, acceptance criteri - As a platform engineer, I can purge DLQ safely after exporting. ## Acceptance Criteria + - Each requirement has unit and/or integration tests. - Metrics appear in `/metrics` with expected names and types. - Health endpoints return correct status based on readiness. - Admin commands operate without data loss and require confirmation when destructive. ## Definition of Done + - Code merged with passing CI (unit, race, integration). - Documentation updated (README, PRD, relevant docs under `docs/`). - Version bumped and CHANGELOG updated for releases. - diff --git a/docs/10_risk_register.md b/docs/10_risk_register.md index 858fc3e5..b4560a1b 100644 --- a/docs/10_risk_register.md +++ b/docs/10_risk_register.md @@ -3,9 +3,11 @@ - Last updated: 2025-09-12 ## Executive Summary + Top project risks with probability, impact, mitigation strategies, and contingency plans. ## Table of Contents + - [Risk Matrix](#risk-matrix) ## Risk Matrix @@ -22,4 +24,3 @@ Top project risks with probability, impact, mitigation strategies, and contingen | 8 | CI flakiness (timing-sensitive tests) | Medium | 3 | Retries; timeouts; use service containers | Quarantine and fix tests | | 9 | Security vulnerabilities in deps | Medium | 4 | `govulncheck` in CI | Pin versions; patch on release | | 10 | Operator error on DLQ purge | Low | 4 | Confirmation prompts; export-first guidance | Restore from backup/export | - diff --git a/docs/11_project_charter.md b/docs/11_project_charter.md index 289206fc..c88db786 100644 --- a/docs/11_project_charter.md +++ b/docs/11_project_charter.md @@ -3,14 +3,17 @@ - Last updated: 2025-09-12 ## Executive Summary + Deliver a production-ready, Go-based Redis work queue with strong reliability and observability by 2025-11-07. ## Table of Contents + - [Goals and Objectives](#goals-and-objectives) - [Stakeholders and RACI](#stakeholders-and-raci) - [Success Metrics](#success-metrics) ## Goals and Objectives + - GA v1.0.0 by 2025-11-07. - Reliable processing: retries, DLQ, reaper, graceful shutdown. - Operational visibility: metrics, health, tracing. @@ -29,9 +32,9 @@ Deliver a production-ready, Go-based Redis work queue with strong reliability an Legend: R=Responsible, A=Accountable, C=Consulted, I=Informed ## Success Metrics + - Availability: Ready > 99.9% over rolling 30 days. - Reliability: Zero lost jobs; DLQ rate < 0.5% of consumed jobs. - Performance: p95 processing duration < 2s for small files. - Observability: 100% of operations emit metrics; tracing coverage ≥ 80% of job processing. - Quality: Unit coverage ≥ 80% for core packages; CI green on main. - diff --git a/docs/12_performance_baseline.md b/docs/12_performance_baseline.md index 3ce06618..ed5534a9 100644 --- a/docs/12_performance_baseline.md +++ b/docs/12_performance_baseline.md @@ -3,31 +3,51 @@ - Last updated: 2025-09-12 ## Executive Summary + This guide provides a reproducible method to measure throughput and latency, and offers tuning steps to hit the target 1k jobs/min per 4 vCPU node with p95 < 2s for small files. ## Table of Contents + - [Methodology](#methodology) - [Baseline Procedure](#baseline-procedure) - [Tuning Levers](#tuning-levers) - [Expected Results](#expected-results) ## Methodology + - Use the built-in admin bench command to enqueue N jobs at a target rate to a priority queue. - Run worker-only mode on the same host (or separate host) and measure completion latency based on job creation_time vs. wall clock. - Metrics are exposed at `/metrics`; verify `jobs_completed_total`, `job_processing_duration_seconds`, and `queue_length{queue}`. ## Baseline Procedure -1) Start Redis (e.g., docker run -p 6379:6379 redis:7-alpine) + +1) Start Redis + +```bash +docker run --rm -d --name jobq-redis -p 6379:6379 redis:7-alpine +``` + 2) Copy `config/config.example.yaml` to `config/config.yaml` and set: - `worker.count`: 16 on a 4 vCPU node (adjust as needed) - `redis.addr`: "localhost:6379" -3) In one shell, run the worker: - - `./bin/job-queue-system --role=worker --config=config/config.yaml` -4) In another shell, run the bench (enqueue and wait for completion): - - `./bin/job-queue-system --role=admin --admin-cmd=bench --bench-count=2000 --bench-rate=1000 --bench-priority=low --bench-timeout=60s` +3) In one shell, run the worker + +```bash +./bin/job-queue-system --role=worker --config=config/config.yaml +``` + +4) In another shell, run the bench (enqueue and wait for completion) + +```bash +./bin/job-queue-system --role=admin --admin-cmd=bench \ + --bench-count=2000 --bench-rate=1000 \ + --bench-priority=low --bench-timeout=60s +``` + 5) Record the JSON result and capture Prometheus metrics (if scraping locally, curl /metrics). ## Tuning Levers + - Redis pool: `redis.pool_size_multiplier` (default 10*NumCPU). Increase for higher concurrency; monitor Redis CPU. - Timeouts: `redis.read_timeout`/`write_timeout` (default 3s). Too low yields errors under load; too high slows failure detection. - Worker concurrency: `worker.count`. Increase up to CPU saturation; watch goroutine scheduling and Redis ops. @@ -35,6 +55,6 @@ This guide provides a reproducible method to measure throughput and latency, and - Priority timeout: `worker.brpoplpush_timeout` (default 1s). Smaller values reduce low-priority latency but add Redis ops. ## Expected Results + - On a 4 vCPU node, `bench-count=2000`, `bench-rate=1000` should achieve ≥1k jobs/min throughput, with p95 latency < 2s for small files (<1MB). - If results fall short, see tuning levers and ensure host/Redis are not CPU or I/O bound. - diff --git a/docs/13_release_versioning.md b/docs/13_release_versioning.md index 27d105e0..59b9486c 100644 --- a/docs/13_release_versioning.md +++ b/docs/13_release_versioning.md @@ -3,22 +3,31 @@ - Last updated: 2025-09-12 ## Executive Summary + Defines our versioning scheme, changelog process, and release checklist for alpha → beta → RC → GA. ## Versioning + - Semantic Versioning (SemVer): MAJOR.MINOR.PATCH - Pre-releases: `-alpha`, `-beta`, `-rc` - Branch: main is protected; release tags from main. ## Changelog + - Keep `CHANGELOG.md` in the repo. - Use conventional commit messages; sections: Features, Fixes, Docs, CI, Refactor, Tests, Chore. - On each release, summarize notable changes since last tag. ## Release Checklist + 1) Ensure CI green; govulncheck passes; tests (unit/race/e2e) pass. 2) Update docs (README, PRD, performance baseline) if needed. 3) Bump version in `CHANGELOG.md` with date and summary. -4) Tag release: `git tag vX.Y.Z[-pre] && git push --tags`. -5) Publish GitHub Release notes, attach Docker image reference. +4) Tag release + +```bash +git tag vX.Y.Z[-pre] +git push --tags +``` +5) Publish GitHub Release notes, attach Docker image reference. diff --git a/docs/14_ops_runbook.md b/docs/14_ops_runbook.md index a74930ab..9caad27b 100644 --- a/docs/14_ops_runbook.md +++ b/docs/14_ops_runbook.md @@ -3,9 +3,11 @@ - Last updated: 2025-09-12 ## Executive Summary + Day-2 operations guide: deploy, scale, monitor, recover, and release/rollback procedures for the Go Redis Work Queue. ## Table of Contents + - [Deployment](#deployment) - [Configuration](#configuration) - [Health and Monitoring](#health-and-monitoring) @@ -15,37 +17,65 @@ Day-2 operations guide: deploy, scale, monitor, recover, and release/rollback pr - [Release and Rollback](#release-and-rollback) ## Deployment -- Docker: `docker build -t job-queue-system:local .` then run with flags. -- docker-compose: see `deploy/docker-compose.yml` (services: redis, app-all, app-worker, app-producer). -- Container image: `ghcr.io//:` published on git tags (see release workflow). + +- Docker build + +```bash +docker build -t job-queue-system:local . +``` + +- docker-compose: see `deploy/docker-compose.yml` (services: redis, app-all, app-worker, app-producer) +- Container image: `ghcr.io//:` published on git tags (see release workflow) ## Configuration + - Primary: `config/config.yaml` (see `config/config.example.yaml`). - Overrides: environment vars (upper snake case replaces dots, e.g., `WORKER_COUNT=32`). - Validate: service fails to start with descriptive errors on invalid configs. ## Health and Monitoring + - Liveness: `/healthz` returns 200 when the process is up. - Readiness: `/readyz` returns 200 when Redis is reachable. - Metrics: `/metrics` exposes Prometheus counters/gauges/histograms: - jobs_* counters, job_processing_duration_seconds, queue_length{queue}, circuit_breaker_state, worker_active. ## Scaling + - Horizontal: run more worker instances; each instance can run N workers (`worker.count`). - Redis: ensure adequate CPU and memory; monitor latency and ops/sec. - Pooling: tune `redis.pool_size_multiplier`, `min_idle_conns` for throughput and latency. ## Common Operations -- Inspect stats: - `./job-queue-system --role=admin --admin-cmd=stats --config=config.yaml` -- Peek queue items: - `./job-queue-system --role=admin --admin-cmd=peek --queue=high --n=20 --config=config.yaml` -- Purge dead-letter queue: - `./job-queue-system --role=admin --admin-cmd=purge-dlq --yes --config=config.yaml` -- Benchmark throughput/latency: - `./job-queue-system --role=admin --admin-cmd=bench --bench-count=2000 --bench-rate=1000 --bench-priority=low --bench-timeout=60s` + +- Inspect stats + +```bash +./job-queue-system --role=admin --admin-cmd=stats --config=config.yaml +``` + +- Peek queue items + +```bash +./job-queue-system --role=admin --admin-cmd=peek --queue=high --n=20 --config=config.yaml +``` + +- Purge dead-letter queue + +```bash +./job-queue-system --role=admin --admin-cmd=purge-dlq --yes --config=config.yaml +``` + +- Benchmark throughput/latency + +```bash +./job-queue-system --role=admin --admin-cmd=bench \ + --bench-count=2000 --bench-rate=1000 \ + --bench-priority=low --bench-timeout=60s +``` ## Troubleshooting + - High failures / breaker open: - Check Redis latency and CPU; verify timeouts. - Inspect logs for job-specific errors; consider reducing worker.count temporarily. @@ -57,6 +87,7 @@ Day-2 operations guide: deploy, scale, monitor, recover, and release/rollback pr - Check Redis availability and credentials; verify network and firewall. ## Release and Rollback + - Versioning: SemVer; `--version` prints build version. - Release: push tag `vX.Y.Z` to trigger release workflow; image published to GHCR. - Rollback: @@ -65,4 +96,3 @@ Day-2 operations guide: deploy, scale, monitor, recover, and release/rollback pr 3) Verify `/healthz` and `/readyz` return 200. 4) Check metrics: `circuit_breaker_state=0`, `jobs_failed_total` steady, `queue_length` normalized. 5) If DLQ large, export/inspect before purge. - diff --git a/docs/15_promotion_checklists.md b/docs/15_promotion_checklists.md new file mode 100644 index 00000000..bbeb5fa3 --- /dev/null +++ b/docs/15_promotion_checklists.md @@ -0,0 +1,80 @@ +# Promotion Checklists + +- Last updated: 2025-09-12 + +## Alpha → Beta Checklist + +- [x] Functional completeness: producer, worker, all-in-one, reaper, breaker, admin CLI +- [x] Observability: /metrics, /healthz, /readyz live and correct +- [x] CI green on main (build, vet, race, unit, integration, e2e) +- [ ] Unit coverage ≥ 80% core packages (attach coverage report) +- [ ] E2E passes deterministically (≥ 5 runs) +- [x] govulncheck: no Critical/High in code paths/stdlib +- [ ] Performance baseline: 1k jobs at 500/s complete; limiter ±10%/60s +- [x] Docs: README, PRD, test plan, deployment, runbook updated +- Evidence links: + - CI run URL: … + - Bench JSON: … + - Metrics snapshot(s): … + - Issues list: … + +### Confidence Scores (Alpha → Beta) + +| Criterion | Confidence | Rationale | How to improve | Status | +|---|---:|---|---|---| +| Functional completeness | 0.90 | All core roles implemented and tested; admin CLI present | Add more E2E admin flows; edge-case docs | Done | +| Observability endpoints | 0.95 | /metrics, /healthz, /readyz live; used in CI/e2e | Add probes to examples; alert rules | Done | +| CI health | 0.90 | CI green (build, vet, race, e2e, govulncheck) | Add matrix (OS/Go), flaky-test detector | Done | +| Coverage ≥ 80% | 0.75 | Gaps in admin/obs packages | Add tests for admin + HTTP handlers | In progress | +| E2E determinism | 0.80 | CI runs e2e 5×; generally stable | Gate on 5× passing and record | In progress | +| Security (govulncheck) | 0.95 | Go 1.25 stdlib; no critical findings | Image scanning; pin base digest | Done | +| Performance baseline | 0.70 | Harness present; prelim results only | Use Prom histograms; 4 vCPU run | In progress | +| Documentation completeness | 0.90 | PRD, runbook, deploy, perf, checklists | Add alert rules + Helm usage | Done | + +## Beta → RC Checklist + +- [ ] Throughput ≥ 1k jobs/min for ≥ 10m; p95 < 2s (<1MB files) +- [ ] Chaos tests: Redis outage/latency/worker crash → no lost jobs; breaker transitions +- [ ] Admin CLI validated against live instance +- [ ] Queue gauges and breaker metric accurate under load +- [ ] 24–48h soak: error rate < 0.5%, no leaks +- [ ] govulncheck clean; deps pinned +- [ ] Docs: performance report and tuning +- [ ] No P0/P1; ≤ 3 P2s w/ workarounds +- Evidence links as above + +### Confidence Scores (Beta → RC) + +| Criterion | Confidence | Rationale | How to improve | Status | +|---|---:|---|---|---| +| ≥1k jobs/min for ≥10m | 0.60 | Not yet run on dedicated 4 vCPU node | Schedule controlled benchmark; record env + metrics | In progress | +| p95 < 2s (<1MB) | 0.60 | Coarse sampling currently | Use Prom histograms; sustained run | In progress | +| Chaos (outage/latency/crash) | 0.70 | Recovery logic exists; partial tests | Add chaos e2e (stop Redis; add latency) | In progress | +| Admin validation | 0.85 | Admin commands + tests | Add e2e assertions for outputs | Ready for review | +| Gauges/breaker accuracy | 0.85 | Metrics wired; observed | Add metric assertions; dashboards | Ready for review | +| 24–48h soak | 0.50 | Not yet performed | Stage soak; capture dashboards | Not started | +| Security and deps | 0.90 | govulncheck green; deps pinned | Add Renovate/Dependabot; image scan | Done | +| Issue hygiene | 0.90 | No open P0/P1 | Enforce labels/triage automation | Done | + +## RC → GA Checklist + +- [ ] Code freeze; only showstopper fixes +- [ ] 0 P0/P1; ≤ 2 P2s with workarounds; no flakey tests across 10 runs +- [ ] Release workflow proven; rollback rehearsal complete +- [ ] Config/backcompat validated or migration guide +- [ ] Docs complete; README examples validated +- [ ] govulncheck clean; image scan no Critical +- [ ] 7-day RC soak: readiness > 99.9%, DLQ < 0.5% +- Evidence links as above + +### Confidence Scores (RC → GA) + +| Criterion | Confidence | Rationale | How to improve | Status | +|---|---:|---|---|---| +| Code freeze discipline | 0.80 | Process defined; branch protection enabled | Require 1 review + passing checks; CODEOWNERS | Ready for review | +| Zero P0/P1; ≤2 P2 | 0.85 | Backlog clean at present | Maintain triage; add SLOs | Ready for review | +| Release workflow | 0.90 | GoReleaser + GHCR configured | Dry-run snapshot + pre-release tag | Done (configured) | +| Rollback rehearsal | 0.60 | Runbook documented | Execute rehearsal in staging | In progress | +| Backward compatibility | 0.80 | Config stable; validation added | Versioned schema + migration notes | Ready for review | +| Docs completeness | 0.90 | Extensive docs + examples | Add alert rules + dashboards | Done | +| 7-day soak | 0.50 | Not yet executed | Run RC soak; attach dashboards | Not started | diff --git a/docs/PRD.md b/docs/PRD.md index 0b2f3599..31ea92ce 100644 --- a/docs/PRD.md +++ b/docs/PRD.md @@ -1,35 +1,42 @@ # Product Requirements Document (PRD) ## Title + Go Redis Work Queue — Production-Ready File Processing System ## Summary + A single, multi-role Go binary that implements a robust file-processing job queue backed by Redis. It supports producer, worker, and all-in-one modes; priority queues; reliable processing with retries and a dead-letter queue; graceful shutdown; a reaper for stuck jobs; a circuit breaker; and comprehensive observability (metrics, logs, tracing). All behavior is driven by a YAML configuration with environment variable overrides. ## Goals + - Reliable and fault-tolerant job ingestion and processing - Horizontally scalable worker pool with dynamic Redis connection pooling - Strong operational visibility with Prometheus metrics and structured logs - Minimal operational footprint: single binary, simple config, Docker-ready ## Non-goals + - Workflow orchestration or multi-step DAGs - Persisting job payloads outside Redis - UI dashboard (metrics only) ## User Stories + - As an operator, I can run the same binary in producer, worker, or all-in-one mode using a flag. - As a developer, I can configure the system via a YAML file and environment overrides. - As an SRE, I can observe queue depth, processing latency, throughput, failures, retries, and circuit breaker state via Prometheus. - As a platform engineer, I can deploy the service with Docker/Kubernetes easily. ## Roles and Execution Modes + - role=producer: scans a directory and enqueues jobs with priority, rate-limited via Redis. - role=worker: runs N worker goroutines consuming jobs by priority, with processing lists and heartbeats. - role=all: runs both producer and worker in one process for development or small deployments. - role=admin: provides operational commands: `stats` (print queue/processing/heartbeat counts), `peek` (inspect queue tail items), and `purge-dlq` (clear dead-letter queue with `--yes`). ## Configuration + All parameters are set via YAML with env var overrides. Example: ```yaml @@ -88,6 +95,7 @@ observability: Environment overrides use upper snake case with dots replaced by underscores, e.g., WORKER_COUNT, REDIS_ADDR. ## Data Model + Job payload JSON: ```json @@ -104,6 +112,7 @@ Job payload JSON: ``` ## Redis Keys and Structures + - Queues: jobqueue:high_priority, jobqueue:low_priority (List) - Processing list per worker: jobqueue:worker::processing (List) - Heartbeat per worker: jobqueue:processing:worker: (String with EX) @@ -114,35 +123,42 @@ Job payload JSON: ## Core Algorithms ### Producer + - Scan directory recursively using include/exclude globs. - Determine priority by extension list. - Rate limiting: INCR rate_limit_key; if first increment, set EX=1; if value > rate_limit_per_sec, `TTL`-based precise sleep (with jitter) until window reset before enqueueing more. - LPUSH job JSON to priority queue. ### Worker Fetch + - Unique worker ID: "hostname-PID-idx" for each goroutine. - Prioritized fetch: loop priorities in order (e.g., high then low) and call `BRPOPLPUSH` per-queue with a short timeout (default 1s). Guarantees atomic move per-queue, priority preference within timeout granularity, and no job loss. Tradeoff: lower-priority jobs may wait up to the timeout when higher-priority queues are empty. - On receipt, SET heartbeat key to job JSON with EX=heartbeat_ttl. ### Processing + - Create a span (if tracing enabled) using job trace/span IDs when present; log with IDs. - Execute user-defined processing (stub initially: simulate processing with duration proportional to filesize; placeholder to plug real logic). - On success: LPUSH completed_list job JSON; LREM processing_list 1 job; DEL heartbeat key. - On failure: increment Retries in payload; exponential backoff; if retries <= max_retries LPUSH back to original priority queue; else LPUSH dead_letter_list; in both cases LREM from processing_list and DEL heartbeat. ### Graceful Shutdown + - Catch SIGINT/SIGTERM; cancel context; stop accepting new jobs; allow in-flight job to finish; ensure heartbeat and processing list cleanup as part of success/failure paths. ### Reaper + - Periodically scan all heartbeat keys matching pattern. For each missing/expired heartbeat, recover jobs lingering in processing lists: - For each worker processing list, if list has elements and the corresponding heartbeat key is absent, pop jobs (LPOP) one by one, inspect priority within payload, and LPUSH back to the appropriate priority queue. ### Circuit Breaker + - Closed: normal operation, track success/failure counts in rolling window. - Open: if failure_rate >= threshold and samples >= min_samples, stop fetching jobs for cooldown_period. - HalfOpen: probe with a single job; on success -> Closed; on failure -> Open. ## Observability + - HTTP server exposes `/metrics`, `/healthz`, and `/readyz`. Key metrics: - Counter: jobs_produced_total, jobs_consumed_total, jobs_completed_total, jobs_failed_total, jobs_retried_total, jobs_dead_letter_total - Histogram: job_processing_duration_seconds @@ -151,34 +167,41 @@ Job payload JSON: - Tracing (OpenTelemetry): optional OTLP exporter, spans for produce/consume/process. Job `trace_id`/`span_id` are propagated as remote parent when present. ## CLI + - --role=producer|worker|all - --config=path/to/config.yaml - --version ## Performance & Capacity Targets + - Baseline throughput: O(1k) jobs/minute on modest 4 vCPU nodes - End-to-end p95 latency under 2s for small files (<1MB) - Sustained stability under brief Redis outages via retries and backoff ## Security & Reliability + - Redis credentials via config or environment - Avoid plaintext logs of secrets - Resilient to worker crashes with reaper resurrection ## Deployment + - Dockerfile builds single static binary - Health probes: metrics endpoint (HTTP 200), and optional /healthz (future) ## Testing Strategy + - Unit tests: job serialization, rate limiter, circuit breaker, worker loop logic - Integration tests with Redis: enqueue/dequeue paths, retries, reaper behavior - Race detector in CI; coverage target ≥80% for core packages ## Risks & Mitigations + - Multi-queue atomic move: emulate priority by sequential BRPOPLPUSH timeouts - Large queues: monitor queue_length gauges; consider sharding per priority if needed ## Milestones + 1) Scaffolding and PRD (this doc) 2) Core implementation (config, producer, worker, reaper, breaker, observability) 3) Tests and CI/CD (GitHub Actions) diff --git a/docs/did-not-agree/README.md b/docs/did-not-agree/README.md new file mode 100644 index 00000000..59cae5b8 --- /dev/null +++ b/docs/did-not-agree/README.md @@ -0,0 +1,11 @@ +# Design Decisions: Deferred or Alternative Implementations + +This folder documents instances where we intentionally deviated from a reviewer recommendation, with rationale, tradeoffs, and revisit criteria. + +Contents + +- rate-limiter-fixed-window.md +- priority-fetching-brpoplpush.md +- reaper-scan-vs-registry.md +- metrics-scope.md +- go-toolchain-version.md diff --git a/docs/did-not-agree/go-toolchain-version.md b/docs/did-not-agree/go-toolchain-version.md new file mode 100644 index 00000000..7f756e7e --- /dev/null +++ b/docs/did-not-agree/go-toolchain-version.md @@ -0,0 +1,22 @@ +# Go Toolchain Version: 1.25.x + +Decision + +- Use Go 1.25.x across CI and release workflows; go.mod set to 1.25.0. + +Rationale + +- Addresses govulncheck-reported stdlib CVEs affecting earlier versions. +- CI and local builds succeed with 1.25.x in this environment; evidence (`go_info`) confirms runtime version. + +Tradeoffs + +- Requires builders to have Go 1.25 installed; older toolchains will not match CI. + +Revisit Criteria + +- If compatibility issues arise for consumers on earlier Go versions. + +Future Work + +- Consider matrix-testing across supported Go versions once stability goals are met. diff --git a/docs/did-not-agree/metrics-scope.md b/docs/did-not-agree/metrics-scope.md new file mode 100644 index 00000000..467dceb5 --- /dev/null +++ b/docs/did-not-agree/metrics-scope.md @@ -0,0 +1,23 @@ +# Metrics Scope: What We Deferred + +Decision + +- Added essential metrics (job counters, duration histogram, queue length, breaker state, trips, reaper recovered, worker_active). Defer additional metrics like worker restart count and Redis pool internals for now. + +Rationale + +- Worker restarts are process-level events better captured by orchestrator; tracking inside the binary can be misleading without a supervisor. +- Redis pool internals vary by client/runtime; better to surface via existing client/exporter when needed. + +Tradeoffs + +- Less granular visibility into certain failure modes without external instrumentation. + +Revisit Criteria + +- If operators need in-binary restart counts for specific environments without orchestration. +- If visibility gaps are identified during soak/chaos tests. + +Future Work + +- Integrate with process metrics (e.g., kube-state-metrics) and Redis exporter for pool stats. diff --git a/docs/did-not-agree/priority-fetching-brpoplpush.md b/docs/did-not-agree/priority-fetching-brpoplpush.md new file mode 100644 index 00000000..b3bdb895 --- /dev/null +++ b/docs/did-not-agree/priority-fetching-brpoplpush.md @@ -0,0 +1,23 @@ +# Priority Fetching and BRPOPLPUSH Semantics + +Decision + +- Use per-queue BRPOPLPUSH with short timeout to emulate multi-queue priority rather than a single command returning queue name and value. + +Rationale + +- Redis does not provide multi-source BRPOPLPUSH. Looping priorities with a short timeout preserves atomic move semantics per queue and delivers predictable prioritization. +- go-redis returns only the value for BRPopLPush. We record the source queue implicitly by the loop order and use the known `srcQueue` when processing. + +Tradeoffs + +- Lower-priority jobs may incur up to the per-queue timeout in latency when higher-priority queues are empty. +- We do not rely on returned queue name; this is documented and tested. + +Revisit Criteria + +- If sub-second latency for low priority becomes unacceptable or we need multi-queue fairness beyond simple priority preference. + +Future Work + +- Explore a Lua-assisted sweep to pick the first non-empty queue without waiting the full timeout per queue in sequence. diff --git a/docs/did-not-agree/rate-limiter-fixed-window.md b/docs/did-not-agree/rate-limiter-fixed-window.md new file mode 100644 index 00000000..4dd39276 --- /dev/null +++ b/docs/did-not-agree/rate-limiter-fixed-window.md @@ -0,0 +1,25 @@ +# Rate Limiter: Fixed Window vs Token Bucket + +Decision + +- Keep fixed-window limiter (INCR + 1s EXPIRE + TTL-based sleep with jitter) for v0.4.0-alpha. + +Rationale + +- Simplicity and predictable behavior for batch-like producers. +- TTL-based sleep avoids busy waiting; jitter reduces thundering herd. +- Meets initial SLOs with fewer moving parts; easier to operate/debug. + +Tradeoffs + +- Boundary bursts possible vs sliding window/token bucket. +- At very high RPS, token bucket better smooths flow. + +Revisit Criteria + +- If sustained RPS saturation shows boundary spikes causing Redis latency or worker oscillations. +- If customer use-cases require fine-grained smoothing or multiple producers coordinating tightly. + +Future Work + +- Evaluate token bucket in Redis using LUA script to atomically debit tokens per producer key. diff --git a/docs/did-not-agree/reaper-scan-vs-registry.md b/docs/did-not-agree/reaper-scan-vs-registry.md new file mode 100644 index 00000000..f320e57c --- /dev/null +++ b/docs/did-not-agree/reaper-scan-vs-registry.md @@ -0,0 +1,23 @@ +# Reaper: SCAN-Based Recovery vs Worker Registry + +Decision + +- Keep SCAN-based discovery of processing lists for v0.4.0-alpha, instead of maintaining a registry of active workers or relying on keyspace notifications. + +Rationale + +- Simplicity and robustness: SCAN requires no extra moving parts or configuration and tolerates sudden worker exits. +- Predictable load: bounded SCAN page sizes and periodic cadence maintain manageable overhead. + +Tradeoffs + +- SCAN is O(keys); at very large worker fleets, registry/notifications can reduce overhead. + +Revisit Criteria + +- If reaper CPU or Redis time spent on SCAN becomes material (observed via profiling/metrics) under expected fleet sizes. + +Future Work + +- Add optional worker registry with TTL stored in Redis; reaper would iterate registry members and target per-worker keys directly. +- Consider Redis keyspace notifications where operationally acceptable. diff --git a/docs/evidence/README.md b/docs/evidence/README.md new file mode 100644 index 00000000..248ba703 --- /dev/null +++ b/docs/evidence/README.md @@ -0,0 +1,75 @@ +# Evidence for v0.4.0-alpha Promotion + +- CI run: see `ci_run.json` (contains URL to the successful workflow run) +- Bench JSON: `bench.json` (admin bench with 1000 jobs at 500 rps) +- Config used: `config.alpha.yaml` +- Metrics snapshots: `metrics_before.txt`, `metrics_after.txt` + +Reproduce locally + +1) Ensure Redis is running on localhost:6379 + +```bash +docker run -p 6379:6379 --rm --name jobq-redis redis:7-alpine +``` + +2) Build the binary + +```bash +make build +``` + +3) Start worker + +```bash +./bin/job-queue-system --role=worker --config=docs/evidence/config.alpha.yaml +``` + +4) In another terminal, run bench + +```bash +./bin/job-queue-system --role=admin --config=docs/evidence/config.alpha.yaml \ + --admin-cmd=bench --bench-count=1000 --bench-rate=500 \ + --bench-priority=low --bench-timeout=60s +``` + +5) Capture metrics + +```bash +curl -sS localhost:9191/metrics | head -n 200 > docs/evidence/metrics_after.txt +``` + +Important notes + +- The admin `bench` command enqueues jobs directly (it does LPUSH), so `jobs_produced_total` will remain 0 in this harness; use `jobs_consumed_total`/`jobs_completed_total` and queue lengths to assess throughput and progress. +- To avoid stale backlog affecting evidence, clear test keys before running a bench: + +```bash +redis-cli DEL jobqueue:high_priority jobqueue:low_priority jobqueue:completed jobqueue:dead_letter +redis-cli KEYS 'jobqueue:worker:*:processing' | xargs -n 50 redis-cli DEL +``` + +- The metrics port in this harness is `9191` (see `observability.metrics_port` in config.alpha.yaml). Ensure your curl commands match this port. + +Notes + +- The simple latency reported in `bench.json` is measured by comparing current time to each job's creation_time after completion sampling and is a coarse approximation. For precise latency distributions, prefer Prometheus histogram `job_processing_duration_seconds` and compute quantiles there. + +Automated harness + +- A convenience script `docs/evidence/run_bench.sh` automates the above steps. +- Default params: COUNT=1000, RATE=500, PRIORITY=low, CONFIG=docs/evidence/config.alpha.yaml, PURGE=1. +- Example: + +```bash +bash docs/evidence/run_bench.sh +# or override +COUNT=2000 RATE=1000 PRIORITY=low bash docs/evidence/run_bench.sh +``` + +Outputs are written under `docs/evidence/run_YYYYmmdd_HHMMSS/`: + +- worker.log, worker.pid +- metrics_before.txt, metrics_after.txt +- stats_before.json, stats_after.json +- bench.json diff --git a/docs/evidence/bench.json b/docs/evidence/bench.json new file mode 100644 index 00000000..b8d8a4b8 --- /dev/null +++ b/docs/evidence/bench.json @@ -0,0 +1,7 @@ +{ + "count": 1000, + "duration": 62125591875, + "throughput_jobs_per_sec": 16.096426123586127, + "p50_latency": 252446328000, + "p95_latency": 252878289000 +} diff --git a/docs/evidence/ci_run.json b/docs/evidence/ci_run.json new file mode 100644 index 00000000..1058d1fc --- /dev/null +++ b/docs/evidence/ci_run.json @@ -0,0 +1 @@ +[{"conclusion":"success","displayTitle":"chore(release): add Helm chart, Grafana dashboard, GoReleaser + workflows, changelog automation, promotion checklists, evidence harness","headBranch":"release/alpha-v0.4.0","status":"completed","url":"https://github.com/flyingrobots/go-redis-work-queue/actions/runs/17684747392"}] diff --git a/docs/evidence/config.alpha.yaml b/docs/evidence/config.alpha.yaml new file mode 100644 index 00000000..728ba956 --- /dev/null +++ b/docs/evidence/config.alpha.yaml @@ -0,0 +1,47 @@ +redis: + addr: "localhost:6379" + pool_size_multiplier: 10 + min_idle_conns: 5 + dial_timeout: 5s + read_timeout: 3s + write_timeout: 3s + max_retries: 3 + +worker: + count: 8 + heartbeat_ttl: 30s + max_retries: 3 + backoff: + base: 100ms + max: 2s + priorities: ["high", "low"] + queues: + high: "jobqueue:high_priority" + low: "jobqueue:low_priority" + processing_list_pattern: "jobqueue:worker:%s:processing" + heartbeat_key_pattern: "jobqueue:processing:worker:%s" + completed_list: "jobqueue:completed" + dead_letter_list: "jobqueue:dead_letter" + brpoplpush_timeout: 1s + +producer: + scan_dir: "./data" + include_globs: ["**/*"] + exclude_globs: ["**/*.tmp", "**/.DS_Store"] + default_priority: "low" + high_priority_exts: [".pdf", ".docx", ".xlsx", ".zip"] + rate_limit_per_sec: 1000 + rate_limit_key: "jobqueue:rate_limit:producer" + +circuit_breaker: + failure_threshold: 0.5 + window: 1m + cooldown_period: 30s + min_samples: 20 + +observability: + metrics_port: 9191 + log_level: "info" + tracing: + enabled: false + endpoint: "" diff --git a/docs/evidence/config.used.yaml b/docs/evidence/config.used.yaml new file mode 100644 index 00000000..f4352b2a --- /dev/null +++ b/docs/evidence/config.used.yaml @@ -0,0 +1,48 @@ +redis: + addr: "localhost:6379" + pool_size_multiplier: 10 + min_idle_conns: 5 + dial_timeout: 5s + read_timeout: 3s + write_timeout: 3s + max_retries: 3 + +worker: + count: 8 + heartbeat_ttl: 30s + max_retries: 3 + backoff: + base: 100ms + max: 2s + priorities: ["high", "low"] + queues: + high: "jobqueue:high_priority" + low: "jobqueue:low_priority" + processing_list_pattern: "jobqueue:worker:%s:processing" + heartbeat_key_pattern: "jobqueue:processing:worker:%s" + completed_list: "jobqueue:completed" + dead_letter_list: "jobqueue:dead_letter" + brpoplpush_timeout: 1s + +producer: + scan_dir: "./data" + include_globs: ["**/*"] + exclude_globs: ["**/*.tmp", "**/.DS_Store"] + default_priority: "low" + high_priority_exts: [".pdf", ".docx", ".xlsx", ".zip"] + rate_limit_per_sec: 1000 + rate_limit_key: "jobqueue:rate_limit:producer" + +circuit_breaker: + failure_threshold: 0.5 + window: 1m + cooldown_period: 30s + min_samples: 20 + +observability: + metrics_port: 9191 + log_level: "info" + queue_sample_interval: 2s + tracing: + enabled: false + endpoint: "" diff --git a/docs/evidence/issues.json b/docs/evidence/issues.json new file mode 100644 index 00000000..fe51488c --- /dev/null +++ b/docs/evidence/issues.json @@ -0,0 +1 @@ +[] diff --git a/docs/evidence/metrics_after.txt b/docs/evidence/metrics_after.txt new file mode 100644 index 00000000..0290b2b3 --- /dev/null +++ b/docs/evidence/metrics_after.txt @@ -0,0 +1,141 @@ +# HELP circuit_breaker_state 0 Closed, 1 HalfOpen, 2 Open +# TYPE circuit_breaker_state gauge +circuit_breaker_state 0 +# HELP go_gc_duration_seconds A summary of the pause duration of garbage collection cycles. +# TYPE go_gc_duration_seconds summary +go_gc_duration_seconds{quantile="0"} 0.000119708 +go_gc_duration_seconds{quantile="0.25"} 0.000119708 +go_gc_duration_seconds{quantile="0.5"} 0.000524542 +go_gc_duration_seconds{quantile="0.75"} 0.000524542 +go_gc_duration_seconds{quantile="1"} 0.000524542 +go_gc_duration_seconds_sum 0.00064425 +go_gc_duration_seconds_count 2 +# HELP go_goroutines Number of goroutines that currently exist. +# TYPE go_goroutines gauge +go_goroutines 29 +# HELP go_info Information about the Go environment. +# TYPE go_info gauge +go_info{version="go1.25.0"} 1 +# HELP go_memstats_alloc_bytes Number of bytes allocated and still in use. +# TYPE go_memstats_alloc_bytes gauge +go_memstats_alloc_bytes 1.539776e+06 +# HELP go_memstats_alloc_bytes_total Total number of bytes allocated, even if freed. +# TYPE go_memstats_alloc_bytes_total counter +go_memstats_alloc_bytes_total 3.460832e+06 +# HELP go_memstats_buck_hash_sys_bytes Number of bytes used by the profiling bucket hash table. +# TYPE go_memstats_buck_hash_sys_bytes gauge +go_memstats_buck_hash_sys_bytes 4993 +# HELP go_memstats_frees_total Total number of frees. +# TYPE go_memstats_frees_total counter +go_memstats_frees_total 31713 +# HELP go_memstats_gc_sys_bytes Number of bytes used for garbage collection system metadata. +# TYPE go_memstats_gc_sys_bytes gauge +go_memstats_gc_sys_bytes 2.75432e+06 +# HELP go_memstats_heap_alloc_bytes Number of heap bytes allocated and still in use. +# TYPE go_memstats_heap_alloc_bytes gauge +go_memstats_heap_alloc_bytes 1.539776e+06 +# HELP go_memstats_heap_idle_bytes Number of heap bytes waiting to be used. +# TYPE go_memstats_heap_idle_bytes gauge +go_memstats_heap_idle_bytes 3.211264e+06 +# HELP go_memstats_heap_inuse_bytes Number of heap bytes that are in use. +# TYPE go_memstats_heap_inuse_bytes gauge +go_memstats_heap_inuse_bytes 4.096e+06 +# HELP go_memstats_heap_objects Number of allocated objects. +# TYPE go_memstats_heap_objects gauge +go_memstats_heap_objects 5979 +# HELP go_memstats_heap_released_bytes Number of heap bytes released to OS. +# TYPE go_memstats_heap_released_bytes gauge +go_memstats_heap_released_bytes 2.113536e+06 +# HELP go_memstats_heap_sys_bytes Number of heap bytes obtained from system. +# TYPE go_memstats_heap_sys_bytes gauge +go_memstats_heap_sys_bytes 7.307264e+06 +# HELP go_memstats_last_gc_time_seconds Number of seconds since 1970 of last garbage collection. +# TYPE go_memstats_last_gc_time_seconds gauge +go_memstats_last_gc_time_seconds 1.7577071901524212e+09 +# HELP go_memstats_lookups_total Total number of pointer lookups. +# TYPE go_memstats_lookups_total counter +go_memstats_lookups_total 0 +# HELP go_memstats_mallocs_total Total number of mallocs. +# TYPE go_memstats_mallocs_total counter +go_memstats_mallocs_total 37692 +# HELP go_memstats_mcache_inuse_bytes Number of bytes in use by mcache structures. +# TYPE go_memstats_mcache_inuse_bytes gauge +go_memstats_mcache_inuse_bytes 12080 +# HELP go_memstats_mcache_sys_bytes Number of bytes used for mcache structures obtained from system. +# TYPE go_memstats_mcache_sys_bytes gauge +go_memstats_mcache_sys_bytes 15704 +# HELP go_memstats_mspan_inuse_bytes Number of bytes in use by mspan structures. +# TYPE go_memstats_mspan_inuse_bytes gauge +go_memstats_mspan_inuse_bytes 119360 +# HELP go_memstats_mspan_sys_bytes Number of bytes used for mspan structures obtained from system. +# TYPE go_memstats_mspan_sys_bytes gauge +go_memstats_mspan_sys_bytes 130560 +# HELP go_memstats_next_gc_bytes Number of heap bytes when next garbage collection will take place. +# TYPE go_memstats_next_gc_bytes gauge +go_memstats_next_gc_bytes 4.194304e+06 +# HELP go_memstats_other_sys_bytes Number of bytes used for other system allocations. +# TYPE go_memstats_other_sys_bytes gauge +go_memstats_other_sys_bytes 2.357799e+06 +# HELP go_memstats_stack_inuse_bytes Number of bytes in use by the stack allocator. +# TYPE go_memstats_stack_inuse_bytes gauge +go_memstats_stack_inuse_bytes 1.081344e+06 +# HELP go_memstats_stack_sys_bytes Number of bytes obtained from system for stack allocator. +# TYPE go_memstats_stack_sys_bytes gauge +go_memstats_stack_sys_bytes 1.081344e+06 +# HELP go_memstats_sys_bytes Number of bytes obtained from system. +# TYPE go_memstats_sys_bytes gauge +go_memstats_sys_bytes 1.3651984e+07 +# HELP go_threads Number of OS threads created. +# TYPE go_threads gauge +go_threads 15 +# HELP job_processing_duration_seconds Histogram of job processing durations +# TYPE job_processing_duration_seconds histogram +job_processing_duration_seconds_bucket{le="0.005"} 480 +job_processing_duration_seconds_bucket{le="0.01"} 480 +job_processing_duration_seconds_bucket{le="0.025"} 480 +job_processing_duration_seconds_bucket{le="0.05"} 480 +job_processing_duration_seconds_bucket{le="0.1"} 480 +job_processing_duration_seconds_bucket{le="0.25"} 480 +job_processing_duration_seconds_bucket{le="0.5"} 480 +job_processing_duration_seconds_bucket{le="1"} 480 +job_processing_duration_seconds_bucket{le="2.5"} 480 +job_processing_duration_seconds_bucket{le="5"} 480 +job_processing_duration_seconds_bucket{le="10"} 480 +job_processing_duration_seconds_bucket{le="+Inf"} 480 +job_processing_duration_seconds_sum 1.2178256330000006 +job_processing_duration_seconds_count 480 +# HELP jobs_completed_total Total number of successfully completed jobs +# TYPE jobs_completed_total counter +jobs_completed_total 480 +# HELP jobs_consumed_total Total number of jobs consumed by workers +# TYPE jobs_consumed_total counter +jobs_consumed_total 480 +# HELP jobs_dead_letter_total Total number of jobs moved to dead letter queue +# TYPE jobs_dead_letter_total counter +jobs_dead_letter_total 0 +# HELP jobs_failed_total Total number of failed jobs +# TYPE jobs_failed_total counter +jobs_failed_total 0 +# HELP jobs_produced_total Total number of jobs produced +# TYPE jobs_produced_total counter +jobs_produced_total 0 +# HELP jobs_retried_total Total number of job retries +# TYPE jobs_retried_total counter +jobs_retried_total 0 +# HELP promhttp_metric_handler_requests_in_flight Current number of scrapes being served. +# TYPE promhttp_metric_handler_requests_in_flight gauge +promhttp_metric_handler_requests_in_flight 1 +# HELP promhttp_metric_handler_requests_total Total number of scrapes by HTTP status code. +# TYPE promhttp_metric_handler_requests_total counter +promhttp_metric_handler_requests_total{code="200"} 1 +promhttp_metric_handler_requests_total{code="500"} 0 +promhttp_metric_handler_requests_total{code="503"} 0 +# HELP queue_length Current length of Redis queues +# TYPE queue_length gauge +queue_length{queue="jobqueue:completed"} 480 +queue_length{queue="jobqueue:dead_letter"} 0 +queue_length{queue="jobqueue:high_priority"} 0 +queue_length{queue="jobqueue:low_priority"} 2020 +# HELP worker_active Number of active worker goroutines +# TYPE worker_active gauge +worker_active 8 diff --git a/docs/evidence/metrics_before.txt b/docs/evidence/metrics_before.txt new file mode 100644 index 00000000..f887a21a --- /dev/null +++ b/docs/evidence/metrics_before.txt @@ -0,0 +1,135 @@ +# HELP circuit_breaker_state 0 Closed, 1 HalfOpen, 2 Open +# TYPE circuit_breaker_state gauge +circuit_breaker_state 0 +# HELP go_gc_duration_seconds A summary of the pause duration of garbage collection cycles. +# TYPE go_gc_duration_seconds summary +go_gc_duration_seconds{quantile="0"} 0 +go_gc_duration_seconds{quantile="0.25"} 0 +go_gc_duration_seconds{quantile="0.5"} 0 +go_gc_duration_seconds{quantile="0.75"} 0 +go_gc_duration_seconds{quantile="1"} 0 +go_gc_duration_seconds_sum 0 +go_gc_duration_seconds_count 0 +# HELP go_goroutines Number of goroutines that currently exist. +# TYPE go_goroutines gauge +go_goroutines 29 +# HELP go_info Information about the Go environment. +# TYPE go_info gauge +go_info{version="go1.25.0"} 1 +# HELP go_memstats_alloc_bytes Number of bytes allocated and still in use. +# TYPE go_memstats_alloc_bytes gauge +go_memstats_alloc_bytes 853664 +# HELP go_memstats_alloc_bytes_total Total number of bytes allocated, even if freed. +# TYPE go_memstats_alloc_bytes_total counter +go_memstats_alloc_bytes_total 853664 +# HELP go_memstats_buck_hash_sys_bytes Number of bytes used by the profiling bucket hash table. +# TYPE go_memstats_buck_hash_sys_bytes gauge +go_memstats_buck_hash_sys_bytes 4993 +# HELP go_memstats_frees_total Total number of frees. +# TYPE go_memstats_frees_total counter +go_memstats_frees_total 269 +# HELP go_memstats_gc_sys_bytes Number of bytes used for garbage collection system metadata. +# TYPE go_memstats_gc_sys_bytes gauge +go_memstats_gc_sys_bytes 1.58952e+06 +# HELP go_memstats_heap_alloc_bytes Number of heap bytes allocated and still in use. +# TYPE go_memstats_heap_alloc_bytes gauge +go_memstats_heap_alloc_bytes 853664 +# HELP go_memstats_heap_idle_bytes Number of heap bytes waiting to be used. +# TYPE go_memstats_heap_idle_bytes gauge +go_memstats_heap_idle_bytes 3.538944e+06 +# HELP go_memstats_heap_inuse_bytes Number of heap bytes that are in use. +# TYPE go_memstats_heap_inuse_bytes gauge +go_memstats_heap_inuse_bytes 3.833856e+06 +# HELP go_memstats_heap_objects Number of allocated objects. +# TYPE go_memstats_heap_objects gauge +go_memstats_heap_objects 3161 +# HELP go_memstats_heap_released_bytes Number of heap bytes released to OS. +# TYPE go_memstats_heap_released_bytes gauge +go_memstats_heap_released_bytes 3.538944e+06 +# HELP go_memstats_heap_sys_bytes Number of heap bytes obtained from system. +# TYPE go_memstats_heap_sys_bytes gauge +go_memstats_heap_sys_bytes 7.3728e+06 +# HELP go_memstats_last_gc_time_seconds Number of seconds since 1970 of last garbage collection. +# TYPE go_memstats_last_gc_time_seconds gauge +go_memstats_last_gc_time_seconds 0 +# HELP go_memstats_lookups_total Total number of pointer lookups. +# TYPE go_memstats_lookups_total counter +go_memstats_lookups_total 0 +# HELP go_memstats_mallocs_total Total number of mallocs. +# TYPE go_memstats_mallocs_total counter +go_memstats_mallocs_total 3430 +# HELP go_memstats_mcache_inuse_bytes Number of bytes in use by mcache structures. +# TYPE go_memstats_mcache_inuse_bytes gauge +go_memstats_mcache_inuse_bytes 12080 +# HELP go_memstats_mcache_sys_bytes Number of bytes used for mcache structures obtained from system. +# TYPE go_memstats_mcache_sys_bytes gauge +go_memstats_mcache_sys_bytes 15704 +# HELP go_memstats_mspan_inuse_bytes Number of bytes in use by mspan structures. +# TYPE go_memstats_mspan_inuse_bytes gauge +go_memstats_mspan_inuse_bytes 116000 +# HELP go_memstats_mspan_sys_bytes Number of bytes used for mspan structures obtained from system. +# TYPE go_memstats_mspan_sys_bytes gauge +go_memstats_mspan_sys_bytes 130560 +# HELP go_memstats_next_gc_bytes Number of heap bytes when next garbage collection will take place. +# TYPE go_memstats_next_gc_bytes gauge +go_memstats_next_gc_bytes 4.194304e+06 +# HELP go_memstats_other_sys_bytes Number of bytes used for other system allocations. +# TYPE go_memstats_other_sys_bytes gauge +go_memstats_other_sys_bytes 2.605095e+06 +# HELP go_memstats_stack_inuse_bytes Number of bytes in use by the stack allocator. +# TYPE go_memstats_stack_inuse_bytes gauge +go_memstats_stack_inuse_bytes 1.015808e+06 +# HELP go_memstats_stack_sys_bytes Number of bytes obtained from system for stack allocator. +# TYPE go_memstats_stack_sys_bytes gauge +go_memstats_stack_sys_bytes 1.015808e+06 +# HELP go_memstats_sys_bytes Number of bytes obtained from system. +# TYPE go_memstats_sys_bytes gauge +go_memstats_sys_bytes 1.273448e+07 +# HELP go_threads Number of OS threads created. +# TYPE go_threads gauge +go_threads 15 +# HELP job_processing_duration_seconds Histogram of job processing durations +# TYPE job_processing_duration_seconds histogram +job_processing_duration_seconds_bucket{le="0.005"} 0 +job_processing_duration_seconds_bucket{le="0.01"} 0 +job_processing_duration_seconds_bucket{le="0.025"} 0 +job_processing_duration_seconds_bucket{le="0.05"} 0 +job_processing_duration_seconds_bucket{le="0.1"} 0 +job_processing_duration_seconds_bucket{le="0.25"} 0 +job_processing_duration_seconds_bucket{le="0.5"} 0 +job_processing_duration_seconds_bucket{le="1"} 0 +job_processing_duration_seconds_bucket{le="2.5"} 0 +job_processing_duration_seconds_bucket{le="5"} 0 +job_processing_duration_seconds_bucket{le="10"} 0 +job_processing_duration_seconds_bucket{le="+Inf"} 0 +job_processing_duration_seconds_sum 0 +job_processing_duration_seconds_count 0 +# HELP jobs_completed_total Total number of successfully completed jobs +# TYPE jobs_completed_total counter +jobs_completed_total 0 +# HELP jobs_consumed_total Total number of jobs consumed by workers +# TYPE jobs_consumed_total counter +jobs_consumed_total 0 +# HELP jobs_dead_letter_total Total number of jobs moved to dead letter queue +# TYPE jobs_dead_letter_total counter +jobs_dead_letter_total 0 +# HELP jobs_failed_total Total number of failed jobs +# TYPE jobs_failed_total counter +jobs_failed_total 0 +# HELP jobs_produced_total Total number of jobs produced +# TYPE jobs_produced_total counter +jobs_produced_total 0 +# HELP jobs_retried_total Total number of job retries +# TYPE jobs_retried_total counter +jobs_retried_total 0 +# HELP promhttp_metric_handler_requests_in_flight Current number of scrapes being served. +# TYPE promhttp_metric_handler_requests_in_flight gauge +promhttp_metric_handler_requests_in_flight 1 +# HELP promhttp_metric_handler_requests_total Total number of scrapes by HTTP status code. +# TYPE promhttp_metric_handler_requests_total counter +promhttp_metric_handler_requests_total{code="200"} 0 +promhttp_metric_handler_requests_total{code="500"} 0 +promhttp_metric_handler_requests_total{code="503"} 0 +# HELP worker_active Number of active worker goroutines +# TYPE worker_active gauge +worker_active 8 diff --git a/docs/evidence/run_bench.sh b/docs/evidence/run_bench.sh new file mode 100644 index 00000000..5c3e9f3d --- /dev/null +++ b/docs/evidence/run_bench.sh @@ -0,0 +1,76 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Simple, reproducible evidence harness +# - Starts a worker with the provided config +# - Optionally purges test keys +# - Captures stats-keys and /metrics before/after +# - Runs an admin bench (enqueue + wait) +# - Writes outputs into a timestamped directory under docs/evidence + +COUNT=${COUNT:-1000} +RATE=${RATE:-500} +PRIORITY=${PRIORITY:-low} +CONFIG=${CONFIG:-docs/evidence/config.alpha.yaml} +BIN=${BIN:-./bin/job-queue-system} +OUTDIR=${OUTDIR:-docs/evidence/run_$(date +%Y%m%d_%H%M%S)} +PURGE=${PURGE:-1} + +mkdir -p "$OUTDIR" + +if [[ ! -x "$BIN" ]] && [[ -f "$BIN" ]]; then + chmod +x "$BIN" || true +fi + +if [[ ! -f "$BIN" ]]; then + echo "Building binary..." + make build +fi + +# Extract metrics port (fallback 9191) +PORT=$(awk '/metrics_port:/ {gsub(":" , " "); print $2; exit}' "$CONFIG" || true) +PORT=${PORT:-9191} + +echo "Writing outputs to: $OUTDIR" + +echo "Stats (keys) BEFORE..." +"$BIN" --role=admin --config="$CONFIG" --admin-cmd=stats-keys > "$OUTDIR/stats_before.json" + +if [[ "$PURGE" == "1" ]]; then + echo "Purging test keys..." + "$BIN" --role=admin --config="$CONFIG" --admin-cmd=purge-all --yes >/dev/null +fi + +echo "Starting worker (background)..." +"$BIN" --role=worker --config="$CONFIG" > "$OUTDIR/worker.log" 2>&1 & +WPID=$! +echo $WPID > "$OUTDIR/worker.pid" + +echo "Waiting for readiness on port $PORT..." +for i in {1..100}; do + if curl -fsS "http://localhost:$PORT/readyz" >/dev/null; then + break + fi + sleep 0.1 +done + +echo "Metrics BEFORE..." +curl -fsS "http://localhost:$PORT/metrics" | head -n 200 > "$OUTDIR/metrics_before.txt" || true + +echo "Running bench: count=$COUNT rate=$RATE priority=$PRIORITY..." +"$BIN" --role=admin --config="$CONFIG" --admin-cmd=bench \ + --bench-count="$COUNT" --bench-rate="$RATE" --bench-priority="$PRIORITY" --bench-timeout=60s \ + | tee "$OUTDIR/bench.json" + +echo "Metrics AFTER..." +curl -fsS "http://localhost:$PORT/metrics" | head -n 200 > "$OUTDIR/metrics_after.txt" || true + +echo "Stats (keys) AFTER..." +"$BIN" --role=admin --config="$CONFIG" --admin-cmd=stats-keys > "$OUTDIR/stats_after.json" + +echo "Stopping worker..." +kill "$WPID" || true +sleep 0.2 + +echo "Done. Outputs in: $OUTDIR" + diff --git a/docs/testing-guide.md b/docs/testing-guide.md new file mode 100644 index 00000000..fa3de608 --- /dev/null +++ b/docs/testing-guide.md @@ -0,0 +1,173 @@ +# Testing Guide + +This guide describes how to build and run the test suites, what each test verifies, and copy/paste commands to run any test in isolation. + +## Prerequisites + +- Go 1.25+ +- Git +- Docker (only for the e2e test that talks to a real Redis) + +## Quick build + +```bash +make build +``` + +## Run all tests (race detector) + +```bash +go test ./... -race -count=1 +``` + +Notes + +- Use `-count=1` to avoid cached results during iteration. +- Add `-v` for verbose output. + +## Package-by-package suites + +### internal/config + +- Tests: configuration defaults and validation logic + - `TestLoadDefaults` — Ensures reasonable defaults load without a file + - `TestValidateFails` — Asserts invalid configs produce descriptive errors + +Run the whole package: + +```bash +go test ./internal/config -race -count=1 -v +``` + +Run a single test: + +```bash +go test ./internal/config -run '^TestLoadDefaults$' -race -count=1 -v +``` + +### internal/breaker + +- Tests: circuit breaker state machine and HalfOpen semantics + - `TestBreakerTransitions` — Closed → Open; HalfOpen probe; Close + - `TestBreakerHalfOpenSingleProbeUnderLoad` — Under heavy concurrency, HalfOpen admits one probe only + +Run the whole package: + +```bash +go test ./internal/breaker -race -count=1 -v +``` + +Run a single test: + +```bash +go test ./internal/breaker -run '^TestBreakerHalfOpenSingleProbeUnderLoad$' -race -count=1 -v +``` + +### internal/queue + +- Tests: job serialization round-trip + - `TestMarshalUnmarshal` — JSON encode/decode preserves fields + +Run: + +```bash +go test ./internal/queue -race -count=1 -v +``` + +### internal/producer + +- Tests: priority mapping and rate limiter behavior + - `TestPriorityForExt` — `.pdf` → high, others → default (low) + - `TestRateLimit` — Exceeding the fixed-window cap sleeps until TTL expiry + +Run: + +```bash +go test ./internal/producer -race -count=1 -v +``` + +Run a single test: + +```bash +go test ./internal/producer -run '^TestRateLimit$' -race -count=1 -v +``` + +### internal/reaper + +- Tests: requeue without heartbeat using miniredis + - `TestReaperRequeuesWithoutHeartbeat` — Orphans in processing list are moved back to source queue + +Run: + +```bash +go test ./internal/reaper -race -count=1 -v +``` + +### internal/worker + +- Tests: backoff, success/retry/DLQ paths, and breaker integration + - `TestBackoffCaps` — Exponential backoff caps at configured max + - `TestProcessJobSuccess` — Happy-path processing + - `TestProcessJobRetryThenDLQ` — Retry then move to DLQ after threshold + - `TestWorkerBreakerTripsAndPausesConsumption` — Failures trip breaker; consumption pauses while Open + +Run the whole package: + +```bash +go test ./internal/worker -race -count=1 -v +``` + +Run a single test: + +```bash +go test ./internal/worker -run '^TestWorkerBreakerTripsAndPausesConsumption$' -race -count=1 -v +``` + +### test/e2e (real Redis) + +End-to-end test that talks to a real Redis server. Start Redis locally with Docker, set an env var, then run. + +Start Redis: + +```bash +docker run --rm -d --name jobq-redis -p 6379:6379 redis:7-alpine +``` + +Run the e2e suite: + +```bash +E2E_REDIS_ADDR=localhost:6379 go test ./test/e2e -race -count=1 -v +``` + +Run the single e2e test: + +```bash +E2E_REDIS_ADDR=localhost:6379 go test ./test/e2e -run '^TestE2E_WorkerCompletesJobWithRealRedis$' -race -count=1 -v +``` + +Stop Redis: + +```bash +docker rm -f jobq-redis +``` + +## Common flags and tips + +- `-race` — enable the race detector (recommended) +- `-v` — verbose output +- `-run '^TestName$'` — run a single test by name (regex) +- `-count=1` — disable caching +- `-coverprofile=coverage.out` — generate coverage report + +Coverage example: + +```bash +go test ./... -coverprofile=coverage.out +go tool cover -func=coverage.out | sort -k3 -r +``` + +## Troubleshooting + +- “connection refused” in e2e: ensure Redis is running and `E2E_REDIS_ADDR` points to it +- Flaky timings: add `-v` and rerun with `-count=1`; CI also runs the e2e test 5× to catch flakiness +- Missing Go tools: ensure `go version` reports 1.25+ diff --git a/go.mod b/go.mod index 5cc45b83..2bef9e3f 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/flyingrobots/go-redis-work-queue -go 1.24.0 +go 1.25.0 require ( github.com/alicebob/miniredis/v2 v2.35.0 diff --git a/internal/admin/admin.go b/internal/admin/admin.go index e541a958..06b5ce6c 100644 --- a/internal/admin/admin.go +++ b/internal/admin/admin.go @@ -1,3 +1,4 @@ +// Copyright 2025 James Ross package admin import ( @@ -153,3 +154,110 @@ func Bench(ctx context.Context, cfg *config.Config, rdb *redis.Client, priority } return res, nil } + +// KeysStats summarizes managed Redis keys and queue lengths. +type KeysStats struct { + QueueLengths map[string]int64 `json:"queue_lengths"` + ProcessingLists int64 `json:"processing_lists"` + ProcessingItems int64 `json:"processing_items"` + Heartbeats int64 `json:"heartbeats"` + RateLimitKey string `json:"rate_limit_key"` + RateLimitTTL string `json:"rate_limit_ttl,omitempty"` +} + +// StatsKeys scans for managed keys and returns counts and lengths. +func StatsKeys(ctx context.Context, cfg *config.Config, rdb *redis.Client) (KeysStats, error) { + out := KeysStats{QueueLengths: map[string]int64{}} + // Known queues + qset := map[string]string{ + "high": cfg.Worker.Queues["high"], + "low": cfg.Worker.Queues["low"], + "completed": cfg.Worker.CompletedList, + "dead_letter": cfg.Worker.DeadLetterList, + } + for name, key := range qset { + if key == "" { continue } + n, err := rdb.LLen(ctx, key).Result() + if err != nil && err != redis.Nil { return out, err } + out.QueueLengths[name+"("+key+")"] = n + } + // Processing lists + var cursor uint64 + for { + keys, cur, err := rdb.Scan(ctx, cursor, "jobqueue:worker:*:processing", 500).Result() + if err != nil { return out, err } + cursor = cur + out.ProcessingLists += int64(len(keys)) + for _, k := range keys { + n, _ := rdb.LLen(ctx, k).Result() + out.ProcessingItems += n + } + if cursor == 0 { break } + } + // Heartbeats + cursor = 0 + for { + keys, cur, err := rdb.Scan(ctx, cursor, "jobqueue:processing:worker:*", 1000).Result() + if err != nil { return out, err } + cursor = cur + out.Heartbeats += int64(len(keys)) + if cursor == 0 { break } + } + // Rate limiter + if cfg.Producer.RateLimitKey != "" { + out.RateLimitKey = cfg.Producer.RateLimitKey + if ttl, err := rdb.TTL(ctx, cfg.Producer.RateLimitKey).Result(); err == nil && ttl > 0 { + out.RateLimitTTL = ttl.String() + } + } + return out, nil +} + +// PurgeAll deletes common test keys used by this system, including +// priority queues, completed/dead_letter, rate limiter key, and +// per-worker processing lists and heartbeats. Returns number of keys deleted. +func PurgeAll(ctx context.Context, cfg *config.Config, rdb *redis.Client) (int64, error) { + var deleted int64 + // Explicit keys + keys := []string{ + cfg.Worker.Queues["high"], cfg.Worker.Queues["low"], + cfg.Worker.CompletedList, cfg.Worker.DeadLetterList, + } + if cfg.Producer.RateLimitKey != "" { + keys = append(keys, cfg.Producer.RateLimitKey) + } + // Dedup + uniq := map[string]struct{}{} + ek := make([]string, 0, len(keys)) + for _, k := range keys { + if k == "" { continue } + if _, ok := uniq[k]; ok { continue } + uniq[k] = struct{}{} + ek = append(ek, k) + } + if len(ek) > 0 { + n, err := rdb.Del(ctx, ek...).Result() + if err != nil { return deleted, err } + deleted += n + } + // Patterns: processing lists and heartbeats + patterns := []string{ + "jobqueue:worker:*:processing", + "jobqueue:processing:worker:*", + } + for _, pat := range patterns { + var cursor uint64 + for { + keys, cur, err := rdb.Scan(ctx, cursor, pat, 500).Result() + if err != nil { return deleted, err } + cursor = cur + if len(keys) > 0 { + n, err := rdb.Del(ctx, keys...).Result() + if err != nil { return deleted, err } + deleted += n + } + if cursor == 0 { break } + } + } + return deleted, nil +} diff --git a/internal/breaker/breaker.go b/internal/breaker/breaker.go index a148d9a0..65e1931a 100644 --- a/internal/breaker/breaker.go +++ b/internal/breaker/breaker.go @@ -1,3 +1,5 @@ +// Copyright 2025 James Ross +// Copyright 2025 James Ross package breaker import ( @@ -28,6 +30,7 @@ type CircuitBreaker struct { minSamples int lastTransition time.Time results []result + halfOpenInFlight bool } func New(window time.Duration, cooldown time.Duration, failureThresh float64, minSamples int) *CircuitBreaker { @@ -46,11 +49,17 @@ func (cb *CircuitBreaker) Allow() bool { if time.Since(cb.lastTransition) >= cb.cooldown { cb.state = HalfOpen cb.lastTransition = time.Now() - return true // allow a probe + cb.halfOpenInFlight = false + // allow exactly one probe once we enter HalfOpen; next branch handles flag + cb.halfOpenInFlight = true + return true } return false case HalfOpen: - // allow one probe at a time; simplistic approach + if cb.halfOpenInFlight { + return false + } + cb.halfOpenInFlight = true return true default: return true @@ -101,9 +110,10 @@ func (cb *CircuitBreaker) Record(ok bool) { } else { cb.state = Open } + // the single probe completed; allow a future probe after cooldown or next Allow + cb.halfOpenInFlight = false cb.lastTransition = now case Open: // handled in Allow() } } - diff --git a/internal/breaker/breaker_load_test.go b/internal/breaker/breaker_load_test.go new file mode 100644 index 00000000..febadc8b --- /dev/null +++ b/internal/breaker/breaker_load_test.go @@ -0,0 +1,65 @@ +// Copyright 2025 James Ross +package breaker + +import ( + "sync" + "testing" + "time" +) + +// Test that in HalfOpen under concurrent load, only a single probe is allowed at a time. +func TestBreakerHalfOpenSingleProbeUnderLoad(t *testing.T) { + cb := New(20*time.Millisecond, 50*time.Millisecond, 0.5, 2) + if cb.State() != Closed { t.Fatal("expected closed") } + cb.Record(false) + cb.Record(false) + if cb.State() != Open { t.Fatal("expected open after 2 failures") } + + // Wait for cooldown to enter HalfOpen + time.Sleep(60 * time.Millisecond) + + // Concurrently call Allow; only one should be allowed + const N = 100 + var wg sync.WaitGroup + wg.Add(N) + trues := 0 + var mu sync.Mutex + for i := 0; i < N; i++ { + go func() { + defer wg.Done() + if cb.Allow() { + mu.Lock(); trues++; mu.Unlock() + } + }() + } + wg.Wait() + if trues != 1 { + t.Fatalf("expected exactly 1 allowed probe, got %d", trues) + } + + // Fail the probe to remain Open + cb.Record(false) + if cb.State() != Open { t.Fatalf("expected open after failed probe, got %v", cb.State()) } + + // Wait again to HalfOpen and check single probe again + time.Sleep(60 * time.Millisecond) + trues = 0 + wg.Add(N) + for i := 0; i < N; i++ { + go func() { + defer wg.Done() + if cb.Allow() { + mu.Lock(); trues++; mu.Unlock() + } + }() + } + wg.Wait() + if trues != 1 { + t.Fatalf("expected exactly 1 allowed probe in second cycle, got %d", trues) + } + + // Succeed the probe to close + cb.Record(true) + if cb.State() != Closed { t.Fatalf("expected closed after successful probe, got %v", cb.State()) } +} + diff --git a/internal/breaker/breaker_test.go b/internal/breaker/breaker_test.go index 281c3329..fb661cb1 100644 --- a/internal/breaker/breaker_test.go +++ b/internal/breaker/breaker_test.go @@ -1,3 +1,4 @@ +// Copyright 2025 James Ross package breaker import ( @@ -14,8 +15,9 @@ func TestBreakerTransitions(t *testing.T) { if cb.State() != Open { t.Fatal("expected open") } if cb.Allow() != false { t.Fatal("should not allow until cooldown") } time.Sleep(250 * time.Millisecond) + // Half-open should allow exactly one probe if cb.Allow() != true { t.Fatal("should allow probe in half-open") } + if cb.Allow() != false { t.Fatal("should block additional probes in half-open") } cb.Record(true) if cb.State() != Closed { t.Fatal("expected closed after probe success") } } - diff --git a/internal/config/config.go b/internal/config/config.go index b8970c97..2f01a37e 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1,3 +1,4 @@ +// Copyright 2025 James Ross package config import ( @@ -39,6 +40,7 @@ type Worker struct { CompletedList string `mapstructure:"completed_list"` DeadLetterList string `mapstructure:"dead_letter_list"` BRPopLPushTimeout time.Duration `mapstructure:"brpoplpush_timeout"` + BreakerPause time.Duration `mapstructure:"breaker_pause"` } type Producer struct { @@ -101,6 +103,7 @@ func defaultConfig() *Config { CompletedList: "jobqueue:completed", DeadLetterList: "jobqueue:dead_letter", BRPopLPushTimeout: 1 * time.Second, + BreakerPause: 100 * time.Millisecond, }, Producer: Producer{ ScanDir: "./data", @@ -156,6 +159,7 @@ func Load(path string) (*Config, error) { v.SetDefault("worker.completed_list", def.Worker.CompletedList) v.SetDefault("worker.dead_letter_list", def.Worker.DeadLetterList) v.SetDefault("worker.brpoplpush_timeout", def.Worker.BRPopLPushTimeout) + v.SetDefault("worker.breaker_pause", def.Worker.BreakerPause) v.SetDefault("producer.scan_dir", def.Producer.ScanDir) v.SetDefault("producer.include_globs", def.Producer.IncludeGlobs) diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 5145c55d..bb7eb37f 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -1,3 +1,4 @@ +// Copyright 2025 James Ross package config import ( diff --git a/internal/obs/http.go b/internal/obs/http.go index 5e891645..64192b0b 100644 --- a/internal/obs/http.go +++ b/internal/obs/http.go @@ -1,3 +1,4 @@ +// Copyright 2025 James Ross package obs import ( @@ -36,4 +37,3 @@ func StartHTTPServer(cfg *config.Config, readiness func(context.Context) error) go func() { _ = srv.ListenAndServe() }() return srv } - diff --git a/internal/obs/logger.go b/internal/obs/logger.go index 8784e37f..0a3a547d 100644 --- a/internal/obs/logger.go +++ b/internal/obs/logger.go @@ -1,3 +1,4 @@ +// Copyright 2025 James Ross package obs import ( @@ -28,4 +29,3 @@ func String(k, v string) zap.Field { return zap.String(k, v) } func Int(k string, v int) zap.Field { return zap.Int(k, v) } func Bool(k string, v bool) zap.Field { return zap.Bool(k, v) } func Err(err error) zap.Field { return zap.Error(err) } - diff --git a/internal/obs/metrics.go b/internal/obs/metrics.go index 0255ef1c..39124289 100644 --- a/internal/obs/metrics.go +++ b/internal/obs/metrics.go @@ -1,3 +1,4 @@ +// Copyright 2025 James Ross package obs import ( @@ -47,6 +48,14 @@ var ( Name: "circuit_breaker_state", Help: "0 Closed, 1 HalfOpen, 2 Open", }) + CircuitBreakerTrips = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "circuit_breaker_trips_total", + Help: "Count of times the circuit breaker transitioned to Open", + }) + ReaperRecovered = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "reaper_recovered_total", + Help: "Total number of jobs recovered by the reaper from processing lists", + }) WorkerActive = prometheus.NewGauge(prometheus.GaugeOpts{ Name: "worker_active", Help: "Number of active worker goroutines", @@ -54,7 +63,7 @@ var ( ) func init() { - prometheus.MustRegister(JobsProduced, JobsConsumed, JobsCompleted, JobsFailed, JobsRetried, JobsDeadLetter, JobProcessingDuration, QueueLength, CircuitBreakerState, WorkerActive) + prometheus.MustRegister(JobsProduced, JobsConsumed, JobsCompleted, JobsFailed, JobsRetried, JobsDeadLetter, JobProcessingDuration, QueueLength, CircuitBreakerState, CircuitBreakerTrips, ReaperRecovered, WorkerActive) } // StartMetricsServer exposes /metrics and returns a server for controlled shutdown. diff --git a/internal/obs/queue_length.go b/internal/obs/queue_length.go index a17e13b0..b9bfeadc 100644 --- a/internal/obs/queue_length.go +++ b/internal/obs/queue_length.go @@ -1,3 +1,4 @@ +// Copyright 2025 James Ross package obs import ( @@ -38,4 +39,3 @@ func StartQueueLengthUpdater(ctx context.Context, cfg *config.Config, rdb *redis } }() } - diff --git a/internal/obs/tracing.go b/internal/obs/tracing.go index 5d9387cb..67a956cd 100644 --- a/internal/obs/tracing.go +++ b/internal/obs/tracing.go @@ -1,3 +1,4 @@ +// Copyright 2025 James Ross package obs import ( diff --git a/internal/producer/producer.go b/internal/producer/producer.go index 9dc3ebf5..13b51c0b 100644 --- a/internal/producer/producer.go +++ b/internal/producer/producer.go @@ -1,3 +1,4 @@ +// Copyright 2025 James Ross package producer import ( @@ -29,39 +30,42 @@ func New(cfg *config.Config, rdb *redis.Client, log *zap.Logger) *Producer { func (p *Producer) Run(ctx context.Context) error { root := p.cfg.Producer.ScanDir + absRoot, errAbs := filepath.Abs(root) + if errAbs != nil { + return errAbs + } include := p.cfg.Producer.IncludeGlobs exclude := p.cfg.Producer.ExcludeGlobs - var files []string err := filepath.WalkDir(root, func(path string, d os.DirEntry, err error) error { if err != nil { return err } if d.IsDir() { return nil } + // Safety: ensure file under root + abs, err2 := filepath.Abs(path) + if err2 != nil { return nil } + if !strings.HasPrefix(abs, absRoot+string(os.PathSeparator)) && abs != absRoot { + return nil + } rel, _ := filepath.Rel(root, path) // include check incMatch := len(include) == 0 for _, g := range include { if ok, _ := doublestar.PathMatch(g, rel); ok { incMatch = true; break } } if !incMatch { return nil } for _, g := range exclude { if ok, _ := doublestar.PathMatch(g, rel); ok { return nil } } - files = append(files, path) - return nil - }) - if err != nil { - return err - } - for _, f := range files { + // Per-file enqueue (streaming) select { case <-ctx.Done(): return ctx.Err() default: } if err := p.rateLimit(ctx); err != nil { return err } - fi, err := os.Stat(f) - if err != nil { continue } - prio := p.priorityForExt(filepath.Ext(f)) + fi, err := os.Stat(path) + if err != nil { return nil } + prio := p.priorityForExt(filepath.Ext(path)) id := randID() traceID, spanID := randTraceAndSpan() - j := queue.NewJob(id, f, fi.Size(), prio, traceID, spanID) + j := queue.NewJob(id, abs, fi.Size(), prio, traceID, spanID) payload, _ := j.Marshal() key := p.cfg.Worker.Queues[prio] if key == "" { key = p.cfg.Worker.Queues[p.cfg.Producer.DefaultPriority] } @@ -70,6 +74,10 @@ func (p *Producer) Run(ctx context.Context) error { } obs.JobsProduced.Inc() p.log.Info("enqueued job", obs.String("id", j.ID), obs.String("queue", key), obs.String("trace_id", j.TraceID), obs.String("span_id", j.SpanID)) + return nil + }) + if err != nil { + return err } return nil } diff --git a/internal/producer/producer_test.go b/internal/producer/producer_test.go index 164e2586..b9ed9089 100644 --- a/internal/producer/producer_test.go +++ b/internal/producer/producer_test.go @@ -1,3 +1,4 @@ +// Copyright 2025 James Ross package producer import ( diff --git a/internal/queue/job.go b/internal/queue/job.go index 7cbf8e3b..0fd1b9a2 100644 --- a/internal/queue/job.go +++ b/internal/queue/job.go @@ -1,3 +1,4 @@ +// Copyright 2025 James Ross package queue import ( @@ -42,4 +43,3 @@ func UnmarshalJob(s string) (Job, error) { err := json.Unmarshal([]byte(s), &j) return j, err } - diff --git a/internal/queue/job_test.go b/internal/queue/job_test.go index bbb0ebeb..65790535 100644 --- a/internal/queue/job_test.go +++ b/internal/queue/job_test.go @@ -1,3 +1,4 @@ +// Copyright 2025 James Ross package queue import "testing" @@ -12,4 +13,3 @@ func TestMarshalUnmarshal(t *testing.T) { t.Fatalf("roundtrip mismatch: %#v vs %#v", j, j2) } } - diff --git a/internal/reaper/reaper.go b/internal/reaper/reaper.go index def39f88..37e11f4d 100644 --- a/internal/reaper/reaper.go +++ b/internal/reaper/reaper.go @@ -1,3 +1,4 @@ +// Copyright 2025 James Ross package reaper import ( @@ -65,8 +66,12 @@ func (r *Reaper) scanOnce(ctx context.Context) { prio := job.Priority dest := r.cfg.Worker.Queues[prio] if dest == "" { dest = r.cfg.Worker.Queues[r.cfg.Producer.DefaultPriority] } - _ = r.rdb.LPush(ctx, dest, payload).Err() - r.log.Warn("requeued abandoned job", obs.String("id", job.ID), obs.String("to", dest), obs.String("trace_id", job.TraceID), obs.String("span_id", job.SpanID)) + if err := r.rdb.LPush(ctx, dest, payload).Err(); err != nil { + r.log.Error("requeue failed", obs.Err(err)) + } else { + obs.ReaperRecovered.Inc() + r.log.Warn("requeued abandoned job", obs.String("id", job.ID), obs.String("to", dest), obs.String("trace_id", job.TraceID), obs.String("span_id", job.SpanID)) + } } } if cursor == 0 { break } diff --git a/internal/reaper/reaper_test.go b/internal/reaper/reaper_test.go index e05810bd..100ddf9b 100644 --- a/internal/reaper/reaper_test.go +++ b/internal/reaper/reaper_test.go @@ -1,3 +1,4 @@ +// Copyright 2025 James Ross package reaper import ( diff --git a/internal/redisclient/client.go b/internal/redisclient/client.go index 75526d27..433a923d 100644 --- a/internal/redisclient/client.go +++ b/internal/redisclient/client.go @@ -1,3 +1,4 @@ +// Copyright 2025 James Ross package redisclient import ( @@ -28,4 +29,3 @@ func New(cfg *config.Config) *redis.Client { IdleTimeout: 5 * time.Minute, }) } - diff --git a/internal/worker/worker.go b/internal/worker/worker.go index cd8fac59..ce9fbff9 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -1,3 +1,4 @@ +// Copyright 2025 James Ross package worker import ( @@ -21,20 +22,24 @@ type Worker struct { rdb *redis.Client log *zap.Logger cb *breaker.CircuitBreaker + baseID string } func New(cfg *config.Config, rdb *redis.Client, log *zap.Logger) *Worker { cb := breaker.New(cfg.CircuitBreaker.Window, cfg.CircuitBreaker.CooldownPeriod, cfg.CircuitBreaker.FailureThreshold, cfg.CircuitBreaker.MinSamples) - return &Worker{cfg: cfg, rdb: rdb, log: log, cb: cb} + host, _ := os.Hostname() + pid := os.Getpid() + now := time.Now().UnixNano() + randSfx := fmt.Sprintf("%04x", time.Now().UnixNano()&0xffff) + base := fmt.Sprintf("%s-%d-%d-%s", host, pid, now, randSfx) + return &Worker{cfg: cfg, rdb: rdb, log: log, cb: cb, baseID: base} } func (w *Worker) Run(ctx context.Context) error { var wg sync.WaitGroup - host, _ := os.Hostname() - pid := os.Getpid() for i := 0; i < w.cfg.Worker.Count; i++ { wg.Add(1) - id := fmt.Sprintf("%s-%d-%d", host, pid, i) + id := fmt.Sprintf("%s-%d", w.baseID, i) go func(workerID string) { defer wg.Done() obs.WorkerActive.Inc() @@ -74,7 +79,7 @@ func (w *Worker) runOne(ctx context.Context, workerID string) { for ctx.Err() == nil { if !w.cb.Allow() { - time.Sleep(100 * time.Millisecond) + time.Sleep(w.cfg.Worker.BreakerPause) continue } @@ -106,11 +111,17 @@ func (w *Worker) runOne(ctx context.Context, workerID string) { // heartbeat set _ = w.rdb.Set(ctx, hbKey, payload, w.cfg.Worker.HeartbeatTTL).Err() + // measure state transition around Record() to count trips start := time.Now() // process job ok := w.processJob(ctx, workerID, srcQueue, procList, hbKey, payload) obs.JobProcessingDuration.Observe(time.Since(start).Seconds()) + prev := w.cb.State() w.cb.Record(ok) + curr := w.cb.State() + if prev != curr && curr == breaker.Open { + obs.CircuitBreakerTrips.Inc() + } } } @@ -127,13 +138,23 @@ func (w *Worker) processJob(ctx context.Context, workerID, srcQueue, procList, h ctx, span := obs.ContextWithJobSpan(ctx, job) defer span.End() - // Simulated processing: sleep based on filesize + // Simulated processing: sleep based on filesize with cancellable timer dur := time.Duration(min64(job.FileSize/1024, 1000)) * time.Millisecond canceled := false - select { - case <-ctx.Done(): - canceled = true - case <-time.After(dur): + if dur > 0 { + timer := time.NewTimer(dur) + defer func() { if !timer.Stop() { <-timer.C } }() + select { + case <-ctx.Done(): + canceled = true + case <-timer.C: + } + } else { + select { + case <-ctx.Done(): + canceled = true + default: + } } // For demonstration, consider processing success unless canceled or filename contains "fail" @@ -141,9 +162,15 @@ func (w *Worker) processJob(ctx context.Context, workerID, srcQueue, procList, h if success { // complete - _ = w.rdb.LPush(ctx, w.cfg.Worker.CompletedList, payload).Err() - _ = w.rdb.LRem(ctx, procList, 1, payload).Err() - _ = w.rdb.Del(ctx, hbKey).Err() + if err := w.rdb.LPush(ctx, w.cfg.Worker.CompletedList, payload).Err(); err != nil { + w.log.Error("LPUSH completed failed", obs.Err(err)) + } + if err := w.rdb.LRem(ctx, procList, 1, payload).Err(); err != nil { + w.log.Error("LREM processing failed", obs.Err(err)) + } + if err := w.rdb.Del(ctx, hbKey).Err(); err != nil { + w.log.Error("DEL heartbeat failed", obs.Err(err)) + } obs.JobsCompleted.Inc() w.log.Info("job completed", obs.String("id", job.ID), obs.String("trace_id", job.TraceID), obs.String("span_id", job.SpanID), obs.String("worker_id", workerID)) return true @@ -162,17 +189,29 @@ func (w *Worker) processJob(ctx context.Context, workerID, srcQueue, procList, h if job.Retries <= w.cfg.Worker.MaxRetries { obs.JobsRetried.Inc() payload2, _ := job.Marshal() - _ = w.rdb.LPush(ctx, srcQueue, payload2).Err() - _ = w.rdb.LRem(ctx, procList, 1, payload).Err() - _ = w.rdb.Del(ctx, hbKey).Err() + if err := w.rdb.LPush(ctx, srcQueue, payload2).Err(); err != nil { + w.log.Error("LPUSH retry failed", obs.Err(err)) + } + if err := w.rdb.LRem(ctx, procList, 1, payload).Err(); err != nil { + w.log.Error("LREM processing failed", obs.Err(err)) + } + if err := w.rdb.Del(ctx, hbKey).Err(); err != nil { + w.log.Error("DEL heartbeat failed", obs.Err(err)) + } w.log.Warn("job retried", obs.String("id", job.ID), obs.Int("retries", job.Retries), obs.String("trace_id", job.TraceID), obs.String("span_id", job.SpanID), obs.String("worker_id", workerID)) return false } // dead letter - _ = w.rdb.LPush(ctx, w.cfg.Worker.DeadLetterList, payload).Err() - _ = w.rdb.LRem(ctx, procList, 1, payload).Err() - _ = w.rdb.Del(ctx, hbKey).Err() + if err := w.rdb.LPush(ctx, w.cfg.Worker.DeadLetterList, payload).Err(); err != nil { + w.log.Error("LPUSH DLQ failed", obs.Err(err)) + } + if err := w.rdb.LRem(ctx, procList, 1, payload).Err(); err != nil { + w.log.Error("LREM processing failed", obs.Err(err)) + } + if err := w.rdb.Del(ctx, hbKey).Err(); err != nil { + w.log.Error("DEL heartbeat failed", obs.Err(err)) + } obs.JobsDeadLetter.Inc() w.log.Error("job dead-lettered", obs.String("id", job.ID), obs.String("trace_id", job.TraceID), obs.String("span_id", job.SpanID), obs.String("worker_id", workerID)) return false diff --git a/internal/worker/worker_breaker_integration_test.go b/internal/worker/worker_breaker_integration_test.go new file mode 100644 index 00000000..582fbe7f --- /dev/null +++ b/internal/worker/worker_breaker_integration_test.go @@ -0,0 +1,87 @@ +// Copyright 2025 James Ross +package worker + +import ( + "context" + "testing" + "time" + + "github.com/alicebob/miniredis/v2" + "github.com/flyingrobots/go-redis-work-queue/internal/config" + "github.com/flyingrobots/go-redis-work-queue/internal/queue" + "github.com/go-redis/redis/v8" + "go.uber.org/zap" +) + +// Test that repeated failures trip the breaker, and while Open the worker +// does not drain the queue until cooldown elapses. +func TestWorkerBreakerTripsAndPausesConsumption(t *testing.T) { + mr, _ := miniredis.Run() + defer mr.Close() + + cfg, _ := config.Load("nonexistent.yaml") + cfg.Redis.Addr = mr.Addr() + cfg.Worker.Count = 1 + // Make retries immediate and short + cfg.Worker.Backoff.Base = 1 * time.Millisecond + cfg.Worker.Backoff.Max = 2 * time.Millisecond + cfg.Worker.BRPopLPushTimeout = 5 * time.Millisecond + // Breaker tuned for quick transition + cfg.CircuitBreaker.Window = 20 * time.Millisecond + cfg.CircuitBreaker.CooldownPeriod = 100 * time.Millisecond + cfg.CircuitBreaker.FailureThreshold = 0.5 + cfg.CircuitBreaker.MinSamples = 1 + cfg.Worker.BreakerPause = 5 * time.Millisecond + + rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + defer rdb.Close() + + // Enqueue failing jobs (filename contains "fail") + for i := 0; i < 5; i++ { + j := queue.NewJob( + "id-fail-", + "/tmp/fail-test.txt", // contains "fail" to force failure + 1, + "low", + "", + "", + ) + payload, _ := j.Marshal() + if err := rdb.LPush(context.Background(), cfg.Worker.Queues["low"], payload).Err(); err != nil { + t.Fatalf("lpush: %v", err) + } + } + + log, _ := zap.NewDevelopment() + w := New(cfg, rdb, log) + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { defer close(done); _ = w.Run(ctx) }() + + // Wait up to 2s for breaker to open + deadline := time.Now().Add(2 * time.Second) + opened := false + for time.Now().Before(deadline) { + if w.cb.State() == 2 { // Open + opened = true + break + } + time.Sleep(5 * time.Millisecond) + } + if !opened { + cancel(); <-done + t.Fatalf("breaker did not open under failures") + } + + // While breaker is Open (cooldown 100ms), queue length should not decrease + n1, _ := rdb.LLen(context.Background(), cfg.Worker.Queues["low"]).Result() + time.Sleep(50 * time.Millisecond) // less than cooldown + n2, _ := rdb.LLen(context.Background(), cfg.Worker.Queues["low"]).Result() + if n2 < n1 { + cancel(); <-done + t.Fatalf("queue drained during breaker open: before=%d after=%d", n1, n2) + } + + cancel() + <-done +} diff --git a/internal/worker/worker_process_test.go b/internal/worker/worker_process_test.go index e249e8b4..99b91886 100644 --- a/internal/worker/worker_process_test.go +++ b/internal/worker/worker_process_test.go @@ -1,3 +1,4 @@ +// Copyright 2025 James Ross package worker import ( @@ -63,4 +64,3 @@ func TestProcessJobRetryThenDLQ(t *testing.T) { if ok2 { t.Fatalf("expected failure to DLQ") } if n, _ := rdb.LLen(ctx, cfg.Worker.DeadLetterList).Result(); n != 1 { t.Fatalf("expected DLQ 1, got %d", n) } } - diff --git a/internal/worker/worker_test.go b/internal/worker/worker_test.go index 50a3800b..a9fc6337 100644 --- a/internal/worker/worker_test.go +++ b/internal/worker/worker_test.go @@ -1,3 +1,4 @@ +// Copyright 2025 James Ross package worker import ( @@ -9,4 +10,3 @@ func TestBackoffCaps(t *testing.T) { b := backoff(10, 100*time.Millisecond, 1*time.Second) if b != 1*time.Second { t.Fatalf("expected cap at 1s, got %v", b) } } - diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index ca8eeb8e..49a05fd8 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -1,3 +1,4 @@ +// Copyright 2025 James Ross package e2e import ( @@ -59,4 +60,3 @@ func TestE2E_WorkerCompletesJobWithRealRedis(t *testing.T) { t.Fatalf("expected completed 1, got %d", n) } } -