# Curve Formation Pipeline Orchestrator

This notebook orchestrates the end-to-end curve formation pipeline, including:
1. Market data ingestion and validation
2. Curve calculation for different asset classes
3. Data quality checks and monitoring
4. Output persistence and notifications

## Parameters
- `env`: Environment (dev/staging/prod)
- `trade_date`: Trade date for curve construction (YYYY-MM-DD)
- `asset_classes`: List of asset classes to process (comma-separated)
- `notification_email`: Email for notifications

In [None]:
# Curve Formation Pipeline Orchestrator
import os
import sys
from datetime import datetime, timedelta
from typing import Any
from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F

# Add project root to Python path
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
if project_root not in sys.path:
    sys.path.append(project_root)

from src.curve_formation.core import processor, writer
from src.utils import config_manager
from src.utils.data_quality import validator
from src.utils.exception_handler import handle_exception
from src.utils.logging_utils import get_logger
from src.utils.spark_utils import create_spark_session

In [None]:
# Initialize Databricks utilities
try:
    from pyspark.dbutils import DBUtils
    from pyspark.sql import SparkSession

    # Create a new SparkSession if running locally
    if 'spark' not in locals():
        spark = SparkSession.builder.getOrCreate()
    
    # Initialize dbutils if not already available
    if 'dbutils' not in locals():
        dbutils = DBUtils(spark)
except Exception as e:
    print(f"Failed to initialize Databricks utilities: {str(e)}")
    raise

In [None]:
# Initialize logging and configuration
logger = get_logger(__name__)

try:
    # Get pipeline parameters using Databricks widgets
    dbutils.widgets.text("env", "dev", "Environment")
    dbutils.widgets.text("trade_date", datetime.now().strftime("%Y-%m-%d"), "Trade Date")
    dbutils.widgets.text("asset_classes", "IR,FX,CREDIT", "Asset Classes")
    dbutils.widgets.text("notification_email", "", "Notification Email")

    env = dbutils.widgets.get("env")
    trade_date = datetime.strptime(dbutils.widgets.get("trade_date"), "%Y-%m-%d")
    asset_classes = dbutils.widgets.get("asset_classes").split(",")
    notification_email = dbutils.widgets.get("notification_email")
except Exception as e:
    logger.warning(f"Could not initialize Databricks widgets: {str(e)}")
    # Use default values if widgets are not available
    env = "dev"
    trade_date = datetime.now()
    asset_classes = ["IR", "FX", "CREDIT"]
    notification_email = ""

# Load configuration
config = config_manager.load_config(env)
logger.info(f"Loaded configuration for environment: {env}")

# Initialize Spark session
spark = create_spark_session(f"Curve Formation - {env}")
logger.info("Initialized Spark session")

In [None]:
# Load and validate market data
@handle_exception("Error loading market data")
def load_market_data(trade_date: datetime, asset_classes: list[str], config: dict[str, Any]) -> dict[str, DataFrame]:
    """Load market data for specified asset classes and validate
    
    Args:
        trade_date: Date for which to load market data
        asset_classes: List of asset classes to process
        config: Configuration dictionary
        
    Returns:
        Dictionary mapping asset class to market data DataFrame
        
    Raises:
        ValueError: If data quality validation fails
    """
    logger.info(f"Loading market data for {trade_date} - Asset Classes: {asset_classes}")
    
    market_data = {}
    
    for asset_class in asset_classes:
        # Load market data from Delta table
        df = spark.table(f"{config['catalog']}.{config['schemas']['market_data']}.market_data") \
            .filter(f"trade_date = '{trade_date.strftime('%Y-%m-%d')}' AND asset_class = '{asset_class}'")
        
        # Validate data quality
        validation_config = config['data_quality'][asset_class]
        if not validator.validate_market_data(df, validation_config):
            raise ValueError(f"Data quality validation failed for {asset_class}")
            
        market_data[asset_class] = df
        logger.info(f"Successfully loaded and validated market data for {asset_class}")
    
    return market_data

# Load market data for processing
market_data = load_market_data(trade_date, asset_classes, config)
logger.info("Market data loading complete")

