Skip to content

Conversation

alvin-reyes
Copy link
Contributor

@alvin-reyes alvin-reyes commented Jun 1, 2025

feat: Implement Priority Queue System for Job Processing

Summary

This PR introduces a priority queue system that enables preferential processing of jobs from specific worker IDs. The system maintains backward compatibility while providing significant performance improvements for high-priority workers.

As the tee-worker scales, we need a mechanism to ensure that critical workers receive faster job processing. This implementation allows certain worker IDs to have their jobs processed with higher priority while maintaining fair processing for all workers.

Changes

Core Features

  • Dual Queue Architecture: Separate fast and slow queues for priority-based processing
  • Priority Worker Management: Dynamic list of worker IDs that receive priority treatment
  • External Endpoint Integration: Fetch priority lists from configurable external APIs
  • Automatic Refresh: Periodic updates of priority worker list (default: 15 minutes)
  • Queue Monitoring: New /job/queue/stats endpoint for real-time queue statistics
  • Backward Compatibility: Can be disabled to use legacy single-queue mode

Technical Implementation

New Files:

  • internal/jobserver/priority_queue.go - Dual queue implementation with thread-safe operations
  • internal/jobserver/priority_manager.go - Manages priority worker IDs and external sync
  • internal/jobserver/errors.go - Queue-specific error definitions
  • PRIORITY_QUEUE.md - Comprehensive documentation

Modified Files:

  • internal/jobserver/jobserver.go - Integrated priority queue system
  • internal/jobserver/worker.go - Priority-aware job processing logic
  • api/types/job.go - Added GetBool() helper method
  • internal/api/routes.go - Added queue statistics endpoint
  • internal/api/start.go - Registered new API endpoint

Test Files:

  • internal/jobserver/priority_queue_test.go - Unit tests for queue operations
  • internal/jobserver/priority_manager_test.go - Priority manager tests
  • internal/jobserver/priority_integration_test.go - Integration tests

Configuration

Testing

The implementation includes:

  • Comprehensive unit tests for queue operations
  • Integration tests for end-to-end job flow
  • Dummy data for local development/testing
  • Backward compatibility tests

Test Coverage

  • ✅ Queue enqueue/dequeue operations
  • ✅ Priority routing logic
  • ✅ Concurrent operations
  • ✅ Queue statistics
  • ✅ Priority list management
  • ✅ Legacy mode compatibility

Performance Impact

  • Minimal overhead when disabled
  • Improved latency for priority workers
  • Fair processing maintained for regular workers
  • Configurable queue sizes to tune performance

Breaking Changes

None. The feature is enabled by default but can be disabled to maintain exact legacy behavior.

TODO/Blocked

  • replace the dummy worker list with an endpoint call.
  • implementation is blocked by list of worker-ids from endpoint call.

Verification

This assumes that the list of worker-ids are manually set.

Test Results

Priority Queue Routing Tests

✓ should route priority worker jobs to fast queue
✓ should route non-priority worker jobs to slow queue
✓ should process fast queue jobs before slow queue jobs
✓ should fallback to slow queue when fast queue is full

All priority routing tests passed successfully.

Race Condition Tests

✓ should not panic when enqueueing while closing
✓ should handle rapid close/enqueue cycles  
✓ should handle high concurrency without panics

All race condition tests passed successfully.

Queue Close Handling Tests

✓ should return ErrQueueClosed when queue is closed while blocking
✓ should handle enqueue errors after close
✓ should not panic when multiple goroutines dequeue from closed queue

All close handling tests passed successfully.

Build Verification

$ go build -mod=readonly ./internal/api/...
$ go build -mod=readonly ./internal/jobserver/...

No compilation errors.

Manual Verification Example

Priority Worker (Fast Queue)

# Submit job from priority worker ID
curl -X POST /job/add -d '{"worker_id": "worker-001", "type": "telemetry"}'
# Result: Job routed to fast queue ✓

