In [2]:
import pandas as pd
from datetime import datetime
import numpy as np

import matplotlib.pyplot as plt
from numpy.linalg import inv

import sys
sys.path.append('/Users/cheng/Google Drive/PhD/Research/Portfolio Selection via TBN/codes/')

from module.data_library import data_library
from module.portfolio_generator import portfolio_generator
from module.vectorized_backtesting import vectorized_backtesting, get_portfolio_performance
from module.portfolio_analyser import get_sharpe_ratio, get_turnover

In [3]:
def get_equal_weighted_portfolio(period_df):
    n_stock = period_df.shape[1]
    portfolio = [1 / n_stock] * n_stock

    return portfolio

# data

In [2]:
class data_library:
    def __init__(self) -> None:
        pass

    def get_stock_returns(
        name: str,
        frequency: str
        ) -> pd.DataFrame:
        '''
        Load local stock returns csv file.

        Args:

        Returns:
        '''

        file_path_dict = {
            ('industry', 'monthly'): \
            '/Users/cheng/Google Drive/PhD/Research/Portfolio Selection via TBN/data/Data/49_Industry_Portfolios_Monthly_Value_Weighted.csv',
            ('industry', 'daily'):\
            '/Users/cheng/Google Drive/PhD/Research/Portfolio Selection via TBN/data/Data/49_Industry_Portfolios_Daily_Value_Weighted.csv'
        }

        date_parser_dict = {
            'monthly': lambda x: datetime.strptime(str(x), "%Y%m"),
            'daily': lambda x: datetime.strptime(str(x), "%Y%m%d")
        }

        file_path = file_path_dict[(name, frequency)]
        data = pd.read_csv(file_path, index_col=0, parse_dates=True, date_parser=date_parser_dict[frequency])
        data = data.dropna(axis=1)

        return data

In [3]:
lib = data_library
data = lib.get_stock_returns('industry', 'monthly')

# backtesting

In [76]:
def vectorized_backtesting(
    stock_return_df: pd.DataFrame, 
    rebalance_option: str, 
    portfolo_strategy_matrix: np.array):

    '''
    
    '''

    
    if rebalance_option == 'month':
        rebalance_option = pd.Grouper(freq="M")
    elif rebalance_option == 'year':
        rebalance_option = pd.Grouper(freq="Y")
    else:
        raise Exception('rebalance option can be either \'month\' or \'year\'')

    stock_ret_rebalance = [group.values for name, group in stock_return_df.groupby(rebalance_option)]
    stock_ret_rebalance = np.array(stock_ret_rebalance, dtype=object)

    n_period = stock_ret_rebalance.shape[0]
    n_stock = stock_ret_rebalance[0].shape[1]

    if n_period != portfolo_strategy_matrix.shape[0]:
        raise Exception('Portfolio matrix period doesn\'t align with stock return period.\
                         Stock return breaks down into {} rebalancing periods.\
                         Portfolio matrix has {} periods.'.format(n_period, portfolo_strategy_matrix.shape[0]))
    if n_stock != portfolo_strategy_matrix.shape[1]:
        raise Exception('portfolio matrix stock num doesn\'t align with stock return dimension')

    portfolio_return = np.concatenate([stock_ret @ portfolio for stock_ret, portfolio in zip(stock_ret_rebalance, portfolo_strategy_matrix)])
    portfolio_return_df = pd.DataFrame(portfolio_return, index=stock_return_df.index, columns=['portfolio_return'])

    return portfolio_return_df


In [7]:
a1 = np.array([0.1]*3)
a2 = np.random.randint(5, size=(2, 3))

In [12]:
a2 @ a1

array([0.8, 0.6])

In [61]:
n_period = 96
n_stock = 40
portfolio = np.array([1/n_stock] * n_stock)
portfolio_matrix = np.array([portfolio for x in range(0, n_period)])

In [75]:
test = vectorized_backtesting(data, 'month', portfolio_matrix)

Exception: portfolio matrix period doesn't align with stock return period

In [74]:
test


array([[0.025, 0.025, 0.025, ..., 0.025, 0.025, 0.025],
       [0.025, 0.025, 0.025, ..., 0.025, 0.025, 0.025],
       [0.025, 0.025, 0.025, ..., 0.025, 0.025, 0.025],
       ...,
       [0.025, 0.025, 0.025, ..., 0.025, 0.025, 0.025],
       [0.025, 0.025, 0.025, ..., 0.025, 0.025, 0.025],
       [0.025, 0.025, 0.025, ..., 0.025, 0.025, 0.025]])

# Portfolio generator

In [67]:
class portfolio_generator:
    def __init__(self) -> None:
        pass
    def get_portfolio_matrix(
        self,
        stock_return_df: pd.DataFrame, 
        rebalance_option: 'str',
        rolling_period: int) -> np.array:
        '''
        Slice stock df for each rolling period. 
        Put each period's df togather to form a list.
        For each rolling period calculate portfolio.
        Return a matrix(np.array) containing portfolio for each period.

        Args:

        Return:
        
        '''
        if rebalance_option == 'month':
            rebalance_option = pd.Grouper(freq="M")
        elif rebalance_option == 'year':
            rebalance_option = pd.Grouper(freq="Y")
        else:
            raise Exception('rebalance option can be either \'month\' or \'year\'')

        n_period = len(stock_return_df.groupby(rebalance_option).groups)
        period_range_idx = np.arange(0, n_period+1)
        period_df_list = [stock_return_df.iloc[period: (period + rolling_period)] for period in period_range_idx[:-rolling_period-1]]
        portfolio_matrix = [self.get_portfolio(period_df) for period_df in period_df_list]
        portfolio_matrix = np.array(portfolio_matrix).T

        return portfolio_matrix
    
    def get_portfolio(self, period_df):
        raise Exception('Please override get_portfolio() function!')

In [68]:
agent = portfolio_generator()

In [71]:
def get_portfolio(period_df):
    n_stock = period_df.shape[1]
    portfolio = [1 / n_stock] * n_stock

    return portfolio

In [72]:
agent.get_portfolio = get_portfolio

In [3]:
# stock_return_df.rolling('D').count()

# Performance analyser

## turnover
$$
\operatorname{Turn}_{t}=\sum_{i=1}^{N}\left|w_{i, t}-\frac{w_{i, t-1} R_{i, t-1}}{\sum_{i=1}^{N} w_{i, t-1} R_{i, t-1}}\right|
$$

Use following example as a test. \
Test 1/N portfolio on 'Book to market(25)' monthly data. \
Rolling period is 120 month.\
Testing period is from '1927-01-01' to '2018-12-01'.

In [42]:
def get_gmv(period_df):
    H = period_df.cov() # not sure do we need 12

    one = np.ones(H.shape[0]) 
    H_inv = inv(H)
    numerator = H_inv @ one
    denominator = one.T @ H_inv @ one

    gmv = numerator / denominator

    return gmv

In [6]:
def get_equal_weighted_portfolio(period_df):
    n_stock = period_df.shape[1]
    portfolio = [1 / n_stock] * n_stock

    return portfolio

In [20]:
def get_turnover(
    rebalance_option: str,
    stock_return_df: pd.DataFrame,
    portfolio_matrix: np.array
    ) -> float:
    '''
    

    '''
    stock_return_df += 1 #convert stock return to gross asset return

    if rebalance_option == 'month':
        rebalance_option = pd.Grouper(freq="M")
    elif rebalance_option == 'year':
        rebalance_option = pd.Grouper(freq="Y")
    else:
        raise Exception('rebalance option can be either \'month\' or \'year\'')

    stock_ret_rebalance = [group.values.tolist()[0] for name, group in stock_return_df.groupby(rebalance_option)] # unsafe expression using [0]
    stock_ret_rebalance = np.array(stock_ret_rebalance[rolling_period:], dtype=object)

    weighted_weights_matrix = portfolio_matrix * stock_ret_rebalance 
    weighted_weights_sum_vector = np.sum(weighted_weights_matrix, axis=1).reshape(-1,1)
    weighted_weights_diff_matrix = np.abs(portfolio_matrix[1:] - (weighted_weights_matrix/weighted_weights_sum_vector)[:-1])
    turnover_vector = np.sum(weighted_weights_diff_matrix, axis=1)
    turnover_avg = np.mean(turnover_vector)
    
    return turnover_avg



In [21]:
get_turnover(rebalance_option='month', stock_return_df=ret, portfolio_matrix=portfolio_matrix)

0.7555461988243944

## net portfolio return
The portfolio returns net of proportional transaction costs is calculated as
$$
p_{t}=\left(1+\tilde{p}_{t}\right)\left(1-c \times \text { Turnover }_{t-1}\right)-1
$$

In [43]:
data_frequncy = 'monthly'
data_name = 'industry'
start='1969-07-01'
end='2018-12-01'
rebalance_frequncy= 'month'
get_portfolio = get_gmv
rolling_period = 120

# data
lib = data_library
ret = lib.get_stock_returns(data_name, data_frequncy, start=start, end=end)
rf = lib.get_risk_free_rates(frequency=data_frequncy)

# portfolio matrix
portfolio_manager = portfolio_generator()
portfolio_manager.get_portfolio = get_portfolio
portfolio_matrix = portfolio_manager.get_portfolio_matrix(
    stock_return_df=ret,
    rebalance_option=rebalance_frequncy,
    rolling_period=rolling_period
)

# portfolio return
portfolio_return = vectorized_backtesting(
                            stock_return_df=ret,
                            rebalance_option=rebalance_frequncy,
                            portfolo_strategy_matrix=portfolio_matrix,
                            rolling_period=rolling_period
                        )

# analysis
sr = get_sharpe_ratio(portfolio_return,rf)
turnover, turnover_vector = get_turnover(
                        stock_return_df=ret,
                        portfolio_matrix=portfolio_matrix,
                        rebalance_option=rebalance_frequncy,
                        rolling_period=rolling_period
)
#performance = pd.DataFrame([sr, turnover], index=['Sharpe ratio', 'Turnover'], columns=[data_name])

In [47]:
def get_portfolio_net_return(
    portfolio_return:pd.DataFrame,
    turnover_vector:np.array,
    c: float
    ) -> pd.DataFrame:

    '''
    Calculate portfolio returns net of proportional transaction costs.
    
    '''

    port_ret_vec = portfolio_return.values.reshape(-1)
    port_net_ret_vec = (1 + port_ret_vec[1:]) * (1 - c * turnover_vector) - 1
    port_net_ret_vec = np.append(port_ret_vec[0], port_net_ret_vec)
    port_net_ret_df = pd.DataFrame(port_net_ret_vec, index=portfolio_return.index, columns=['portfolio net return'])

    return port_net_ret_df


In [49]:
net_ret = get_portfolio_net_return(portfolio_return, turnover_vector, 0.002)
get_sharpe_ratio(net_ret, rf)

0.08241516066341377