In [19]:
import pandas as pd
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.colors
import os
import glob
import logging
from pathlib import Path
from typing import Optional, List, Dict

# --- Configuration ---
PLOTLY_TEMPLATE = "plotly_dark"
BASE_DATA_DIR = os.path.join("data_infra", "data")
RISK_FREE_RATE = 0.02  # Annual risk-free rate for Sharpe/Sortino calculations

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- Helper Functions (Retained) ---

def find_latest_backtest_dir(base_dir: str) -> Optional[str]:
    """Finds the most recently created backtest directory."""
    try:
        list_of_dirs = [d for d in Path(base_dir).iterdir() if d.is_dir() and '_backtest_' in d.name]
        if not list_of_dirs:
            logging.warning(f"No directories matching '*_backtest_*' found in {base_dir}")
            return None
        return str(max(list_of_dirs, key=lambda d: d.stat().st_mtime))
    except FileNotFoundError:
        logging.error(f"Base data directory not found: {base_dir}")
        return None
    except Exception as e:
        logging.error(f"Error finding latest backtest directory: {e}", exc_info=True)
        return None

def load_csv(file_path: str, index_col=None) -> Optional[pd.DataFrame]:
    """Loads a CSV file into a pandas DataFrame with robust error handling."""
    path_obj = Path(file_path)
    if not path_obj.is_file():
        logging.warning(f"CSV file not found: {file_path}")
        return None
    if path_obj.stat().st_size == 0:
        logging.warning(f"CSV file is empty: {file_path}")
        return pd.DataFrame()
    try:
        df = pd.read_csv(file_path, index_col=index_col)
        if 'timestamp' in df.columns:
            df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce')
        elif df.index.name == 'timestamp':
            df.index = pd.to_datetime(df.index, errors='coerce')
        return df
    except Exception as e:
        logging.error(f"Error loading CSV file {file_path}: {e}", exc_info=True)
        return None

# --- Metrics Calculation Engine ---

def calculate_all_metrics(
    df_abs: pd.DataFrame,
    df_trades: pd.DataFrame,
    risk_free_rate: float = RISK_FREE_RATE
) -> Dict[str, float]:
    """Calculates a comprehensive set of annualized performance and risk metrics."""
    metrics = {}
    if df_abs is None or 'portfolio_value' not in df_abs.columns or df_abs.empty: return {}
    equity_curve = df_abs.set_index('timestamp')['portfolio_value'].dropna()
    daily_returns = equity_curve.pct_change().dropna()
    initial_capital = equity_curve.iloc[0]
    final_equity = equity_curve.iloc[-1]
    net_profit = final_equity - initial_capital
    metrics['Net Profit'] = net_profit
    metrics['Cumulative Returns'] = (final_equity / initial_capital) - 1
    total_days = (equity_curve.index[-1] - equity_curve.index[0]).days
    metrics['Annualised Return'] = (1 + metrics['Cumulative Returns']) ** (365.25 / total_days) - 1 if total_days > 0 else 0
    if df_trades is not None and not df_trades.empty:
        df_trades['pnl'] = df_trades['shares'] * df_trades['fill_price'] * np.where(df_trades['signal_type'] == 'SELL', 1, -1)
        gross_profit = df_trades[df_trades['pnl'] > 0]['pnl'].sum()
        gross_loss = abs(df_trades[df_trades['pnl'] < 0]['pnl'].sum())
        metrics['Profit Factor'] = gross_profit / gross_loss if gross_loss > 0 else np.inf
    metrics['Return on Investment (ROI)'] = net_profit / initial_capital if initial_capital > 0 else 0
    rolling_max = equity_curve.cummax()
    drawdown = (equity_curve - rolling_max) / rolling_max
    metrics['Maximum Drawdown'] = drawdown.min()
    annual_volatility = daily_returns.std() * np.sqrt(252)
    metrics['Volatility (Annualised)'] = annual_volatility
    if annual_volatility > 0:
        daily_rf = (1 + risk_free_rate)**(1/252) - 1
        excess_returns = daily_returns - daily_rf
        metrics['Sharpe Ratio (Annualised)'] = (excess_returns.mean() / excess_returns.std()) * np.sqrt(252)
        downside_std = excess_returns[excess_returns < 0].std()
        metrics['Sortino Ratio (Annualised)'] = (excess_returns.mean() / downside_std) * np.sqrt(252) if downside_std > 0 else np.inf
        metrics['Calmar Ratio'] = metrics['Annualised Return'] / abs(metrics['Maximum Drawdown']) if metrics['Maximum Drawdown'] < 0 else np.inf
    else:
        metrics.update({'Sharpe Ratio (Annualised)': 0, 'Sortino Ratio (Annualised)': 0, 'Calmar Ratio': 0})
    winning_days = daily_returns[daily_returns > 0]
    losing_days = daily_returns[daily_returns < 0]
    metrics['Win Rate (% of Days)'] = len(winning_days) / len(daily_returns) if len(daily_returns) > 0 else 0
    metrics['Average Win per Day'] = winning_days.mean()
    metrics['Average Loss per Day'] = losing_days.mean()
    metrics['Payoff Ratio'] = winning_days.mean() / abs(losing_days.mean()) if losing_days.mean() != 0 else np.inf
    if df_trades is not None:
        metrics['Number of Trades'] = len(df_trades)
    return metrics

# --- Visualization Suite ---

def display_metrics_table(metrics: dict, title="Backtest Performance Metrics"):
    """Displays the calculated metrics in a clean, professional table."""
    categories = {
        'Profitability': ['Net Profit', 'Cumulative Returns', 'Annualised Return', 'Profit Factor', 'Return on Investment (ROI)'],
        'Risk (Annualised)': ['Maximum Drawdown', 'Volatility (Annualised)', 'Sharpe Ratio (Annualised)', 'Sortino Ratio (Annualised)', 'Calmar Ratio'],
        'Daily Win/Loss': ['Win Rate (% of Days)', 'Average Win per Day', 'Average Loss per Day', 'Payoff Ratio'],
        'Efficiency': ['Number of Trades']
    }
    header = ['Category', 'Metric', 'Value']
    cells = [[], [], []]
    for cat, metric_names in categories.items():
        for i, name in enumerate(metric_names):
            if name in metrics:
                cells[0].append(f"<b>{cat}</b>" if i == 0 else "")
                cells[1].append(name)
                val = metrics[name]
                if any(s in name for s in ['Return', 'Rate', 'Drawdown', 'Volatility', 'Win', 'Loss', 'ROI']):
                    formatted_val = f"{val:.2%}"
                elif any(s in name for s in ['Ratio', 'Factor']):
                    formatted_val = f"{val:.2f}"
                elif 'Profit' in name:
                    formatted_val = f"${val:,.2f}"
                else:
                    formatted_val = f"{val:,.0f}"
                cells[2].append(formatted_val)
    fig = go.Figure(data=[go.Table(
        header=dict(values=[f"<b>{h}</b>" for h in header], fill_color='#2c3e50', align='left', font=dict(color='white', size=14)),
        cells=dict(values=cells, fill_color='#34495e', align='left', font=dict(color='white', size=12), height=30)
    )])
    fig.update_layout(title_text=f"<b>{title}</b>", title_x=0.5, template=PLOTLY_TEMPLATE, height=600)
    fig.show()

def plot_performance_and_drawdown(df_abs: pd.DataFrame, df_trades: Optional[pd.DataFrame], title_suffix=""):
    """Creates a combined plot showing the equity curve and drawdown series."""
    fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.05, row_heights=[0.7, 0.3], specs=[[{"secondary_y": True}], [{"secondary_y": False}]])
    if 'portfolio_value' not in df_abs.columns: return
    equity_curve = df_abs.set_index('timestamp')['portfolio_value']
    pct_returns = (equity_curve / equity_curve.iloc[0] - 1)
    fig.add_trace(go.Scatter(x=equity_curve.index, y=equity_curve, mode='lines', name='Portfolio Value ($)', line=dict(color='#3498db', width=2.5)), row=1, col=1, secondary_y=False)
    fig.add_trace(go.Scatter(x=pct_returns.index, y=pct_returns, mode='lines', name='Return (%)', line=dict(color='#f1c40f', width=1.5, dash='dash')), row=1, col=1, secondary_y=True)
    if df_trades is not None and not df_trades.empty:
        buys = df_trades[df_trades['signal_type'] == 'BUY']
        sells = df_trades[df_trades['signal_type'] == 'SELL']
        fig.add_trace(go.Scatter(x=buys['timestamp'], y=pct_returns.reindex(buys['timestamp'], method='pad'), mode='markers', name='Buy', marker=dict(color='#2ecc71', size=8, symbol='triangle-up')), row=1, col=1, secondary_y=True)
        fig.add_trace(go.Scatter(x=sells['timestamp'], y=pct_returns.reindex(sells['timestamp'], method='pad'), mode='markers', name='Sell', marker=dict(color='#e74c3c', size=8, symbol='triangle-down')), row=1, col=1, secondary_y=True)
    rolling_max = equity_curve.cummax()
    drawdown = (equity_curve - rolling_max) / rolling_max
    fig.add_trace(go.Scatter(x=drawdown.index, y=drawdown, fill='tozeroy', mode='lines', name='Drawdown', line=dict(color='#e74c3c', width=1)), row=2, col=1)
    fig.update_layout(title_text=f"<b>Performance and Drawdown Analysis{title_suffix}</b>", title_x=0.5, template=PLOTLY_TEMPLATE, legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), hovermode="x unified", height=700)
    fig.update_yaxes(title_text="Portfolio Value ($)", row=1, col=1, secondary_y=False); fig.update_yaxes(title_text="Return (%)", row=1, col=1, secondary_y=True, tickformat=".2%"); fig.update_yaxes(title_text="Drawdown", row=2, col=1, secondary_y=False, tickformat=".2%")
    fig.show()

def plot_stacked_portfolio_composition(df_abs: pd.DataFrame, title_suffix=""):
    """Plots the portfolio's value composition as a stacked area chart."""
    fig = go.Figure()
    ticker_cols = [col for col in df_abs.columns if col not in ['timestamp', 'portfolio_value', 'pnl_pct']]
    colors = go.layout.Template().layout.colorway
    if colors is None: colors = plotly.colors.qualitative.Plotly
    for i, col in enumerate(ticker_cols):
        fig.add_trace(go.Scatter(x=df_abs['timestamp'], y=df_abs[col], mode='lines', name=f'{col} Value', stackgroup='one', line=dict(width=0.5, color=colors[i % len(colors)]), hovertemplate=f"<b>{col}</b><br>Value: $%{{y:,.2f}}<br>Date: %{{x}}<extra></extra>"))
    fig.update_layout(title_text=f"<b>Portfolio Composition Over Time{title_suffix}</b>", title_x=0.5, xaxis_title="Timestamp", yaxis_title="Total Portfolio Value ($)", template=PLOTLY_TEMPLATE, hovermode="x unified", legend_title_text='Tickers')
    fig.show()
    
def plot_rolling_volatility(results_dir: str, title_suffix=""):
    """Plots rolling volatility for multiple time windows for the portfolio and tickers."""
    all_rolling_files = glob.glob(os.path.join(results_dir, "*D_Rolling.csv"))
    if not all_rolling_files: return
    fig = go.Figure()
    colors = plotly.colors.qualitative.Vivid
    for i, file_path in enumerate(sorted(all_rolling_files)):
        df_rolling = load_csv(file_path)
        if df_rolling is None or df_rolling.empty: continue
        try: window = os.path.basename(file_path).split('_Rolling.csv')[0]
        except: continue
        color = colors[i % len(colors)]
        portfolio_vol_col = f'portfolio_pct_ret_vol_{window.lower()}'
        if portfolio_vol_col in df_rolling.columns:
            fig.add_trace(go.Scatter(x=df_rolling['timestamp'], y=df_rolling[portfolio_vol_col], mode='lines', name=f'Portfolio Vol ({window})', line=dict(color=color, width=2.5)))
        ticker_vol_cols = [c for c in df_rolling.columns if c.endswith(f'_vol_{window.lower()}') and c != portfolio_vol_col]
        for col in ticker_vol_cols:
            ticker_name = col.split('_pct_ret_vol_')[0]
            fig.add_trace(go.Scatter(x=df_rolling['timestamp'], y=df_rolling[col], mode='lines', name=f'{ticker_name} Vol ({window})', line=dict(color=color, width=1, dash='dot'), visible='legendonly'))
    fig.update_layout(title_text=f"<b>Rolling Volatility Comparison{title_suffix}</b>", title_x=0.5, xaxis_title="Timestamp", yaxis_title="Volatility (Std. Dev of Daily Returns)", template=PLOTLY_TEMPLATE, hovermode="x unified", legend_title_text='Time Window')
    fig.show()

def plot_var_analysis(df_abs: pd.DataFrame, title_suffix="", window=252, quantile=0.05):
    """Plots daily returns against the rolling Value at Risk (VaR)."""
    if df_abs is None or df_abs.empty: return
    daily_returns = df_abs.set_index('timestamp')['portfolio_value'].pct_change().dropna()
    rolling_var = daily_returns.rolling(window=window).quantile(quantile)
    fig = go.Figure()
    fig.add_trace(go.Scatter(x=rolling_var.index, y=rolling_var, mode='lines', name=f'Rolling VaR {1-quantile:.0%} ({window}-Day)', line=dict(color='red', width=2, dash='dash')))
    fig.add_trace(go.Scatter(x=daily_returns.index, y=daily_returns, mode='markers', name='Daily Returns', marker=dict(color='rgba(52, 152, 219, 0.5)', size=5)))
    fig.update_layout(title_text=f"<b>Value at Risk (VaR) Analysis{title_suffix}</b>", title_x=0.5, xaxis_title="Timestamp", yaxis_title="Daily Return", template=PLOTLY_TEMPLATE, legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), yaxis=dict(tickformat=".2%"))
    fig.show()

def plot_correlation_heatmap(results_dir: str, title_suffix=""):
    """
    NEW: Plots the correlation matrix of ticker returns as a heatmap.
    """
    corr_file = os.path.join(results_dir, "ticker_return_correlations.csv")
    df_corr = load_csv(corr_file, index_col=0)
    
    if df_corr is None or df_corr.empty:
        logging.warning("Correlation file not found or is empty. Skipping correlation heatmap.")
        return

    fig = go.Figure(data=go.Heatmap(
        z=df_corr.values,
        x=df_corr.columns,
        y=df_corr.index,
        colorscale='RdBu',
        zmid=0, # Center the colorscale at zero correlation
        text=np.around(df_corr.values, 2),
        texttemplate="%{text}",
        hoverongaps=False
    ))
    
    fig.update_layout(
        title_text=f"<b>Ticker Return Correlation Matrix{title_suffix}</b>", title_x=0.5,
        template=PLOTLY_TEMPLATE,
        yaxis_autorange='reversed'
    )
    fig.show()

def plot_intraday_volatility(df_abs: pd.DataFrame, title_suffix=""):
    """
    NEW: Calculates and plots the average volatility for each trading hour.
    """
    if df_abs is None or df_abs.empty:
        logging.warning("Absolute performance data is empty. Skipping intraday volatility plot.")
        return
        
    df_intra = df_abs.set_index('timestamp').copy()
    if not isinstance(df_intra.index, pd.DatetimeIndex): return

    # Calculate interval returns for all value columns
    returns_intra = df_intra.pct_change().dropna()
    
    # Get hour of the day
    returns_intra['hour'] = returns_intra.index.hour
    
    # Calculate hourly volatility (std dev of returns)
    hourly_vol = returns_intra.groupby('hour').std()
    
    fig = go.Figure()
    
    # Plot Portfolio Hourly Volatility
    if 'portfolio_value' in hourly_vol.columns:
        fig.add_trace(go.Bar(
            x=hourly_vol.index,
            y=hourly_vol['portfolio_value'],
            name='Portfolio Volatility'
        ))
    
    # Plot Ticker Hourly Volatility (hidden by default)
    ticker_cols = [col for col in hourly_vol.columns if col not in ['portfolio_value', 'hour']]
    for col in ticker_cols:
         fig.add_trace(go.Bar(
            x=hourly_vol.index,
            y=hourly_vol[col],
            name=f'{col} Volatility',
            visible='legendonly'
        ))
        
    fig.update_layout(
        title_text=f"<b>Average Intraday Volatility by Hour{title_suffix}</b>", title_x=0.5,
        xaxis_title="Hour of the Day",
        yaxis_title="Volatility (Std. Dev of Interval Returns)",
        template=PLOTLY_TEMPLATE,
        xaxis = dict(tickmode = 'linear', dtick = 1),
        legend_title_text='Series'
    )
    fig.show()

# --- Main Execution Block ---

if __name__ == "__main__":
    logging.info("--- Starting Comprehensive Backtest Visualization Script ---")
    target_dir = find_latest_backtest_dir(BASE_DATA_DIR)
    
    if not target_dir:
        logging.error("No valid backtest directory found. Exiting.")
    else:
        logging.info(f"Visualizing results from: {target_dir}")
        
        df_abs = load_csv(os.path.join(target_dir, "performance_timeseries_absolute.csv"))
        df_trades = load_csv(os.path.join(target_dir, "trade_log.csv"))
        df_monthly = load_csv(os.path.join(target_dir, "monthly_returns.csv"))
        
        if df_abs is None:
            logging.error("Could not load essential performance data (performance_timeseries_absolute.csv). Exiting.")
        else:
            portfolio_id = os.path.basename(target_dir).split('_backtest_')[-1]
            title_suffix = f" (Portfolio: {portfolio_id})"
            
            # 1. Display Full Metrics Table
            metrics = calculate_all_metrics(df_abs, df_trades)
            display_metrics_table(metrics, title=f"Comprehensive Performance Metrics{title_suffix}")
            
            # 2. Display Combined Performance & Drawdown Plot
            plot_performance_and_drawdown(df_abs, df_trades, title_suffix)
            
            # 3. Display Stacked Portfolio Composition Plot
            plot_stacked_portfolio_composition(df_abs, title_suffix)
            
            # 4. Display Rolling Volatility Plot
            plot_rolling_volatility(target_dir, title_suffix)
            
            # 5. Display Value at Risk (VaR) Plot
            plot_var_analysis(df_abs, title_suffix)
            
            # 6. NEW: Display Ticker Correlation Heatmap
            plot_correlation_heatmap(target_dir, title_suffix)
            
            # 7. NEW: Display Intraday Volatility Plot
            plot_intraday_volatility(df_abs, title_suffix)
            
            # 8. Display Monthly Returns Heatmap (if data exists)
            # if df_monthly is not None:
            #     plot_monthly_heatmap(df_monthly, title=f"Monthly Returns (%){title_suffix}")

    logging.info("--- Visualization Script Finished ---")

2025-06-22 00:28:25,420 - INFO - --- Starting Comprehensive Backtest Visualization Script ---
2025-06-22 00:28:25,421 - INFO - Visualizing results from: data_infra/data/20250622_002158_backtest_1


2025-06-22 00:28:26,832 - INFO - --- Visualization Script Finished ---
