# 📊 KSA Transition Risk Analysis — Cement Sector Model

**Author:** Saveeza Aziz  
**Version:** 2.0 (Production-Ready)  
**Objective:** This notebook performs a quantitative stress test on the Saudi Arabian cement sector to model the financial impact of carbon pricing. It serves as the analytical engine for the findings presented in the main article.

### Pipeline Stages:
1. **Setup & Configuration:** Loads all dependencies, establishes a robust project path, and loads the master `config.yaml` file.
2. **Data Loading & Validation:** Ingests and validates all source data, ensuring integrity before processing.
3. **Analysis Functions:** Defines the core vectorized functions for the stress test.
4. **Main Execution:** Orchestrates the full pipeline from loading to saving results.
5. **Self-Testing:** Includes a simple smoke test to ensure the pipeline runs without error.

---

## 1. Setup & Preliminaries

This cell handles all initial setup, including importing libraries, configuring the professional logging framework, and robustly locating the project's root directory and configuration file.

In [None]:
# --- Dependency Guard ---
try:
    import pandas as pd
    import numpy as np
    from pathlib import Path
    import sys
    import os
    import logging
    from datetime import datetime, timezone
    import yaml
    from typing import Dict, Optional, Tuple
except ImportError as e:
    print(f"FATAL ERROR: A critical library is missing: {e}. Please install requirements.")
    sys.exit(1)

# --- Conditional Import for Display ---
try:
    from IPython.display import display
    IS_JUPYTER = True
except ImportError:
    IS_JUPYTER = False
    def display(df: pd.DataFrame):
        """Fallback display function for non-Jupyter environments."""
        print(df.head(20).to_string())

# --- Centralized Constants for Magic Strings ---
class ConfigKeys:
    PATHS = 'paths'
    DATA_DIR = 'data_dir'
    RESULTS_DIR = 'results_dir'
    FILES = 'files'
    ARCHETYPES_FILE = 'archetypes_file'
    ASSUMPTIONS_FILE = 'assumptions_file'
    CONSTANTS = 'constants'
    SECTORS = 'sectors'
    CEMENT_SECTOR = 'cement'
    SHEET_NAME = 'sheet_name'
    REQUIRED_COLS = 'required_columns'
    METADATA = 'metadata'
    VERSION = 'version'

# --- Logging Configuration ---
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
if not logger.handlers:
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s')
    stream_handler = logging.StreamHandler(sys.stdout)
    stream_handler.setFormatter(formatter)
    logger.addHandler(stream_handler)

# --- Robust Path & Configuration Management ---
def find_project_root(anchor: str = '.git') -> Path:
    """Finds project root by searching upwards for an anchor, with fallbacks."""
    if 'PROJECT_ROOT' in os.environ:
        return Path(os.environ['PROJECT_ROOT'])
    path = Path.cwd()
    while path.parent != path:
        if (path / anchor).exists(): return path
        path = path.parent
    if (Path.cwd() / 'config.yaml').exists():
        return Path.cwd()
    raise FileNotFoundError("Project root not found. Set PROJECT_ROOT env var or run from repo root.")

try:
    BASE_DIR = find_project_root()
    with open(BASE_DIR / 'config.yaml', 'r') as f:
        config = yaml.safe_load(f)
        if not isinstance(config, dict):
            raise ValueError("config.yaml is empty or malformed.")

    RESULTS_DIR = BASE_DIR / config[ConfigKeys.PATHS][ConfigKeys.RESULTS_DIR]
    RESULTS_DIR.mkdir(parents=True, exist_ok=True)
    LOG_FILE_PATH = RESULTS_DIR / f"analysis_log_{datetime.now(timezone.utc).strftime('%Y%m%d')}.log"
    
    file_handler = logging.FileHandler(LOG_FILE_PATH, encoding='utf-8')
    file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s'))
    logger.addHandler(file_handler)
    
    CONSTANTS = {k: int(v) for k, v in config.get(ConfigKeys.CONSTANTS, {}).items()}
    REQUIRED_CONSTANTS = ['tons_per_megaton', 'sar_per_billion']
    missing_consts = [c for c in REQUIRED_CONSTANTS if c not in CONSTANTS]
    if missing_consts:
        raise ValueError(f"Missing required constants in config.yaml: {missing_consts}")
    
    logger.info(f"Project root found and configured: {BASE_DIR}")