Regular Worker (Slow Queue)

# Submit job from non-priority worker ID
curl -X POST /job/add -d '{"worker_id": "worker-999", "type": "telemetry"}'
# Result: Job routed to slow queue ✓

Queue Statistics

curl http://localhost:8080/job/queue/stats
{
  "enabled": true,
  "fast_queue_depth": 0,
  "slow_queue_depth": 0,
  "fast_processed": 156,   # Jobs from priority workers
  "slow_processed": 892,   # Jobs from regular workers
  "last_update": "2024-12-15T10:30:45Z"
}

Summary

  • ✅ All priority queue tests pass
  • ✅ Priority routing works correctly
  • ✅ No race conditions detected
  • ✅ API builds successfully
  • ✅ Concurrent operations handled safely

@alvin-reyes alvin-reyes self-assigned this Jun 1, 2025
@alvin-reyes alvin-reyes requested a review from Copilot June 1, 2025 23:34
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements a priority queue system for job processing, allowing designated worker IDs to have their jobs handled preferentially.

  • Introduces a dual-queue (fast/slow) architecture with thread-safe operations and statistics tracking.
  • Adds a PriorityManager for fetching and refreshing priority worker lists (including external endpoint support).
  • Exposes a /job/queue/stats endpoint and integrates priority routing in JobServer.

Reviewed Changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
internal/jobserver/priority_queue.go Implements dual fast/slow queue and stats tracking
internal/jobserver/priority_manager.go Manages dynamic priority worker list with refresh
internal/jobserver/worker.go Routes worker loops to priority or legacy implementations
internal/jobserver/jobserver.go Integrates priority queue and routes new jobs
internal/api/routes.go Adds /job/queue/stats endpoint
api/types/job.go Adds GetBool helper for configuration
Comments suppressed due to low confidence (4)

internal/jobserver/priority_manager.go:5

  • Remove the blank import of "encoding/json"; this import is unused and can be omitted for clarity.
_ "encoding/json"

internal/jobserver/priority_manager.go:23

  • Rename externalWorkerIdPriorityEndpoint to externalWorkerIDPriorityEndpoint to follow Go's conventional capitalization of acronyms.
externalWorkerIdPriorityEndpoint string

internal/jobserver/jobserver.go:168

  • [nitpick] Using defer to launch a goroutine for enqueuing in AddJob can obscure control flow; consider launching the goroutine directly instead of deferring.
