<a href="https://colab.research.google.com/github/azrankalo123/Quant_corner/blob/main/Backtesting_Mean_Variance.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import yfinance as yf
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import plotly.graph_objects as go
import cvxpy as cp

# Necessary function

In [None]:
def pull_data(Ticker_names, startDate, endDate, interval):
    """The pulling data function from Yahoo finance.

    Args:
        Ticker_names (list): List of ticker names in Yahoo finance that you want to pull.
        startDate (datetime): Starting date of data you want to pull in datetime form.
        endDate (datetime): Ending date of data you want to pull in datetime form.
        interval (str): Data interval. Valid intervals are:
                       “1m”, “2m”, “5m”, “15m”, “30m”, “60m”, “90m”, “1h”, “1d”, “5d”, “1wk”, “1mo”, “3mo”

    Returns:
        DataFrame: Close_price_dataframe
    """
    # Create dictionaries to store ticker objects and historical data
    Ticker = dict()
    Ticker_historical_data = dict()

    # Loop through ticker names to get ticker objects
    for ticker_name in Ticker_names:
        Ticker[ticker_name] = yf.Ticker(ticker_name)  # Get ticker object

    # Loop through ticker names to get historical data
    for ticker_name in Ticker_names:
        # Get historical data for each ticker
        Ticker_historical_data[ticker_name] = Ticker[ticker_name].history(
            start=startDate, end=endDate, interval=interval
        )[['Close']]
        # Rename the 'Close' column to the ticker name
        Ticker_historical_data[ticker_name].rename(columns={'Close': '%s' % ticker_name}, inplace=True)
        # Set the index to the date
        Ticker_historical_data[ticker_name].index = Ticker_historical_data[ticker_name].index.date

    # Concatenate the historical data for all tickers and drop rows with missing values
    return pd.concat(Ticker_historical_data.values(), axis=1).dropna()


In [None]:
def Log_return(Price_dataframe):
    """Compute the log return from an asset price dataframe.

    Args:
        Price_dataframe (DataFrame): Column: ticker name, Index: date

    Returns:
        DataFrame: Log_return_dataframe
    """
    # Create a shifted DataFrame by shifting Price_dataframe by one period
    dummy = Price_dataframe.shift(periods=1).T

    # Set the first column of the shifted DataFrame to the first row of Price_dataframe
    dummy[Price_dataframe.index[0]] = Price_dataframe.iloc[0].tolist()

    # Create a DataFrame for log returns by taking the natural logarithm of the ratio
    # of Price_dataframe to the shifted DataFrame (to calculate daily log returns)
    return pd.DataFrame(
        np.log(Price_dataframe.values.T / dummy.values),
        columns=Price_dataframe.index.tolist(),
        index=Price_dataframe.columns.tolist()
    ).T


In [None]:
def Mean_Cov(Log_return_dataframe, shifting_day):
    """Compute the Mean and Covariance of each asset.

    Args:
        Log_return_dataframe (DataFrame): Column: ticker name, Index: date
        shifting_day (int): The number of days to shift.

    Returns:
        (Mean, Cov): Mean list, Covariance matrix
    """
    # Compute the mean of log returns for each asset and multiply by the shifting days
    Mean = np.asarray(np.mean(Log_return_dataframe.values.T, axis=1)) * shifting_day

    # Compute the covariance matrix of log returns and multiply by the shifting days
    Cov = np.asmatrix(np.cov(Log_return_dataframe.values.T)) * shifting_day

    return Mean, Cov


In [None]:
def Mean_Variance_optimization(return_array, covariance_matrix, require_return, short_sell=None):
    """
    Optimize the portfolio for mean-variance using convex optimization.

    Args:
        return_array (array): Array of expected returns for each asset.
        covariance_matrix (array): Covariance matrix of asset returns.
        require_return (float): Required return for the portfolio.
        short_sell (str, optional): If short selling is allowed, input 'yes'. Defaults to None.

    Returns:
        tuple: Portfolio weights, objective value, and portfolio return.
    """
    # Define the decision variable for portfolio weights
    X = cp.Variable(len(return_array))

    # Formulate the objective function as the quadratic form of portfolio weights and covariance matrix
    objective_formulation = cp.quad_form(X, covariance_matrix)

    # Define the objective function to minimize the variance
    objective_function = cp.Minimize(objective_formulation)

    # Calculate the expected return of the portfolio
    Return = return_array.T @ X

    # Define constraints based on the specified conditions (short selling or not)
    if short_sell == None:

        constraints = [sum(X) == 1, Return >= require_return, X >= 0]
    else:

        constraints = [sum(X) == 1, Return >= require_return]

    # Formulate the convex optimization problem
    problem = cp.Problem(objective_function, constraints)

    # Solve the optimization problem
    problem.solve()

    if problem.status != 'optimal':

        optimal_portfolio = np.array([0 for i in range(len(return_array))])

    else:

        optimal_portfolio = X.value

    # Return the optimized portfolio weights, objective value (variance), and the expected return of the portfolio
    return optimal_portfolio, problem.value, Return.value


In [None]:
def checking_day_trade(Date, Close_price_dataframe):
    """
    Check if the given date is a trading day. If not, find the next trading day.

    Args:
        Date (datetime): The input date to check.
        Close_price_dataframe (DataFrame): DataFrame with trading day indices.

    Returns:
        datetime: The next trading day date.
    """
    # Continue searching for the next trading day until a trading day is found
    while not (Date.date() in Close_price_dataframe.index):
        # Increment the date by one day
        Date = Date + timedelta(days=1)

    # Return the next trading day date
    return Date


# Pull data

In [None]:
Ticker_names = ['AAPL', 'AMD', 'MSFT']
startDate = datetime(1950, 1, 2)
endDate = datetime(2021, 12, 31)
interval = '1d'

# Call the pull_data function to retrieve historical close prices for the specified tickers and date range
Close_price_dataframe = pull_data(Ticker_names, startDate, endDate, interval)

# Display the resulting dataframe containing historical close prices
Close_price_dataframe

Unnamed: 0,AAPL,AMD,MSFT
1986-03-13,0.085501,15.875000,0.060274
1986-03-14,0.090250,14.625000,0.062427
1986-03-17,0.089818,15.812500,0.063504
1986-03-18,0.092842,15.687500,0.061889
1986-03-19,0.091546,15.312500,0.060812
...,...,...,...
2021-12-23,174.288620,146.139999,328.668732
2021-12-27,178.292877,154.360001,336.289093
2021-12-28,177.264618,153.149994,335.110687
2021-12-29,177.353592,148.259995,335.798157


In [None]:
def Backtesting_Mean_Variance(len_of_trianing_data, trading_day, period, number_of_trading, require_return, Close_price_dataframe, short_sell = None):
    """
    Backtest a Mean-Variance portfolio strategy over multiple trading periods.

    Args:
        len_of_trianing_data (int): Length of the training data period (in months).
        trading_day (datetime): Initial trading day to start the backtesting.
        period (int): Number of months for each trading period.
        number_of_trading (int): Number of trading periods to backtest.
        require_return (float): Required return for the portfolio in each trading period.
        Close_price_dataframe (DataFrame): DataFrame with close prices for each trading day.

    Returns:
        dict, dict: Returns and portfolio proportions for each trading period.
    """
    trading_day = checking_day_trade(trading_day, Close_price_dataframe)
    start_dict = dict()
    end_dict = dict()
    trianing_data = dict()
    day_trading = dict()
    Mean_dict = dict()
    Cov_dict = dict()
    portfolio_proportions_dict = dict()

    # Loop through each trading period for backtesting
    for i in range(number_of_trading + 1):
        start_dict[i] = (trading_day - len_of_trianing_data * timedelta(days=30)).date()
        end_dict[i] = (trading_day - timedelta(days=1)).date()
        trianing_data[i] = Close_price_dataframe.loc[start_dict[i]:end_dict[i]]
        day_trading[i] = trading_day
        Mean_dict[i], Cov_dict[i] = Mean_Cov(Log_return(trianing_data[i]), period * 30)
        portfolio_proportions_dict[i], __, __ = Mean_Variance_optimization(Mean_dict[i], Cov_dict[i], require_return, short_sell=short_sell)

        trading_day = checking_day_trade(trading_day + timedelta(days=30 * period), Close_price_dataframe)

    R = dict()
    # Calculate returns for each trading period
    for i in range(number_of_trading):
        R[i + 1] = sum(portfolio_proportions_dict[i] * np.log(np.array(Close_price_dataframe.loc[day_trading[i + 1].date()].tolist()) /
                                                       np.array(Close_price_dataframe.loc[day_trading[i].date()].tolist())))

    return R, portfolio_proportions_dict, day_trading


In [None]:
# input variable
len_of_trianing_data = 36 # unit: mouth
trading_day = datetime(1990, 1, 20) # datetime format
period = 1 # unit: mouth
number_of_trading = 300 # nunit : mouth
require_return = [0.05/12,0.1/12,0.2/12] # 1%
Close_price_dataframe

Unnamed: 0,AAPL,AMD,MSFT
1986-03-13,0.085500,15.875000,0.060274
1986-03-14,0.090250,14.625000,0.062427
1986-03-17,0.089818,15.812500,0.063504
1986-03-18,0.092842,15.687500,0.061889
1986-03-19,0.091546,15.312500,0.060812
...,...,...,...
2021-12-23,174.288620,146.139999,328.668732
2021-12-27,178.292877,154.360001,336.289154
2021-12-28,177.264633,153.149994,335.110718
2021-12-29,177.353607,148.259995,335.798126