In [None]:
# Process curves for each asset class
@handle_exception("Error processing curves")
def process_curves(spark: SparkSession, market_data: dict[str, DataFrame], trade_date: datetime, config: dict[str, Any]) -> dict[str, dict[str, DataFrame]]:
    """Construct curves for each asset class
    
    Args:
        spark: Spark session
        market_data: Dictionary mapping asset class to market data DataFrame
        trade_date: Date for curve construction
        config: Configuration dictionary
        
    Returns:
        Nested dictionary mapping asset class to curve name to curve DataFrame
        
    Raises:
        ValueError: If curve validation fails
    """
    logger.info("Starting curve construction process")
    
    processed_curves = {}
    
    for asset_class, data in market_data.items():
        logger.info(f"Processing curves for {asset_class}")
        
        # Construct curves
        curves = processor.process_all_curves(
            spark,
            {asset_class: data},
            config
        )
        
        # Validate curve outputs
        validation_config = config['data_quality'][f'{asset_class}_curves']
        
        for curve_name, curve_data in curves.items():
            if not validator.validate_market_data(curve_data, validation_config):
                raise ValueError(f"Curve validation failed for {curve_name}")
        
        # Write curves to Delta table
        for curve_name, curve_df in curves.items():
            table_name = f"{config['catalog']}.{config['schemas']['curves']}.{curve_name}_{trade_date.strftime('%Y%m%d')}"
            writer.write_output(spark, curve_df, table_name)
            
        processed_curves[asset_class] = curves
        logger.info(f"Successfully processed and stored curves for {asset_class}")
    
    return processed_curves

# Process curves
processed_curves = process_curves(spark, market_data, trade_date, config)
logger.info("Curve processing complete")

In [None]:
# Generate monitoring metrics and send notifications
@handle_exception("Error generating monitoring metrics")
def generate_monitoring_metrics(spark: SparkSession, processed_curves: dict[str, dict[str, DataFrame]], trade_date: datetime, config: dict[str, Any]) -> None:
    """Generate monitoring metrics and send notifications
    
    Args:
        spark: Spark session
        processed_curves: Nested dictionary mapping asset class to curve name to curve DataFrame
        trade_date: Date of curve construction
        config: Configuration dictionary
    """
    logger.info("Generating monitoring metrics")
    
    def calculate_curve_metrics(curve_df: DataFrame) -> dict[str, float]:
        """Pure function to calculate metrics for a single curve
        
        Args:
            curve_df: DataFrame containing curve data
            
        Returns:
            Dictionary containing calculated metrics
        """
        stats = curve_df.agg(
            F.min('tenor').alias('min_tenor'),
            F.max('tenor').alias('max_tenor'),
            F.min('value').alias('min_value'),
            F.max('value').alias('max_value'),
            F.count('*').alias('points_count')
        ).collect()[0]
        
        return {
            'points_count': stats['points_count'],
            'min_tenor': float(stats['min_tenor']),
            'max_tenor': float(stats['max_tenor']),
            'min_value': float(stats['min_value']),
            'max_value': float(stats['max_value'])
        }
    
    metrics = {}
    for asset_class, curves in processed_curves.items():
        metrics[asset_class] = {
            'curve_count': len(curves),
            'timestamp': datetime.now().isoformat(),
            'trade_date': trade_date.strftime('%Y-%m-%d')
        }
        
        # Calculate curve-specific metrics using pure function
        metrics[asset_class].update({
            curve_name: calculate_curve_metrics(curve_data)
            for curve_name, curve_data in curves.items()
        })
    
    # Write metrics to monitoring table
    metrics_df = spark.createDataFrame([metrics])
    metrics_df.write.format('delta').mode('append').saveAsTable(
        f"{config['catalog']}.{config['schemas']['monitoring']}.curve_metrics"
    )
    
    # Send notification if configured
    if notification_email:
        send_notification(
            notification_email,
            f"Curve Formation Pipeline Complete - {trade_date.strftime('%Y-%m-%d')}",
            f"Processed curves for asset classes: {', '.join(processed_curves.keys())}\n"
            f"Metrics: {metrics}"
        )
    
    logger.info("Monitoring metrics generated and notifications sent")

# Generate metrics and send notifications
generate_monitoring_metrics(spark, processed_curves, trade_date, config)
logger.info("Pipeline execution complete")

In [None]:
def send_notification(email: str, subject: str, body: str) -> None:
    """Send email notification
    
    Args:
        email: Recipient email address
        subject: Email subject
        body: Email body text
    """
    try:
        # Here you would implement your email sending logic
        # For example using AWS SES, SMTP, or other email service
        logger.info(f"Sending notification to {email}")
        logger.info(f"Subject: {subject}")
        logger.info(f"Body: {body}")
        # TODO: Implement actual email sending
        pass
    except Exception as e:
        logger.error(f"Failed to send notification: {str(e)}")