Skip to content

Implement background job workers system#12

Open
protoappxyz wants to merge 7 commits intomainfrom
claude/implement-background-workers-011CUsYg2xujqwSAdviCdHph
Open

Implement background job workers system#12
protoappxyz wants to merge 7 commits intomainfrom
claude/implement-background-workers-011CUsYg2xujqwSAdviCdHph

Conversation

@protoappxyz
Copy link
Copy Markdown
Collaborator

Add Redis-backed job queue using Asynq for handling background tasks:

  • Job Types & Infrastructure:

    • Email sending (verification, welcome, position updates, rewards, milestones)
    • Position recalculation for waitlist ranking
    • Reward fulfillment with email and webhook delivery
    • Hourly analytics aggregation
    • Fraud detection (self-referral, velocity, fake emails, bots, duplicate IP/device)
  • Workers Implementation:

    • EmailWorker: Processes email jobs with template rendering
    • PositionWorker: Recalculates waitlist positions based on referrals
    • RewardWorker: Delivers rewards via email or webhook
    • AnalyticsWorker: Aggregates campaign metrics hourly
    • FraudWorker: Runs fraud checks with configurable detection types
  • Store Layer Enhancements:

    • Campaign analytics CRUD operations
    • User activity logging
    • Fraud detection records
    • Helper methods for counting metrics
    • GetAllActiveCampaigns for batch processing
  • Worker Server:

    • Standalone worker process (cmd/worker/main.go)
    • Priority-based queue system (high/medium/low)
    • Configurable concurrency (10 high, 5 medium, 2 low workers)
    • Periodic task scheduling for hourly analytics
    • Graceful shutdown handling
  • Job Client:

    • Centralized job enqueueing
    • Methods for all job types
    • Integrated with existing infrastructure

Technical Details:

  • Uses Asynq for reliable Redis-backed job processing
  • Exponential backoff retry strategy (max 5 attempts)
  • Template variable substitution for emails
  • Position ranking algorithm with verified referral weighting
  • Fraud detection with confidence scoring
  • Hourly cron schedule for analytics aggregation

Add Redis-backed job queue using Asynq for handling background tasks:

- Job Types & Infrastructure:
  - Email sending (verification, welcome, position updates, rewards, milestones)
  - Position recalculation for waitlist ranking
  - Reward fulfillment with email and webhook delivery
  - Hourly analytics aggregation
  - Fraud detection (self-referral, velocity, fake emails, bots, duplicate IP/device)

- Workers Implementation:
  - EmailWorker: Processes email jobs with template rendering
  - PositionWorker: Recalculates waitlist positions based on referrals
  - RewardWorker: Delivers rewards via email or webhook
  - AnalyticsWorker: Aggregates campaign metrics hourly
  - FraudWorker: Runs fraud checks with configurable detection types

- Store Layer Enhancements:
  - Campaign analytics CRUD operations
  - User activity logging
  - Fraud detection records
  - Helper methods for counting metrics
  - GetAllActiveCampaigns for batch processing

- Worker Server:
  - Standalone worker process (cmd/worker/main.go)
  - Priority-based queue system (high/medium/low)
  - Configurable concurrency (10 high, 5 medium, 2 low workers)
  - Periodic task scheduling for hourly analytics
  - Graceful shutdown handling

- Job Client:
  - Centralized job enqueueing
  - Methods for all job types
  - Integrated with existing infrastructure

Technical Details:
- Uses Asynq for reliable Redis-backed job processing
- Exponential backoff retry strategy (max 5 attempts)
- Template variable substitution for emails
- Position ranking algorithm with verified referral weighting
- Fraud detection with confidence scoring
- Hourly cron schedule for analytics aggregation
…rcing

Implement a production-ready Kafka infrastructure alongside existing Asynq workers,
providing durability, replay capability, and event sourcing for critical business events.

## New Infrastructure

### Kafka Core Components (internal/kafka/):
- **Producer**: Durable message producer with configurable compression, batching, and acks
  - Snappy compression for efficiency
  - Batch size: 100 messages
  - RequiredAcks: -1 (all replicas) for guaranteed durability
  - Auto-generated message IDs and timestamps

- **Consumer**: Robust message consumer with consumer groups and DLQ support
  - Auto-commit with 1s interval
  - Session timeout: 30s with heartbeat every 3s
  - Range partitioner for ordered processing
  - Automatic offset management
  - Built-in dead letter queue handling

- **Topics**: Comprehensive topic architecture with 17 topics:
  - **Job Topics** (7 days retention): Email, Position, Rewards, Fraud, Webhooks, Analytics
  - **Event Sourcing Topics** (1 year retention): User signups, Verifications, Referrals, Rewards
  - **Dead Letter Queue** (1 year retention): Failed messages for debugging

### Job Client (internal/jobs/):
- **KafkaClient**: Kafka-based job enqueueing with topic routing
  - Per-topic producer management
  - Message key strategies for ordering (campaign_id, user_id, etc.)
  - Headers for metadata and tracing
  - Event sourcing publish method for audit trails

### Kafka Worker Server (cmd/kafka-worker/):
- Standalone Kafka consumer server with multiple consumer groups
- Parallel consumers for all topics with independent scaling
- Periodic hourly analytics aggregation
- Graceful shutdown with errgroup coordination
- Comprehensive error handling and DLQ routing

### Docker Compose (docker-compose.kafka.yml):
- Complete Kafka stack for local development:
  - Zookeeper for cluster coordination
  - Kafka broker with optimized settings
  - Kafka UI at http://localhost:8080
  - Schema Registry (optional, --profile full)
  - Kafka Connect (optional, --profile full)
  - Automatic topic initialization on startup

## Features

### Durability & Reliability:
- ✅ Messages persisted to disk with 3x replication (production)
- ✅ RequiredAcks=-1 ensures all replicas acknowledge writes
- ✅ No message loss even on broker failures
- ✅ Automatic consumer rebalancing on failures

### Replay & Recovery:
- ✅ Can reprocess messages from any offset/timestamp
- ✅ Rebuild analytics from event stream
- ✅ Investigate issues by replaying production events
- ✅ 1-year retention for compliance/audit

### Event Sourcing:
- ✅ Append-only audit log of all business events
- ✅ Full history of user signups, verifications, referrals, rewards
- ✅ Can reconstruct system state from event stream
- ✅ Compliance and regulatory requirements

### Ordering Guarantees:
- ✅ Per-partition message ordering
- ✅ Messages with same key (e.g., user_id) processed in order
- ✅ Critical for position recalculation and fraud detection

### Scalability:
- ✅ 10-20 partitions per high-volume topic
- ✅ Consumer groups for horizontal scaling
- ✅ Millions of messages per second capacity
- ✅ Independent scaling per worker type

### Monitoring:
- ✅ Kafka UI for topic/consumer monitoring
- ✅ Consumer lag tracking
- ✅ Producer/consumer statistics
- ✅ Dead letter queue inspection

## Topic Architecture

**High Priority (10 partitions, 7 days):**
- waitlist.email.verification, welcome, position-update, reward-earned, milestone
- waitlist.reward.delivery (30 days - critical)

**Medium Priority (5 partitions, 7 days):**
- waitlist.position.recalculation
- waitlist.fraud.detection (30 days)
- waitlist.webhook.delivery
- waitlist.analytics.aggregation

**Event Sourcing (10 partitions, 1 year):**
- waitlist.events.user.signup, verified
- waitlist.events.referral.created
- waitlist.events.reward.earned

**Dead Letter Queue (3 partitions, 1 year):**
- waitlist.dlq

## Usage

### Start Kafka:
```bash
docker-compose -f docker-compose.kafka.yml up -d
# View UI at http://localhost:8080
```

