Production-ready monitoring system for semiconductor wafer fabrication with separated environments.
- β Real-time Event Ingestion - High-throughput event processing with local spooling for resilience
- β Time-Series Storage - TimescaleDB with 72h hot storage + 10-year S3 archival
- β Multi-Site Aggregation - Centralized monitoring across multiple fabrication sites
- β Performance Metrics - CPU, memory, duration tracking with automatic collection
- β Interactive Dashboards - Real-time Streamlit dashboards with charts and analytics
- π₯ Structured Logging - JSON logging with structured data using structlog
- π₯ Distributed Tracing - OpenTelemetry integration for request tracking across services
- π₯ Prometheus Metrics - Comprehensive metrics collection with /metrics endpoints
- π₯ Smart Alerting - Configurable alert rules with Slack/Webhook/Email notifications
- π₯ Error Handling - Automatic retries with exponential backoff
- π₯ Configuration Management - Pydantic-based config with validation
- π₯ Performance Optimized - Connection pooling, caching, batch processing
- π₯ Comprehensive Tests - Unit, integration, and performance test suites
- π₯ Multi-Integration Support - Send events to multiple backends (Zabbix, ELK, CSV, JSON, Webhooks)
- βοΈ AWS Cloud Integration - Monitor EC2, ECS, and Lambda jobs with CloudWatch & X-Ray
- ποΈ TimescaleDB Optimization - Advanced time-series features, compression, retention policies
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β CENTRAL NODE (Env C) β
β ββββββββββββββββββββ ββββββββββββββββββββ β
β β Central Web UI βββββββββββ Central API β β
β β (Streamlit) β β (Aggregator) β β
β ββββββββββββββββββββ βββββββββββ¬βββββββββ β
β β β
ββββββββββββββββββββββββββββββββββββββββββββΌββββββββββββββββββββββββ
β
ββββββββββββββββββββββββΌβββββββββββββββββββββββ
β β β
βββββββββββββΌβββββββββββ βββββββββΌβββββββββββ βββββββββΌβββββββββββ
β SITE 1 (Fab1) β β SITE 2 (Fab2) β β SITE N (FabN) β
ββββββββββββββββββββββββ ββββββββββββββββββββ ββββββββββββββββββββ
Each Site has 3 environments:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ENV A: BUSINESS NODE β
β ββββββββββββββββββββββββββββββββββββββββ β
β β Sidecar Agent (Forwarding + Spool) β β
β βββββββββββββββββββββ¬βββββββββββββββββββ β
ββββββββββββββββββββββββΌβββββββββββββββββββββββββββββββββββββββββββ
β HTTP
ββββββββββββββββββββββββΌβββββββββββββββββββββββββββββββββββββββββββ
β ENV B: PLANT DATA PLANE β
β ββββββββββββββ ββββββββββββββββ βββββββββββββββ β
β β Local API βββΆβ TimescaleDB ββββ Archiver ββββΆ S3 β
β β (Ingest + β β (72h hot) β β (Parquet) β β
β β Query) β ββββββββββββββββ βββββββββββββββ β
β ββββββββββββββ β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
β Read-only
ββββββββββΌββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ENV C: OPERATOR HMI β
β ββββββββββββββββββββββββββββββββββββββ β
β β Local Web UI (Streamlit) β β
β ββββββββββββββββββββββββββββββββββββββ β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
- Python 3.10+
- PostgreSQL 14+ with TimescaleDB extension
- (Optional) S3-compatible storage for archival
- (Optional) OpenTelemetry collector for tracing
# Clone repository
git clone <repo-url>
cd wafer-monitor-v2
# Install dependencies
pip install -e .
# For development
pip install -e ".[dev]"
Create .env
files or set environment variables:
LOCAL_API_BASE=http://localhost:18000
SPOOL_DIR=/tmp/sidecar-spool
LOG_LEVEL=INFO
ENABLE_TRACING=true
OTLP_ENDPOINT=http://localhost:4317
DATABASE_URL=postgresql://postgres:postgres@localhost:5432/monitor
DB_POOL_MIN_SIZE=2
DB_POOL_MAX_SIZE=10
LOG_LEVEL=INFO
ENABLE_TRACING=true
SITES=fab1=http://site1:18000,fab2=http://site2:18000
LOG_LEVEL=INFO
# Start Local API
cd apps/local_api
python main.py
# Start Sidecar Agent
cd apps/sidecar_agent
python main.py
# Start Central API
cd apps/central_api
python main.py
# Start Archiver
cd apps/archiver
python main.py
# Start Web UI (Local)
cd apps/web_local
streamlit run streamlit_app.py --server.port 8501
# Start Web UI (Central)
cd apps/web_central
streamlit run streamlit_app.py --server.port 8502
# Local site deployment
docker-compose -f deploy/docker/compose.local-data.yml up -d
docker-compose -f deploy/docker/compose.local-web.yml up -d
# Central deployment
docker-compose -f deploy/docker/compose.central.yml up -d
# Business node
docker-compose -f deploy/docker/compose.business.yml up -d
# See deploy/podman/ for pod scripts
cd deploy/podman/local-data
./up.sh
from uuid import uuid4
from monitoring_sdk import Monitored, AppRef, SidecarEmitter
# Create app reference
app = AppRef(
app_id=uuid4(),
name='wafer-process',
version='2.1.0'
)
# Monitor a job
with Monitored(
site_id='fab1',
app=app,
entity_type='job',
business_key='batch-12345'
):
# Your processing code here
process_wafer_batch()
# Monitor parent job with subjobs
with Monitored(
site_id='fab1',
app=app,
entity_type='job',
business_key='batch-12345',
metadata={'priority': 'high', 'customer': 'ACME'}
) as job:
# Process multiple subjobs
for wafer_id in wafer_ids:
with Monitored(
site_id='fab1',
app=app,
entity_type='subjob',
parent_id=job.entity_id,
sub_key=f'wafer-{wafer_id}'
) as subjob:
process_wafer(wafer_id)
# Report progress
subjob.tick(extra_meta={'progress': 0.5})
from monitoring_sdk import SidecarEmitter
# Custom emitter with retry configuration
emitter = SidecarEmitter(
base_url='http://sidecar:8000',
timeout=10.0,
max_retries=5,
enable_retries=True
)
with Monitored(
site_id='fab1',
app=app,
entity_type='job',
emitter=emitter
):
process_data()
POST /v1/ingest/events
- Ingest single eventPOST /v1/ingest/events:batch
- Ingest batch of eventsGET /v1/healthz
- Health checkGET /metrics
- Prometheus metrics
POST /v1/ingest/events
- Ingest single event (from sidecar)POST /v1/ingest/events:batch
- Ingest batch of eventsGET /v1/jobs
- Query jobs with filtersGET /v1/subjobs
- Query subjobs with filtersGET /v1/stream
- Real-time event stream (SSE)GET /v1/healthz
- Health checkGET /metrics
- Prometheus metrics
GET /v1/jobs?site=<site_id>
- Query jobs from specific siteGET /v1/subjobs?site=<site_id>
- Query subjobs from specific siteGET /v1/sites
- List configured sitesGET /v1/healthz
- Health check with site statusGET /metrics
- Prometheus metrics
All services expose /metrics
endpoints with comprehensive metrics:
HTTP Metrics:
http_requests_total
- Total HTTP requests by method, endpoint, statushttp_request_duration_seconds
- Request latency histogram
Database Metrics:
db_operations_total
- Total DB operations by type, table, statusdb_operation_duration_seconds
- DB operation latencydb_pool_size
- Connection pool sizedb_pool_available
- Available connections
Event Processing:
events_processed_total
- Total events by type and statusevents_in_spool
- Current spool directory size
Job Metrics:
jobs_total
- Total jobs by app and statusjob_duration_seconds
- Job duration histogram
Enable OpenTelemetry tracing by setting:
ENABLE_TRACING=true
OTLP_ENDPOINT=http://your-collector:4317
Traces include:
- Request flows across services
- Database operations
- Event forwarding
- Query execution
View traces in Jaeger, Tempo, or any OTLP-compatible backend.
All services emit structured JSON logs:
{
"event": "event_ingested",
"timestamp": "2025-10-19T12:34:56.789Z",
"level": "info",
"service": "local-api",
"event_kind": "finished",
"entity_type": "job",
"site_id": "fab1",
"duration_s": 0.0234
}
The system includes built-in alert rules:
- High Failure Rate - >10% jobs failing
- Long Running Jobs - Jobs running >1 hour
- High Memory Usage - >8GB memory usage
- No Jobs Received - No activity when expected
- Ingestion Lag - >100 events in spool
- Database Issues - Connection failures
Set environment variables:
# Webhook alerts
ALERT_WEBHOOK_URL=https://your-webhook-endpoint
# Slack alerts
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/YOUR/WEBHOOK/URL
# Email alerts (requires email API)
EMAIL_API_URL=https://your-email-api
from shared_utils import get_alert_manager, AlertRule, AlertSeverity
alert_mgr = get_alert_manager()
# Add custom rule
alert_mgr.add_rule(AlertRule(
name='custom_metric_threshold',
condition=lambda m: m.get('custom_metric', 0) > 1000,
severity=AlertSeverity.WARNING,
message_template='Custom metric exceeded: {custom_metric}',
cooldown_minutes=10
))
# Unit tests
pytest tests/unit/ -v
# Integration tests (requires running services)
pytest tests/integration/ -v
# Performance tests
pytest tests/performance/ -v -s -m performance
# All tests with coverage
pytest tests/ --cov=apps --cov-report=html
Expected performance (adjust based on hardware):
- Single Event Latency: <500ms average, <1s P95
- Batch Throughput: >50 events/second
- Concurrent Load: >100 events/second with 20 concurrent clients
- Query Latency: <200ms average for 100 jobs
wafer-monitor-v2/
βββ apps/
β βββ archiver/ # S3 archival service
β βββ central_api/ # Central aggregation API
β βββ local_api/ # Local site API
β βββ monitoring_sdk/ # Client SDK
β β βββ aws_helpers.py # AWS platform helpers
β βββ sidecar_agent/ # Event forwarding agent
β βββ shared_utils/ # Shared utilities
β β βββ alerts.py # Alerting system
β β βββ config.py # Configuration
β β βββ logging.py # Structured logging
β β βββ metrics.py # Prometheus metrics
β β βββ tracing.py # OpenTelemetry tracing
β β βββ integrations/ # Multi-backend integrations
β β βββ local_api.py # Local API integration
β β βββ zabbix.py # Zabbix monitoring
β β βββ elk.py # Elasticsearch/Logstash
β β βββ csv_export.py # CSV file export
β β βββ json_export.py # JSON file export
β β βββ webhook.py # Generic webhooks
β β βββ aws_cloudwatch.py # AWS CloudWatch
β β βββ aws_xray.py # AWS X-Ray tracing
β β βββ container.py # DI container
β βββ web_central/ # Central dashboard
β βββ web_local/ # Local site dashboard
βββ deploy/
β βββ docker/ # Docker Compose configs
β βββ podman/ # Podman pod scripts
βββ docs/ # Documentation
β βββ API.md # API reference
β βββ DEPLOYMENT.md # Deployment guide
β βββ INTEGRATIONS.md # Integration docs
β βββ MULTI_INTEGRATION_GUIDE.md
β βββ AWS_INTEGRATION.md # AWS cloud guide
β βββ TIMESCALEDB_OPTIMIZATION.md
βββ examples/
β βββ integrations/ # Integration configs
β βββ aws/ # AWS examples
β βββ lambda_handler.py
β βββ ec2_job.py
β βββ ecs_task.py
β βββ Dockerfile.lambda
β βββ Dockerfile.ec2
β βββ task-definition.json
β βββ IAM-policies.json
βββ ops/
β βββ sql/
β β βββ schema.sql # Database schema
β β βββ timescaledb_enhancements.sql
β β βββ timescaledb_config.sql
β βββ scripts/
β βββ monitor_timescaledb.py
β βββ maintenance.sh
βββ tests/
β βββ unit/ # Unit tests
β βββ integration/ # Integration tests
β βββ performance/ # Performance tests
βββ pyproject.toml # Dependencies
βββ README.md # This file
- app - Application registry
- job - Job records (hypertable, 72h retention)
- subjob - Subjob records (hypertable, 72h retention)
- event - Raw events (hypertable, 72h retention)
- Time-based indexes for efficient queries
- Status indexes for filtering
- Unique constraint on idempotency keys
- Connection Pooling - Async connection pools with configurable sizes
- Query Optimization - Indexed queries with CTEs for deduplication
- Batch Processing - Batch event ingestion support
- Caching - Dashboard caching with configurable TTL
- Retry Logic - Automatic retries with exponential backoff
- Spooling - Local event spooling for resilience
# Database pool sizing
DB_POOL_MIN_SIZE=5
DB_POOL_MAX_SIZE=20
# Query limits
QUERY_DEFAULT_LIMIT=1000
QUERY_MAX_LIMIT=10000
# Timeouts
REQUEST_TIMEOUT_S=5.0
DRAIN_INTERVAL_S=2.0
# Sidecar Agent
curl http://localhost:8000/v1/healthz
# Local API
curl http://localhost:18000/v1/healthz
# Central API
curl http://localhost:19000/v1/healthz
# View Prometheus metrics
curl http://localhost:8000/metrics
# View spooled events (when Local API is unavailable)
ls -l /tmp/sidecar-spool/
# Connect to database
psql $DATABASE_URL
# Check table sizes
SELECT schemaname, tablename, pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename))
FROM pg_tables
WHERE schemaname = 'public';
Monitor near-real-time compute jobs on AWS EC2, ECS, and Lambda with CloudWatch and X-Ray integration!
from monitoring_sdk import AppRef, Monitored
from monitoring_sdk.aws_helpers import create_aws_emitter, get_aws_metadata
from uuid import uuid4
app = AppRef(app_id=uuid4(), name="my-aws-job", version="1.0.0")
emitter = create_aws_emitter() # Auto-detects EC2/ECS/Lambda
metadata = get_aws_metadata()
with Monitored(
site_id='site1',
app=app,
entity_type='job',
business_key='daily-batch',
emitter=emitter,
metadata=metadata
):
# Your job logic - metrics sent to CloudWatch & X-Ray
process_data()
from monitoring_sdk.aws_helpers import monitored_lambda_handler
@monitored_lambda_handler('site1', app_ref)
def lambda_handler(event, context):
# Automatically monitored!
return {'statusCode': 200}
See AWS_INTEGRATION.md for complete guide.
Send monitoring events to multiple backends simultaneously:
- Local API - TimescaleDB storage
- Zabbix - Enterprise monitoring
- ELK Stack - Elasticsearch for search & analysis
- CSV/JSON Export - File-based backups
- Webhooks - Generic HTTP endpoints
- AWS CloudWatch - Cloud metrics & logs
- AWS X-Ray - Distributed tracing
See INTEGRATIONS.md and MULTI_INTEGRATION_GUIDE.md for details.
Advanced time-series database features:
- Continuous Aggregates - Pre-computed rollups (1h, 1d, 1w, 1mo)
- Compression - Automatic compression after 3 days
- Retention Policies - Auto-delete data after 90 days
- Stored Procedures - Analytics & alerting functions
- Monitoring Views - Health & performance metrics
See TIMESCALEDB_OPTIMIZATION.md for complete guide.
- QUICKSTART.md - 10-minute setup guide
- API.md - Complete API reference
- DEPLOYMENT.md - Production deployment guide
- INTEGRATIONS.md - Integration backends
- AWS_INTEGRATION.md - AWS cloud monitoring
- TIMESCALEDB_OPTIMIZATION.md - Database optimization
- CHANGELOG.md - Version history
[Your License Here]
[Contribution guidelines here]
[Support information here]