In [None]:
import os
import logging
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import shap
from sklearn.ensemble import RandomForestRegressor
from joblib import Parallel, delayed

# Constants
OUTPUT_DIR = "output"
EXCEL_RESULTS_FILE = os.path.join(OUTPUT_DIR, "fpna_results.xlsx")
SHAP_PLOT_FILE = os.path.join(OUTPUT_DIR, "shap_summary.png")

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.FileHandler(os.path.join(OUTPUT_DIR, "fpna_analysis.log")),
        logging.StreamHandler()
    ]
)

def ensure_output_dir():
    """
    Ensure the output directory exists.
    """
    if not os.path.exists(OUTPUT_DIR):
        os.makedirs(OUTPUT_DIR)
        logging.info(f"Created output directory at {OUTPUT_DIR}")
    else:
        logging.info(f"Output directory exists at {OUTPUT_DIR}")

def load_dummy_data(seed: int = 42) -> pd.DataFrame:
    """
    Generate dummy FP&A data for testing.  
    Args:
        seed (int): Random seed for reproducibility.
    Returns:
        pd.DataFrame: Generated dummy data.
    """
    np.random.seed(seed)
    n = 100  # number of records
    
    data = {
        "Country": np.random.choice(["USA", "Germany", "Brazil", "India"], size=n),
        "Department": np.random.choice(["Sales", "R&D", "Finance", "Operations"], size=n),
        "Account": np.random.choice(["Revenue", "Cost", "Profit", "Investment"], size=n),
        "Budget": np.random.uniform(10000, 50000, size=n),
        "Actual": np.random.uniform(9000, 55000, size=n),
        "Month": np.random.choice(pd.date_range("2024-01-01", periods=12, freq='M')_
                                  .strftime('%Y-%m'), size=n),
        "Historical_Variance": np.random.normal(0, 0.05, size=n),  # +/- 5% variance
        "Market_Index": np.random.uniform(0.8, 1.2, size=n),
        "Operational_Efficiency": np.random.uniform(0.7, 1.1, size=n)
    }
    
    df = pd.DataFrame(data)
    logging.info(f"Dummy data loaded with shape {df.shape}")
    return df


def calculate_rule_scores(df: pd.DataFrame) -> pd.DataFrame:
    """
    Calculate rule-based anomaly scores for each record.
    
    Args:
        df (pd.DataFrame): Input FP&A data.
    
    Returns:
        pd.DataFrame: DataFrame with added anomaly score columns.
    """
    logging.info("Calculating rule-based anomaly scores.")
    df = df.copy()
    
    # Avoid division by zero
    df['Budget'] = df['Budget'].replace(0, np.nan)
    
    # Rule 1: Variance percentage
    df['Variance_Pct'] = (df['Actual'] - df['Budget']) / df['Budget']
    
    # Rule 2: Historical variance deviation
    df['Historical_Adj'] = df['Variance_Pct'] - df['Historical_Variance']
    
    # Rule 3: Market impact adjustment
    df['Market_Adj'] = df['Variance_Pct'] / df['Market_Index']
    
    # Rule 4: Operational efficiency adjustment
    df['OpEff_Adj'] = df['Variance_Pct'] / df['Operational_Efficiency']
    
    # Combine rules into a composite anomaly score (weighted sum)
    weights = {
        'Variance_Pct': 0.4,
        'Historical_Adj': 0.2,
        'Market_Adj': 0.2,
        'OpEff_Adj': 0.2
    }
    
    df['Anomaly_Score'] = (
        weights['Variance_Pct'] * df['Variance_Pct'].abs() +
        weights['Historical_Adj'] * df['Historical_Adj'].abs() +
        weights['Market_Adj'] * df['Market_Adj'].abs() +
        weights['OpEff_Adj'] * df['OpEff_Adj'].abs()
    )
    
    # Normalize anomaly score between 0 and 1
    max_score = df['Anomaly_Score'].max()
    if max_score > 0:
        df['Anomaly_Score'] = df['Anomaly_Score'] / max_score
    else:
        df['Anomaly_Score'] = 0
    
    logging.info("Rule-based anomaly scores calculated.")
    return df

def parallel_rule_scores(df: pd.DataFrame) -> pd.DataFrame:
    """
    Parallelize rule score calculation for large datasets.
    
    Args:
        df (pd.DataFrame): Input FP&A data.
    
    Returns:
        pd.DataFrame: DataFrame with anomaly scores.
    """
    logging.info("Starting parallel anomaly score calculation.")
    n_jobs = min(4, os.cpu_count() or 1)
    splits = np.array_split(df, n_jobs)
    
    results = Parallel(n_jobs=n_jobs)(
        delayed(calculate_rule_scores)(split) for split in splits
    )
    
    combined_df = pd.concat(results, axis=0).reset_index(drop=True)
    logging.info("Parallel anomaly score calculation completed.")
    return combined_df

def explain_with_shap(df: pd.DataFrame, feature_cols: 
                      list, target_col: str = "Anomaly_Score")
                        -> None:
    """
    Generate SHAP explanations and save summary plot.
    
    Args:
        df (pd.DataFrame): DataFrame containing features and target.
        feature_cols (list): List of feature column names.
        target_col (str): Name of the target column for explanation.
    """
    logging.info("Starting SHAP explainability analysis.")
    try:
        X = df[feature_cols]
        y = df[target_col]
        
        model = RandomForestRegressor(n_estimators=100, random_state=42)
        model.fit(X, y)
        
        explainer = shap.TreeExplainer(model)
        shap_values = explainer.shap_values(X)
        
        plt.figure(figsize=(10, 6))
        shap.summary_plot(shap_values, X, feature_names=feature_cols, show=False)
        plt.tight_layout()
        plt.savefig(SHAP_PLOT_FILE, dpi=300)
        plt.close()
        
        logging.info(f"SHAP summary plot saved to {SHAP_PLOT_FILE}")
    except Exception as e:
        logging.error(f"SHAP explanation failed: {e}", exc_info=True)

def save_to_excel(df: pd.DataFrame, filename: str) -> None:
    """
    Save DataFrame to Excel with conditional formatting and summary.
    
    Args:
        df (pd.DataFrame): DataFrame to save.
        filename (str): Output Excel file path.
    """
    logging.info(f"Saving results to Excel file {filename}")
    try:
        with pd.ExcelWriter(filename, engine='xlsxwriter') as writer:
            df.to_excel(writer, index=False, sheet_name='Results')
            
            workbook = writer.book
            worksheet = writer.sheets['Results']
            
            # Apply conditional formatting on Anomaly_Score column (assumed column J)
            anomaly_col_idx = df.columns.get_loc('Anomaly_Score')
            col_letter = chr(ord('A') + anomaly_col_idx)
            data_range = f"{col_letter}2:{col_letter}{len(df)+1}"
            
            worksheet.conditional_format(data_range, {
                'type': '3_color_scale',
                'min_color': "#63BE7B",
                'mid_color': "#FFEB84",
                'max_color': "#F8696B"
            })
            
            # Write summary statistics in a new sheet
            summary = df.describe()
            summary.to_excel(writer, sheet_name='Summary')
            
        logging.info(f"Excel file saved successfully: {filename}")
    except Exception as e:
        logging.error(f"Failed to save Excel file: {e}", exc_info=True)

def run_analysis() -> pd.DataFrame:
    """
    Main analysis workflow: load data, calculate anomaly scores,
    explain, and save results.
    
    Returns:
        pd.DataFrame: Final DataFrame with anomaly scores.
    """
    try:
        ensure_output_dir()
        
        # Load or generate FP&A data
        df = load_dummy_data()
        
        # Calculate anomaly scores (parallelized)
        df_scored = parallel_rule_scores(df)
        
        # Features used in scoring for SHAP explanation
        feature_cols = ['Variance_Pct', 'Historical_Adj', 'Market_Adj', 'OpEff_Adj']
        
        # Generate SHAP explanations
        explain_with_shap(df_scored, feature_cols)
        
        # Save results to Excel
        save_to_excel(df_scored, EXCEL_RESULTS_FILE)
        
        logging.info("FP&A analysis workflow completed successfully.")
        return df_scored
    except Exception as e:
        logging.error(f"FP&A analysis workflow failed: {e}", exc_info=True)
        return None

def main():
    """
    Command-line entry point for the FP&A explainable AI analysis.
    """
    logging.info("Starting FP&A Explainable AI analysis.")
    result_df = run_analysis()
    if result_df is not None:
        print("\nTop 10 anomalies detected:")
        print(result_df[['Country', 'Department', 'Account', 'Month', 'Anomaly_Score']]_
              .sort_values(
            by='Anomaly_Score', ascending=False).head(10))
    else:
        print("Analysis failed. Please check the logs for details.")

if __name__ == "__main__":
    main()
