A high-performance microservice, asynchronous email processing pipeline with a file-based queuing system, multipart template rendering engine, and RESTful API interface. Designed for reliable email dispatch with configurable concurrency, retry semantics, and fault isolation.
- One file per email: Each email is persisted as its own JSON file named
batch_{timestamp_ms}_{uuid8}.json. This eliminates partial-batch corruption, removes all multi-email lock contention, and makes every queue entry independently atomic. - Collision-free naming: File names combine a millisecond timestamp with a random UUID suffix, making name collisions impossible even under high concurrency.
- FIFO ordering: Files are processed in sorted order by name (timestamp-prefixed), giving natural first-in, first-out semantics.
- Thread-safe operations: All queue mutations are guarded by a reentrant lock (
threading.RLock), ensuring correctness under concurrent access from multiple worker threads. - Atomic file operations: Writes use temp file →
fsync→ atomic rename, guaranteeing no partial writes survive a crash. - Dirty-flag index cache: The queue file index is rebuilt from disk only when a mutation has occurred, avoiding redundant
globcalls on every worker poll. - Automatic cleanup: Processed files are moved to
data/trash/with a timestamp and status annotation; permanently failed emails are routed todata/dead_letter/.
- Retry with exponential backoff: Configurable retry attempts (default: 3) with exponential delay scaling (base: 1 second).
- Thread-safe connection pool: SMTP connections are cached per
(host, port)key and protected bythreading.RLock. Pooled connections are validated viaNOOPbefore reuse; stale connections are evicted automatically. - Context manager lifecycle:
EmailSenderimplements__enter__/__exit__for reliable connection cleanup — no reliance on__del__. - Message-ID header: Every outbound MIME message gets a unique
Message-IDgenerated byemail.utils.make_msgid(), reducing spam filter penalties. - Multi-format support: Supports plain text, HTML, and multipart/alternative MIME structures.
- Priority classification: Three-tier priority (High, Normal, Low) mapped to integer levels 1 through 3.
- File-backed templates: Templates are stored as plain text files in the
templates/directory, with a JSON manifest (templates/manifest.json) for discovery. - Variable interpolation: Uses
{{variable}}double-curly-brace syntax for substitution at render time. - Automatic multipart generation: When both
.txtand.htmlvariants of a template exist, the engine produces a multipart/alternative MIME message automatically. - Pre-defined templates: Four default templates (
verify,welcome,password_reset,notification) are generated on first run. - RESTful management: Templates are discoverable via the API with optional metadata output.
The API is implemented with FastAPI and exposes the following endpoints:
| Method | Endpoint | Description |
|---|---|---|
| POST | /email |
Submits an email to the processing queue |
| GET | /status |
Returns queue statistics and file metadata |
| GET | /health |
Performs a health check on all system components |
| GET | /templates |
Lists available templates (supports ?detailed=true) |
| GET | /workers/status |
Reports per-worker metrics and pool state |
| POST | /process-batch |
Manually triggers processing of the next queued file |
| GET | /docs |
Interactive API documentation (Swagger UI) |
- Structured logging: Each module writes to a dedicated log file under
logs/with automatic rotation (5 MB per file, 3 backup rotations). - Health monitoring: The
/healthendpoint validates the queue manager, template engine, and SMTP sender connectivity, returning a degraded status if any component is non-functional. - Per-worker metrics: The
/workers/statusendpoint reportsemails_sent,emails_failed, andavg_latency_msper worker thread in addition to running state and current batch. - Dead-letter queue: Emails that exhaust their retry limit are persisted to
data/dead_letter/with the failure reason and original payload for forensic analysis.
+----------------+ +----------------+ +----------------+ +----------------+
| HTTP Client |--->| API Server |--->| Queue Manager |--->| Worker Pool |
| (External) | | (FastAPI) | | (1 file/email)| | (Thread Pool) |
+----------------+ +----------------+ +----------------+ +--------+-------+
|
+----------------+ +----------------+ +----------------+ |
| Template |<---| Template | | Email Sender |<-----------+
| Files | | Engine | | (SMTP Client) |
+----------------+ +----------------+ +----------------+
The processing pipeline operates as follows:
- Incoming email requests are received by the FastAPI server and converted into
EmailRequestdata objects. - If a template name is specified, the template engine renders the subject and body using variable substitution before enqueuing.
- The queue manager writes the request to its own JSON file using atomic write semantics (
fsync+ rename). - Worker threads poll the queue manager for available files, claim one via atomic rename (
.processingsuffix), and dispatch the email viaEmailSender. - On success the file moves to
data/trash/. On failure,retry_countis incremented and the file is renamed back to pending. Emails that exceedmax_retriesare routed todata/dead_letter/.
- Python 3.8 or later
- Access to an SMTP server with credentials
pippackage installer
- Clone the repository and configure environment variables:
cp .env.example .env- Edit
.envwith your SMTP credentials:
nano .env- Install dependencies:
pip install fastapi uvicorn pydantic python-dotenv- Start the API server:
python api_server.py- Access the API documentation at
http://localhost:8000/docs.
All configuration is managed through environment variables, read at process start by a Pydantic Settings model in config.py:
SENDER_EMAIL=your-email@example.com
PASSWORD="your-smtp-password"
SMTP_SERVER=smtp.gmail.com
PORT=465
AUTO_PROCESSING_ENABLED=True
NUM_WORKERS=3
WORKER_POLLING_INTERVAL=5| Variable | Type | Default | Description |
|---|---|---|---|
SENDER_EMAIL |
string | -- | SMTP authentication username and From address |
PASSWORD |
string | -- | SMTP authentication password |
SMTP_SERVER |
string | -- | SMTP server hostname |
PORT |
integer | -- | SMTP server port (typically 465 for SSL) |
AUTO_PROCESSING_ENABLED |
boolean | True | Enable background worker pool on startup |
NUM_WORKERS |
integer | 3 | Number of concurrent worker threads |
WORKER_POLLING_INTERVAL |
integer | 5 | Polling interval in seconds for workers |
curl -X POST "http://localhost:8000/email" \
-H "Content-Type: application/json" \
-d '{
"subject": "Welcome Email",
"body": "Welcome to our service.",
"to_email": "user@example.com",
"format": "multipart"
}'The system includes four pre-defined templates (verify, welcome, password_reset, notification). Provide the template name and the required variables.
# Verification email with a confirmation code
curl -X POST "http://localhost:8000/email" \
-H "Content-Type: application/json" \
-d '{
"to_email": "user@example.com",
"template_name": "verify",
"template_vars": {
"code": "123456",
"app_name": "MyApp",
"expiry_minutes": 30
}
}'
# Welcome message for new user registrations
curl -X POST "http://localhost:8000/email" \
-H "Content-Type: application/json" \
-d '{
"to_email": "user@example.com",
"template_name": "welcome",
"template_vars": {
"name": "John Doe",
"username": "johndoe",
"email": "john@example.com",
"signup_date": "2026-05-04",
"app_name": "MyApp"
}
}'
# Password reset confirmation
curl -X POST "http://localhost:8000/email" \
-H "Content-Type: application/json" \
-d '{
"to_email": "user@example.com",
"template_name": "password_reset",
"template_vars": {
"name": "John Doe",
"app_name": "MyApp",
"reset_code": "ABC123",
"expiry_hours": 24
}
}'
# General notification
curl -X POST "http://localhost:8000/email" \
-H "Content-Type: application/json" \
-d '{
"to_email": "user@example.com",
"template_name": "notification",
"template_vars": {
"title": "System Maintenance",
"message": "Our system will undergo maintenance on Saturday from 2-4 AM UTC.",
"app_name": "MyApp"
}
}'curl "http://localhost:8000/status"curl "http://localhost:8000/health"curl "http://localhost:8000/workers/status"Returns per-worker emails_sent, emails_failed, avg_latency_ms, and current batch.
# Basic listing
curl "http://localhost:8000/templates"
# Detailed metadata including placeholders
curl "http://localhost:8000/templates?detailed=true"Email_engine/
+-- api_server.py FastAPI REST API server with endpoint definitions
+-- queue_manager.py 1-file-per-email queue with atomic writes and dirty-flag cache
+-- worker_pool.py Thread pool with per-worker metrics
+-- email_sender.py SMTP client with thread-safe pool, context manager, Message-ID
+-- template_engine.py File-backed template rendering with variable substitution
+-- config.py Pydantic-based environment configuration model
+-- logger_engine.py Rotating file logger setup utility
+-- email_engine.py Legacy module maintained for backward compatibility
+-- .env Environment variables (SMTP credentials, runtime config)
+-- LICENSE PolyForm Strictly Noncommercial License 1.0.0
+-- README.md This document
+-- data/
| +-- queue/ Active email files — batch_{ts_ms}_{uuid8}.json (auto-generated)
| +-- dead_letter/ Permanently failed emails with metadata
| +-- trash/ Processed or stale files
+-- templates/ Email template files (auto-generated)
| +-- manifest.json Template metadata and variable definitions
| +-- verify.txt / .html
| +-- welcome.txt / .html
| +-- password_reset.txt / .html
| +-- notification.txt / .html
+-- logs/ Rotating log output (auto-generated)
+-- Tests/ Test suite
+-- test_queue_system.py
+-- test_email_sender.py
+-- test_template_engine.py
+-- test_api_integration.py
+-- integration_test.py
Run tests from the project root with PYTHONPATH set so modules resolve correctly:
PYTHONPATH=. python Tests/test_queue_system.py
PYTHONPATH=. python Tests/test_email_sender.py
PYTHONPATH=. python Tests/test_template_engine.py
PYTHONPATH=. python Tests/test_api_integration.py
PYTHONPATH=. python Tests/integration_test.pytest_api_integration.py and the API portion of integration_test.py require the server to be running (python api_server.py) before execution.
| Metric | Value |
|---|---|
| Queue capacity | Disk-bound (one JSON file per email) |
| Queue granularity | 1 email per file |
| Worker threads | Configurable (default: 3) |
| Retry attempts | Configurable (default: 3) |
| Retry backoff strategy | Exponential (2^n × base delay) |
| Memory footprint | O(1) per file (streamed from disk) |
| SMTP connection pool | Per-(host, port), RLock-protected, NOOP-validated |
- Transient failures: SMTP send failures trigger automatic retry with exponential backoff. Failed connections are evicted from the pool and re-established on the next attempt.
- Permanent failures: Emails that exhaust their retry limit are persisted to
data/dead_letter/with failure metadata and are not re-queued automatically. - Template errors: Missing templates or invalid variable references raise
ValueError, surfaced to the client as HTTP 400. - Queue corruption: Invalid JSON in a file is detected on read; corrupted files are moved to trash and logged.
- Stale processing files: Files left in
.processingstate for more than one hour are assumed orphaned and moved to trash on the next queue poll. - API errors: All internal exceptions are caught and returned as structured HTTP error responses with appropriate status codes.
- Log rotation: Each component writes to a dedicated log file; files are rotated at 5 MB with up to 3 historical backups.
- Queue statistics: The
/statusendpoint reports total file count, pending email count, and per-file metadata. - Health checks: The
/healthendpoint validates queue manager, template engine, and email sender connectivity. - Worker metrics: The
/workers/statusendpoint reportsemails_sent,emails_failed,avg_latency_ms, and current batch per worker thread.
- Use a process supervisor (systemd, supervisor) to manage the
api_server.pyprocess with automatic restart. - Centralize logs to a dedicated monitoring platform (e.g., journald, ELK stack, or equivalent).
- Implement external monitoring for queue depth, processing latency, and failure rates.
- Configure alerting for sustained queue growth or elevated dead-letter rates.
- Schedule regular backups of the
templates/directory and configuration.
- Increase
NUM_WORKERSto improve throughput on systems with sufficient CPU and SMTP capacity. - Monitor disk I/O under sustained load; with one file per email, high ingestion rates produce many small files — ensure the filesystem handles high inode counts efficiently (ext4 with
dir_indexenabled is fine; avoid FAT32 or similarly limited filesystems). - For production workloads exceeding 1 million emails, consider migrating from file-backed storage to a database-backed queue.
The legacy send_simple_email function from the original email_engine.py module is preserved:
from email_sender import send_simple_email
send_simple_email(
subject="Test",
body="Hello",
to_email="user@example.com",
format="plain" # Accepts "plain", "html", or "multipart"
)- No authentication or authorization middleware: The API has no built-in access control. Production deployments must implement authentication at the reverse proxy or application layer.
- File-system-based queue: Queue persistence relies on the local file system and does not provide the transactional guarantees, replication, or durability semantics of a dedicated message broker. Data loss is possible in the event of an unclean shutdown during a write operation (mitigated by
fsyncbefore rename). - No TLS termination: The FastAPI server runs over plain HTTP on port 8000. In production, a reverse proxy (e.g., nginx, Caddy, or a cloud load balancer) should terminate TLS.
This project is licensed under the PolyForm Strictly Noncommercial License 1.0.0. See the LICENSE file for the full license text.
Copyright Crellsin. All rights reserved.