A production-ready microservice that consumes messages from AWS SQS, processes segment relationships using Neo4j graph database, and stores results in PostgreSQL. Built for high-throughput processing with support for multiple concurrent workers.
- AWS SQS Consumer: Reliable message consumption with automatic retry and error handling
- Multi-Worker Support: Race condition protection for concurrent processing
- Atomic Task Claiming: Prevents duplicate processing across multiple workers
- Dragonfly Cache: High-performance caching for Neo4j alignment queries
- Graph Database Integration: Neo4j for complex relationship traversal (BFS algorithm)
- PostgreSQL Storage: Persistent storage of job status and results
- Structured Logging: Production-ready logging with proper log levels
- Database Migrations: Alembic integration for schema management
- Error Recovery: Automatic retry mechanism with error tracking
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β SQS Consumer Workers β
β (Multiple instances supported with atomic β
β task claiming to prevent duplicates) β
βββββββββββββββββββββββββββββββββββββββββββββββββββ€
β ββββββββββββββββββββββββββββββββββββββββββββ β
β β 1. Receive message from SQS β β
β β 2. Atomically claim task (QUEUEDβ β β
β β IN_PROGRESS) β β
β β 3. Query Neo4j for related segments β β
β β 4. Store results in PostgreSQL β β
β β 5. Update job completion status β β
β β 6. Send completion message to SQS β β
β ββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
βΌ βΌ
ββββββββββββ ββββββββββββ
β AWS SQS β β Neo4j β
β Queue β β Database β
ββββββββββββ ββββββββββββ
β
βΌ
ββββββββββββ
βPostgreSQLβ
β Database β
ββββββββββββ
- Python 3.12+
- AWS Account with SQS access
- PostgreSQL database
- Neo4j database (cloud or self-hosted)
- Clone the repository
git clone <repository-url>
cd sqs-microservice- Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate- Install dependencies
pip install -r requirements.txt- Configure environment
cp env.example .env
# Edit .env with your credentials- Run database migrations
alembic upgrade head- Start the consumer
python -m app.mainCreate a .env file with the following variables:
# Neo4j Database
NEO4J_URI=neo4j+s://xxx.databases.neo4j.io
NEO4J_USER=neo4j
NEO4J_PASSWORD=your-password
# PostgreSQL Database
POSTGRES_URL=postgresql://user:password@host:5432/database
# AWS Configuration
AWS_REGION=us-east-1
AWS_ACCESS_KEY_ID=your-access-key
AWS_SECRET_ACCESS_KEY=your-secret-key
# SQS Queues
SQS_QUEUE_URL=https://sqs.region.amazonaws.com/account/queue-name
SQS_COMPLETED_QUEUE_URL=https://sqs.region.amazonaws.com/account/completed-queue
# Dragonfly Cache (Optional - improves performance)
DRAGONFLY_URL=redis://default:password@your-instance.dragonflydb.cloud:6379See env.example for a complete template.
-
Message Reception: Worker receives a message from SQS containing:
{ "job_id": "uuid", "manifestation_id": "string", "segment_id": "string", "start": 0, "end": 100 } -
Atomic Task Claiming: Worker attempts to claim the task by updating status from
QUEUEDtoIN_PROGRESS. If another worker already claimed it, this worker skips processing. -
Relationship Processing: Worker queries Neo4j to find all related segments using BFS traversal through alignment pairs.
-
Result Storage: Worker stores the related segments in PostgreSQL with the segment task record.
-
Job Completion: When all segments are processed, a completion message is sent to the completed queue.
The microservice uses atomic task claiming to prevent duplicate processing:
# Only updates if status is currently "QUEUED"
UPDATE segment_task
SET status = 'IN_PROGRESS'
WHERE job_id = ? AND segment_id = ? AND status = 'QUEUED'This ensures that even with multiple workers consuming from the same queue, each segment is processed exactly once.
- job_id (UUID, PK)
- manifestation_id (String)
- total_segments (Integer)
- completed_segments (Integer)
- status (Enum: QUEUED, IN_PROGRESS, COMPLETED, FAILED)
- created_at (Timestamp)
- updated_at (Timestamp)- task_id (UUID, PK)
- job_id (UUID, FK)
- segment_id (String)
- status (Enum: QUEUED, IN_PROGRESS, COMPLETED, RETRYING, FAILED)
- result_json (JSONB) - Stores related segments
- result_location (String, optional)
- error_message (Text, nullable)
- created_at (Timestamp)
- updated_at (Timestamp)- Create a new Background Worker on Render
- Environment: Select
Python 3 - Build Command:
pip install -r requirements.txt - Start Command:
python -m app.main - Add environment variables from your
.envfile - Deploy: Render will automatically deploy when you push to main branch
Important: Create a runtime.txt file to specify Python version:
python-3.12.8
To scale processing, deploy multiple instances:
On Render:
- Increase the number of instances in the service settings
- Each instance will automatically claim tasks atomically
Locally:
# Terminal 1
python -m app.main
# Terminal 2
python -m app.main
# Terminal 3
python -m app.mainAll workers will consume from the same queue without duplicating work.
sqs-microservice/
βββ app/
β βββ main.py # SQS consumer entry point
β βββ tasks.py # Task processing logic
β βββ relation.py # API endpoints (optional)
β βββ config.py # Configuration management
β βββ models.py # Pydantic models
β βββ neo4j_database.py # Neo4j integration
β βββ neo4j_quries.py # Cypher queries
β βββ neo4j_database_validator.py
β βββ alembic/ # Database migrations
β β βββ versions/
β βββ alembic.ini
β βββ db/
β βββ postgres.py # PostgreSQL connection
β βββ models.py # SQLAlchemy models
βββ requirements.txt # Python dependencies
βββ runtime.txt # Python version for deployment
βββ env.example # Environment template
βββ purge_sqs.py # Utility to purge SQS queue
βββ README.md
The main consumer that polls SQS and processes messages:
consumer = SimpleConsumer(
queue_url=queue_url,
region=get("AWS_REGION"),
polling_wait_time_ms=50
)
consumer.start()Handles the business logic:
- Atomic task claiming
- Neo4j relationship queries
- Result storage
- Error handling and retries
Graph database operations:
- BFS traversal for related segments
- Alignment pair queries
- Segment overlap detection
SQLAlchemy models for job and task tracking.
# Check environment variables
python -c "from app.config import get; print(get('SQS_QUEUE_URL'))"
# Test AWS credentials
aws sqs get-queue-attributes --queue-url <YOUR_QUEUE_URL># Test PostgreSQL
python -c "from app.db.postgres import engine; engine.connect(); print('β PostgreSQL connected')"
# Test Neo4j
python -c "from app.neo4j_database import Neo4JDatabase; db = Neo4JDatabase(); print('β Neo4j connected')"- Check SQS queue has messages:
aws sqs get-queue-attributes --queue-url <URL> --attribute-names ApproximateNumberOfMessages - Verify visibility timeout is appropriate (recommended: 60+ seconds)
- Check worker logs for errors
If you see duplicate processing even with multiple workers:
- Ensure all workers are using the same database
- Check that
_try_claim_segment_taskis being called before processing - Verify database transactions are properly committed
The application uses structured logging:
2025-11-18 10:30:45 - app.tasks - INFO - Processing segment ABC123 for job 550e8400
2025-11-18 10:30:45 - app.tasks - INFO - Attempting to claim segment task ABC123
2025-11-18 10:30:45 - app.tasks - INFO - Successfully claimed segment task ABC123
2025-11-18 10:30:46 - app.neo4j_database - INFO - Starting BFS traversal from manifestation XYZ
Query the database to check job progress:
SELECT
job_id,
manifestation_id,
completed_segments,
total_segments,
status,
updated_at
FROM root_job
WHERE status = 'IN_PROGRESS'
ORDER BY updated_at DESC;Purge SQS Queue (for testing):
python purge_sqs.py- Send a test message to SQS
- Check logs for processing
- Query database for results
- β
Never commit
.envfile - β Use IAM roles instead of access keys (when on AWS)
- β Rotate credentials regularly
- β Use connection pooling for database connections
- β Set appropriate SQS visibility timeouts
- β Monitor failed message dead letter queues
- Adjust Polling Wait Time: Increase
polling_wait_time_msto reduce API calls - Database Connection Pooling: Already configured in PostgreSQL connection
- Neo4j Connection Reuse: Singleton driver pattern implemented
- Multiple Workers: Scale horizontally for high throughput
- SQS Batch Size: Consumer handles messages one at a time for reliable processing
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
[Add your license here]
For issues or questions:
- Check the troubleshooting section
- Review application logs
- Check database records for task status
- Open an issue on GitHub
Built with Python, AWS SQS, PostgreSQL, and Neo4j π