except (FileNotFoundError, yaml.YAMLError, KeyError, ValueError) as e:
    logging.critical(f"FATAL ERROR during initial setup: {e}")
    raise SystemExit(1)

## 2. Data Loading & Validation Functions

These functions handle the ingestion of source files (`.xlsx`, `.csv`) and perform rigorous validation to ensure data integrity before any analysis is run. This includes checking for missing files, columns, and parameters.

In [None]:
def load_and_validate_data() -> Tuple[pd.DataFrame, pd.DataFrame]:
    """Loads and validates all data sources based on the global config."""
    logger.info("Starting data loading and validation...")
    
    data_dir = BASE_DIR / config[ConfigKeys.PATHS][ConfigKeys.DATA_DIR]
    excel_path = data_dir / config[ConfigKeys.FILES][ConfigKeys.ARCHETYPES_FILE]
    csv_path = data_dir / config[ConfigKeys.FILES][ConfigKeys.ASSUMPTIONS_FILE]['path']
    
    if not excel_path.exists() or not csv_path.exists():
        raise FileNotFoundError("Input file(s) not found.")

    assumptions_df = pd.read_csv(csv_path, index_col='Parameter', encoding='utf-8-sig')
    if 'Value' not in assumptions_df.columns:
        raise KeyError("Column 'Value' missing in assumptions file.")
    if assumptions_df.index.duplicated().any():
        raise ValueError(f"Duplicate parameters found: {assumptions_df.index[assumptions_df.index.duplicated()].unique().tolist()}")

    xls = pd.ExcelFile(excel_path, engine='openpyxl')
    sheet_name = config[ConfigKeys.SECTORS][ConfigKeys.CEMENT_SECTOR][ConfigKeys.SHEET_NAME]
    if sheet_name not in xls.sheet_names:
        raise ValueError(f"Sheet '{sheet_name}' not found in Excel file.")
    cement_df = pd.read_excel(xls, sheet_name=sheet_name)
    
    required_cols = config[ConfigKeys.SECTORS][ConfigKeys.CEMENT_SECTOR][ConfigKeys.REQUIRED_COLS]
    if not all(col in cement_df.columns for col in required_cols):
        raise ValueError(f"Required columns missing: {set(required_cols) - set(cement_df.columns)}")
        
    logger.info("✅ Data files loaded successfully.")
    return cement_df, assumptions_df