### Run Kafka Workers:
```bash
KAFKA_BROKERS=localhost:9092 go run cmd/kafka-worker/main.go
```

### Enqueue Jobs:
```go
jobClient := jobs.NewKafkaClient([]string{"localhost:9092"}, logger)
err := jobClient.EnqueueEmailJob(ctx, jobs.EmailJobPayload{...})
```

### Publish Events:
```go
err := jobClient.PublishEvent(ctx, kafka.TopicUserSignup, userID.String(), event)
```

## Migration Strategy

Both Asynq (cmd/worker) and Kafka (cmd/kafka-worker) systems co-exist:
1. Run both worker types in parallel
2. Gradually migrate job enqueueing to Kafka
3. Monitor consumer lag and error rates
4. Decommission Asynq when Kafka is stable

## Documentation

See KAFKA_WORKERS.md for:
- Detailed architecture
- Topic configuration
- Consumer groups
- Error handling & DLQ
- Replay & recovery procedures
- Monitoring & scaling
- Troubleshooting guide
- Migration checklist

## Benefits Over Asynq/Redis

| Feature | Kafka | Asynq/Redis |
|---------|-------|-------------|
| Durability | Disk + Replication | Memory (AOF/RDB) |
| Message Loss | No | Possible |
| Replay | Yes | No |
| Event Sourcing | Yes | No |
| Retention | Years | Hours |
| Ordering | Guaranteed | No |
| Throughput | Millions/sec | Hundreds K/sec |

This implementation provides enterprise-grade reliability for critical business
events while maintaining the flexibility to replay, audit, and recover from failures.
@protoappxyz protoappxyz force-pushed the claude/implement-background-workers-011CUsYg2xujqwSAdviCdHph branch from 143061e to fee7644 Compare November 7, 2025 02:07
This commit refactors the Kafka-based background worker system to align with
the existing webhook delivery Kafka implementation, using shared infrastructure
and following established patterns.

**Architecture Changes:**

1. **Shared Kafka Client Infrastructure**
   - Removed redundant internal/kafka/ directory
   - Now uses internal/clients/kafka/ (shared with webhooks)
   - Single EventMessage schema for consistency

2. **Job Producer** (internal/jobs/producer/)
   - High-level API for enqueueing background jobs
   - Converts job payloads to Kafka EventMessage format
   - Supports all job types: email, position, reward, analytics, fraud

3. **Job Consumer with Worker Pool** (internal/jobs/consumer/)
   - Worker pool architecture (default: 10 concurrent workers)
   - Routes events to appropriate worker based on event type
   - Manual offset commit for reliability

4. **Dual-Mode Workers**
   - All workers now support both Asynq (Redis) and Kafka
   - ProcessXxxTask() methods for Asynq compatibility
   - ProcessXxx() methods for Kafka compatibility
   - Shared core processing logic

**Benefits:**

✅ **Consistency**: Same Kafka pattern as webhook delivery
✅ **Maintainability**: Shared client code, single source of truth
✅ **Flexibility**: Support both Asynq and Kafka concurrently
✅ **Scalability**: Worker pool enables concurrent processing
✅ **Simplicity**: Single topic (job-events) with event-type routing

**Configuration:**

New environment variables:
- KAFKA_JOB_TOPIC: Topic name (default: job-events)
- KAFKA_JOB_CONSUMER_GROUP: Consumer group ID (default: job-workers)

**Event Types:**

- job.email.<type> (verification, welcome, position_update, etc.)
- job.position.recalculate
- job.reward.deliver
- job.fraud.detect
- job.analytics.aggregate

**Documentation:**

- Removed KAFKA_WORKERS.md (outdated)
- Added docs/BACKGROUND_JOBS.md with comprehensive guide
- Includes architecture, usage, deployment, monitoring, troubleshooting

**Migration:**

