diff --git a/PRIORITY_QUEUE.md b/PRIORITY_QUEUE.md new file mode 100644 index 00000000..62e2b22c --- /dev/null +++ b/PRIORITY_QUEUE.md @@ -0,0 +1,193 @@ +# Priority Queue System Documentation + +## Overview + +The tee-worker now implements a priority queue system that enables preferential processing of jobs from specific worker IDs. This system ensures that high-priority workers get their jobs processed faster while maintaining fair processing for all workers. + +## Architecture + +### Components + +1. **Dual Queue System** + - **Fast Queue**: For jobs from priority worker IDs + - **Slow Queue**: For jobs from regular worker IDs + +2. **Priority Manager** + - Maintains a list of priority worker IDs + - Fetches updates from an external endpoint + - Refreshes the list periodically (default: 15 minutes) + +3. **Job Router** + - Routes incoming jobs to appropriate queues based on worker ID + - Falls back to slow queue if fast queue is full + +4. **Worker Processing** + - Workers always check fast queue first + - Only process slow queue jobs when fast queue is empty + +## Configuration + +Configure the priority queue system using these environment variables: + +```bash +# Enable/disable priority queue system (default: true) +ENABLE_PRIORITY_QUEUE=true + +# Queue sizes +FAST_QUEUE_SIZE=1000 # Max jobs in fast queue (default: 1000) +SLOW_QUEUE_SIZE=5000 # Max jobs in slow queue (default: 5000) + +# External endpoint for priority worker list +EXTERNAL_WORKER_ID_PRIORITY_ENDPOINT=https://api.example.com/priority-workers + +# Refresh interval in seconds (default: 900 = 15 minutes) +PRIORITY_REFRESH_INTERVAL_SECONDS=900 +``` + +## External Endpoint Format + +The external endpoint should return JSON in this format: + +```json +{ + "workers": [ + "https://217.28.137.141:50035", + "https://20.245.90.64:50001", + "https://40.76.123.136:50042", + "https://172.214.189.153:18080" + ] +} +``` + +**Note**: The system currently uses the full URL as the worker ID. When submitting jobs, use the complete URL as the worker_id to match against the priority list. + +## Job Flow + +``` +1. Job arrives from a submitter with their worker_id +2. System checks if the submitter's worker_id is in priority list +3. If priority submitter → Route to fast queue +4. If regular submitter → Route to slow queue +5. Tee-worker processes fast queue first, then slow queue +``` + +**Important**: The priority is based on the job submitter's worker ID, not the tee-worker's own ID. This allows certain job submitters to have their requests processed faster. + +## API Endpoints + +### Queue Statistics +```bash +GET /job/queue/stats +``` + +Response: +```json +{ + "fast_queue_depth": 10, + "slow_queue_depth": 45, + "fast_processed": 1234, + "slow_processed": 5678, + "last_update": "2024-01-15T10:30:00Z" +} +``` + +## Development & Testing + +### Using Real Endpoint + +To use the actual TEE workers endpoint: +```bash +export EXTERNAL_WORKER_ID_PRIORITY_ENDPOINT="https://tee-api.masa.ai/list-tee-workers" +``` + +### Using Dummy Data + +When no external endpoint is configured or if the endpoint fails, the system falls back to dummy priority worker IDs: +- `worker-001`, `worker-002`, `worker-005` +- `worker-priority-1`, `worker-priority-2` +- `worker-vip-1` +- `worker-high-priority-3` +- `worker-fast-lane-1` + +### Disable Priority Queue + +To run in legacy mode (single queue): +```bash +ENABLE_PRIORITY_QUEUE=false +``` + +## Implementation Details + +### Files Added/Modified + +1. **New Files**: + - `internal/jobserver/priority_queue.go` - Dual queue implementation + - `internal/jobserver/priority_manager.go` - Priority worker list management + - `internal/jobserver/errors.go` - Error definitions + +2. **Modified Files**: + - `internal/jobserver/jobserver.go` - Integration with priority system + - `internal/jobserver/worker.go` - Priority-aware job processing + - `api/types/job.go` - Added GetBool helper method + - `internal/api/routes.go` - Added queue stats endpoint + - `internal/api/start.go` - Registered new endpoint + +### Key Features + +- **Non-breaking**: Falls back to legacy mode when disabled +- **Resilient**: Uses dummy data if external endpoint fails +- **Observable**: Queue statistics endpoint for monitoring +- **Configurable**: All parameters can be tuned via environment +- **Concurrent**: Thread-safe operations with proper locking + +## Example Usage + +### Start with Priority Queue +```bash +export ENABLE_PRIORITY_QUEUE=true +export EXTERNAL_WORKER_ID_PRIORITY_ENDPOINT="https://your-api.com/priority-workers" +export FAST_QUEUE_SIZE=2000 +export SLOW_QUEUE_SIZE=10000 +export PRIORITY_REFRESH_INTERVAL_SECONDS=300 # 5 minutes + +./tee-worker +``` + +### Monitor Queue Performance +```bash +# Check queue statistics +curl http://localhost:8080/job/queue/stats + +# Response shows queue depths and processing counts +{ + "fast_queue_depth": 5, + "slow_queue_depth": 23, + "fast_processed": 1523, + "slow_processed": 4821, + "last_update": "2024-01-15T14:22:31Z" +} +``` + +## Endpoint Integration Details + +### Automatic Refresh +The priority list is automatically refreshed from the external endpoint: +- Initial fetch on startup +- Periodic refresh every 15 minutes (configurable) +- Continues using last known good list if refresh fails +- All errors are logged but don't stop the service + +### Monitoring Endpoint Status +Check logs for endpoint status: +``` +INFO[0000] Fetching initial priority list from external endpoint: https://tee-api.masa.ai/list-tee-workers +INFO[0000] Priority list updated with 179 workers from external endpoint +``` + +## Future Enhancements + +1. **Dynamic Queue Sizing**: Adjust queue sizes based on load +2. **Priority Levels**: Multiple priority tiers (not just fast/slow) +3. **Metrics Export**: Prometheus/Grafana integration +4. **Queue Persistence**: Survive restarts without losing jobs +5. **Worker ID Extraction**: Extract worker ID from URL if needed (currently uses full URL) \ No newline at end of file diff --git a/api/types/job.go b/api/types/job.go index b0c4f4a7..d836e0ca 100644 --- a/api/types/job.go +++ b/api/types/job.go @@ -149,7 +149,18 @@ func (jc JobConfiguration) GetDuration(key string, defSecs int) time.Duration { func (jc JobConfiguration) GetString(key string, def string) string { if v, ok := jc[key]; ok { - return v.(string) + if val, ok := v.(string); ok { + return val + } + } + return def +} + +func (jc JobConfiguration) GetBool(key string, def bool) bool { + if v, ok := jc[key]; ok { + if val, ok := v.(bool); ok { + return val + } } return def } diff --git a/internal/api/routes.go b/internal/api/routes.go index f401e01b..2f4a1556 100644 --- a/internal/api/routes.go +++ b/internal/api/routes.go @@ -3,6 +3,7 @@ package api import ( "fmt" "net/http" + "time" "github.com/labstack/echo/v4" "github.com/masa-finance/tee-worker/api/types" @@ -121,3 +122,56 @@ func setKey(dataDir string) func(c echo.Context) error { return c.JSON(http.StatusOK, types.KeyResponse{Status: "Key set"}) } } + +// queueStats returns the current queue statistics for monitoring the priority queue system. +// +// GET /job/queue/stats +// +// Response format: +// { +// "enabled": true, // Whether priority queue is enabled +// "fast_queue_depth": 10, // Current number of jobs waiting in fast queue +// "slow_queue_depth": 45, // Current number of jobs waiting in slow queue +// "fast_processed": 1234, // Total jobs processed from fast queue +// "slow_processed": 5678, // Total jobs processed from slow queue +// "last_update": "2024-01-15T10:30:00Z" // ISO8601 timestamp or null +// } +// +// The response always includes all fields for consistent client parsing. +// +// This endpoint is useful for: +// - Monitoring queue health and performance +// - Detecting queue backlogs +// - Verifying priority routing is working correctly +// - Calculating processing rates +func queueStats(jobServer *jobserver.JobServer) func(c echo.Context) error { + return func(c echo.Context) error { + stats := jobServer.GetQueueStats() + if stats == nil { + // Return consistent schema even when disabled + return c.JSON(http.StatusOK, map[string]interface{}{ + "enabled": false, + "fast_queue_depth": 0, + "slow_queue_depth": 0, + "fast_processed": 0, + "slow_processed": 0, + "last_update": nil, // Use nil for JSON null + }) + } + + // Format timestamp as ISO8601 string or null if zero + var lastUpdate interface{} = nil + if !stats.LastUpdateTime.IsZero() { + lastUpdate = stats.LastUpdateTime.Format(time.RFC3339) + } + + return c.JSON(http.StatusOK, map[string]interface{}{ + "enabled": true, + "fast_queue_depth": stats.FastQueueDepth, + "slow_queue_depth": stats.SlowQueueDepth, + "fast_processed": stats.FastProcessed, + "slow_processed": stats.SlowProcessed, + "last_update": lastUpdate, + }) + } +} diff --git a/internal/api/start.go b/internal/api/start.go index a8bd183b..c7b42b6d 100644 --- a/internal/api/start.go +++ b/internal/api/start.go @@ -104,12 +104,14 @@ func Start(ctx context.Context, listenAddress, dataDIR string, standalone bool, - POST /job/add: Add a job to the queue - GET /job/status/:job_id: Get the status of a job - POST /job/result: Get the result of a job, decrypt it and return it + - GET /job/queue/stats: Get queue statistics (if priority queue enabled) */ job := e.Group("/job") job.POST("/generate", generate) job.POST("/add", add(jobServer)) job.GET("/status/:job_id", status(jobServer)) job.POST("/result", result) + job.GET("/queue/stats", queueStats(jobServer)) go func() { <-ctx.Done() diff --git a/internal/jobserver/errors.go b/internal/jobserver/errors.go new file mode 100644 index 00000000..f3945f7c --- /dev/null +++ b/internal/jobserver/errors.go @@ -0,0 +1,25 @@ +// Package jobserver provides job queue management and processing functionality. +// This file defines common errors used throughout the jobserver package. +package jobserver + +import "errors" + +// Common errors returned by jobserver operations. +// These errors help distinguish between different failure scenarios +// and allow callers to handle specific error conditions appropriately. +var ( + // ErrQueueClosed is returned when attempting to use a closed queue + ErrQueueClosed = errors.New("queue is closed") + + // ErrQueueFull is returned when attempting to enqueue to a full queue + ErrQueueFull = errors.New("queue is full") + + // ErrQueueEmpty is returned when attempting to dequeue from empty queues + ErrQueueEmpty = errors.New("all queues are empty") + + // ErrJobNotFound is returned when a job is not found + ErrJobNotFound = errors.New("job not found") + + // ErrInvalidJobType is returned when job type is invalid + ErrInvalidJobType = errors.New("invalid job type") +) \ No newline at end of file diff --git a/internal/jobserver/jobserver.go b/internal/jobserver/jobserver.go index 502557a4..28cf3068 100644 --- a/internal/jobserver/jobserver.go +++ b/internal/jobserver/jobserver.go @@ -3,6 +3,7 @@ package jobserver import ( "context" "sync" + "time" "github.com/sirupsen/logrus" @@ -15,13 +16,18 @@ import ( type JobServer struct { sync.Mutex - jobChan chan types.Job + jobChan chan types.Job // Legacy channel for backward compatibility workers int results *ResultCache jobConfiguration types.JobConfiguration jobWorkers map[string]*jobWorkerEntry + + // Priority queue system + priorityQueue *PriorityQueue + priorityManager *PriorityManager + usePriorityQueue bool } type jobWorkerEntry struct { @@ -89,6 +95,34 @@ func NewJobServer(workers int, jc types.JobConfiguration) *JobServer { logrus.Info("Job workers setup completed.") + // Initialize priority queue system if enabled + var priorityQueue *PriorityQueue + var priorityManager *PriorityManager + usePriorityQueue := jc.GetBool("enable_priority_queue", true) + + if usePriorityQueue { + logrus.Info("Initializing priority queue system...") + + // Create priority queue with configurable sizes + fastQueueSize := jc.GetInt("fast_queue_size", 1000) + slowQueueSize := jc.GetInt("slow_queue_size", 5000) + priorityQueue = NewPriorityQueue(fastQueueSize, slowQueueSize) + + // Create priority manager + externalWorkerIDPriorityEndpoint := jc.GetString("external_worker_id_priority_endpoint", "") + // Default to 15 minutes (900 seconds) if not specified + refreshIntervalSecs := jc.GetInt("priority_refresh_interval_seconds", 900) + refreshInterval := time.Duration(refreshIntervalSecs) * time.Second + priorityManager = NewPriorityManager(externalWorkerIDPriorityEndpoint, refreshInterval) + + logrus.Infof("Priority queue initialized (fast: %d, slow: %d)", fastQueueSize, slowQueueSize) + if externalWorkerIDPriorityEndpoint != "" { + logrus.Infof("External worker ID priority endpoint: %s (refresh every %v)", externalWorkerIDPriorityEndpoint, refreshInterval) + } else { + logrus.Info("Using dummy priority list (no external worker ID priority endpoint configured)") + } + } + // Return the JobServer instance logrus.Info("JobServer initialization complete.") return &JobServer{ @@ -98,6 +132,9 @@ func NewJobServer(workers int, jc types.JobConfiguration) *JobServer { workers: workers, jobConfiguration: jc, jobWorkers: jobworkers, + priorityQueue: priorityQueue, + priorityManager: priorityManager, + usePriorityQueue: usePriorityQueue, } } @@ -109,18 +146,110 @@ func (js *JobServer) Run(ctx context.Context) { <-ctx.Done() } +// AddJob submits a new job to the job server for processing. +// +// This method: +// 1. Assigns a unique UUID to the job +// 2. Sets the job timeout from configuration +// 3. Routes the job to the appropriate queue based on the job's WorkerID: +// - If priority queue is enabled AND the job's WorkerID is in the priority list → fast queue +// - Otherwise → slow queue (or legacy channel if priority queue is disabled) +// +// The job's WorkerID represents the ID of the worker that submitted this job, +// NOT the ID of this tee-worker instance. +// +// Returns the assigned UUID for tracking the job status. +// The actual job processing happens asynchronously. func (js *JobServer) AddJob(j types.Job) string { // TODO The default should come from config.go, but during tests the config is not necessarily read j.Timeout = js.jobConfiguration.GetDuration("job_timeout_seconds", 300) j.UUID = uuid.New().String() - defer func() { + + if js.usePriorityQueue && js.priorityQueue != nil { + // Route job based on worker ID priority + go func() { + jobCopy := j // Create a copy to avoid data races + if js.priorityManager.IsPriorityWorker(jobCopy.WorkerID) { + if err := js.priorityQueue.EnqueueFast(&jobCopy); err != nil { + logrus.Warnf("Failed to enqueue to fast queue: %v, trying slow queue", err) + if err := js.priorityQueue.EnqueueSlow(&jobCopy); err != nil { + logrus.Errorf("Failed to enqueue job %s: %v", jobCopy.UUID, err) + } + } + } else { + if err := js.priorityQueue.EnqueueSlow(&jobCopy); err != nil { + logrus.Errorf("Failed to enqueue job %s to slow queue: %v", jobCopy.UUID, err) + } + } + }() + } else { + // Use legacy channel-based approach go func() { js.jobChan <- j }() - }() + } + return j.UUID } func (js *JobServer) GetJobResult(uuid string) (types.JobResult, bool) { return js.results.Get(uuid) } + +// GetQueueStats returns real-time statistics about the priority queue system. +// +// Returns: +// - Queue depths (number of pending jobs in each queue) +// - Processing counts (total jobs processed from each queue) +// - Last update timestamp +// +// Returns nil if the priority queue system is disabled. +// +// This method is useful for monitoring system performance and queue health. +// It can be called frequently without significant performance impact. +func (js *JobServer) GetQueueStats() *QueueStats { + if !js.usePriorityQueue || js.priorityQueue == nil { + return nil + } + stats := js.priorityQueue.GetStats() + return &stats +} + +// GetPriorityWorkers returns the current list of worker IDs that have priority status. +// +// These are the worker IDs whose jobs will be routed to the fast queue +// for expedited processing. +// +// Returns an empty slice if: +// - Priority queue system is disabled +// - No priority workers are configured +// +// The returned list reflects the most recent update from the external endpoint +// or the default dummy data if no endpoint is configured. +func (js *JobServer) GetPriorityWorkers() []string { + if !js.usePriorityQueue || js.priorityManager == nil { + return []string{} + } + return js.priorityManager.GetPriorityWorkers() +} + +// Shutdown performs a graceful shutdown of the job server. +// +// This method: +// 1. Stops the priority manager's background refresh goroutine +// 2. Closes the priority queues (preventing new job submissions) +// 3. Allows existing jobs to complete processing +// +// This method should be called when the application is shutting down +// to ensure proper cleanup of resources. +// +// Note: This does not cancel running jobs or wait for them to complete. +// Use context cancellation in Run() for immediate shutdown. +func (js *JobServer) Shutdown() { + if js.priorityManager != nil { + js.priorityManager.Stop() + } + if js.priorityQueue != nil { + js.priorityQueue.Close() + } +} diff --git a/internal/jobserver/priority_integration_test.go b/internal/jobserver/priority_integration_test.go new file mode 100644 index 00000000..68a92af1 --- /dev/null +++ b/internal/jobserver/priority_integration_test.go @@ -0,0 +1,245 @@ +package jobserver_test + +import ( + "context" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/masa-finance/tee-worker/api/types" + "github.com/masa-finance/tee-worker/internal/jobserver" +) + +var _ = Describe("Priority Queue Integration", func() { + var ( + js *jobserver.JobServer + ctx context.Context + cancel context.CancelFunc + config types.JobConfiguration + ) + + BeforeEach(func() { + ctx, cancel = context.WithCancel(context.Background()) + + // Create job configuration with priority queue enabled + config = types.JobConfiguration{ + "enable_priority_queue": true, + "fast_queue_size": 100, + "slow_queue_size": 500, + "external_worker_id_priority_endpoint": "", // Use dummy data + "priority_refresh_interval_seconds": 5, + "result_cache_max_size": 100, + "result_cache_max_age_seconds": 60, + "job_timeout_seconds": 30, + "stats_buf_size": uint(128), + "worker_id": "test-worker", + } + + // Create job server with 5 workers + js = jobserver.NewJobServer(5, config) + go js.Run(ctx) + + // Give workers time to start + time.Sleep(100 * time.Millisecond) + }) + + AfterEach(func() { + cancel() + js.Shutdown() + time.Sleep(100 * time.Millisecond) + }) + + Describe("Job Routing", func() { + It("should route priority worker jobs to fast queue", func() { + // Add job from priority worker + priorityJob := types.Job{ + Type: "telemetry", + WorkerID: "worker-001", // This is in the priority list + Arguments: types.JobArguments{ + "test": "priority", + }, + } + + uuid := js.AddJob(priorityJob) + Expect(uuid).NotTo(BeEmpty()) + + // Wait for job to be processed + Eventually(func() bool { + result, exists := js.GetJobResult(uuid) + return exists && result.Error == "" + }, 5*time.Second, 100*time.Millisecond).Should(BeTrue()) + + // Check queue stats + stats := js.GetQueueStats() + Expect(stats).NotTo(BeNil()) + Expect(stats.FastProcessed).To(BeNumerically(">=", 1)) + }) + + It("should route non-priority worker jobs to slow queue", func() { + // Add job from non-priority worker + regularJob := types.Job{ + Type: "telemetry", + WorkerID: "worker-999", // Not in the priority list + Arguments: types.JobArguments{ + "test": "regular", + }, + } + + uuid := js.AddJob(regularJob) + Expect(uuid).NotTo(BeEmpty()) + + // Wait for job to be processed + Eventually(func() bool { + result, exists := js.GetJobResult(uuid) + return exists && result.Error == "" + }, 5*time.Second, 100*time.Millisecond).Should(BeTrue()) + + // Check queue stats + stats := js.GetQueueStats() + Expect(stats).NotTo(BeNil()) + Expect(stats.SlowProcessed).To(BeNumerically(">=", 1)) + }) + + It("should process fast queue jobs before slow queue jobs", func() { + // Add multiple slow jobs first + slowUUIDs := make([]string, 5) + for i := 0; i < 5; i++ { + slowJob := types.Job{ + Type: "telemetry", + WorkerID: "regular-worker", + Arguments: types.JobArguments{ + "index": i, + "type": "slow", + }, + } + slowUUIDs[i] = js.AddJob(slowJob) + } + + // Add fast jobs after slow jobs + fastUUIDs := make([]string, 3) + for i := 0; i < 3; i++ { + fastJob := types.Job{ + Type: "telemetry", + WorkerID: "worker-priority-1", + Arguments: types.JobArguments{ + "index": i, + "type": "fast", + }, + } + fastUUIDs[i] = js.AddJob(fastJob) + } + + // Wait for all jobs to complete + time.Sleep(2 * time.Second) + + // Check stats - fast jobs should have been processed + stats := js.GetQueueStats() + Expect(stats.FastProcessed).To(Equal(int64(3))) + Expect(stats.SlowProcessed).To(BeNumerically(">=", 0)) + Expect(stats.SlowProcessed).To(BeNumerically("<=", 5)) + }) + + It("should fallback to slow queue when fast queue is full", func() { + // Create job server with very small fast queue + smallQueueConfig := types.JobConfiguration{ + "enable_priority_queue": true, + "fast_queue_size": 2, // Very small fast queue + "slow_queue_size": 100, + "external_worker_id_priority_endpoint": "", + "priority_refresh_interval_seconds": 5, + "result_cache_max_size": 100, + "result_cache_max_age_seconds": 60, + "job_timeout_seconds": 30, + "stats_buf_size": uint(128), + "worker_id": "test-worker", + } + + smallJS := jobserver.NewJobServer(0, smallQueueConfig) // No workers initially + defer smallJS.Shutdown() + + // Add multiple priority jobs to fill fast queue + // The fast queue can only hold 2, so others should go to slow queue + jobUUIDs := make([]string, 5) + for i := 0; i < 5; i++ { + priorityJob := types.Job{ + Type: "telemetry", + WorkerID: "worker-001", // Priority worker + Arguments: types.JobArguments{ + "index": i, + }, + } + jobUUIDs[i] = smallJS.AddJob(priorityJob) + } + + // Check stats immediately - jobs should be in queues since no workers are running + stats := smallJS.GetQueueStats() + Expect(stats).NotTo(BeNil()) + + // Fast queue should be at capacity (2) and slow queue should have the overflow + // Note: Due to the async nature of AddJob, we check that jobs were distributed + Eventually(func() int { + s := smallJS.GetQueueStats() + return s.FastQueueDepth + s.SlowQueueDepth + }, 1*time.Second, 100*time.Millisecond).Should(Equal(5)) + }) + }) + + Describe("Priority Worker Management", func() { + It("should return current priority workers", func() { + workers := js.GetPriorityWorkers() + Expect(workers).To(ContainElements( + "worker-001", + "worker-002", + "worker-005", + "worker-priority-1", + "worker-priority-2", + "worker-vip-1", + )) + }) + }) + + Describe("Legacy Mode", func() { + It("should work with priority queue disabled", func() { + // Create job server with priority queue disabled + legacyConfig := types.JobConfiguration{ + "enable_priority_queue": false, + "result_cache_max_size": 100, + "result_cache_max_age_seconds": 60, + "job_timeout_seconds": 30, + "stats_buf_size": uint(128), + "worker_id": "test-worker", + } + + legacyJS := jobserver.NewJobServer(2, legacyConfig) + legacyCtx, legacyCancel := context.WithCancel(context.Background()) + defer legacyCancel() + defer legacyJS.Shutdown() + + go legacyJS.Run(legacyCtx) + time.Sleep(100 * time.Millisecond) + + // Add job + job := types.Job{ + Type: "telemetry", + WorkerID: "any-worker", + Arguments: types.JobArguments{ + "test": "legacy", + }, + } + + uuid := legacyJS.AddJob(job) + Expect(uuid).NotTo(BeEmpty()) + + // Wait for job to be processed + Eventually(func() bool { + result, exists := legacyJS.GetJobResult(uuid) + return exists && result.Error == "" + }, 5*time.Second, 100*time.Millisecond).Should(BeTrue()) + + // Stats should be nil in legacy mode + stats := legacyJS.GetQueueStats() + Expect(stats).To(BeNil()) + }) + }) +}) diff --git a/internal/jobserver/priority_manager.go b/internal/jobserver/priority_manager.go new file mode 100644 index 00000000..3d2ecf6b --- /dev/null +++ b/internal/jobserver/priority_manager.go @@ -0,0 +1,275 @@ +package jobserver + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "sync" + "time" + + "github.com/sirupsen/logrus" +) + +// PriorityManager manages the list of worker IDs that receive priority processing. +// It supports both static configuration and dynamic updates from an external endpoint. +// +// Key features: +// - Maintains an in-memory set of priority worker IDs +// - Optionally fetches updates from an external API endpoint +// - Refreshes the priority list periodically (configurable interval) +// - Thread-safe for concurrent access +type PriorityManager struct { + mu sync.RWMutex + priorityWorkers map[string]bool + externalWorkerIDPriorityEndpoint string + refreshInterval time.Duration + httpClient *http.Client + ctx context.Context + cancel context.CancelFunc +} + +// PriorityWorkerList represents the expected JSON response format from the external priority endpoint. +// This structure should match the API response from the external service that provides +// the list of worker IDs that should receive priority processing. +type PriorityWorkerList struct { + Workers []string `json:"workers"` +} + +// NewPriorityManager creates and initializes a new priority manager. +// +// Parameters: +// - externalWorkerIdPriorityEndpoint: URL of the external API to fetch priority worker IDs. +// If empty, only uses the built-in dummy data. +// - refreshInterval: How often to refresh the priority list from the external endpoint. +// If <= 0, defaults to 15 minutes. +// +// The manager will: +// 1. Initialize with dummy data for testing +// 2. Immediately fetch from the external endpoint (if configured) +// 3. Start a background goroutine to refresh the list periodically +// +// Returns a fully initialized PriorityManager ready for use. +func NewPriorityManager(externalWorkerIDPriorityEndpoint string, refreshInterval time.Duration) *PriorityManager { + ctx, cancel := context.WithCancel(context.Background()) + + // Default to 15 minutes if not specified + if refreshInterval <= 0 { + refreshInterval = 15 * time.Minute + } + + pm := &PriorityManager{ + priorityWorkers: make(map[string]bool), + externalWorkerIDPriorityEndpoint: externalWorkerIDPriorityEndpoint, + refreshInterval: refreshInterval, + httpClient: &http.Client{ + Timeout: 10 * time.Second, + }, + ctx: ctx, + cancel: cancel, + } + + // Fetch initial priority list from external endpoint + if externalWorkerIDPriorityEndpoint != "" { + logrus.Infof("Fetching initial priority list from external endpoint: %s", externalWorkerIDPriorityEndpoint) + if err := pm.fetchPriorityList(); err != nil { + logrus.Warnf("Failed to fetch initial priority list: %v", err) + // Initialize with dummy data as fallback + pm.initializeDummyData() + } + + // Start background refresh + go pm.startBackgroundRefresh() + } else { + logrus.Info("No external worker ID priority endpoint configured, using dummy priority list") + // Initialize with dummy data for testing + pm.initializeDummyData() + } + + return pm +} + +// initializeDummyData populates the priority manager with test data. +// This is useful for local development and testing without an external endpoint. +// +// The dummy data includes various worker ID patterns that can be used +// to test the priority queue behavior. +// +// TODO: This method will be removed once real external endpoint integration is complete. +func (pm *PriorityManager) initializeDummyData() { + pm.mu.Lock() + defer pm.mu.Unlock() + + // Dummy priority worker IDs + dummyWorkers := []string{ + "worker-001", + "worker-002", + "worker-005", + "worker-priority-1", + "worker-priority-2", + "worker-vip-1", + } + + for _, workerID := range dummyWorkers { + pm.priorityWorkers[workerID] = true + } +} + +// IsPriorityWorker checks if a given worker ID should receive priority processing. +// +// Parameters: +// - workerID: The worker ID to check +// +// Returns true if the worker ID is in the priority list, false otherwise. +// +// This method is designed to be called frequently (on every job submission) +// and is optimized for performance with O(1) lookup time. +// Thread-safe: Can be called concurrently from multiple goroutines. +func (pm *PriorityManager) IsPriorityWorker(workerID string) bool { + pm.mu.RLock() + defer pm.mu.RUnlock() + return pm.priorityWorkers[workerID] +} + +// GetPriorityWorkers returns a snapshot of all worker IDs currently in the priority list. +// +// Returns a slice containing all priority worker IDs. The order is not guaranteed. +// The returned slice is a copy, so modifications won't affect the internal state. +// +// This method is useful for monitoring and debugging purposes. +// Thread-safe: Can be called concurrently from multiple goroutines. +func (pm *PriorityManager) GetPriorityWorkers() []string { + pm.mu.RLock() + defer pm.mu.RUnlock() + + workers := make([]string, 0, len(pm.priorityWorkers)) + for workerID := range pm.priorityWorkers { + workers = append(workers, workerID) + } + return workers +} + +// UpdatePriorityWorkers replaces the entire priority worker list with a new set. +// +// Parameters: +// - workerIDs: The new complete list of worker IDs that should have priority +// +// This method completely replaces the existing priority list. Any worker IDs +// not in the new list will lose their priority status. +// +// Thread-safe: Can be called concurrently with other methods. +func (pm *PriorityManager) UpdatePriorityWorkers(workerIDs []string) { + pm.mu.Lock() + defer pm.mu.Unlock() + + // Clear existing map + pm.priorityWorkers = make(map[string]bool) + + // Add new worker IDs + for _, workerID := range workerIDs { + pm.priorityWorkers[workerID] = true + } +} + +// fetchPriorityList retrieves the latest priority worker list from the external endpoint. +// +// This method: +// 1. Makes an HTTP GET request to the configured endpoint +// 2. Parses the JSON response into PriorityWorkerList format +// 3. Updates the internal priority list with the new data +// +// Returns an error if: +// - No external endpoint is configured +// - The HTTP request fails +// - The response cannot be parsed +// +// Note: Currently returns dummy data for testing. The TODO comment indicates +// where real HTTP implementation should be added. +func (pm *PriorityManager) fetchPriorityList() error { + if pm.externalWorkerIDPriorityEndpoint == "" { + return fmt.Errorf("no external worker ID priority endpoint configured") + } + + // Create HTTP request with context + req, err := http.NewRequestWithContext(pm.ctx, http.MethodGet, pm.externalWorkerIDPriorityEndpoint, nil) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + + // Execute request + resp, err := pm.httpClient.Do(req) + if err != nil { + return fmt.Errorf("failed to fetch priority list: %w", err) + } + defer resp.Body.Close() + + // Check response status + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + + // Parse JSON response + var workerList PriorityWorkerList + if err := json.NewDecoder(resp.Body).Decode(&workerList); err != nil { + return fmt.Errorf("failed to decode response: %w", err) + } + + // Extract worker IDs from URLs + // The response contains full URLs like "https://20.245.90.64:50001" + // We need to extract just the worker IDs if they're embedded in the URLs + // For now, we'll use the full URLs as the worker IDs + workerIDs := make([]string, 0, len(workerList.Workers)) + for _, workerURL := range workerList.Workers { + // You can add logic here to extract worker ID from URL if needed + // For example, if the worker ID is the host:port combination + workerIDs = append(workerIDs, workerURL) + } + + pm.UpdatePriorityWorkers(workerIDs) + + // Log the update for debugging + logrus.Infof("Priority list updated with %d workers from external endpoint", len(workerIDs)) + + return nil +} + +// startBackgroundRefresh runs a background goroutine that periodically fetches +// updates from the external endpoint. +// +// This method: +// - Runs indefinitely until Stop() is called +// - Refreshes at the interval specified during initialization +// - Logs errors but continues running if a refresh fails +// +// This method should be called as a goroutine and is started automatically +// by NewPriorityManager when an external endpoint is configured. +func (pm *PriorityManager) startBackgroundRefresh() { + ticker := time.NewTicker(pm.refreshInterval) + defer ticker.Stop() + + for { + select { + case <-pm.ctx.Done(): + return + case <-ticker.C: + if err := pm.fetchPriorityList(); err != nil { + // Log error but continue running + logrus.Errorf("Error refreshing priority list: %v", err) + } + } + } +} + +// Stop gracefully shuts down the priority manager. +// +// This method: +// - Cancels the background refresh goroutine +// - Ensures all resources are properly cleaned up +// +// After calling Stop, the manager can still be queried but will no longer +// update from the external endpoint. +// +// This method is idempotent and can be called multiple times safely. +func (pm *PriorityManager) Stop() { + pm.cancel() +} diff --git a/internal/jobserver/priority_manager_test.go b/internal/jobserver/priority_manager_test.go new file mode 100644 index 00000000..9b14622f --- /dev/null +++ b/internal/jobserver/priority_manager_test.go @@ -0,0 +1,103 @@ +package jobserver_test + +import ( + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/masa-finance/tee-worker/internal/jobserver" +) + +var _ = Describe("PriorityManager", func() { + var pm *jobserver.PriorityManager + + BeforeEach(func() { + // Create priority manager without external endpoint (uses dummy data) + pm = jobserver.NewPriorityManager("", 5*time.Minute) + }) + + AfterEach(func() { + pm.Stop() + }) + + Describe("Worker Priority Check", func() { + It("should identify priority workers from dummy list", func() { + // These workers are in the dummy list + Expect(pm.IsPriorityWorker("worker-001")).To(BeTrue()) + Expect(pm.IsPriorityWorker("worker-002")).To(BeTrue()) + Expect(pm.IsPriorityWorker("worker-005")).To(BeTrue()) + Expect(pm.IsPriorityWorker("worker-priority-1")).To(BeTrue()) + Expect(pm.IsPriorityWorker("worker-priority-2")).To(BeTrue()) + Expect(pm.IsPriorityWorker("worker-vip-1")).To(BeTrue()) + + // These workers are not in the dummy list + Expect(pm.IsPriorityWorker("worker-003")).To(BeFalse()) + Expect(pm.IsPriorityWorker("worker-004")).To(BeFalse()) + Expect(pm.IsPriorityWorker("random-worker")).To(BeFalse()) + }) + }) + + Describe("Get Priority Workers", func() { + It("should return the list of priority workers", func() { + workers := pm.GetPriorityWorkers() + Expect(workers).To(ContainElements( + "worker-001", + "worker-002", + "worker-005", + "worker-priority-1", + "worker-priority-2", + "worker-vip-1", + )) + Expect(len(workers)).To(BeNumerically(">=", 6)) + }) + }) + + Describe("Update Priority Workers", func() { + It("should update the priority workers list", func() { + newWorkers := []string{"new-worker-1", "new-worker-2", "new-worker-3"} + pm.UpdatePriorityWorkers(newWorkers) + + // Old workers should no longer be priority + Expect(pm.IsPriorityWorker("worker-001")).To(BeFalse()) + Expect(pm.IsPriorityWorker("worker-002")).To(BeFalse()) + + // New workers should be priority + Expect(pm.IsPriorityWorker("new-worker-1")).To(BeTrue()) + Expect(pm.IsPriorityWorker("new-worker-2")).To(BeTrue()) + Expect(pm.IsPriorityWorker("new-worker-3")).To(BeTrue()) + + // Verify the list + workers := pm.GetPriorityWorkers() + Expect(workers).To(ConsistOf("new-worker-1", "new-worker-2", "new-worker-3")) + }) + }) + + Describe("With External Endpoint", func() { + It("should fall back to dummy data when endpoint fails", func() { + // Create manager with non-existent external endpoint + // This will fail to fetch and fall back to dummy data + pmWithEndpoint := jobserver.NewPriorityManager("https://api.example.com/priority-workers", 5*time.Minute) + defer pmWithEndpoint.Stop() + + // Should have dummy workers after failed fetch + workers := pmWithEndpoint.GetPriorityWorkers() + Expect(len(workers)).To(BeNumerically(">", 0)) + + // Check for expected dummy workers + Expect(pmWithEndpoint.IsPriorityWorker("worker-001")).To(BeTrue()) + Expect(pmWithEndpoint.IsPriorityWorker("worker-priority-1")).To(BeTrue()) + Expect(pmWithEndpoint.IsPriorityWorker("worker-vip-1")).To(BeTrue()) + }) + }) + + Describe("Refresh Interval", func() { + It("should use default refresh interval when not specified", func() { + pmDefault := jobserver.NewPriorityManager("", 0) + defer pmDefault.Stop() + + // Should still have dummy workers + Expect(pmDefault.IsPriorityWorker("worker-001")).To(BeTrue()) + }) + }) +}) \ No newline at end of file diff --git a/internal/jobserver/priority_queue.go b/internal/jobserver/priority_queue.go new file mode 100644 index 00000000..f88659d9 --- /dev/null +++ b/internal/jobserver/priority_queue.go @@ -0,0 +1,259 @@ +// Package jobserver provides job queue management and processing functionality +// for the tee-worker, including priority-based job routing. +package jobserver + +import ( + "sync" + "sync/atomic" + "time" + + "github.com/masa-finance/tee-worker/api/types" +) + +// PriorityQueue implements a dual-queue system for job prioritization. +// It maintains two separate queues: +// - Fast queue: For high-priority jobs from workers in the priority list +// - Slow queue: For regular jobs from all other workers +// +// The queue ensures thread-safe operations and provides statistics tracking. +type PriorityQueue struct { + fastQueue chan *types.Job + slowQueue chan *types.Job + mu sync.RWMutex + closed bool + stats *QueueStats +} + +// QueueStats provides real-time metrics about queue performance. +// Queue depths are calculated dynamically in GetStats() to avoid update overhead. +// Processing counters use atomic operations for lock-free updates under high concurrency. +type QueueStats struct { + FastQueueDepth int // Calculated dynamically, not stored + SlowQueueDepth int // Calculated dynamically, not stored + FastProcessed int64 // Total jobs processed from fast queue (atomic) + SlowProcessed int64 // Total jobs processed from slow queue (atomic) + LastUpdateTime time.Time // Calculated from lastUpdateNano in GetStats() + lastUpdateNano int64 // Atomic storage for LastUpdateTime as UnixNano +} + +// NewPriorityQueue creates a new priority queue with specified buffer sizes. +// +// Parameters: +// - fastQueueSize: Maximum number of jobs that can be buffered in the fast queue +// - slowQueueSize: Maximum number of jobs that can be buffered in the slow queue +// +// Returns a ready-to-use PriorityQueue instance with statistics tracking enabled. +func NewPriorityQueue(fastQueueSize, slowQueueSize int) *PriorityQueue { + return &PriorityQueue{ + fastQueue: make(chan *types.Job, fastQueueSize), + slowQueue: make(chan *types.Job, slowQueueSize), + stats: &QueueStats{ + lastUpdateNano: time.Now().UnixNano(), + }, + } +} + +// EnqueueFast adds a job to the fast (high-priority) queue. +// +// This method is non-blocking and will return immediately. +// Returns ErrQueueFull if the fast queue is at capacity. +// Returns ErrQueueClosed if the queue has been closed. +// +// Thread-safe: Can be called concurrently from multiple goroutines. +func (pq *PriorityQueue) EnqueueFast(job *types.Job) error { + pq.mu.RLock() + defer pq.mu.RUnlock() + + if pq.closed { + return ErrQueueClosed + } + + select { + case pq.fastQueue <- job: + pq.updateStats(true, false) + return nil + default: + return ErrQueueFull + } +} + +// EnqueueSlow adds a job to the slow (regular-priority) queue. +// +// This method is non-blocking and will return immediately. +// Returns ErrQueueFull if the slow queue is at capacity. +// Returns ErrQueueClosed if the queue has been closed. +// +// Thread-safe: Can be called concurrently from multiple goroutines. +func (pq *PriorityQueue) EnqueueSlow(job *types.Job) error { + pq.mu.RLock() + defer pq.mu.RUnlock() + + if pq.closed { + return ErrQueueClosed + } + + select { + case pq.slowQueue <- job: + pq.updateStats(false, false) + return nil + default: + return ErrQueueFull + } +} + +// Dequeue retrieves a job from the queue system. +// +// Priority order: +// 1. Always checks fast queue first +// 2. Only checks slow queue if fast queue is empty +// +// This method is non-blocking and returns immediately. +// Returns ErrQueueEmpty if both queues are empty. +// Returns ErrQueueClosed if the queue has been closed. +// +// Thread-safe: Can be called concurrently from multiple goroutines. +func (pq *PriorityQueue) Dequeue() (*types.Job, error) { + pq.mu.RLock() + if pq.closed { + pq.mu.RUnlock() + return nil, ErrQueueClosed + } + pq.mu.RUnlock() + + // Always try fast queue first + select { + case job := <-pq.fastQueue: + pq.updateStats(true, true) + return job, nil + default: + // Fast queue is empty, try slow queue + select { + case job := <-pq.slowQueue: + pq.updateStats(false, true) + return job, nil + default: + return nil, ErrQueueEmpty + } + } +} + +// DequeueBlocking retrieves a job from the queue system, blocking until one is available. +// +// This method implements a priority-aware blocking dequeue that: +// 1. Always checks the fast queue first +// 2. Only processes slow queue jobs when fast queue is empty +// 3. Blocks efficiently when both queues are empty +// +// The implementation uses a single select statement for clarity while maintaining +// the priority semantics through periodic fast queue checks. +// +// Returns ErrQueueClosed if the queue has been closed. +// Thread-safe: Can be called concurrently from multiple goroutines. +func (pq *PriorityQueue) DequeueBlocking() (*types.Job, error) { + // Create a ticker to periodically check fast queue + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() + + for { + // Check if queue is closed + pq.mu.RLock() + if pq.closed { + pq.mu.RUnlock() + return nil, ErrQueueClosed + } + pq.mu.RUnlock() + + // First, always try fast queue non-blocking + select { + case job := <-pq.fastQueue: + pq.updateStats(true, true) + return job, nil + default: + // Fast queue is empty, continue to blocking select + } + + // Blocking select on both queues with periodic fast queue re-check + select { + case job := <-pq.fastQueue: + pq.updateStats(true, true) + return job, nil + case job := <-pq.slowQueue: + pq.updateStats(false, true) + return job, nil + case <-ticker.C: + // Periodically loop back to check fast queue first + // This ensures we don't miss fast queue jobs while blocked on slow queue + continue + } + } +} + +// Close gracefully shuts down the priority queue. +// +// After calling Close: +// - No new jobs can be enqueued (will return ErrQueueClosed) +// - Existing jobs in the queues can still be dequeued +// - DequeueBlocking will return ErrQueueClosed once queues are empty +// +// This method is idempotent and can be called multiple times safely. +// +// Note: This implementation does not close the channels to avoid potential +// panics from concurrent sends. The channels will be garbage collected +// when no longer referenced. +func (pq *PriorityQueue) Close() { + pq.mu.Lock() + defer pq.mu.Unlock() + + if !pq.closed { + pq.closed = true + // Do not close channels to avoid panic from concurrent sends + // Channels will be garbage collected when no longer referenced + } +} + +// GetStats returns a snapshot of current queue statistics. +// +// The returned statistics include: +// - Current depth of both fast and slow queues (calculated dynamically) +// - Total number of jobs processed from each queue +// - Timestamp of last statistics update +// +// This method is lightweight and can be called frequently for monitoring. +// Thread-safe: Can be called concurrently from multiple goroutines. +func (pq *PriorityQueue) GetStats() QueueStats { + // Use atomic loads for lock-free reading + fast := atomic.LoadInt64(&pq.stats.FastProcessed) + slow := atomic.LoadInt64(&pq.stats.SlowProcessed) + lastNano := atomic.LoadInt64(&pq.stats.lastUpdateNano) + + return QueueStats{ + FastQueueDepth: len(pq.fastQueue), + SlowQueueDepth: len(pq.slowQueue), + FastProcessed: fast, + SlowProcessed: slow, + LastUpdateTime: time.Unix(0, lastNano), + } +} + +// updateStats updates internal queue statistics. +// +// Parameters: +// - isFast: true if the operation was on the fast queue, false for slow queue +// - isDequeue: true if this was a dequeue operation, false for enqueue +// +// This method is called internally after each queue operation to maintain accurate statistics. +// Uses atomic operations for lock-free updates under high concurrency. +// Note: Timestamp updates are rate-limited to reduce overhead under high throughput. +func (pq *PriorityQueue) updateStats(isFast bool, isDequeue bool) { + if isDequeue { + if isFast { + atomic.AddInt64(&pq.stats.FastProcessed, 1) + } else { + atomic.AddInt64(&pq.stats.SlowProcessed, 1) + } + + // Update timestamp only on dequeue operations to reduce overhead + // This provides a good balance between accuracy and performance + atomic.StoreInt64(&pq.stats.lastUpdateNano, time.Now().UnixNano()) + } +} \ No newline at end of file diff --git a/internal/jobserver/priority_queue_close_test.go b/internal/jobserver/priority_queue_close_test.go new file mode 100644 index 00000000..38255ae8 --- /dev/null +++ b/internal/jobserver/priority_queue_close_test.go @@ -0,0 +1,99 @@ +package jobserver_test + +import ( + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/masa-finance/tee-worker/api/types" + "github.com/masa-finance/tee-worker/internal/jobserver" +) + +var _ = Describe("PriorityQueue Close Handling", func() { + var pq *jobserver.PriorityQueue + + BeforeEach(func() { + pq = jobserver.NewPriorityQueue(10, 10) + }) + + Describe("DequeueBlocking with closed queues", func() { + It("should return ErrQueueClosed when queue is closed while blocking", func() { + // Start a goroutine that will block on dequeue + done := make(chan struct{}) + var err error + + go func() { + defer close(done) + _, err = pq.DequeueBlocking() + }() + + // Give the goroutine time to start blocking + time.Sleep(100 * time.Millisecond) + + // Close the queue + pq.Close() + + // Wait for the goroutine to finish + Eventually(done, 1*time.Second).Should(BeClosed()) + + // Should have received ErrQueueClosed + Expect(err).To(Equal(jobserver.ErrQueueClosed)) + }) + + It("should handle enqueue errors after close", func() { + // Add a job before closing + job1 := &types.Job{UUID: "job-1", WorkerID: "worker-1"} + err := pq.EnqueueFast(job1) + Expect(err).NotTo(HaveOccurred()) + + // Close the queue + pq.Close() + + // Try to enqueue after close - should fail + job2 := &types.Job{UUID: "job-2", WorkerID: "worker-2"} + err = pq.EnqueueFast(job2) + Expect(err).To(Equal(jobserver.ErrQueueClosed)) + + err = pq.EnqueueSlow(job2) + Expect(err).To(Equal(jobserver.ErrQueueClosed)) + + // Non-blocking dequeue should return ErrQueueClosed after close + _, err = pq.Dequeue() + Expect(err).To(Equal(jobserver.ErrQueueClosed)) + + // Blocking dequeue should also return ErrQueueClosed + _, err = pq.DequeueBlocking() + Expect(err).To(Equal(jobserver.ErrQueueClosed)) + }) + + It("should not panic when multiple goroutines dequeue from closed queue", func() { + numWorkers := 5 + errors := make(chan error, numWorkers) + + // Start multiple dequeuers + for i := 0; i < numWorkers; i++ { + go func() { + _, err := pq.DequeueBlocking() + errors <- err + }() + } + + // Give them time to start blocking + time.Sleep(100 * time.Millisecond) + + // Close the queue + pq.Close() + + // All workers should receive ErrQueueClosed + for i := 0; i < numWorkers; i++ { + select { + case err := <-errors: + Expect(err).To(Equal(jobserver.ErrQueueClosed)) + case <-time.After(1 * time.Second): + Fail("Worker did not return after queue close") + } + } + }) + }) +}) \ No newline at end of file diff --git a/internal/jobserver/priority_queue_test.go b/internal/jobserver/priority_queue_test.go new file mode 100644 index 00000000..c791c2e5 --- /dev/null +++ b/internal/jobserver/priority_queue_test.go @@ -0,0 +1,215 @@ +package jobserver_test + +import ( + "fmt" + "sync" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/masa-finance/tee-worker/api/types" + "github.com/masa-finance/tee-worker/internal/jobserver" +) + +var _ = Describe("PriorityQueue", func() { + var pq *jobserver.PriorityQueue + + BeforeEach(func() { + pq = jobserver.NewPriorityQueue(10, 10) + }) + + AfterEach(func() { + pq.Close() + }) + + Describe("Enqueue and Dequeue", func() { + It("should enqueue and dequeue from fast queue", func() { + job := &types.Job{ + Type: "test", + UUID: "test-123", + WorkerID: "worker-001", + } + + err := pq.EnqueueFast(job) + Expect(err).NotTo(HaveOccurred()) + + dequeuedJob, err := pq.Dequeue() + Expect(err).NotTo(HaveOccurred()) + Expect(dequeuedJob.UUID).To(Equal("test-123")) + }) + + It("should enqueue and dequeue from slow queue", func() { + job := &types.Job{ + Type: "test", + UUID: "test-456", + WorkerID: "worker-002", + } + + err := pq.EnqueueSlow(job) + Expect(err).NotTo(HaveOccurred()) + + dequeuedJob, err := pq.Dequeue() + Expect(err).NotTo(HaveOccurred()) + Expect(dequeuedJob.UUID).To(Equal("test-456")) + }) + + It("should prioritize fast queue over slow queue", func() { + slowJob := &types.Job{ + Type: "test", + UUID: "slow-1", + WorkerID: "worker-slow", + } + fastJob := &types.Job{ + Type: "test", + UUID: "fast-1", + WorkerID: "worker-fast", + } + + // Add slow job first + err := pq.EnqueueSlow(slowJob) + Expect(err).NotTo(HaveOccurred()) + + // Add fast job second + err = pq.EnqueueFast(fastJob) + Expect(err).NotTo(HaveOccurred()) + + // Fast job should be dequeued first + dequeuedJob, err := pq.Dequeue() + Expect(err).NotTo(HaveOccurred()) + Expect(dequeuedJob.UUID).To(Equal("fast-1")) + + // Slow job should be dequeued second + dequeuedJob, err = pq.Dequeue() + Expect(err).NotTo(HaveOccurred()) + Expect(dequeuedJob.UUID).To(Equal("slow-1")) + }) + + It("should return error when queues are empty", func() { + _, err := pq.Dequeue() + Expect(err).To(Equal(jobserver.ErrQueueEmpty)) + }) + + It("should handle queue full scenario", func() { + // Fill the fast queue + smallQueue := jobserver.NewPriorityQueue(2, 2) + defer smallQueue.Close() + + for i := 0; i < 2; i++ { + err := smallQueue.EnqueueFast(&types.Job{UUID: fmt.Sprintf("job-%d", i)}) + Expect(err).NotTo(HaveOccurred()) + } + + // Try to add one more + err := smallQueue.EnqueueFast(&types.Job{UUID: "overflow"}) + Expect(err).To(Equal(jobserver.ErrQueueFull)) + }) + }) + + Describe("Blocking Dequeue", func() { + It("should block until job is available", func() { + var wg sync.WaitGroup + var dequeuedJob *types.Job + var dequeueErr error + + wg.Add(1) + go func() { + defer wg.Done() + dequeuedJob, dequeueErr = pq.DequeueBlocking() + }() + + // Give the goroutine time to start blocking + time.Sleep(100 * time.Millisecond) + + // Add a job + job := &types.Job{UUID: "blocking-test"} + err := pq.EnqueueFast(job) + Expect(err).NotTo(HaveOccurred()) + + // Wait for dequeue to complete + wg.Wait() + + Expect(dequeueErr).NotTo(HaveOccurred()) + Expect(dequeuedJob.UUID).To(Equal("blocking-test")) + }) + }) + + Describe("Statistics", func() { + It("should track queue statistics", func() { + // Add jobs to both queues + for i := 0; i < 3; i++ { + err := pq.EnqueueFast(&types.Job{UUID: fmt.Sprintf("fast-%d", i)}) + Expect(err).NotTo(HaveOccurred()) + } + for i := 0; i < 5; i++ { + err := pq.EnqueueSlow(&types.Job{UUID: fmt.Sprintf("slow-%d", i)}) + Expect(err).NotTo(HaveOccurred()) + } + + stats := pq.GetStats() + Expect(stats.FastQueueDepth).To(Equal(3)) + Expect(stats.SlowQueueDepth).To(Equal(5)) + + // Dequeue some jobs + for i := 0; i < 4; i++ { + _, err := pq.Dequeue() + Expect(err).NotTo(HaveOccurred()) + } + + stats = pq.GetStats() + Expect(stats.FastProcessed).To(Equal(int64(3))) + Expect(stats.SlowProcessed).To(Equal(int64(1))) + }) + }) + + Describe("Concurrent Operations", func() { + It("should handle concurrent enqueue and dequeue operations", func() { + // Create a larger queue for this test + concurrentPQ := jobserver.NewPriorityQueue(500, 500) + defer concurrentPQ.Close() + + var wg sync.WaitGroup + numJobs := 100 + processedJobs := make([]string, 0, numJobs) + var mutex sync.Mutex + + // Enqueue jobs concurrently + for i := 0; i < numJobs; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + job := &types.Job{ + UUID: fmt.Sprintf("job-%d", id), + WorkerID: fmt.Sprintf("worker-%d", id), + } + if id%2 == 0 { + err := concurrentPQ.EnqueueFast(job) + Expect(err).NotTo(HaveOccurred()) + } else { + err := concurrentPQ.EnqueueSlow(job) + Expect(err).NotTo(HaveOccurred()) + } + }(i) + } + + // Wait for all enqueues to complete + wg.Wait() + + // Dequeue all jobs + for i := 0; i < numJobs; i++ { + job, err := concurrentPQ.Dequeue() + Expect(err).NotTo(HaveOccurred()) + mutex.Lock() + processedJobs = append(processedJobs, job.UUID) + mutex.Unlock() + } + + // Verify all jobs were processed + Expect(len(processedJobs)).To(Equal(numJobs)) + + // Verify no more jobs in queue + _, err := concurrentPQ.Dequeue() + Expect(err).To(Equal(jobserver.ErrQueueEmpty)) + }) + }) +}) \ No newline at end of file diff --git a/internal/jobserver/worker.go b/internal/jobserver/worker.go index acb8d4eb..7c158a87 100644 --- a/internal/jobserver/worker.go +++ b/internal/jobserver/worker.go @@ -8,7 +8,31 @@ import ( "github.com/sirupsen/logrus" ) +// worker is the main entry point for job processing goroutines. +// It automatically selects the appropriate worker implementation based on +// whether the priority queue system is enabled. +// +// This method is called by Run() to start worker goroutines. func (js *JobServer) worker(c context.Context) { + if js.usePriorityQueue && js.priorityQueue != nil { + // Use priority queue mode + js.priorityQueueWorker(c) + } else { + // Use legacy channel mode + js.legacyWorker(c) + } +} + +// legacyWorker implements the original channel-based worker logic. +// This is used when the priority queue system is disabled. +// +// The worker: +// - Listens on a single shared channel for all jobs +// - Processes jobs in FIFO order +// - Continues until the context is cancelled +// +// This maintains backward compatibility with the original implementation. +func (js *JobServer) legacyWorker(c context.Context) { for { select { case <-c.Done(): @@ -24,10 +48,64 @@ func (js *JobServer) worker(c context.Context) { } } +// priorityQueueWorker implements the priority queue-based worker logic. +// This is used when the priority queue system is enabled. +// +// The worker: +// - Always checks the fast queue first for priority jobs +// - Falls back to the slow queue when fast queue is empty +// - Uses blocking dequeue to wait for jobs efficiently +// - Continues until the context is cancelled or queues are closed +// +// This ensures priority jobs are processed before regular jobs, +// improving response times for important workloads. +func (js *JobServer) priorityQueueWorker(c context.Context) { + for { + select { + case <-c.Done(): + logrus.Info("Worker shutting down: context done") + return + default: + // Try to get a job from the priority queue + job, err := js.priorityQueue.DequeueBlocking() + if err != nil { + if err == ErrQueueClosed { + logrus.Info("Worker shutting down: queue closed") + return + } + // For other errors, log and continue + logrus.Debugf("Dequeue error: %v", err) + continue + } + + if job != nil { + logrus.Debugf("Job received from priority queue: %s (type: %s, worker: %s)", + job.UUID, job.Type, job.WorkerID) + if err := js.doWork(*job); err != nil { + logrus.Errorf("Error while executing job %s: %v", job.UUID, err) + } + } + } + } +} + type worker interface { ExecuteJob(j types.Job) (types.JobResult, error) } +// doWork executes a single job by delegating to the appropriate job type handler. +// +// This method: +// 1. Looks up the handler for the job type +// 2. Locks the handler to prevent concurrent execution of the same type +// 3. Executes the job +// 4. Stores the result in the cache +// +// Returns an error if the job type is unknown or execution fails. +// The error is also stored in the job result for client retrieval. +// +// Note: The mutex locking per job type is a current limitation that +// prevents parallel execution of jobs of the same type. func (js *JobServer) doWork(j types.Job) error { // TODO: Add the job to the cache with the status set to Running w, exists := js.jobWorkers[j.Type]