In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import yfinance as yf
from scipy import stats
from scipy.optimize import minimize
# from pypfopt import efficient_frontier, plotting, objective_functions
# from sklearn import linear_model
# import statsmodels.api as sm
from typing import Union

In [None]:
def formatFloat(n): return "{:.6f}".format(n) # format to 6 decimal points

In [None]:
class Portfolio:
  """Portfolio Class
  
  Params:
   - tickers: the list of stocks to generate the portfolio
   - market: ticker of the market reference i.e. S&P500 as '^GSPC'
   - start: start date to retrieve historical data
   - end: end date to retrieve historical data
   - bounds: constraint on the weight of each stock where 1 == 100% and negative means shorting
  """
  def __init__(self, tickers: list = [], market: str = '^GSPC', start: str = "2014-01-01", end: Union[str, None] = None, bounds: Union[tuple, list, None] = None) -> None:
    self.tickers = tickers
    self.market = market
    self.start = start
    self.end = end
    self.bounds = bounds
    
  def retrieve_data(self):
    self.retrieve_stock_data()
    self.retrieve_market_data()
  
  def retrieve_stock_data(self):
    self.stock_data = yf.download(self.tickers, start = self.start, end = self.end, auto_adjust=True)
    self.stock_data.dropna(axis=1, inplace=True)
    self.tickers = self.stock_data['Close'].columns.tolist()
    returns_data = self.stock_data['Close'].pct_change()
    returns_data.columns = pd.MultiIndex.from_product([['Return'], returns_data.columns])
    self.stock_data = self.stock_data.join(returns_data)
  
  def retrieve_market_data(self):
    self.market_data = yf.download(self.market, start = self.start, end = self.end, auto_adjust=True)
    self.market_data.dropna(inplace=True)
    self.market_data['Return'] = self.market_data['Close'].pct_change()
  
  def calc_stats(self):
    self.calc_expected_returns()
    self.calc_risk_free_rate()
    self.calc_covariance_matrix()
    self.calc_market_stats()
  
  def calc_expected_returns(self):
    self.expected_returns = [np.mean(self.stock_data['Return'][ticker.upper()]) for ticker in self.tickers]

  def calc_risk_free_rate(self):
    self.risk_free_rate = np.mean(self.market_data['Return'])
  
  def calc_covariance_matrix(self):
    stock_cov_data = [self.stock_data['Return'][ticker.upper()][1:] for ticker in self.tickers]
    self.cov_matrix = np.cov(stock_cov_data, bias=True)
    
  def calc_market_stats(self):
    self.market_stats = {
      "Return": self.risk_free_rate,
      "Volatility": np.std(self.market_data['Return']),
    }
  
  def pf_variance(self, weights):
    """Calculate portfolio standard deviation give stock weights"""
    return np.dot(weights.T, np.dot(self.cov_matrix, weights))
  
  def pf_volatility(self, weights):
    """Calculate portfolio standard deviation give stock weights"""
    return np.sqrt(self.pf_variance(weights))
  
  def pf_return(self, weights):
    """Calculate portfolio returns give stock weights"""
    return np.dot(weights.T, self.expected_returns)
  
  def pf_return_negate(self, weights):
    """Calculate portfolio returns give stock weights"""
    return -np.dot(weights.T, self.expected_returns)
  
  def pf_sharpe(self, weights, risk_free_rate):
      return -(self.pf_return(weights) - risk_free_rate) / self.pf_volatility(weights)
    
  def pf_utility(self, weights, risk_aversion):
      return -(self.pf_return(weights) - (1 / 2) * risk_aversion * self.pf_volatility(weights))
  
  def minimum_variance(self, expected_return=None):
    init_guess = np.ones(len(self.expected_returns)) / len(self.expected_returns)

    constraints = [{"type": "eq", "fun": lambda x: np.sum(x) - 1}]
    if not (expected_return is None):
      constraints.append({"type": "eq", "fun": lambda x: self.pf_return(x) - expected_return})

    result = minimize(fun=self.pf_variance,
                      x0=init_guess,
                      args=(),
                      method="SLSQP",
                      constraints=constraints,
                      bounds=self.bounds)

    self.stats = {
      "Weights": result.x,
      "Returns": np.sum(self.expected_returns * result.x),
      "Volatility": np.sqrt(result.fun)
    }
    
  def maximum_return(self, expected_volatility):
    init_guess = np.ones(len(self.expected_returns)) / len(self.expected_returns)

    constraints = [{"type": "eq", "fun": lambda x: np.sum(x) - 1},
                   {"type": "eq", "fun": lambda x: self.pf_volatility(x) - expected_volatility}]

    result = minimize(fun=self.pf_return_negate,
                      x0=init_guess,
                      args=(),
                      method="SLSQP",
                      constraints=constraints,
                      bounds=self.bounds)

    self.stats = {
      "Weights": result.x,
      "Returns": -result.fun,
      "Volatility": self.pf_volatility(result.x)
    }
  
  def maximum_sharpe(self):
    init_guess = np.ones(len(self.expected_returns)) / len(self.expected_returns)

    constraints = [{"type": "eq", "fun": lambda x: np.sum(x) - 1}]

    result = minimize(fun=self.pf_sharpe,
                      x0=init_guess,
                      args=(self.risk_free_rate),
                      method="SLSQP",
                      constraints=constraints,
                      bounds=self.bounds)

    self.stats = {
      "Weights": result.x,
      "Returns": np.sum(self.expected_returns * result.x),
      "Volatility": self.pf_volatility(result.x),
      "Sharpe": -result.fun
    }
    
  def maximum_utility(self, risk_aversion=10):
    init_guess = np.ones(len(self.expected_returns)) / len(self.expected_returns)    

    constraints = [{"type": "eq", "fun": lambda x: np.sum(x) - 1}]

    result = minimize(fun=self.pf_utility,
                      x0=init_guess,
                      args=(risk_aversion),
                      method="SLSQP",
                      constraints=constraints,
                      bounds=self.bounds)

    self.stats = {
      "Weights": result.x,
      "Returns": np.sum(self.expected_returns * result.x),
      "Volatility": self.pf_volatility(result.x),
      "Utility": -result.fun
    }
    
  def plot(self):
    market_trace = [1000]
    for ret in self.market_data['Return'][1:]:
      market_trace.append(market_trace[-1] * (1+ret))

    portfolio_trace = self.stock_data["Return"].copy()
    portfolio_trace.iloc[0] = self.stats["Weights"]
    portfolio_trace.iloc[0] *= 1000

    for idx in portfolio_trace.index[1:]:
      portfolio_trace.loc[idx] = portfolio_trace.shift(1).loc[idx] * (1+self.stock_data["Return"].loc[idx])
      
    portfolio_trace['total'] = portfolio_trace.sum(axis=1)
    
    plt.plot(self.market_data.index, market_trace)
    plt.plot(portfolio_trace.index, portfolio_trace.total)
    plt.title('S&P Index over xx-xx')
    plt.xlabel('Dates')
    plt.ylabel('Index Level')
    plt.show()
    
  def plot_MVO(self):
    returns = np.linspace(0, 0.005, 51)
    vol = []
    for ret in returns:
      self.minimum_variance(ret)
      vol.append(self.stats["Volatility"])
    plt.plot(vol, returns)
    plt.title('Mean Variance Frontier')
    plt.xlabel('Volatility')
    plt.ylabel('Return')
    plt.show()

In [None]:
# daw_jones = ['MSFT', 'HON', 'KO', 'IBM', 'AMGN', 'CHV', 'AAPL', 'PG', 'CAT', 'BA', 'JNJ', 'MMM', 'MRK', 'DIS', 'MCD', 'CMB', 'WMT', 'NKE', 'AXP', 'INTC', 'SPC', 'HD', 'CSCO', 'GS', 'UNH']
portfolio = Portfolio(['aapl', 'googl', 'msft', 'nflx', 'tsla'], '^GSPC', "2014-01-01", None, None)
# portfolio = Portfolio(daw_jones, '^GSPC', "2000-01-01", None, None)
portfolio.retrieve_data()
portfolio.calc_stats()

In [None]:
portfolio.plot_MVO()

In [None]:
portfolio.maximum_sharpe()
portfolio.stats

In [None]:
portfolio.market_stats