# **Task 4: Optimize Portfolio Based on Forecast**

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from IPython.display import display
import warnings
warnings.filterwarnings("ignore")

In [2]:
# Defined functions
# -----------------------
def annualize_return_from_price_forecast(last_price, forecast_prices, horizon_days, trading_days=252):
    """
    Compute implied annual expected return from a horizon price forecast.
    - last_price: latest observed price (scalar)
    - forecast_prices: array-like of forecast prices over horizon (len = horizon_days)
    - horizon_days: number of trading days in forecast horizon
    Returns: annualized expected return (float)
    """
    try:
        horizon_days = int(horizon_days)
        if horizon_days <= 0:
            raise ValueError("horizon_days must be > 0")
        if len(forecast_prices) < 1:
            raise ValueError("forecast_prices must contain at least one value")
        final_price = float(forecast_prices[-1])
        total_return = final_price / float(last_price) - 1.0
        # Annualize assuming compounding: (1 + total_return)^(trading_days/horizon) - 1
        annual_return = (1.0 + total_return) ** (trading_days / horizon_days) - 1.0
        return float(annual_return)
    except Exception as e:
        raise RuntimeError(f"Failed to compute annualized return from forecast: {e}")

def compute_historical_mean_annual_return(price_series, trading_days=252):
    """
    Compute historical mean daily return and annualize it.
    price_series: pd.Series of prices (chronologically ordered)
    Returns: annualized mean return (float)
    """
    try:
        returns = price_series.pct_change().dropna()
        mean_daily = returns.mean()
        annualized = (1 + mean_daily) ** trading_days - 1
        return float(annualized)
    except Exception as e:
        raise RuntimeError(f"Failed to compute historical annual return: {e}")

def compute_annualized_cov_matrix(price_df, trading_days=252):
    """
    price_df: DataFrame with columns [TSLA, BND, SPY] of daily prices (aligned)
    Returns: covariance matrix (annualized)
    """
    try:
        returns = price_df.pct_change().dropna()
        cov_daily = returns.cov()
        cov_annual = cov_daily * trading_days
        return cov_annual, returns
    except Exception as e:
        raise RuntimeError(f"Failed to compute covariance matrix: {e}")

def simulate_random_portfolios(expected_returns, cov_matrix, n_portfolios=50000, trading_days=252, rf=0.0, seed=42):
    """
    Monte Carlo simulation of random portfolios.
    expected_returns: array-like annual expected returns (length N)
    cov_matrix: annual covariance matrix (N x N)
    Returns DataFrame with columns: ['ret','vol','sharpe', 'weights']
    """
    np.random.seed(seed)
    n_assets = len(expected_returns)
    results = np.zeros((n_portfolios, 3 + n_assets))  # ret, vol, sharpe, weights...
    exp = np.array(expected_returns)
    cov = np.array(cov_matrix)

    for i in range(n_portfolios):
        # random weights with sum 1 and no shorting (set negative allowed? here not allowed)
        w = np.random.random(n_assets)
        w = w / np.sum(w)
        port_ret = np.dot(w, exp)
        port_vol = np.sqrt(w.T @ cov @ w)
        sharpe = (port_ret - rf) / port_vol if port_vol != 0 else 0.0
        results[i, 0] = port_ret
        results[i, 1] = port_vol
        results[i, 2] = sharpe
        results[i, 3:] = w

    cols = ['ret', 'vol', 'sharpe'] + [f'w_{i}' for i in range(n_assets)]
    return pd.DataFrame(results, columns=cols)

def find_optimal_portfolios(sim_df, asset_names):
    """
    Given simulation DataFrame, return min volatility and max sharpe rows and weights mapped to asset names.
    """
    try:
        idx_max_sharpe = sim_df['sharpe'].idxmax()
        idx_min_vol = sim_df['vol'].idxmin()
        max_sharpe = sim_df.loc[idx_max_sharpe]
        min_vol = sim_df.loc[idx_min_vol]

        def row_to_dict(row):
            weights = {asset_names[i]: float(row[f'w_{i}']) for i in range(len(asset_names))}
            return {
                'ret': float(row['ret']),
                'vol': float(row['vol']),
                'sharpe': float(row['sharpe']),
                'weights': weights
            }

        return row_to_dict(max_sharpe), row_to_dict(min_vol)
    except Exception as e:
        raise RuntimeError(f"Failed to extract optimal portfolios: {e}")

def plot_efficient_frontier(sim_df, asset_names, max_sharpe_port, min_vol_port):
    plt.figure(figsize=(10,7))
    sc = plt.scatter(sim_df['vol'], sim_df['ret'], c=sim_df['sharpe'], cmap='viridis', alpha=0.4)
    plt.colorbar(sc, label='Sharpe Ratio')
    plt.scatter(max_sharpe_port['vol'], max_sharpe_port['ret'], marker='*', color='r', s=250, label='Max Sharpe')
    plt.scatter(min_vol_port['vol'], min_vol_port['ret'], marker='o', color='b', s=150, label='Min Volatility')
    plt.xlabel('Annualized Volatility')
    plt.ylabel('Annualized Return')
    plt.title('Efficient Frontier (Monte Carlo simulated portfolios)')
    plt.legend()
    plt.grid(True)
    plt.show()

# -----------------------
# Main function
# -----------------------
def optimize_portfolio_from_forecast(price_df,
                                     tsla_forecast_prices=None, tsla_horizon_days=None, tsla_expected_annual_return=None,
                                     trading_days=252, rf=0.0, n_portfolios=50000, seed=42):
    """
    price_df: pd.DataFrame with columns ['TSLA','BND','SPY'] daily prices covering historical period
    Provide either:
      - tsla_expected_annual_return (float), OR
      - tsla_forecast_prices (iterable of forecast prices) together with tsla_horizon_days (int)
    Returns:
      A dict containing simulation DataFrame, max_sharpe_port, min_vol_port and a recommended portfolio summary.
    """
    try:
        # Basic checks
        required_cols = ['TSLA','BND','SPY']
        if not all(c in price_df.columns for c in required_cols):
            raise ValueError(f"price_df must contain columns: {required_cols}")

        # Compute cov matrix and historical returns
        cov_annual, returns_df = compute_annualized_cov_matrix(price_df, trading_days=trading_days)

        # Compute expected returns vector
        if tsla_expected_annual_return is None:
            if tsla_forecast_prices is None or tsla_horizon_days is None:
                raise ValueError("Either tsla_expected_annual_return OR (tsla_forecast_prices and tsla_horizon_days) must be provided.")
            last_price = price_df['TSLA'].iloc[-1]
            tsla_expected_annual_return = annualize_return_from_price_forecast(last_price, tsla_forecast_prices, tsla_horizon_days, trading_days=trading_days)

        # Historical expected returns (annualized) for BND & SPY
        bnd_annual = compute_historical_mean_annual_return(price_df['BND'], trading_days=trading_days)
        spy_annual = compute_historical_mean_annual_return(price_df['SPY'], trading_days=trading_days)

        expected_returns = np.array([tsla_expected_annual_return, bnd_annual, spy_annual])
        asset_names = ['TSLA','BND','SPY']

        display("Expected returns (annualized):")
        display(dict(zip(asset_names, np.round(expected_returns,6))))
        display("Annualized covariance matrix (first 3x3):")
        display(cov_annual.round(6))

        # Simulate portfolios
        sim_df = simulate_random_portfolios(expected_returns, cov_annual, n_portfolios=n_portfolios, trading_days=trading_days, rf=rf, seed=seed)

        # Find opt portfolios
        max_sharpe_port, min_vol_port = find_optimal_portfolios(sim_df, asset_names)

        # Plot
        plot_efficient_frontier(sim_df, asset_names, max_sharpe_port, min_vol_port)

        # Prepare a recommended portfolio:
        # Choose between max_sharpe or min_vol depending on preference — here we present both and recommend max_sharpe by default
        recommended = max_sharpe_port

        summary = {
            'recommended_choice': 'Max Sharpe (Tangency) by default',
            'recommended_weights': recommended['weights'],
            'expected_annual_return': recommended['ret'],
            'annual_volatility': recommended['vol'],
            'sharpe_ratio': recommended['sharpe'],
            'max_sharpe_portfolio': max_sharpe_port,
            'min_vol_portfolio': min_vol_port,
            'simulation_df': sim_df
        }

        # Display concise summary
        display("Recommended portfolio (concise):")
        display({
            'choice': summary['recommended_choice'],
            'weights': {k: round(v,4) for k,v in recommended['weights'].items()},
            'expected_annual_return': round(recommended['ret'],6),
            'annual_volatility': round(recommended['vol'],6),
            'sharpe_ratio': round(recommended['sharpe'],6)
        })

        return summary

    except Exception as e:
        raise RuntimeError(f"Portfolio optimization failed: {e}")

In [3]:
summary = optimize_portfolio_from_forecast(price_df=test.index, tsla_forecast_prices=lstm_pred_index, tsla_horizon_days=126,  trading_days=252,  rf=0.0,  n_portfolios=20000)

NameError: name 'test' is not defined