defer func() {

internal/jobserver/worker.go:11

  • [nitpick] The method worker could be renamed to a more descriptive name (e.g., startWorker) to avoid confusion with the worker interface and improve readability.
// worker is the main entry point for job processing goroutines.

@alvin-reyes alvin-reyes requested a review from Copilot June 2, 2025 00:12
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements a priority queue system for tee-worker, allowing jobs from certain worker IDs to be processed faster while preserving backward compatibility with the legacy channel-based processing.

  • Introduces PriorityQueue (fast/slow queues) and PriorityManager (dynamic priority list with external sync)
  • Integrates priority routing into JobServer.AddJob, worker loops, and adds a new /job/queue/stats endpoint
  • Provides comprehensive unit and integration tests, plus documentation in PRIORITY_QUEUE.md

Reviewed Changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
internal/jobserver/priority_queue.go Dual-queue implementation with non-blocking & blocking dequeue and stats
internal/jobserver/priority_manager.go Manages priority worker IDs, dummy data, and external refresh
internal/jobserver/jobserver.go Wires in priority queue/manager, routes jobs, adds stats APIs
internal/api/routes.go Adds /job/queue/stats endpoint and JSON response schema
api/types/job.go Adds GetBool helper and strengthens type assertions
Comments suppressed due to low confidence (4)

internal/jobserver/priority_manager.go:5

  • The blank import of encoding/json is unused—consider removing it to keep imports clean.
_ "encoding/json"

internal/jobserver/priority_manager.go:25

  • [nitpick] The field name 'externalWorkerIdPriorityEndpoint' mixes 'Id'—consider using 'ID' (externalWorkerIDPriorityEndpoint) for consistency with Go naming conventions.
externalWorkerIdPriorityEndpoint string

internal/jobserver/jobserver.go:187

  • [nitpick] Using defer to launch a goroutine for the legacy job channel is confusing; it's clearer to launch the goroutine directly without defer.
defer func() {

internal/jobserver/jobserver.go:174

  • [nitpick] There’s no unit test covering the fallback path when the fast queue is full and jobs are enqueued to the slow queue. Consider adding a test for that scenario.
logrus.Warnf("Failed to enqueue to fast queue: %v, trying slow queue", err)

@alvin-reyes alvin-reyes requested a review from Copilot June 2, 2025 00:29
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements a dual-queue priority system for job processing to expedite jobs from priority worker IDs while maintaining support for legacy behavior. Key changes include the introduction of a fast/slow queue mechanism, integration with a priority manager for dynamic updates, and comprehensive tests and documentation updates to support the new design.

Reviewed Changes

Copilot reviewed 13 out of 13 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
internal/jobserver/worker.go Introduces priorityQueueWorker and legacyWorker with clear routing logic
internal/jobserver/priority_queue.go Implements dual queue operations with thread-safe statistics and error handling
internal/jobserver/priority_manager.go Implements priority worker management with dummy and external fetch logic
internal/jobserver/jobserver.go Integrates the priority queue system into the JobServer with configuration support
Other test and API files Add or update tests and endpoints to reflect new priority system functionality

@alvin-reyes alvin-reyes requested a review from Copilot June 2, 2025 00:34
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements a dual-queue priority system that routes jobs from high‐priority workers to a fast processing queue while ensuring backward compatibility with the legacy FIFO channel. Key changes include introducing new priority queue and priority manager components, extensive tests for both queue and manager behaviors, and API endpoint integration for monitoring queue statistics.

Reviewed Changes

Copilot reviewed 13 out of 13 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
internal/jobserver/worker.go Adds worker goroutines to switch between legacy and priority queue modes
internal/jobserver/priority_queue.go Implements the dual-queue (fast/slow) system with blocking and non-blocking dequeue operations
internal/jobserver/priority_queue_test.go Contains unit tests covering enqueue, dequeue, blocking behavior, and concurrent operations
internal/jobserver/priority_queue_close_test.go Tests the graceful shutdown and error handling of the priority queue
internal/jobserver/priority_manager.go Introduces a manager for maintaining and periodically refreshing the list of priority workers
internal/jobserver/priority_manager_test.go Validates dummy data, update routines, and external endpoint integration for the priority manager
internal/jobserver/priority_integration_test.go End-to-end integration tests verifying job routing between fast and slow queues and legacy mode
internal/jobserver/jobserver.go Integrates the new priority queue system into the job server (job submission, processing, and shutdown)
internal/api/start.go & internal/api/routes.go Adds a new API endpoint for reporting queue statistics
api/types/job.go Adds helper methods (e.g. GetBool) to support job configuration parameters
PRIORITY_QUEUE.md Provides comprehensive documentation for the priority queue system, its configuration, and usage

@alvin-reyes alvin-reyes requested a review from Copilot June 2, 2025 00:44
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds a priority queue system to jobserver, enabling fast-path processing for designated worker IDs while maintaining legacy behavior and exposing real-time queue metrics.

  • Introduce dual fast/slow queues with blocking and non-blocking dequeue operations
  • Add dynamic priority-worker list with optional external sync and periodic refresh
  • Integrate priority routing in JobServer and expose /job/queue/stats endpoint

Reviewed Changes

Copilot reviewed 13 out of 13 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
internal/jobserver/worker.go Switch between legacy and priority queue processing
internal/jobserver/priority_queue.go Dual-queue implementation with stats tracking
internal/jobserver/priority_manager.go Priority worker list management and refresh logic
internal/jobserver/jobserver.go Initialize priority system and route jobs accordingly
internal/api/routes.go Added /job/queue/stats handler
api/types/job.go Added GetBool helper for config parsing
PRIORITY_QUEUE.md Documentation for priority queue system

@alvin-reyes alvin-reyes requested a review from Copilot June 2, 2025 01:24
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds a priority queue system to the job server, enabling fast and slow queues with dynamic priority worker management, plus an endpoint for real-time queue statistics.

  • Introduces dual fast/slow queues with blocking and non-blocking dequeue operations
  • Integrates a PriorityManager to fetch and refresh priority worker IDs
  • Adds /job/queue/stats endpoint and helper methods for configuration

Reviewed Changes

Copilot reviewed 13 out of 13 changed files in this pull request and generated no comments.

Show a summary per file
File Description
internal/jobserver/worker.go Splits worker loop into legacyWorker and priorityQueueWorker
internal/jobserver/jobserver.go Routes jobs to fast/slow queues in AddJob and exposes stats
internal/jobserver/priority_queue.go Implements PriorityQueue, blocking dequeue, and stats
internal/api/routes.go Defines queueStats handler for /job/queue/stats
api/types/job.go Adds GetBool helper to JobConfiguration
Comments suppressed due to low confidence (3)

internal/jobserver/jobserver.go:166

  • Missing import for the UUID package, which will cause a compile error. Add the appropriate import (e.g., "github.com/google/uuid").
j.UUID = uuid.New().String()

internal/jobserver/worker.go:92

  • [nitpick] The interface name worker conflicts with the worker method and can be ambiguous. Consider renaming it to JobWorker or WorkerHandler for clarity.
type worker interface {

internal/api/start.go:114

  • No tests cover the new /job/queue/stats route. Add an integration test to verify the endpoint returns the expected JSON schema for both enabled and disabled states.
job.GET("/queue/stats", queueStats(jobServer))

@alvin-reyes alvin-reyes requested a review from Copilot June 2, 2025 01:27
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements a priority queue system for processing jobs, allowing high-priority workers’ tasks to be routed through a fast queue while maintaining backward compatibility with the legacy single-channel mode.

  • Introduce a dual-queue (fastQueue/slowQueue) with blocking and non-blocking operations
  • Add PriorityManager to fetch and maintain dynamic priority worker lists
  • Integrate priority routing in JobServer and expose /job/queue/stats endpoint

Reviewed Changes

Copilot reviewed 13 out of 13 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
internal/jobserver/worker.go Refactor worker entry point and add priorityQueueWorker logic
internal/jobserver/priority_queue.go New dual-queue implementation with stats tracking
internal/jobserver/priority_queue_test.go Unit tests for enqueue/dequeue, blocking behavior, stats
internal/jobserver/priority_queue_close_test.go Tests for close handling and panic prevention
internal/jobserver/priority_manager.go New PriorityManager for dynamic worker-ID lists
internal/jobserver/priority_manager_test.go Unit tests for priority manager functions
internal/jobserver/priority_integration_test.go End-to-end integration tests for priority routing
internal/jobserver/jobserver.go Integrate priority queue into JobServer, add AddJob, stats
internal/jobserver/errors.go Define queue-related error constants
internal/api/start.go Register new /job/queue/stats endpoint
internal/api/routes.go Implement JSON handler for queue statistics
api/types/job.go Add GetBool helper to configuration parsing
PRIORITY_QUEUE.md Documentation for priority queue feature

@mudler mudler mentioned this pull request Jun 3, 2025
7 tasks
@mudler mudler added the blocked label Jun 9, 2025
@mudler
Copy link
Contributor

mudler commented Jun 9, 2025

blocked by gopher-lab/subnet-42#109

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants