In [2]:
# Portfolio Risk Analysis Platform
# AI-driven portfolio risk and analytics platform for professional investment managers

import pandas as pd
import numpy as np
import yfinance as yf
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

# Set up plotting style
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")


In [None]:
class PortfolioRiskAnalyzer:
    """
    AI-driven portfolio risk and analytics platform for professional investment managers.
    Provides comprehensive risk analysis, performance metrics, and scenario testing.
    """
    def __init__(self):
        self.portfolio_data = None
        self.price_data = {}
        self.returns_data = {}
        self.risk_metrics = {}
        self.performance_metrics = {}
        
    def load_portfolio_from_inputs(self, inputs_path="../inputs/portfolio1/portfolio_data.csv"):
        self.portfolio_data = pd.read_csv(inputs_path)
        print(f"Loaded portfolio with {len(self.portfolio_data)} positions from {inputs_path}")
        return self.portfolio_data
    
    def fetch_price_data(self, start_date=None, end_date=None):
        if self.portfolio_data is None:
            raise ValueError("No portfolio data loaded. Please load portfolio first.")
        
        symbols = self.portfolio_data['symbol'].unique()
        
        if start_date is None:
            start_date = self.portfolio_data['purchase_date'].min()
        
        if end_date is None:
            end_date = datetime.now().strftime('%Y-%m-%d')
        
        print(f"Fetching price data from {start_date} to {end_date}")
        
        for symbol in symbols:
            try:
                ticker = yf.Ticker(symbol)
                hist = ticker.history(start=start_date, end=end_date)
                self.price_data[symbol] = hist
                print(f"Fetched data for {symbol}")
            except Exception as e:
                print(f"Error fetching {symbol}: {e}")
        
        return self.price_data
    
    def calculate_returns(self):
        """
        Calculate returns for all holdings and portfolio
        """
        # Calculate returns for each holding
        for symbol, prices in self.price_data.items():
            returns = prices['Close'].pct_change().dropna()
            self.returns_data[symbol] = returns
        
        # Calculate portfolio returns (weighted by market value)
        portfolio_returns = self._calculate_portfolio_returns()
        self.returns_data['PORTFOLIO'] = portfolio_returns
        
        return self.returns_data
    
    def _calculate_portfolio_returns(self):
        """
        Calculate weighted portfolio returns based on current market values
        """
        # Get current market values
        current_values = {}
        total_value = 0
        
        for _, row in self.portfolio_data.iterrows():
            symbol = row['symbol']
            shares = row['shares']
            
            if symbol in self.price_data:
                current_price = self.price_data[symbol]['Close'].iloc[-1]
                current_value = shares * current_price
                current_values[symbol] = current_value
                total_value += current_value
        
        # Calculate weights
        weights = {symbol: value/total_value for symbol, value in current_values.items()}
        
        # Calculate weighted returns
        portfolio_returns = pd.Series(0, index=self.returns_data[list(self.returns_data.keys())[0]].index)
        
        for symbol, returns in self.returns_data.items():
            if symbol != 'PORTFOLIO' and symbol in weights:
                portfolio_returns += weights[symbol] * returns
        
        return portfolio_returns



In [9]:
class PerformanceMetrics:
    """
    Class for calculating comprehensive performance metrics including returns, 
    risk-adjusted returns, and market capture ratios.
    """
    
    def __init__(self, returns_data):
        self.returns_data = returns_data
        self.performance_metrics = {}
    
    def calculate_performance_metrics(self):
        """
        Calculate comprehensive performance metrics
        """
        metrics = {}
        
        for symbol, returns in self.returns_data.items():
            if len(returns) == 0:
                continue
                
            # Basic metrics
            total_return = (1 + returns).prod() - 1
            annualized_return = (1 + total_return) ** (252 / len(returns)) - 1
            volatility = returns.std() * np.sqrt(252)
            
            # Risk metrics
            sharpe_ratio = annualized_return / volatility if volatility > 0 else 0
            
            # Drawdown analysis
            cumulative_returns = (1 + returns).cumprod()
            running_max = cumulative_returns.expanding().max()
            drawdown = (cumulative_returns - running_max) / running_max
            max_drawdown = drawdown.min()
            
            # Capture ratios (vs S&P 500)
            if symbol != 'SPY' and 'SPY' in self.returns_data:
                sp500_returns = self.returns_data['SPY']
                # Align dates
                common_dates = returns.index.intersection(sp500_returns.index)
                if len(common_dates) > 0:
                    aligned_returns = returns.loc[common_dates]
                    aligned_sp500 = sp500_returns.loc[common_dates]
                    
                    # Calculate capture ratios
                    portfolio_up_days = aligned_returns[aligned_sp500 > 0]
                    sp500_up_days = aligned_sp500[aligned_sp500 > 0]
                    upside_capture = portfolio_up_days.mean() / sp500_up_days.mean() if len(sp500_up_days) > 0 else 0
                    
                    portfolio_down_days = aligned_returns[aligned_sp500 < 0]
                    sp500_down_days = aligned_sp500[aligned_sp500 < 0]
                    downside_capture = portfolio_down_days.mean() / sp500_down_days.mean() if len(sp500_down_days) > 0 else 0
                else:
                    upside_capture = downside_capture = 0
            else:
                upside_capture = downside_capture = 0
            
            metrics[symbol] = {
                'Total Return': total_return,
                'Annualized Return': annualized_return,
                'Volatility': volatility,
                'Sharpe Ratio': sharpe_ratio,
                'Max Drawdown': max_drawdown,
                'Upside Capture': upside_capture,
                'Downside Capture': downside_capture
            }
        
        self.performance_metrics = metrics
        return metrics


In [10]:
class RiskMetrics:
    """
    Class for calculating comprehensive risk metrics including volatility, 
    Value at Risk (VaR), Conditional VaR, beta, and correlation analysis.
    """
    
    def __init__(self, returns_data):
        self.returns_data = returns_data
        self.risk_metrics = {}
    
    def calculate_risk_metrics(self):
        """
        Calculate comprehensive risk metrics and decomposition
        """
        if not self.returns_data:
            raise ValueError("No returns data available. Please calculate returns first.")
        
        risk_metrics = {}
        
        for symbol, returns in self.returns_data.items():
            if len(returns) == 0:
                continue
            
            # Historical risk metrics
            volatility = returns.std() * np.sqrt(252)
            var_95 = np.percentile(returns, 5)  # 95% VaR
            var_99 = np.percentile(returns, 1)   # 99% VaR
            cvar_95 = returns[returns <= var_95].mean()  # Conditional VaR
            
            # Beta calculation (vs S&P 500)
            beta = 0
            if symbol != 'SPY' and 'SPY' in self.returns_data:
                sp500_returns = self.returns_data['SPY']
                common_dates = returns.index.intersection(sp500_returns.index)
                if len(common_dates) > 0:
                    aligned_returns = returns.loc[common_dates]
                    aligned_sp500 = sp500_returns.loc[common_dates]
                    covariance = np.cov(aligned_returns, aligned_sp500)[0, 1]
                    sp500_variance = np.var(aligned_sp500)
                    beta = covariance / sp500_variance if sp500_variance > 0 else 0
            
            # Correlation with market
            correlation = 0
            if symbol != 'SPY' and 'SPY' in self.returns_data:
                sp500_returns = self.returns_data['SPY']
                common_dates = returns.index.intersection(sp500_returns.index)
                if len(common_dates) > 0:
                    aligned_returns = returns.loc[common_dates]
                    aligned_sp500 = sp500_returns.loc[common_dates]
                    correlation = np.corrcoef(aligned_returns, aligned_sp500)[0, 1]
            
            risk_metrics[symbol] = {
                'Volatility': volatility,
                'VaR 95%': var_95,
                'VaR 99%': var_99,
                'CVaR 95%': cvar_95,
                'Beta': beta,
                'Correlation with S&P 500': correlation
            }
        
        self.risk_metrics = risk_metrics
        return risk_metrics


In [11]:
class ScenarioAnalysis:
    """
    Class for performing scenario analysis and stress testing including 
    historical scenarios, market crashes, volatility spikes, and rate shocks.
    """
    
    def __init__(self, returns_data, risk_metrics=None):
        self.returns_data = returns_data
        self.risk_metrics = risk_metrics or {}
        self.scenarios = {}
    
    def scenario_analysis(self):
        """
        Perform scenario analysis and stress testing
        """
        if not self.returns_data:
            raise ValueError("No returns data available. Please calculate returns first.")
        
        scenarios = {}
        
        # Get portfolio returns
        portfolio_returns = self.returns_data.get('PORTFOLIO', pd.Series())
        if len(portfolio_returns) == 0:
            return scenarios
        
        # Historical scenarios
        scenarios['Historical Worst Day'] = portfolio_returns.min()
        scenarios['Historical Best Day'] = portfolio_returns.max()
        scenarios['2008 Crisis Period'] = self._analyze_2008_crisis()
        
        # Stress test scenarios
        scenarios['Market Crash (-15%)'] = self._stress_test_market_crash()
        scenarios['Volatility Spike (+15%)'] = self._stress_test_volatility_spike()
        scenarios['Interest Rate Shock (+1%)'] = self._stress_test_rate_shock()
        
        self.scenarios = scenarios
        return scenarios
    
    def _analyze_2008_crisis(self):
        """Analyze performance during 2008 financial crisis"""
        # This would require historical data from 2008
        # For now, return a placeholder
        return "Requires historical data from 2008-2009 period"
    
    def _stress_test_market_crash(self):
        """Simulate market crash scenario"""
        if 'SPY' in self.returns_data:
            sp500_returns = self.returns_data['SPY']
            # Simulate -15% market crash
            crash_return = -0.15
            # Estimate portfolio impact based on beta
            portfolio_beta = self.risk_metrics.get('PORTFOLIO', {}).get('Beta', 1.0)
            estimated_impact = crash_return * portfolio_beta
            return f"Estimated portfolio impact: {estimated_impact:.2%}"
        return "No market data available"
    
    def _stress_test_volatility_spike(self):
        """Simulate volatility spike scenario"""
        if 'PORTFOLIO' in self.returns_data:
            portfolio_returns = self.returns_data['PORTFOLIO']
            current_vol = portfolio_returns.std() * np.sqrt(252)
            stressed_vol = current_vol * 1.15  # 15% increase
            return f"Current volatility: {current_vol:.2%}, Stressed: {stressed_vol:.2%}"
        return "No portfolio data available"
    
    def _stress_test_rate_shock(self):
        """Simulate interest rate shock scenario"""
        # This would require bond holdings analysis
        return "Requires fixed income holdings analysis"


In [12]:

# Initialize the main portfolio analyzer
analyzer = PortfolioRiskAnalyzer()

# Load portfolio data
portfolio_data = analyzer.load_portfolio_from_inputs()

# Fetch price data
price_data = analyzer.fetch_price_data()

# Calculate returns
returns_data = analyzer.calculate_returns()

# Initialize metric classes
performance_metrics = PerformanceMetrics(returns_data)
risk_metrics = RiskMetrics(returns_data)
scenario_analysis = ScenarioAnalysis(returns_data)

# Calculate all metrics
perf_results = performance_metrics.calculate_performance_metrics()
risk_results = risk_metrics.calculate_risk_metrics()
scenario_results = scenario_analysis.scenario_analysis()

# Update scenario analysis with risk metrics for better beta calculations
scenario_analysis.risk_metrics = risk_results
scenario_results = scenario_analysis.scenario_analysis()




Loaded portfolio with 16 positions from ../inputs/portfolio1/portfolio_data.csv
Fetching price data from 2023-01-01 to 2025-10-18
✓ Fetched data for AAPL
✓ Fetched data for MSFT
✓ Fetched data for GOOGL
✓ Fetched data for SPY
✓ Fetched data for QQQ
✓ Fetched data for VTI
✓ Fetched data for TSLA
✓ Fetched data for NVDA
✓ Fetched data for JPM
✓ Fetched data for BAC
✓ Fetched data for VEA
✓ Fetched data for VWO
✓ Fetched data for JNJ
✓ Fetched data for PFE
✓ Fetched data for KO
✓ Fetched data for PG


In [13]:
# Print all results
print("=" * 60)
print("PORTFOLIO PERFORMANCE METRICS")
print("=" * 60)
for symbol, metrics in perf_results.items():
    print(f"\n{symbol}:")
    for metric_name, value in metrics.items():
        if isinstance(value, float):
            print(f"  {metric_name}: {value:.4f}")
        else:
            print(f"  {metric_name}: {value}")

print("\n" + "=" * 60)
print("PORTFOLIO RISK METRICS")
print("=" * 60)
for symbol, metrics in risk_results.items():
    print(f"\n{symbol}:")
    for metric_name, value in metrics.items():
        if isinstance(value, float):
            print(f"  {metric_name}: {value:.4f}")
        else:
            print(f"  {metric_name}: {value}")

print("\n" + "=" * 60)
print("SCENARIO ANALYSIS")
print("=" * 60)
for scenario_name, result in scenario_results.items():
    print(f"{scenario_name}: {result}")

print("\n" + "=" * 60)
print("SUMMARY STATISTICS")
print("=" * 60)
print(f"Total symbols analyzed: {len(perf_results)}")
print(f"Risk metrics calculated for: {len(risk_results)} symbols")
print(f"Scenarios tested: {len(scenario_results)}")

PORTFOLIO PERFORMANCE METRICS

AAPL:
  Total Return: 1.0456
  Annualized Return: 0.2939
  Volatility: 0.2615
  Sharpe Ratio: 1.1239
  Max Drawdown: -0.3336
  Upside Capture: 1.1261
  Downside Capture: 1.0714

MSFT:
  Total Return: 1.1908
  Annualized Return: 0.3262
  Volatility: 0.2344
  Sharpe Ratio: 1.3919
  Max Drawdown: -0.2373
  Upside Capture: 1.0954
  Downside Capture: 1.0049

GOOGL:
  Total Return: 1.8618
  Annualized Return: 0.4601
  Volatility: 0.3022
  Sharpe Ratio: 1.5228
  Max Drawdown: -0.2981
  Upside Capture: 1.2741
  Downside Capture: 1.0892

SPY:
  Total Return: 0.8096
  Annualized Return: 0.2380
  Volatility: 0.1558
  Sharpe Ratio: 1.5278
  Max Drawdown: -0.1876
  Upside Capture: 0
  Downside Capture: 0

QQQ:
  Total Return: 1.3216
  Annualized Return: 0.3542
  Volatility: 0.2018
  Sharpe Ratio: 1.7553
  Max Drawdown: -0.2277
  Upside Capture: 1.2878
  Downside Capture: 1.2408

VTI:
  Total Return: 0.7862
  Annualized Return: 0.2322
  Volatility: 0.1576
  Sharpe Rati