# Homework Starter — Stage 15: Orchestration & System Design
Complete the sections below. Keep your answers concise and focused on orchestration readiness.

## 1) Project Task Decomposition
List 4–8 tasks. Add more rows as needed.

In [1]:
# High-Frequency Trading Factor Prediction System - Task Decomposition
from pathlib import Path
import pandas as pd

# Define comprehensive task breakdown for the trading factor prediction pipeline
tasks = pd.DataFrame({
    'task': [
        'data_ingestion',
        'data_validation', 
        'feature_engineering',
        'model_training',
        'model_validation',
        'model_deployment',
        'prediction_serving',
        'monitoring_alerts'
    ],
    'inputs': [
        '/data/market/raw_trading_data.csv',
        'data/processed/trading_data_raw.parquet',
        'data/processed/trading_data_validated.parquet', 
        'data/features/trading_factors.parquet',
        'models/trained/regression_model.pkl',
        'models/validated/champion_model.pkl',
        'models/production/live_model.pkl',
        'logs/predictions/*.json'
    ],
    'outputs': [
        'data/processed/trading_data_raw.parquet',
        'data/processed/trading_data_validated.parquet',
        'data/features/trading_factors.parquet',
        'models/trained/regression_model.pkl,models/trained/classification_model.pkl',
        'models/validated/champion_model.pkl,reports/model_validation.json',
        'models/production/live_model.pkl,config/deployment_manifest.yaml',
        'api/predictions/results.json,logs/predictions/prediction_log.json',
        'alerts/model_health.json,reports/monitoring_summary.json'
    ],
    'duration_mins': [5, 2, 10, 30, 15, 10, 1, 5],
    'frequency': ['hourly', 'hourly', 'hourly', 'daily', 'daily', 'weekly', 'realtime', 'continuous'],
    'idempotent': [True, True, True, True, True, True, False, True],
    'criticality': ['high', 'high', 'high', 'medium', 'high', 'high', 'critical', 'high']
})

print("High-Frequency Trading Factor Prediction System - Task Breakdown")
print("=" * 70)
tasks

High-Frequency Trading Factor Prediction System - Task Breakdown