def process_and_validate_inputs(cement_df: pd.DataFrame, assumptions_df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """Cleans and validates dataframes before analysis."""
    logger.info("Processing and validating inputs...")
    
    assumptions_df['Value'] = pd.to_numeric(assumptions_df['Value'], errors='coerce')
    
    required_params = ['FX_Rate_USD_SAR', 'Carbon_Cost_Pass_Through_Rate', 'CCUS_CAPEX_Cost']
    missing_params = [p for p in required_params if p not in assumptions_df.index or pd.isna(assumptions_df.at[p, 'Value'])]
    if missing_params:
        raise ValueError(f"Missing required assumptions: {missing_params}")
    
    cement_df['Debt_to_EBITDA_Ratio'] = pd.to_numeric(cement_df['Debt_to_EBITDA_Ratio'].astype(str).replace('x', '', regex=False), errors='coerce')
    n_debt_nan = cement_df['Debt_to_EBITDA_Ratio'].isna().sum()
    if n_debt_nan > 0:
        logger.warning(f"{n_debt_nan} 'Debt_to_EBITDA_Ratio' values could not be coerced to numeric and were set to NaN.")
        if n_debt_nan / len(cement_df) > 0.3:
            raise ValueError("Over 30% of Debt_to_EBITDA_Ratio values are invalid, halting analysis.")

    cement_df['Total_Debt_SAR_bn'] = cement_df['Reported_EBITDA_SAR_bn'] * cement_df['Debt_to_EBITDA_Ratio']
    
    logger.info("✅ Inputs processed and validated successfully.")
    return cement_df, assumptions_df

## 3. Analysis & Export Functions

This section contains the core analytical engine. The `run_analysis` function performs the vectorized stress test based on the validated inputs and scenarios. The `save_results` function handles the clean export of the final dataframe.

In [None]:
def run_analysis(cement_df: pd.DataFrame, assumptions_df: pd.DataFrame) -> pd.DataFrame:
    """Runs the vectorized stress test and CAPEX analysis."""
    logger.info("Running core analysis...")
    results_df = cement_df.copy()
    
    def get_param(param_name: str, required: bool = True):
        if param_name not in assumptions_df.index or pd.isna(assumptions_df.at[param_name, 'Value']):
            if required: raise ValueError(f"Required assumption '{param_name}' is missing or invalid.")
            return None
        return assumptions_df.at[param_name, 'Value']

    fx_rate = get_param('FX_Rate_USD_SAR')
    pass_through = get_param('Carbon_Cost_Pass_Through_Rate')
    ccus_cost = get_param('CCUS_CAPEX_Cost')
    
    carbon_df = assumptions_df.filter(like='Carbon_Price_')
    if carbon_df.empty:
        raise ValueError("No 'Carbon_Price_' scenarios found in assumptions file.")
    scenarios = carbon_df['Value'].to_dict()

    with np.errstate(divide='ignore', invalid='ignore'):
        for param_name, carbon_price in scenarios.items():
            scenario_name = param_name.replace('Carbon_Price_', '').replace('_Scenario', '')
            total_cost = (results_df['Scope_1_2_Emissions_MtCO2'] * CONSTANTS['tons_per_megaton'] * carbon_price * fx_rate) / CONSTANTS['sar_per_billion']
            net_cost = total_cost * (1 - pass_through)
            
            stressed_col = f'Stressed_EBITDA_{scenario_name}'
            results_df[stressed_col] = results_df['Reported_EBITDA_SAR_bn'] - net_cost
            
            leverage = (results_df['Total_Debt_SAR_bn'] / results_df[stressed_col]).replace([np.inf, -np.inf], np.nan)
            results_df.loc[:, f'Stressed_Leverage_{scenario_name}'] = leverage

    logger.info("✅ Analysis complete.")
    return results_df

def save_results(df: pd.DataFrame, config: Dict):
    """Saves the results dataframe to a versioned and timestamped CSV."""
    version = config.get(ConfigKeys.METADATA, {}).get(ConfigKeys.VERSION, '0.0.0')
    timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
    filename = f'cement_stress_test_results_v{version}_{timestamp}.csv'
    output_path = RESULTS_DIR / filename
    df.to_csv(output_path, index=False, encoding='utf-8-sig')
    logger.info(f"Results successfully exported to: {output_path}")

## 4. Main Execution Script

This section contains the `main()` function which orchestrates the entire pipeline, calling the validation, processing, and analysis functions in the correct order. It also handles top-level error catching.

In [None]:
def main() -> Optional[pd.DataFrame]:
    """
    Orchestrates the entire analysis pipeline from data loading to saving results.
    
    Returns:
        Optional[pd.DataFrame]: Final results dataframe if successful, else None.
    """
    try:
        df_raw, assumptions_raw = load_and_validate_data()
        df_clean, assumptions_clean = process_and_validate_inputs(df_raw, assumptions_raw)
        final_results = run_analysis(df_clean, assumptions_clean)
        save_results(final_results, config)
        
        if IS_JUPYTER:
            logger.info("Displaying first 20 rows of results in Jupyter.")
            display(final_results.head(20).fillna('Insolvent'))
        
        logger.info("--- Pipeline finished successfully. ---")
        return final_results
        
    except Exception:
        logger.exception("FATAL ERROR: Pipeline failed.")
        return None

## 5. Script Execution & Self-Testing

The final block executes the `main()` function. It also contains a simple 'smoke test' function that can be uncommented to verify the pipeline's integrity.

In [None]:
def test_pipeline_execution():
    """Simple 'smoke test' to ensure the main pipeline runs without crashing."""
    logger.info("\n--- RUNNING SMOKE TEST ---")
    result = main()
    if result is None or not isinstance(result, pd.DataFrame) or result.empty:
        raise AssertionError("Smoke Test FAILED: Pipeline did not return a valid DataFrame.")
    logger.info("✅ SMOKE TEST PASSED: Pipeline executed and returned a valid DataFrame.")

# This standard Python construct ensures main() is called only when the script is executed directly.
if __name__ == "__main__":
    # To run the main analysis:
    main_results = main()
    
    # To run the self-test (uncomment the line below and comment out the line above):
    # try:
    #     test_pipeline_execution()
    # except AssertionError as e:
    #     logger.error(e)
    #     sys.exit(1)
        
    if main_results is None:
        # Exit with a non-zero status code to indicate failure in automated environments.
        sys.exit(1)