Backward compatible with existing Asynq workers. Both systems can run
concurrently for gradual migration.

**Files Changed:**

- cmd/kafka-worker/main.go: Simplified using shared client
- internal/jobs/producer/: New job producer
- internal/jobs/consumer/: New job consumer with worker pool
- internal/jobs/workers/*: Updated to support both Asynq and Kafka
- Deleted: internal/kafka/* (redundant infrastructure)
- Deleted: internal/jobs/kafka_client.go (replaced by producer)
- Deleted: KAFKA_WORKERS.md
- Added: docs/BACKGROUND_JOBS.md
This commit separates background jobs into two complementary patterns based
on their nature, improving efficiency and clarity:

**🔔 Event-Driven Jobs (via Kafka)**
For reactive processing triggered by user actions:
- Email sending (verification, welcome, position updates, rewards)
- Position recalculation (triggered by referral verification)
- Reward delivery (triggered when user earns reward)

**⏰ Scheduled Jobs (via Cron-like Scheduler)**
For proactive processing on time intervals:
- Analytics aggregation (hourly by default)
- Fraud detection scans (every 15 minutes by default)

**Why This Separation?**

Previously, ALL jobs (including analytics and fraud detection) were
event-driven via Kafka. This wasn't optimal because:

1. Analytics should run on schedule (hourly), not per-event
2. Fraud detection should scan batches periodically, not per-user
3. Mixing patterns created confusion about when jobs run
4. Scheduled jobs don't need Kafka's durability (they re-run anyway)

**New Components:**

1. **Scheduler** (`internal/jobs/scheduler/`)
   - Generic job scheduler with configurable intervals
   - Each job implements: Name(), Schedule(), Run()
   - Runs jobs immediately on startup, then on schedule
   - Graceful shutdown support

2. **Analytics Aggregation Job** (scheduled)
   - Runs hourly (configurable via ANALYTICS_INTERVAL)
   - Aggregates metrics for ALL active campaigns
   - Metrics: signups, verifications, referrals, emails, rewards
   - No manual triggering needed

3. **Fraud Detection Job** (scheduled)
   - Runs every 15 minutes (configurable via FRAUD_DETECTION_INTERVAL)
   - Scans users from last 2x interval window
   - Runs 6 fraud checks per user
   - Flags high/critical risk users

**Architecture Changes:**

```
BEFORE (all event-driven):
User Action → Kafka → Worker Pool → Process Everything

AFTER (hybrid):
User Action → Kafka → Worker Pool → Email/Position/Reward
Every Hour → Scheduler → Analytics Aggregation
Every 15min → Scheduler → Fraud Detection Scan
```

**Consumer Changes:**

- Removed analytics and fraud workers from event-driven consumer
- Consumer now only handles: emails, position recalc, rewards
- Cleaner separation of concerns

**Main.go Changes:**

- Starts BOTH event-driven consumer AND scheduler
- Uses errgroup for concurrent execution
- Configuration for both patterns:
  - KAFKA_WORKER_POOL_SIZE (event-driven)
  - ANALYTICS_INTERVAL (scheduled)
  - FRAUD_DETECTION_INTERVAL (scheduled)

**New Environment Variables:**

```bash
ANALYTICS_INTERVAL=1h                    # Default: 1 hour
FRAUD_DETECTION_INTERVAL=15m             # Default: 15 minutes
KAFKA_WORKER_POOL_SIZE=10                # Event-driven workers
```

**Benefits:**

✅ **Clarity**: Clear separation between reactive and proactive jobs
✅ **Efficiency**: Batch processing for analytics/fraud
✅ **Simplicity**: No need to trigger scheduled jobs manually
✅ **Predictability**: Jobs run at consistent intervals
✅ **Scalability**: Event-driven scales with traffic, scheduled is fixed
✅ **Maintainability**: Each pattern optimized for its use case

**Documentation:**

Updated docs/BACKGROUND_JOBS.md with:
- Clear explanation of both patterns
- When to use each pattern
- Configuration examples
- Separate sections for event-driven and scheduled jobs

**Testing:**

Worker server now logs startup configuration showing both systems:
```
Event-driven jobs: 10 concurrent workers
Analytics aggregation: every 1h0m0s
Fraud detection: every 15m0s
```

**Backward Compatibility:**

- Event-driven jobs continue to work exactly as before
- Asynq workers still supported for gradual migration
- No breaking changes to existing code

This architecture follows the webhook pattern (event-driven) while adding
appropriate scheduling for time-based operations.
This commit simplifies the background job system to focus on the core
event-driven pattern using Kafka, matching the webhook delivery system.

**Removed:**
- Scheduler component (internal/jobs/scheduler/)
- Analytics aggregation job (scheduled)
- Fraud detection job (scheduled)
- Scheduled job database migration and store layer

**Reasoning:**

Focus on getting the event-driven pattern solid first:
- Email sending (verification, welcome, position updates, rewards)
- Position recalculation (triggered by referral verification)
- Reward delivery (triggered when user earns reward)

Scheduled jobs (analytics, fraud detection) can be added in a future PR
with a proper table-based tracking system similar to webhooks.

**What Remains:**

Clean event-driven Kafka implementation:
- Event producer (internal/jobs/producer/)
- Event consumer with worker pool (internal/jobs/consumer/)
- Workers for emails, positions, rewards
- Kafka-based worker server (cmd/kafka-worker/)
- Documentation focused on event-driven pattern

**Pattern:**

User Action → Kafka → Worker Pool (10 workers) → Process Job

Same as webhook delivery:
- Durable (Kafka persistence)
- Scalable (worker pool + horizontal scaling)
- Replayable (Kafka offsets)
- Decoupled (async processing)

**Documentation:**

Updated docs/BACKGROUND_JOBS.md to focus solely on:
- Event-driven jobs
- Kafka setup
- Worker pool architecture
- Production deployment

This provides a solid foundation for the event-driven pattern.
Scheduled jobs will be added later with proper tracking.
Changes:
- Created event publisher for domain events (user.*, referral.*, reward.*, campaign.*)
- Replaced job events (job.email.*, job.position.*) with domain events
- Created separate consumer groups (email-workers, position-workers, reward-workers)
- Each consumer subscribes to relevant domain events independently
- Leverages Kafka's fan-out: single event triggers multiple consumers
- Updated documentation to explain domain events pattern

Benefits:
- Events describe "what happened" not "what to do"
- Decoupled consumers can be scaled independently
- Easy to add new consumers without changing publishers
- Simpler, more maintainable architecture

Example: referral.verified event triggers:
  → email-workers: sends position update email
  → position-workers: recalculates leaderboard positions
Critical Issue:
- Position recalculation has read-modify-write pattern
- Multiple workers processing same campaign would cause race conditions:
  1. Worker 1 reads campaign state
  2. Worker 2 reads campaign state (same snapshot)
  3. Both calculate new positions
  4. Worker 1 writes, Worker 2 writes (overwrites with stale data)

Fixes Applied:
1. Limit position-workers to 1 concurrent worker (not 10)
   - Prevents concurrent processing of position recalculations
   - Email/reward workers remain at 10 (independent operations)

2. Add PostgreSQL advisory locks in recalculateAllPositions()
   - pg_advisory_xact_lock(hashtext(campaign_id))
   - Ensures campaign-level serialization even if scaling up later
   - Lock released automatically when transaction commits

3. Events already partitioned by campaign_id (AccountID field)
   - All events for same campaign go to same Kafka partition
   - Enables future horizontal scaling (different instances process different partitions)

Documentation:
- Added "Concurrency & Race Conditions" section
- Explained worker pool sizing rationale
- Documented scaling strategies for position consumer
- Updated startup logs to show 1 worker for position-workers
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants