In [1]:
import pandas_datareader.data as reader
from cvxopt import matrix, solvers
import pandas as pd
import numpy as np
from pandas.tseries.offsets import BusinessDay as Bday
import datetime as dt
import yfinance as yf
import statsmodels.api as sm

In [2]:
class dataprocess():
    """
    This class is used to process data and get the portfolio for each week
    data: data used for backtest
    data_f: data of factors used for backtest
    seed: date from which data is taken
    rdays: number of days used for estimating return
    cdays: number of days used for estimating beta and covariance
    beta: target beta
    wp: composition of a reference Portfolio
    lamda: small regularization parameter
    """
    def __init__(self,data,data_f, time_seed,rdays,cdays,beta,wp, lamda):
        self.data = data
        self.data_f = data_f
        self.seed = time_seed
        self.rdays = rdays
        self.cdays = cdays
        self.betaT = beta
        self.wp = wp
        self.lamda = lamda 
            
    #this function return a matrix of returns
    def Return(self):
        start = self.seed - self.rdays*Bday()
        end = self.seed - Bday()
        
        R = self.data.loc[start:end].pct_change()
        
        mu = matrix(R.mean())
        return mu
    
    #this function return a list if form of list[cov_mat, beta_mat]
    def Cov(self):
        start = self.seed - self.cdays*Bday()
        end = self.seed - Bday()
        
        R = self.data.loc[start:end].pct_change()
        factors = self.data_f.loc[start:end]
        merge = pd.merge(R,factors[1:],on='Date')
        
        tickers = ['DBA', 'EPP', 'EWJ', 'FEZ', 'FXE', 'GLD', 'ILF', 'QQQ','SHV', 'SPY','USO', 'XBI']
        for  i in tickers:
            merge[i+'-RF']=merge[i]-merge.RF
            
        para = pd.DataFrame(index=['const','Mkt-RF','SMB','HML','red std'])
        for j in merge.iloc[:,16:]:
            y = merge[j]
            x = merge[['Mkt-RF','SMB','HML']]
            x_new = sm.add_constant(x)
            SM_model = SM_model = sm.OLS(y, x_new).fit()
            coef = SM_model.params
            resid = SM_model.resid.std()
            para[j]=[coef['const'],coef['Mkt-RF'],coef['SMB'],coef['HML'],resid]
            
        var_f = matrix(pd.DataFrame(merge[['Mkt-RF','SMB','HML']].values).cov().values)
        B=matrix(para.loc[['Mkt-RF','SMB','HML']].values).T
        D = matrix(np.diag(para.iloc[4]))**2
    
        cov = B*var_f*B.T+D
        
        factors['Mkt'] = factors['Mkt-RF']+factors.RF
        
        merge1 = pd.merge(R,factors['Mkt'][1:],on='Date')
        beta = np.zeros(len(tickers))
        for i in range(len(tickers)):
            beta[i]=merge1.cov()['Mkt'][i]/merge1.cov()['Mkt'][len(tickers)]
        return cov, matrix(beta)
    
    #this function return the optimized portfolio
    def port(self):
        mu = self.Return()
        temp = self.Cov()
        cov = temp[0]*2*self.lamda
        b = temp[1]
        
        n = len(mu)
        
        G = matrix(np.concatenate((-np.eye(n,n),np.eye(n,n))))
        h = matrix(np.ones(2*n)*2)
        A = matrix(np.c_[np.ones(n),b]).T
        b = matrix(np.c_[np.ones(1),self.betaT]).T
        q = mu+2*self.lamda*temp[0]*self.wp
        
        sol = solvers.qp(cov,-q,G=G,h=h,A=A,b=b)
        return sol['x']

In [2]:
#this function return pandas.DataFrame of portfolio for each week throughout the period
def port_period(rdays,cdays,beta,lamda): 
    """
    rdays: number of days used to estimate return
    cdays: number of days used to estimate cov and beta
    beta: expected beta
    lamda
    """
    #set dates and investment universe
    tickers = ['DBA', 'EPP', 'EWJ', 'FEZ', 'FXE', 'GLD', 'ILF', 'QQQ','SHV', 'SPY','USO', 'XBI']
    start = dt.date(2007,7,2)
    end = dt.date(2021,10,31)
    backtest_start = start-120*Bday()
    backtest_end = end-Bday()
    
    #download data
    data = yf.download(tickers,start,end)['Adj Close']
    backtest_data = yf.download(tickers,backtest_start,end=end)['Adj Close']
    factors_data  = reader.DataReader('F-F_Research_Data_Factors_daily','famafrench',backtest_start,backtest_end)[0]/100
    
    p = pd.DataFrame(index=tickers)
    #reference portfolio
    wp = matrix(np.ones(len(tickers)))*1/len(tickers)
    
    for i in data.index[::5]:
        p[i]=dataprocess(backtest_data,factors_data,i,rdays,cdays,beta,wp,lamda).port()
        wp = matrix(p[i].values)
    
    return p