Markowitz mean-variance portfolio

In [None]:
def ptf_weights(ret, negative=True, type="MVP", target=0, rf=0, gamma=1, wanted_weight="tangency", max_weight = 999):
    """
    This function calculates the weights of the Minimum Variance Portfolio (MVP) or the Global Minimum Variance Portfolio (GMV).

    Parameters:
    ret (DataFrame): A DataFrame containing the returns of the assets.
    negative (bool): A boolean indicating whether short-selling is allowed or not. Default is True.
    type (str): A string indicating the type of portfolio to be calculated. It can be either "MVP" or "GMV". Default is "MVP".
    target (float): A float indicating the target return of the portfolio. Default is 0.
    rf (float): A float indicating the risk-free rate. Default is 0.
    gamma (float): A float indicating the risk aversion parameter. Default is 1.
    wanted_weight (str): A string indicating the weight to be returned. It can be either "risk-free", "tangency", or "mvp". Default is "tangency".

    Returns:
    w (array): An array containing the weights of the assets in the portfolio. 

    """

    # Calculate the covariance matrix
    V = np.array(ret.cov())

    # Calculate the inverse of the covariance matrix
    Vinverse = np.linalg.inv(V)

    # Computing the vector of ones
    ones = np.ones((len(V),1))

    # Calculate the expected returns
    mu = np.array(ret.mean()).reshape(-1,1)

    # if return_weights == 'exponential':
    #     w = np.exp(-np.log(2) / 2 * np.arange(len(ret))[::-1])
    #     w = w / w.sum()
    #     for stock in ret.columns:
    #         mu[stock] = (ret[stock] * w).sum()

    # Adding the risk-free rate
    Rf = rf

    # First for the MVP
    if type == "MVP":

        # Splitting the calculation into two cases, negative=True and negative=False

        # If we allow for short-selling (negative=True)
        if negative:

            # Calculate the weights
            w_mvp = (1/gamma) * Vinverse @ (mu - Rf * ones)

            # Calculate the weight of the risk-free asset
            w_riskfree = 1 - ones.T @ w_mvp

            # Calculate the weight of the tangency portfolio
            w_tangency = w_mvp / (ones.T @ w_mvp)

            # Return the weights
            if wanted_weight == "risk-free":
                w = w_riskfree
            elif wanted_weight == "tangency":
                w = w_tangency
            else:
                w = w_mvp

        # If we do not allow for short-selling (negative=False)
        else:

            # Define the objective function
            def objective(weights):

                # Calculate the portfolio return
                portfolio_return = weights.T @ (mu - Rf * ones)

                # Calculate the portfolio risk
                portfolio_risk = weights.T @ V @ weights

                # Return the objective function
                return (gamma/2) * portfolio_risk - portfolio_return
            
            # Initialize the initial guess (equally weighted portfolio)
            init_guess = np.ones(len(mu)) / len(mu)

            # Define the bounds (0 <= weight <= 999)
            bounds = tuple((0, max_weight) for _ in range(len(ret.columns)))

            # Minimize the objective function
            results = minimize(objective, init_guess, bounds=bounds, method='SLSQP')

            # Extract the optimal weights
            optimal_weights = results.x

            # Calculate the weight of the risk-free asset
            w_riskfree = 1 - ones.T @ optimal_weights

            # Calculate the weight of the tangency portfolio
            w_tangency = optimal_weights / (ones.T @ optimal_weights)  
            
            # Return the weights
            if wanted_weight == "risk-free":
                w = w_riskfree
            elif wanted_weight == "tangency":
                w = w_tangency
            else:
                w = optimal_weights.reshape(-1)
    
    # If we are looking for the GMV
    if type == "GMV":

        # Splitting the calculation into two cases, negative=True and negative=False
        # If we allow for short-selling (negative=True)
        if negative:

            # Calculate the weights
            w = (Vinverse @ ones) / (ones.T @ Vinverse @ ones)

        # If we do not allow for short-selling (negative=False)
        else:

            # Define the objective function
            def objective(weights, V):

                # Calculate the portfolio variance
                portfolio_variance = weights.T @ V @ weights

                # Return the objective function
                return portfolio_variance
            
            # Define the constraints (sum of weights equals 1)
            constraints = ({'type': 'eq', 'fun': lambda x: np.sum(x) - 1})

            # Initialize the initial guess (equally weighted portfolio)
            intial_guess = np.ones(len(V)) / len(V)

            # Define the bounds (0 <= weight <= 999)
            bounds = tuple((0, max_weight) for _ in range(len(V)))

            # Minimize the objective function
            results = minimize(objective, intial_guess, args=(V,), constraints=constraints, bounds=bounds, method='SLSQP')

            # Extract the optimal weights
            w = results.x
    
    # Return the weights as a numpy array
    return w.reshape(-1)

Simpel sharpe ratio

In [None]:
def sharpes_ratio(returns, frequency=12):
    """
    This function calculates the Sharpe's ratio of a given return series.

    Parameters:
    returns (Series): A Series containing the returns of the asset.
    frequency (int): An integer indicating the frequency of the returns. Default is 12.

    Returns:
    sharpe_ratio (float): A float indicating the Sharpe's ratio of the asset.

    """

    # Return the Sharpe's ratio
    return returns.mean() / returns.std() * np.sqrt(frequency)

Stationary Bootstrap

In [None]:
# from numba import njit
# import numpy as np

# @njit
# def stationary_bootstrap(data: np.ndarray, m: float, sample_length: int) -> np.ndarray:
#     """
#     This function performs stationary bootstrap resampling on the input data.

#     Parameters:
#     data (np.ndarray): The input data array to be resampled.
#     m (float): The average block length.
#     sample_length (int): The length of the resampled output.

#     Returns:
#     np.ndarray: The resampled data array.
#     """

#     # Calculate the acceptance probability
#     accept = 1 / m 
#     data_length = data.shape[0]

#     # Initialize the starting index for the sample
#     sample_index = np.random.randint(0, high=data_length, size=1)[0]
#     sample = np.zeros((sample_length,))

#     # Loop through each sample point
#     for i_sample in range(sample_length):
#         # Decide whether to continue the current block or start a new one
#         if np.random.uniform(0, 1, 1) >= accept:
#             sample_index += 1
#             # Wrap around if the index exceeds the data length
#             if sample_index >= data_length:
#                 sample_index = 0        
#         else:
#             # Start a new block with a random index
#             sample_index = np.random.randint(0, high=data_length, size=1)[0]

#         # Assign the selected data point to the sample
#         sample[i_sample] = data[sample_index]

#     return sample

In [1]:
from numba import njit
import numpy as np

@njit
def stationary_bootstrap(data: np.ndarray, m: float, sample_length: int) -> np.ndarray:
    """
    This function performs stationary bootstrap resampling on an Nx2 array of return series.

    Parameters:
    data (np.ndarray): The input data array with shape (N, 2), where N is the number of time points
                       and each column is a return series.
    m (float): The average block length.
    sample_length (int): The length of the resampled output.

    Returns:
    np.ndarray: The resampled data array with shape (sample_length, 2).
    """

    # Calculate the acceptance probability
    accept = 1 / m
    data_length = data.shape[0]

    # Initialize the starting index for the sample
    sample_index = np.random.randint(0, data_length)
    sample = np.zeros((sample_length, 2))

    # Loop through each sample point
    for i_sample in range(sample_length):
        # Decide whether to continue the current block or start a new one
        if np.random.uniform(0, 1) >= accept:
            sample_index += 1
            # Wrap around if the index exceeds the data length
            if sample_index >= data_length:
                sample_index = 0
        else:
            # Start a new block with a random index
            sample_index = np.random.randint(0, data_length)

        # Assign the selected row (both series) to the sample
        sample[i_sample, :] = data[sample_index, :]

    return sample