# Portfolio Optimization Methods

## HRP

In [None]:
import numpy as np
import pandas as pd
from scipy.cluster.hierarchy import linkage, dendrogram
from scipy.spatial.distance import squareform
import matplotlib.pyplot as plt

class HRP:
    def __init__(self, returns, clustering_method='single', risk_measure='variance'):
        self.returns = returns
        self.clustering_method = clustering_method
        self.risk_measure = risk_measure
        self.weights = None

    def _get_cluster_var(self, cov, cluster):
        """
        Compute cluster variance
        """
        cov_slice = cov.loc[cluster, cluster]
        return cov_slice.values.sum()

    def _get_quasi_diag(self, link):
        # Sort clustered items by distance
        num_items = link.shape[0] + 1
        sort_ix = pd.Series([link[-1, 0], link[-1, 1]], dtype=np.float64)
        sort_ix.index = [0, 1]
        while sort_ix.max() >= num_items:
            sort_ix.index = range(sort_ix.shape[0])
            df0 = sort_ix[sort_ix >= num_items]
            i = df0.index
            j = df0.values - num_items
            sort_ix[i] = link[j.astype(int), 0]
            df0 = pd.Series(link[j.astype(int), 1], index=i + 1, dtype=np.float64)
            sort_ix = pd.concat([sort_ix, df0])
            sort_ix = sort_ix.sort_index()
            sort_ix.index = range(sort_ix.shape[0])
        return sort_ix.astype(int).values

    def fit(self):
        if self.risk_measure == 'variance':
            risk_matrix = self.returns.cov()
            # Ensure the covariance matrix is symmetric
            risk_matrix = (risk_matrix + risk_matrix.T) / 2
        elif self.risk_measure == 'mad':
            risk_matrix = self.returns.mad().to_frame().dot(self.returns.mad().to_frame().T)
        else:
            raise ValueError("Unsupported risk measure")

        # Check if risk_matrix is positive semi-definite
        eigenvalues = np.linalg.eigvals(risk_matrix)
        if not np.all(eigenvalues >= -1e-8):  # allow for small numerical errors
            raise ValueError("Risk matrix is not positive semi-definite")

        corr = self.returns.corr()
        dist = np.sqrt(np.clip((1 - corr) / 2., 0, 1))  # Ensure distance is non-negative
        link = linkage(squareform(dist), method=self.clustering_method)
        sorted_index = self._get_quasi_diag(link)

        sorted_columns = self.returns.columns[sorted_index]
        risk_matrix = risk_matrix.loc[sorted_columns, sorted_columns]
        
        weights = pd.Series(1.0, index=sorted_columns, dtype=np.float64)
        clusters = [sorted_columns]

        while len(clusters) > 0:
            clusters = [cluster[start:end] for cluster in clusters
                        for start, end in ((0, len(cluster) // 2),
                                           (len(cluster) // 2, len(cluster)))
                        if len(cluster) > 1]
            for i in range(0, len(clusters), 2):
                if i + 1 >= len(clusters):
                    break
                cluster0 = clusters[i]
                cluster1 = clusters[i + 1]
                
                cluster0_var = self._get_cluster_var(risk_matrix, cluster0)
                cluster1_var = self._get_cluster_var(risk_matrix, cluster1)
                alpha = cluster1_var / (cluster0_var + cluster1_var)
                
                weights.loc[cluster0] *= alpha
                weights.loc[cluster1] *= 1 - alpha

        self.weights = weights / weights.sum()
        return self.weights

    def plot_dendrogram(self):
        corr = self.returns.corr()
        dist = np.sqrt((1 - corr) / 2.)
        link = linkage(squareform(dist), method=self.clustering_method)
        plt.figure(figsize=(10, 7))
        dendrogram(link, labels=self.returns.columns)
        plt.title('Hierarchical Clustering Dendrogram')
        plt.xlabel('Asset')
        plt.ylabel('Distance')
        plt.show()

    def get_weights(self):
        if self.weights is None:
            raise ValueError("You need to run the fit() method first")
        return self.weights


In [None]:
import yfinance as yf
def fetch_stock_data(tickers, start_date, end_date):
    data = yf.download(tickers, start=start_date, end=end_date)['Adj Close']
    return data

def calculate_returns(prices):
    return prices.pct_change().dropna()


tickers = ['AAPL', 'MSFT', 'AMZN', 'GOOGL', 'TSLA', 'NVDA', 'JPM', 'JNJ', 'V']

start_date = '2020-01-01'
end_date = '2023-12-31'
prices = fetch_stock_data(tickers, start_date, end_date)

returns = calculate_returns(prices)
hrp = HRP(returns, clustering_method='ward', risk_measure='variance')
hrp.plot_dendrogram()
hrp.fit()

## HERC

In [None]:
import numpy as np
import pandas as pd
from scipy.cluster.hierarchy import linkage, fcluster
from scipy.spatial.distance import pdist
from scipy.optimize import minimize

class HERC:
    def __init__(self, returns):
        """
        Initialize the HERC class with returns data.
        
        Parameters:
        returns (pd.DataFrame): Asset returns data.
        """
        self.returns = returns
        self.cov_matrix = returns.cov()
        self.cor_matrix = returns.corr()
    
    def get_linkage_matrix(self):
        """
        Get the linkage matrix using the correlation matrix.
        
        Returns:
        Z (ndarray): Linkage matrix.
        """
        corr_dist = np.sqrt((1 - self.cor_matrix) / 2)
        dist_matrix = pdist(corr_dist)
        Z = linkage(dist_matrix, 'ward')
        return Z
    
    def get_clusters(self, Z, max_clusters=None):
        """
        Get clusters from the linkage matrix.
        
        Parameters:
        Z (ndarray): Linkage matrix.
        max_clusters (int, optional): Maximum number of clusters. If None, a sensible value is chosen.
        
        Returns:
        clusters (list): List of clusters.
        """
        if max_clusters is None:
            max_clusters = int(np.ceil(np.log(len(self.cor_matrix))))
        clusters = fcluster(Z, max_clusters, criterion='maxclust')
        return clusters
    
    def calculate_cluster_var(self, cluster):
        """
        Calculate the variance of a cluster.
        
        Parameters:
        cluster (list): List of asset indices in the cluster.
        
        Returns:
        float: Cluster variance.
        """
        cluster_cov = self.cov_matrix.iloc[cluster, cluster]
        cluster_weights = np.ones(len(cluster)) / len(cluster)
        cluster_var = cluster_weights.T @ cluster_cov @ cluster_weights
        return cluster_var
    
    def get_cluster_var_contrib(self, weights, clusters):
        """
        Get the variance contribution of each cluster.
        
        Parameters:
        weights (ndarray): Portfolio weights.
        clusters (list): List of clusters.
        
        Returns:
        cluster_var_contrib (ndarray): Cluster variance contributions.
        """
        cluster_var_contrib = np.zeros(np.max(clusters))
        for i in range(1, np.max(clusters) + 1):
            cluster_indices = np.where(clusters == i)[0]
            cluster_weights = weights[cluster_indices]
            cluster_cov = self.cov_matrix.iloc[cluster_indices, cluster_indices]
            cluster_var = cluster_weights.T @ cluster_cov @ cluster_weights
            cluster_var_contrib[i-1] = cluster_var
        return cluster_var_contrib
    
    def objective(self, weights, clusters):
        """
        Objective function to minimize.
        
        Parameters:
        weights (ndarray): Portfolio weights.
        clusters (list): List of clusters.
        
        Returns:
        float: Objective value.
        """
        cluster_var_contrib = self.get_cluster_var_contrib(weights, clusters)
        total_var = weights.T @ self.cov_matrix.values @ weights
        equal_contrib = np.ones(len(cluster_var_contrib)) / len(cluster_var_contrib)
        return np.sum((cluster_var_contrib / total_var - equal_contrib) ** 2)
    
    def get_weights(self):
        """
        Get the portfolio weights using the HERC method.
        
        Returns:
        weights (pd.Series): Portfolio weights.
        """
        Z = self.get_linkage_matrix()
        clusters = self.get_clusters(Z)
        
        bounds = [(0, 1) for _ in range(len(self.returns.columns))]
        constraints = [{'type': 'eq', 'fun': lambda weights: np.sum(weights) - 1}]
        initial_weights = np.ones(len(self.returns.columns)) / len(self.returns.columns)
        
        result = minimize(self.objective, initial_weights, args=(clusters,), bounds=bounds, constraints=constraints)
        
        if result.success:
            return pd.Series(result.x, index=self.returns.columns)
        else:
            raise ValueError("Optimization failed")


In [None]:
import yfinance as yf
# Define the tickers and the period for which we want to download the data
tickers = ['AAPL', 'MSFT', 'AMZN', 'GOOGL', 'TSLA', 'NVDA', 'JPM', 'JNJ', 'V']
period = '1y'

# Download the historical data from Yahoo Finance
data = yf.download(tickers, period=period)['Adj Close']

# Calculate the daily returns
returns = data.pct_change().dropna()

# Initialize the HERC optimizer with the returns data
herc = HERC(returns)

# Get the optimized weights
weights = herc.get_weights()

print(weights)

## Black-Litterman

In [None]:
import numpy as np
import pandas as pd
from cvxopt import matrix, solvers

class BlackLitterman:
    def __init__(self, market_weights, cov_matrix, risk_free_rate, tau, P=None, Q=None, omega=None):
        """
        Initializes the Black-Litterman model.
        
        :param market_weights: np.array, Market capitalization weights of the assets.
        :param cov_matrix: np.array, Covariance matrix of the asset returns.
        :param risk_free_rate: float, Risk-free rate.
        :param tau: float, Scalar for the uncertainty of the prior.
        :param P: np.array, Pick matrix representing the assets involved in the views.
        :param Q: np.array, Expected returns vector corresponding to the views.
        :param omega: np.array, Diagonal matrix representing the uncertainty of the views.
        """
        self.market_weights = market_weights
        self.cov_matrix = cov_matrix
        self.risk_free_rate = risk_free_rate
        self.tau = tau
        self.P = P
        self.Q = Q
        self.omega = omega
    
    def implied_equilibrium_returns(self):
        """
        Compute the implied equilibrium returns (pi).
        
        :return: np.array, Implied equilibrium returns.
        """
        pi = self.tau * np.dot(self.cov_matrix, self.market_weights)
        return pi
    
    def adjusted_returns(self):
        """
        Compute the adjusted returns incorporating the investor's views.
        
        :return: np.array, Adjusted returns.
        """
        pi = self.implied_equilibrium_returns()
        
        if self.P is not None and self.Q is not None and self.omega is not None:
            M_inverse = np.linalg.inv(np.dot(self.P.T, np.dot(np.linalg.inv(self.omega), self.P)) + 
                                      np.linalg.inv(self.tau * self.cov_matrix))
            adjusted_mean = M_inverse.dot(np.dot(np.linalg.inv(self.tau * self.cov_matrix), pi) + 
                                          np.dot(self.P.T, np.dot(np.linalg.inv(self.omega), self.Q)))
        else:
            adjusted_mean = pi
            
        return adjusted_mean
    
    def min_variance_allocation(self):
        """
        Compute the minimum variance allocation.
        
        :return: np.array, Portfolio weights for the minimum variance portfolio.
        """
        n = len(self.cov_matrix)
        
        # Convert to cvxopt matrices
        P = matrix(self.cov_matrix)
        q = matrix(np.zeros(n))
        
        # Constraints Gx <= h
        G = matrix(np.diag([-1.0]*n))
        h = matrix(np.zeros(n))
        
        # Constraints Ax = b
        A = matrix(1.0, (1, n))
        b = matrix(1.0)
        
        sol = solvers.qp(P, q, G, h, A, b)
        return np.array(sol['x']).flatten()



In [None]:
import yfinance as yf
tickers = ['AAPL', 'MSFT', 'AMZN', 'GOOGL', 'TSLA', 'NVDA', 'JPM', 'JNJ', 'V']
period = '1y'

# Download the historical data from Yahoo Finance
data = yf.download(tickers, period=period)['Adj Close']

# Calculate the daily returns
returns = data.pct_change().dropna()

# Calculate the covariance matrix of returns
cov_matrix = returns.cov().values

# Example market weights (could be based on market capitalization)
market_weights = np.array([0.15, 0.15, 0.15, 0.1, 0.1, 0.1, 0.1, 0.1, 0.05])

# Arbitrary investor views
P = np.array([[1, -1, 0, 0, 0, 0, 0, 0, 0], 
              [0, 0, 1, -1, 0, 0, 0, 0, 0]])
Q = np.array([0.01, 0.02])
omega = np.diag([0.0001, 0.0001])

# Initialize the Black-Litterman model
tau = 0.05
bl = BlackLitterman(market_weights, cov_matrix, tau, P, Q, omega)

# Calculate the implied and adjusted returns
implied_returns = bl.implied_equilibrium_returns()
adjusted_returns = bl.adjusted_returns()

# Find the minimum variance portfolio
min_variance_weights = bl.min_variance_allocation()

print("Implied Equilibrium Returns:", implied_returns)
print("Adjusted Returns:", adjusted_returns)
print("Minimum Variance Portfolio Weights:", min_variance_weights)

## Markowitz

In [None]:
import numpy as np
import pandas as pd
from cvxopt import matrix, solvers

class Markowitz:
    def __init__(self, returns):
        """
        Initializes the Markowitz model with the returns data.
        
        :param returns: pd.DataFrame, Historical returns data for the assets.
        """
        self.returns = returns
        self.cov_matrix = returns.cov().values
        self.n_assets = len(returns.columns)
    
    def min_variance_allocation(self):
        """
        Compute the minimum variance allocation.
        
        :return: np.array, Portfolio weights for the minimum variance portfolio.
        """
        n = self.n_assets
        
        # Convert to cvxopt matrices
        P = matrix(self.cov_matrix)
        q = matrix(np.zeros(n))
        
        # Constraints Gx <= h
        G = matrix(np.diag([-1.0]*n))
        h = matrix(np.zeros(n))
        
        # Constraints Ax = b
        A = matrix(1.0, (1, n))
        b = matrix(1.0)
        
        sol = solvers.qp(P, q, G, h, A, b)
        return np.array(sol['x']).flatten()

In [None]:
import yfinance as yf
tickers = ['AAPL', 'MSFT', 'AMZN', 'GOOGL', 'TSLA', 'NVDA', 'JPM', 'JNJ', 'V']
period = '1y'

# Download the historical data from Yahoo Finance
data = yf.download(tickers, period=period)['Adj Close']

# Calculate the daily returns
returns = data.pct_change().dropna()

# Initialize the Markowitz model with the returns data
markowitz = Markowitz(returns)

# Get the minimum variance portfolio weights
min_variance_weights = markowitz.min_variance_allocation()

print("Minimum Variance Portfolio Weights:", min_variance_weights)

## Copula-Based Approach

In [None]:
import numpy as np
import pandas as pd
from scipy.optimize import minimize
from copulas.multivariate import VineCopula
from copulas.visualization import scatter_2d

class CopulaOptimizer:
    def __init__(self, returns, n_simulations=100):
        self.returns = returns
        self.n_simulations = n_simulations
        self.n_assets = returns.shape[1]
        self.copula = VineCopula('center')

    def fit_copula(self):
        self.copula.fit(self.returns)
    
    def simulate_returns(self):
        simulated_data = self.copula.sample(self.n_simulations)
        simulated_returns = pd.DataFrame(simulated_data, columns=self.returns.columns)
        return simulated_returns
    
    def portfolio_return(self, weights, returns):
        return np.dot(weights, returns.mean())
    
    def portfolio_volatility(self, weights, returns):
        return np.sqrt(np.dot(weights.T, np.dot(returns.cov(), weights)))
    
    def negative_sharpe_ratio(self, weights, returns, risk_free_rate=0):
        p_return = self.portfolio_return(weights, returns)
        p_volatility = self.portfolio_volatility(weights, returns)
        return -(p_return - risk_free_rate) / p_volatility
    
    def optimize_portfolio(self, risk_free_rate=0):
        constraints = ({'type': 'eq', 'fun': lambda x: np.sum(x) - 1})
        bounds = tuple((0, 1) for _ in range(self.n_assets))
        initial_guess = self.n_assets * [1. / self.n_assets]
        
        simulated_returns = self.simulate_returns()
        
        result = minimize(
            self.negative_sharpe_ratio,
            initial_guess,
            args=(simulated_returns, risk_free_rate),
            method='SLSQP',
            bounds=bounds,
            constraints=constraints
        )
        
        return result.x

In [None]:
import yfinance as yf
# Define the tickers and the period for which we want to download the data
tickers = ['AAPL', 'MSFT', 'AMZN', 'GOOGL', 'TSLA', 'NVDA', 'JPM', 'JNJ', 'V']
period = '1y'

# Download the historical data from Yahoo Finance
data = yf.download(tickers, period=period)['Adj Close']

# Calculate the daily returns
returns = data.pct_change().dropna()

optimizer = CopulaOptimizer(returns)
optimizer.fit_copula()
optimized_weights = optimizer.optimize_portfolio()
print("Optimized Portfolio Weights:", optimized_weights)

## MST

In [16]:
import numpy as np
import pandas as pd
from scipy.spatial.distance import squareform
from scipy.spatial.distance import pdist
import networkx as nx
import matplotlib.pyplot as plt

class MinimumSpanningTree:
    def __init__(self, returns: pd.DataFrame):
        """
        Initialize with stock returns.
        
        Parameters:
        returns (pd.DataFrame): DataFrame where each column represents a stock and each row represents a return for a time period.
        """
        self.returns = returns
        self.distance_matrix = None
        self.mst = None

    def calculate_distance_matrix(self):
        """
        Calculate the distance matrix using Pearson correlation.
        """
        correlation_matrix = self.returns.corr()
        distance_matrix = np.sqrt(2 * (1 - correlation_matrix))
        self.distance_matrix = pd.DataFrame(distance_matrix, index=correlation_matrix.index, columns=correlation_matrix.columns)

    def compute_mst(self):
        """
        Compute the Minimum Spanning Tree (MST) using Kruskal's algorithm.
        """
        if self.distance_matrix is None:
            self.calculate_distance_matrix()

        # Create a graph from the distance matrix
        dist_array = pdist(self.distance_matrix)
        graph = nx.Graph()

        stocks = self.distance_matrix.columns
        for i in range(len(stocks)):
            for j in range(i + 1, len(stocks)):
                graph.add_edge(stocks[i], stocks[j], weight=squareform(dist_array)[i, j])

        # Compute MST using Kruskal's algorithm
        self.mst = nx.minimum_spanning_tree(graph)

    def plot_mst(self):
        """
        Plot the Minimum Spanning Tree.
        """
        if self.mst is None:
            self.compute_mst()

        pos = nx.spring_layout(self.mst)
        plt.figure(figsize=(10, 10))
        nx.draw(self.mst, pos, with_labels=True, node_size=3000, node_color='skyblue', font_size=15, font_weight='bold')
        edge_labels = nx.get_edge_attributes(self.mst, 'weight')
        nx.draw_networkx_edge_labels(self.mst, pos, edge_labels=edge_labels, font_size=10)
        plt.title('Minimum Spanning Tree of Stock Returns')
        plt.show()


In [None]:
import yfinance as yf
# Define the tickers and the period for which we want to download the data
tickers = ['AAPL', 'MSFT', 'AMZN', 'GOOGL', 'TSLA', 'NVDA', 'JPM', 'JNJ', 'V']
period = '1y'

# Download the historical data from Yahoo Finance
data = yf.download(tickers, period=period)['Adj Close']

# Calculate the daily returns
returns = data.pct_change().dropna()
mst_calculator = MinimumSpanningTree(returns)
mst_calculator.plot_mst()