In [None]:
Return, portfolio_proportions, day_trading = dict(), dict(), dict()

for i in require_return:

    Return[i], portfolio_proportions[i], day_trading[i] = Backtesting_Mean_Variance(len_of_trianing_data, trading_day, period, number_of_trading, i, Close_price_dataframe, short_sell = None)

# Histogram

In [None]:
size = 0.01
rr_0_01 = np.array(list(Return[require_return[0]].values()))
rr_0_05 = np.array(list(Return[require_return[1]].values()))
rr_0_1 = np.array(list(Return[require_return[2]].values()))

# Create a histogram using Plotly (go.Histogram) for the optimal portfolio payoffs
hist = go.Figure(data=[go.Histogram(x=rr_0_01, xbins=dict(
    start=min(rr_0_01) // size * size,  # type: ignore
    end=max(rr_0_01) // size * size + size,  # type: ignore
    size=size
), name='Require Return is %.2f' % require_return[0])])

# Update layout settings for the histogram
hist.update_layout(
    # title_text='Portfolios payoffs with mean of return are %f' % np.mean(rr_0_01),  # title of plot
    xaxis_title_text='Return',  # x-axis label
    yaxis_title_text='Count',  # y-axis label
    bargroupgap=0.1  # gap between bars of the same location coordinates
)

# Add a histogram for the random portfolio payoffs
hist.add_trace(go.Histogram(x=rr_0_05, xbins=dict(
    start=min(rr_0_05) // size * size,  # type: ignore
    end=max(rr_0_05) // size * size + size,  # type: ignore
    size=size
), name='Require Return is %.2f' % require_return[1]))

# Add a histogram for the random portfolio payoffs
hist.add_trace(go.Histogram(x=rr_0_1, xbins=dict(
    start=min(rr_0_1) // size * size,  # type: ignore
    end=max(rr_0_1) // size * size + size,  # type: ignore
    size=size
), name='Require Return is %.2f' % require_return[2]))


# The two histograms are drawn on top of one another
hist.update_layout(barmode='overlay')
hist.update_traces(opacity=0.5)

# Show the overlaid histogram
hist.show()

# Line chart

In [None]:
W = 100000
rr_0_01 = np.exp(rr_0_01)
rr_0_05 = np.exp(rr_0_05)
rr_0_1 = np.exp(rr_0_1)

W_invest_0_01 = [W]+[W*np.prod(rr_0_01[0:i]) for i in range(1,len(rr_0_01)+1)]
W_invest_0_05 = [W]+[W*np.prod(rr_0_05[0:i]) for i in range(1,len(rr_0_05)+1)]
W_invest_0_1 = [W]+[W*np.prod(rr_0_1[0:i]) for i in range(1,len(rr_0_1)+1)]

fig = go.Figure(data=go.Scatter(x=list(day_trading[require_return[0]].values()), y=W_invest_0_01,name='Require Return is %.2f' % require_return[0]))
fig.add_trace(go.Scatter(x=list(day_trading[require_return[1]].values()), y=W_invest_0_05, name='Require Return is %.2f' % require_return[1]))
fig.add_trace(go.Scatter(x=list(day_trading[require_return[2]].values()), y=W_invest_0_1, name='Require Return is %.2f' % require_return[2]))

fig.update_layout(
    title_text='Return of each require return', # title of plot
    xaxis_title_text='Day trade', # xaxis label
    yaxis_title_text='Wealth', # yaxis label
)


fig.show()

In [None]:
# @title
Ticker_names = ['^GSPC']
interval = '1d'
index = [i.date() for i in list(day_trading[require_return[0]].values())]
# Call the pull_data function to retrieve historical close prices for the specified tickers and date range
SP_500_dataframe = pull_data(Ticker_names, startDate, endDate, interval)

# Display the resulting dataframe containing historical close prices
SP_500_dataframe = SP_500_dataframe.loc[index]

In [None]:
# @title
log_return_sp_500 = Log_return(SP_500_dataframe)
log_return_sp_500 = np.exp(log_return_sp_500)
W_invest_SP = [W*np.prod(np.array(log_return_sp_500['^GSPC'].tolist())[0:i]) for i in range(1,len(np.array(log_return_sp_500['^GSPC'].tolist()))+1)]
fig = go.Figure(data=go.Scatter(x=index, y=W_invest_SP,name='S&P 500'))
fig.update_layout(
    title_text='Return of S&P 500', # title of plot
    xaxis_title_text='Day trade', # xaxis label
    yaxis_title_text='Wealth', # yaxis label
)
fig.show()