Unnamed: 0,task,inputs,outputs,duration_mins,frequency,idempotent,criticality
0,data_ingestion,/data/market/raw_trading_data.csv,data/processed/trading_data_raw.parquet,5,hourly,True,high
1,data_validation,data/processed/trading_data_raw.parquet,data/processed/trading_data_validated.parquet,2,hourly,True,high
2,feature_engineering,data/processed/trading_data_validated.parquet,data/features/trading_factors.parquet,10,hourly,True,high
3,model_training,data/features/trading_factors.parquet,"models/trained/regression_model.pkl,models/tra...",30,daily,True,medium
4,model_validation,models/trained/regression_model.pkl,"models/validated/champion_model.pkl,reports/mo...",15,daily,True,high
5,model_deployment,models/validated/champion_model.pkl,"models/production/live_model.pkl,config/deploy...",10,weekly,True,high
6,prediction_serving,models/production/live_model.pkl,"api/predictions/results.json,logs/predictions/...",1,realtime,False,critical
7,monitoring_alerts,logs/predictions/*.json,"alerts/model_health.json,reports/monitoring_su...",5,continuous,True,high


## 2) Dependencies (DAG)
Describe dependencies and paste a small diagram if you have one.

In [2]:
# Define complex DAG dependencies for high-frequency trading system
dag = {
    'data_ingestion': [],  # No dependencies - starts the pipeline
    'data_validation': ['data_ingestion'],  # Depends on raw data being ingested
    'feature_engineering': ['data_validation'],  # Requires validated data
    'model_training': ['feature_engineering'],  # Needs engineered features
    'model_validation': ['model_training'],  # Validates trained models
    'model_deployment': ['model_validation'],  # Deploy only validated models
    'prediction_serving': ['model_deployment'],  # Serve predictions with deployed models
    'monitoring_alerts': ['prediction_serving', 'model_deployment']  # Monitor both serving and deployment
}

print("Trading Factor Prediction Pipeline - Dependency Graph")
print("=" * 60)

# Visualize the DAG structure
for task, dependencies in dag.items():
    if dependencies:
        deps_str = " + ".join(dependencies)
        print(f"✓ {task:<20} ← depends on: {deps_str}")
    else:
        print(f"🏁 {task:<20} ← entry point (no dependencies)")

print("\n🔄 Pipeline Flow Visualization:")
print("""
┌─────────────────┐
│ data_ingestion  │ (hourly)
└─────────┬───────┘
          │
┌─────────▼───────┐
│ data_validation │ (hourly) 
└─────────┬───────┘
          │
┌─────────▼───────┐
│feature_engineer │ (hourly)
└─────────┬───────┘
          │
┌─────────▼───────┐
│ model_training  │ (daily)
└─────────┬───────┘
          │
┌─────────▼───────┐
│model_validation │ (daily)
└─────────┬───────┘
          │
┌─────────▼───────┐
│model_deployment │ (weekly)
└─────────┬───────┘
          │
┌─────────▼───────┐     ┌──────────────────┐
│prediction_serve │────▶│monitoring_alerts │
│   (realtime)    │     │   (continuous)   │
└─────────────────┘     └──────────────────┘
""")

dag

Trading Factor Prediction Pipeline - Dependency Graph
🏁 data_ingestion       ← entry point (no dependencies)
✓ data_validation      ← depends on: data_ingestion
✓ feature_engineering  ← depends on: data_validation
✓ model_training       ← depends on: feature_engineering
✓ model_validation     ← depends on: model_training
✓ model_deployment     ← depends on: model_validation
✓ prediction_serving   ← depends on: model_deployment
✓ monitoring_alerts    ← depends on: prediction_serving + model_deployment

🔄 Pipeline Flow Visualization:

┌─────────────────┐
│ data_ingestion  │ (hourly)
└─────────┬───────┘
          │
┌─────────▼───────┐
│ data_validation │ (hourly) 
└─────────┬───────┘
          │
┌─────────▼───────┐
│feature_engineer │ (hourly)
└─────────┬───────┘
          │
┌─────────▼───────┐
│ model_training  │ (daily)
└─────────┬───────┘
          │
┌─────────▼───────┐
│model_validation │ (daily)
└─────────┬───────┘
          │
┌─────────▼───────┐
│model_deployment │ (weekly)
└───────

{'data_ingestion': [],
 'data_validation': ['data_ingestion'],
 'feature_engineering': ['data_validation'],
 'model_training': ['feature_engineering'],
 'model_validation': ['model_training'],
 'model_deployment': ['model_validation'],
 'prediction_serving': ['model_deployment'],
 'monitoring_alerts': ['prediction_serving', 'model_deployment']}

## 3) Logging & Checkpoints Plan
Specify what you will log and where you will checkpoint for each task.

In [3]:
# Comprehensive logging and checkpoint strategy for production trading system
logging_plan = pd.DataFrame({
    'task': [
        'data_ingestion',
        'data_validation', 
        'feature_engineering',
        'model_training',
        'model_validation',
        'model_deployment',
        'prediction_serving',
        'monitoring_alerts'
    ],
    'log_messages': [
        'start/end timestamps, rows ingested, data source health, market session info',
        'start/end, data quality scores, null rates, schema validation results, outlier counts',
        'start/end, feature counts, correlation matrix hash, drift detection scores, processing time',
        'hyperparameters, training metrics (R², RMSE, accuracy), feature importance, convergence status',
        'model performance scores, validation dataset stats, A/B test results, business metrics',
        'deployment status, model version, rollback capabilities, health check results, traffic routing',
        'prediction requests/sec, latency metrics, model inference time, confidence scores, error rates',
        'alert triggers, model drift indicators, performance degradation, SLA violations, escalation status'
    ],
    'checkpoint_artifact': [
        'data/processed/trading_data_raw.parquet + metadata.json',
        'data/processed/trading_data_validated.parquet + quality_report.json',
        'data/features/trading_factors.parquet + feature_metadata.json',
        'models/trained/model.pkl + training_metrics.json + hyperparams.yaml',
        'models/validated/champion_model.pkl + validation_report.json + performance_metrics.json',
        'models/production/live_model.pkl + deployment_manifest.yaml + rollback_config.json',
        'predictions/batch_results.json + inference_logs.json + latency_metrics.json',
        'alerts/triggered_alerts.json + monitoring_dashboard.json + sla_compliance.json'
    ],
    'retention_days': [30, 30, 90, 365, 365, 180, 7, 90],
    'backup_location': [
        's3://trading-data/raw/',
        's3://trading-data/validated/',
        's3://trading-data/features/',
        's3://trading-models/trained/',
        's3://trading-models/validated/',
        's3://trading-models/production/',
        's3://trading-predictions/',
        's3://trading-monitoring/'
    ]
})

print("Comprehensive Logging & Checkpoint Strategy")
print("=" * 55)
print("Key Design Principles:")
print("• Structured JSON logging for all critical events")
print("• Automatic artifact versioning with timestamps")
print("• Distributed backup storage for disaster recovery")
print("• Configurable retention policies based on criticality")
print("• Real-time monitoring integration")
print()

logging_plan

Comprehensive Logging & Checkpoint Strategy
Key Design Principles:
• Structured JSON logging for all critical events
• Automatic artifact versioning with timestamps
• Distributed backup storage for disaster recovery
• Configurable retention policies based on criticality
• Real-time monitoring integration



Unnamed: 0,task,log_messages,checkpoint_artifact,retention_days,backup_location
0,data_ingestion,"start/end timestamps, rows ingested, data sour...",data/processed/trading_data_raw.parquet + meta...,30,s3://trading-data/raw/
1,data_validation,"start/end, data quality scores, null rates, sc...",data/processed/trading_data_validated.parquet ...,30,s3://trading-data/validated/
2,feature_engineering,"start/end, feature counts, correlation matrix ...",data/features/trading_factors.parquet + featur...,90,s3://trading-data/features/
3,model_training,"hyperparameters, training metrics (R², RMSE, a...",models/trained/model.pkl + training_metrics.js...,365,s3://trading-models/trained/
4,model_validation,"model performance scores, validation dataset s...",models/validated/champion_model.pkl + validati...,365,s3://trading-models/validated/
5,model_deployment,"deployment status, model version, rollback cap...",models/production/live_model.pkl + deployment_...,180,s3://trading-models/production/
6,prediction_serving,"prediction requests/sec, latency metrics, mode...",predictions/batch_results.json + inference_log...,7,s3://trading-predictions/
7,monitoring_alerts,"alert triggers, model drift indicators, perfor...",alerts/triggered_alerts.json + monitoring_dash...,90,s3://trading-monitoring/


## 4) Right-Sizing Automation
Which parts will you automate now? Which stay manual? Why?

## Automation Strategy for High-Frequency Trading Factor System

### **Fully Automated Components**

**Data Ingestion & Validation (Hourly)**
- **Rationale**: High-frequency trading requires consistent, timely data updates. Manual intervention would introduce unacceptable latency and human error risk.
- **Implementation**: Automated schedulers (Airflow/Prefect) with built-in retry mechanisms and data quality checks.

**Feature Engineering (Hourly)**
- **Rationale**: Feature calculations are deterministic and computational. Automation ensures consistency and enables rapid response to market changes.
- **Implementation**: Containerized feature processing with automatic scaling based on data volume.

**Model Inference/Prediction Serving (Real-time)**
- **Rationale**: Trading decisions require sub-second response times. Human intervention is impossible at this frequency.
- **Implementation**: Auto-scaling API infrastructure with circuit breakers and fallback mechanisms.

**Monitoring & Alerting (Continuous)**
- **Rationale**: System health monitoring must be continuous to catch issues before they impact trading performance.
- **Implementation**: Automated monitoring with tiered alert escalation and self-healing capabilities.

### **Semi-Automated with Human Oversight**

**Model Training (Daily)**
- **Automation**: Automated triggering, hyperparameter tuning, and metric calculation
- **Human Oversight**: Review training results, approve hyperparameter changes, validate business logic
- **Rationale**: While training can be automated, market regime changes require human expertise to interpret results and adjust strategies.

**Model Validation (Daily)**
- **Automation**: Statistical validation tests, performance metric calculation, A/B testing setup
- **Human Oversight**: Business impact assessment, risk evaluation, deployment approval
- **Rationale**: Model performance validation involves both statistical and business considerations that require human judgment.

### **Manual with Automation Support**

**Model Deployment (Weekly/On-Demand)**
- **Automation**: Deployment infrastructure, health checks, rollback capabilities
- **Manual Control**: Deployment timing, traffic routing decisions, risk assessment
- **Rationale**: Model deployment to production has significant financial impact. Human approval ensures proper risk management and business alignment.

**Incident Response & Model Rollbacks**
- **Automation**: Incident detection, initial diagnostics, automated rollback triggers
- **Manual Control**: Root cause analysis, fix implementation, go/no-go decisions
- **Rationale**: Financial systems require human oversight for critical decisions, especially during incidents.

### **Right-Sizing Justification**

| **Automation Level** | **Tasks** | **Business Impact** | **Risk Level** | **Decision Driver** |
|----------------------|-----------|-------------------|----------------|-------------------|
| **Full Auto** | Data pipeline, serving | High availability | Low | Speed & consistency required |
| **Semi-Auto** | Training, validation | Model quality | Medium | Expertise + efficiency balance |
| **Manual** | Deployment, incidents | Financial safety | High | Risk management priority |

### **Success Metrics**

- **Data Pipeline**: 99.9% uptime, < 30s processing latency
- **Model Training**: 95% automated success rate, human review within 2 hours
- **Deployment**: Zero failed deployments, 100% human approval rate
- **Incident Response**: < 5 minute detection, < 30 minute human response

This strategy balances the need for high-frequency automation with appropriate human oversight for financial risk management.

## 5) (Stretch) Refactor One Task into a Function + CLI
Use the templates below.

In [4]:
import argparse
import json
import logging
import sys
import time
import pandas as pd
import numpy as np
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Any, Optional

# === FEATURE ENGINEERING TASK ===
def feature_engineering_task(input_path: str, output_path: str, config_path: Optional[str] = None) -> None:
    """
    Transform validated trading data into engineered features for model training.
    
    Args:
        input_path: Path to validated trading data (parquet)
        output_path: Path to save engineered features (parquet)
        config_path: Optional path to feature engineering configuration
    """
    logging.info('[feature_engineering] Starting feature engineering task')
    start_time = time.time()
    
    try:
        # Load validated trading data
        if not Path(input_path).exists():
            raise FileNotFoundError(f"Input file not found: {input_path}")
            
        df = pd.read_parquet(input_path)
        logging.info(f'[feature_engineering] Loaded {len(df)} records from {input_path}')
        
        # Load configuration if provided
        config = {}
        if config_path and Path(config_path).exists():
            with open(config_path, 'r') as f:
                config = json.load(f)
        
        # Core feature engineering logic
        features_df = df.copy()
        
        # 1. Calculate initiative buy/sell rate ratios
        features_df['buy_sell_ratio'] = (
            features_df['S_LI_INITIATIVEBUYRATE'] / 
            (features_df['S_LI_INITIATIVESELLRATE'] + 1e-8)
        )
        
        # 2. Large order flow indicators
        features_df['large_order_imbalance'] = (
            features_df['S_LI_LARGEBUYRATE'] - features_df['S_LI_LARGESELLRATE']
        )
        
        # 3. Rolling window features (if timestamp column exists)
        if 'timestamp' in features_df.columns:
            features_df = features_df.sort_values('timestamp')
            # 5-minute rolling averages
            for col in ['S_LI_INITIATIVEBUYRATE', 'S_LI_INITIATIVESELLRATE']:
                features_df[f'{col}_5min_ma'] = features_df[col].rolling(window=5).mean()
        
        # 4. Market regime indicators
        features_df['high_activity'] = (
            (features_df['S_LI_INITIATIVEBUYRATE'] + features_df['S_LI_INITIATIVESELLRATE']) > 0.7
        ).astype(int)
        
        # Create output directory
        Path(output_path).parent.mkdir(parents=True, exist_ok=True)
        
        # Save engineered features
        features_df.to_parquet(output_path, index=False)
        
        # Generate metadata
        metadata = {
            'run_at': datetime.utcnow().isoformat(),
            'input_path': input_path,
            'output_path': output_path,
            'input_records': len(df),
            'output_records': len(features_df),
            'features_created': list(set(features_df.columns) - set(df.columns)),
            'processing_time_seconds': time.time() - start_time,
            'config_used': config,
            'data_quality': {
                'null_rates': features_df.isnull().mean().to_dict(),
                'feature_correlations': features_df.corr()['S_LI_INITIATIVESELLRATE'].to_dict()
            }
        }
        
        # Save metadata
        metadata_path = output_path.replace('.parquet', '_metadata.json')
        with open(metadata_path, 'w') as f:
            json.dump(metadata, f, indent=2, default=str)
        
        logging.info(f'[feature_engineering] Created {len(features_df.columns)} features')
        logging.info(f'[feature_engineering] Saved features to {output_path}')
        logging.info(f'[feature_engineering] Saved metadata to {metadata_path}')
        logging.info(f'[feature_engineering] Completed in {time.time() - start_time:.2f} seconds')
        
    except Exception as e:
        logging.error(f'[feature_engineering] Error: {str(e)}')
        raise

# === MODEL TRAINING TASK ===
def model_training_task(input_path: str, output_path: str, model_type: str = 'regression') -> None:
    """
    Train machine learning models on engineered features.
    
    Args:
        input_path: Path to engineered features (parquet)
        output_path: Path to save trained model (pkl)
        model_type: Type of model to train ('regression' or 'classification')
    """
    logging.info(f'[model_training] Starting {model_type} model training')
    start_time = time.time()
    
    try:
        from sklearn.linear_model import LinearRegression, LogisticRegression
        from sklearn.model_selection import train_test_split
        from sklearn.metrics import r2_score, mean_squared_error, accuracy_score, classification_report
        from sklearn.preprocessing import StandardScaler
        from sklearn.pipeline import Pipeline
        import pickle
        
        # Load features
        df = pd.read_parquet(input_path)
        logging.info(f'[model_training] Loaded {len(df)} records for training')
        
        # Prepare features and target
        feature_cols = [
            'S_LI_INITIATIVEBUYRATE',
            'S_LI_LARGEBUYRATE', 
            'S_LI_LARGESELLRATE',
            'buy_sell_ratio',
            'large_order_imbalance'
        ]
        
        # Filter to available columns
        available_features = [col for col in feature_cols if col in df.columns]
        X = df[available_features].fillna(0)
        y = df['S_LI_INITIATIVESELLRATE'].fillna(0)
        
        # Train-test split
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42
        )
        
        # Train model based on type
        if model_type == 'regression':
            model = LinearRegression()
            model.fit(X_train, y_train)
            y_pred = model.predict(X_test)
            
            metrics = {
                'r2_score': float(r2_score(y_test, y_pred)),
                'rmse': float(np.sqrt(mean_squared_error(y_test, y_pred))),
                'model_type': 'regression'
            }
            
        else:  # classification
            # Convert to binary classification
            y_binary = (y > y.median()).astype(int)
            y_train_clf = y_binary[X_train.index]
            y_test_clf = y_binary[X_test.index]
            
            model = Pipeline([
                ('scaler', StandardScaler()),
                ('classifier', LogisticRegression(random_state=42))
            ])
            
            model.fit(X_train, y_train_clf)
            y_pred = model.predict(X_test)
            
            metrics = {
                'accuracy': float(accuracy_score(y_test_clf, y_pred)),
                'classification_report': classification_report(y_test_clf, y_pred, output_dict=True),
                'model_type': 'classification',
                'threshold': float(y.median())
            }
        
        # Create output directory
        Path(output_path).parent.mkdir(parents=True, exist_ok=True)
        
        # Save model
        model_data = {
            'model': model,
            'feature_names': available_features,
            'metrics': metrics,
            'training_metadata': {
                'trained_at': datetime.utcnow().isoformat(),
                'training_samples': len(X_train),
                'test_samples': len(X_test),
                'input_features': available_features
            }
        }
        
        with open(output_path, 'wb') as f:
            pickle.dump(model_data, f)
        
        # Save training report
        report_path = output_path.replace('.pkl', '_training_report.json')
        with open(report_path, 'w') as f:
            json.dump({
                'metrics': metrics,
                'training_metadata': model_data['training_metadata'],
                'processing_time_seconds': time.time() - start_time
            }, f, indent=2, default=str)
        
        logging.info(f'[model_training] Model performance: {metrics}')
        logging.info(f'[model_training] Saved model to {output_path}')
        logging.info(f'[model_training] Saved report to {report_path}')
        logging.info(f'[model_training] Completed in {time.time() - start_time:.2f} seconds')
        
    except Exception as e:
        logging.error(f'[model_training] Error: {str(e)}')
        raise

# === CLI INTERFACE ===
def main(argv=None):
    parser = argparse.ArgumentParser(description='High-Frequency Trading Factor Pipeline Tasks')
    parser.add_argument('task', choices=['feature_engineering', 'model_training'], 
                       help='Task to execute')
    parser.add_argument('--input', required=True, help='Input file path')
    parser.add_argument('--output', required=True, help='Output file path')
    parser.add_argument('--config', help='Configuration file path')
    parser.add_argument('--model-type', default='regression', 
                       choices=['regression', 'classification'],
                       help='Model type for training task')
    parser.add_argument('--log-level', default='INFO', 
                       choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'],
                       help='Logging level')
    
    args = parser.parse_args(argv)
    
    # Configure logging
    logging.basicConfig(
        level=getattr(logging, args.log_level),
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers=[logging.StreamHandler(sys.stdout)]
    )
    
    # Execute task
    try:
        if args.task == 'feature_engineering':
            feature_engineering_task(args.input, args.output, args.config)
        elif args.task == 'model_training':
            model_training_task(args.input, args.output, args.model_type)
            
        logging.info(f'[main] Task {args.task} completed successfully')
        
    except Exception as e:
        logging.error(f'[main] Task {args.task} failed: {str(e)}')
        sys.exit(1)

# === EXAMPLE USAGE ===
if __name__ == '__main__':
    # Simulate feature engineering task
    print("🔧 Example: Feature Engineering Task")
    print("=" * 50)
    
    # Create sample input data
    sample_input = 'data/sample_validated_data.parquet'
    Path(sample_input).parent.mkdir(parents=True, exist_ok=True)
    
    # Generate sample trading data
    np.random.seed(42)
    sample_data = pd.DataFrame({
        'S_LI_INITIATIVEBUYRATE': np.random.uniform(0.3, 0.7, 100),
        'S_LI_INITIATIVESELLRATE': np.random.uniform(0.3, 0.7, 100),
        'S_LI_LARGEBUYRATE': np.random.uniform(0.1, 0.3, 100),
        'S_LI_LARGESELLRATE': np.random.uniform(0.1, 0.3, 100),
        'timestamp': pd.date_range('2023-01-01', periods=100, freq='1min')
    })
    sample_data.to_parquet(sample_input, index=False)
    
    # Run feature engineering
    main(['feature_engineering', '--input', sample_input, '--output', 'data/features.parquet'])
    
    print("\n🤖 Example: Model Training Task")
    print("=" * 50)
    
    # Run model training
    main(['model_training', '--input', 'data/features.parquet', '--output', 'models/trained_model.pkl'])

🔧 Example: Feature Engineering Task
2025-08-28 11:48:18,239 - INFO - [feature_engineering] Starting feature engineering task
2025-08-28 11:48:18,256 - INFO - [feature_engineering] Loaded 100 records from data/sample_validated_data.parquet
2025-08-28 11:48:18,259 - INFO - [feature_engineering] Created 10 features
2025-08-28 11:48:18,259 - INFO - [feature_engineering] Saved features to data/features.parquet
2025-08-28 11:48:18,259 - INFO - [feature_engineering] Saved metadata to data/features_metadata.json
2025-08-28 11:48:18,259 - INFO - [feature_engineering] Completed in 0.02 seconds
2025-08-28 11:48:18,260 - INFO - [main] Task feature_engineering completed successfully

🤖 Example: Model Training Task
2025-08-28 11:48:18,260 - INFO - [model_training] Starting regression model training
2025-08-28 11:48:18,808 - INFO - [model_training] Loaded 100 records for training
2025-08-28 11:48:18,810 - INFO - [model_training] Model performance: {'r2_score': 0.8992968676832939, 'rmse': 0.0387905983

### Optional: Simple Retry Wrapper (fill in)
Add a small retry with linear backoff to harden a task.

In [5]:
import time
import random
import logging
from functools import wraps
from typing import Callable, Any, Type, Tuple

def retry(n_tries: int = 3, 
          delay: float = 0.2, 
          backoff: float = 2.0, 
          jitter: bool = True,
          exceptions: Tuple[Type[Exception], ...] = (Exception,)):
    """
    Advanced retry decorator with exponential backoff and jitter for production trading systems.
    
    Args:
        n_tries: Number of retry attempts
        delay: Initial delay between retries (seconds)
        backoff: Exponential backoff multiplier
        jitter: Add random jitter to prevent thundering herd
        exceptions: Tuple of exceptions to catch and retry
    """
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            last_exception = None
            
            for attempt in range(n_tries):
                try:
                    return func(*args, **kwargs)
                    
                except exceptions as e:
                    last_exception = e
                    
                    if attempt == n_tries - 1:  # Last attempt
                        logging.error(f'[retry] {func.__name__} failed after {n_tries} attempts: {str(e)}')
                        raise e
                    
                    # Calculate delay with exponential backoff
                    current_delay = delay * (backoff ** attempt)
                    
                    # Add jitter to prevent thundering herd
                    if jitter:
                        current_delay *= (0.5 + random.random())
                    
                    logging.warning(f'[retry] {func.__name__} attempt {attempt + 1}/{n_tries} failed: {str(e)}. '
                                  f'Retrying in {current_delay:.2f} seconds...')
                    
                    time.sleep(current_delay)
                    
                except Exception as e:
                    # Non-retryable exception
                    logging.error(f'[retry] {func.__name__} failed with non-retryable exception: {str(e)}')
                    raise e
            
            # Should never reach here, but just in case
            raise last_exception
            
        return wrapper
    return decorator

# === CIRCUIT BREAKER PATTERN ===
class CircuitBreaker:
    """
    Circuit breaker pattern for high-frequency trading system resilience.
    Prevents cascading failures by failing fast when error rates are high.
    """
    
    def __init__(self, 
                 failure_threshold: int = 5,
                 recovery_timeout: float = 60.0,
                 expected_exception: Type[Exception] = Exception):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception
        
        # State tracking
        self.failure_count = 0
        self.last_failure_time = None
        self.state = 'CLOSED'  # CLOSED, OPEN, HALF_OPEN
        
    def call(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function with circuit breaker protection"""
        
        if self.state == 'OPEN':
            if self._should_attempt_reset():
                self.state = 'HALF_OPEN'
                logging.info(f'[circuit_breaker] Attempting to reset circuit for {func.__name__}')
            else:
                raise Exception(f'Circuit breaker OPEN for {func.__name__}. Last failure: {self.last_failure_time}')
        
        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
            
        except self.expected_exception as e:
            self._on_failure()
            raise e
    
    def _should_attempt_reset(self) -> bool:
        """Check if enough time has passed to attempt reset"""
        return (time.time() - self.last_failure_time) >= self.recovery_timeout
    
    def _on_success(self):
        """Handle successful execution"""
        self.failure_count = 0
        self.state = 'CLOSED'
        
    def _on_failure(self):
        """Handle failed execution"""
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = 'OPEN'
            logging.warning(f'[circuit_breaker] Circuit opened after {self.failure_count} failures')

# === EXAMPLE USAGE WITH TRADING TASKS ===

# Create circuit breaker for external API calls
api_circuit_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30.0)

@retry(n_tries=3, delay=0.5, exceptions=(ConnectionError, TimeoutError))
def fetch_market_data(source_url: str) -> dict:
    """
    Simulate fetching market data with retry and circuit breaker protection.
    """
    logging.info(f'[fetch_market_data] Fetching data from {source_url}')
    
    # Simulate network issues
    if random.random() < 0.3:  # 30% chance of failure
        raise ConnectionError(f"Failed to connect to {source_url}")
    
    # Simulate successful data fetch
    return {
        'timestamp': time.time(),
        'data': f'market_data_from_{source_url}',
        'status': 'success'
    }

def safe_fetch_market_data(source_url: str) -> dict:
    """Market data fetch with circuit breaker protection"""
    return api_circuit_breaker.call(fetch_market_data, source_url)

@retry(n_tries=2, delay=1.0, exceptions=(ValueError, RuntimeError))
def process_trading_signals(data: dict) -> dict:
    """
    Process trading signals with retry for transient failures.
    """
    logging.info('[process_trading_signals] Processing trading signals')
    
    if not data or 'data' not in data:
        raise ValueError("Invalid data format")
    
    # Simulate processing
    if random.random() < 0.2:  # 20% chance of processing error
        raise RuntimeError("Signal processing error")
    
    return {
        'signals': ['BUY', 'HOLD', 'SELL'],
        'confidence': random.uniform(0.7, 0.95),
        'processed_at': time.time()
    }

# === DEMONSTRATION ===
print("🔄 Testing Retry Mechanisms for Trading System")
print("=" * 60)

# Test retry with success after failures
try:
    result = safe_fetch_market_data("https://api.trading-data.com/live")
    print(f"✅ Market data fetch successful: {result['status']}")
except Exception as e:
    print(f"❌ Market data fetch failed: {str(e)}")

# Test processing with retry
try:
    sample_data = {'data': 'trading_signals', 'timestamp': time.time()}
    signals = process_trading_signals(sample_data)
    print(f"✅ Signal processing successful: {len(signals['signals'])} signals generated")
except Exception as e:
    print(f"❌ Signal processing failed: {str(e)}")

print("\n Circuit Breaker Status:")
print(f"State: {api_circuit_breaker.state}")
print(f"Failure Count: {api_circuit_breaker.failure_count}")
print(f"Last Failure: {api_circuit_breaker.last_failure_time}")

print("\n Production Considerations:")
print("• Retry mechanisms prevent transient failures from disrupting trading")
print("• Circuit breakers protect against cascading failures during market stress")
print("• Exponential backoff with jitter prevents thundering herd effects")
print("• Structured logging enables rapid incident response")
print("• Configuration allows tuning for different criticality levels")

🔄 Testing Retry Mechanisms for Trading System
2025-08-28 11:48:18,822 - INFO - [fetch_market_data] Fetching data from https://api.trading-data.com/live
2025-08-28 11:48:19,267 - INFO - [fetch_market_data] Fetching data from https://api.trading-data.com/live
2025-08-28 11:48:20,221 - INFO - [fetch_market_data] Fetching data from https://api.trading-data.com/live
2025-08-28 11:48:20,221 - ERROR - [retry] fetch_market_data failed after 3 attempts: Failed to connect to https://api.trading-data.com/live
❌ Market data fetch failed: Failed to connect to https://api.trading-data.com/live
2025-08-28 11:48:20,222 - INFO - [process_trading_signals] Processing trading signals
✅ Signal processing successful: 3 signals generated

 Circuit Breaker Status:
State: CLOSED
Failure Count: 1
Last Failure: 1756396100.2222478

 Production Considerations:
• Retry mechanisms prevent transient failures from disrupting trading
• Circuit breakers protect against cascading failures during market stress
• Exponenti