In [1]:
# TODO: Use this to profile: https://stackoverflow.com/questions/45893768/how-do-i-find-out-what-parts-of-my-code-are-inefficient-in-python
%load_ext line_profiler

In [2]:
import matplotlib.pyplot as plt
import pandas as pd
import holidays
import datetime
import pprint

from datetime import timedelta
from enum import Enum, auto
from typing import List
from pandas.tseries.offsets import BDay

pp = pprint.PrettyPrinter(indent=4)

In [None]:
API_KEY = 'cfg2wsKZrVNuYBJpETAs'
DEVELOPMENT = False

if DEVELOPMENT:
    daily_metrics = pd.read_csv('SHARADAR-DAILY.csv')
    daily_prices = pd.read_csv('SHARADAR-SEP.csv')
else:
    daily_metrics = pd.read_csv('SHARADAR_DAILY_3_9ffd00fad4f19bbdec75c6e670d3df83.csv')
    daily_prices = pd.read_csv('SHARADAR_SEP_2_0bd2000858d1d8d1f48d4cdea5f8c9e2.csv')

In [None]:
d1 = daily_metrics.copy()
d2 = daily_prices[['ticker', 'date','closeadj']]
d2.rename(columns={'closeadj': 'price'}, inplace=True)

# TODO: How much data do we lose via the merge?
daily_data = d1.merge(d2, on=['date', 'ticker'], how='inner')
daily_data

In [None]:
class StockUniverse(Enum):
    SMALL = auto()  # < $1B
    MID = auto()  # $1B - $10B
    LARGE = auto()  # > $100B
    
class EvaluationMetric(Enum):
    EV_EBIT = auto()
    P_E = auto()
    P_B = auto()
    DIV_YIELD = auto()

    def __str__(self):
        if self.value == EvaluationMetric.EV_EBIT.value:
            return 'EV/EBIT'
        elif self.value == EvaluationMetric.P_E.value:
            return 'P/E'
        elif self.value == EvaluationMetric.P_B.value:
            return 'P/B'
        elif self.value == EvaluationMetric.DIV_YIELD.value:
            return '% Div Yield'
        else:
            raise Exception(f'Unsupported evaluation metric {metric}')
    
def get_closest_previous_work_day(
    check_day: datetime.datetime,
    holidays=holidays.US()
) -> datetime.datetime:
    if check_day.weekday() <= 4 and check_day not in holidays:
        return check_day
    offset = max(1, (check_day.weekday() + 6) % 7 - 3)
    most_recent = check_day - datetime.timedelta(offset)
    if most_recent not in holidays:
        return most_recent
    else:
        return previous_working_day(most_recent, holidays)

def get_rebalance_dates(
    start_date: datetime.datetime,
    end_date: datetime.datetime,
    period_length: datetime.timedelta
) -> List[datetime.datetime]:
    curr_date = start_date
    dates = []
    while curr_date < end_date:
        dates.append(get_closest_previous_work_day(curr_date))
        curr_date += period_length
    return dates

# Assumes data is already filtered by date
def filter_stocks_by_universe(
    df: pd.DataFrame,
    stocks_universe: StockUniverse
) -> pd.DataFrame:
    if stocks_universe.value == StockUniverse.SMALL.value:
        return df[df['marketcap'] < 1]
    elif stocks_universe.value == StockUniverse.MID.value:
        return df[(df['marketcap'] >= 1) & (df['marketcap'] <= 10)]
    elif stocks_universe.value == StockUniverse.LARGE.value:
        return df[(df['marketcap'] >= 10)]
    else:
        raise Exception(f'Unsupported stock universe {stocks_universe}')

def sort_df_by_metric(
    df: pd.DataFrame,
    metric: EvaluationMetric
) -> pd.DataFrame:
    if metric.value == EvaluationMetric.EV_EBIT.value:
        return df.sort_values(by='evebit')
    elif metric.value == EvaluationMetric.P_E.value:
        return df.sort_values(by='pe')
    elif metric.value == EvaluationMetric.P_B.value:
        return df.sort_values(by='pb')
    elif metric.value == EvaluationMetric.DIV_YIELD.value:
        raise Exception('EvaluationMetric.DIV_YIELD not yet supported.')
    else:
        raise Exception(f'Unsupported evaluation metric {metric}')

# Assumes df is sorted appropriately ahead of time.
def get_top_n_stocks_by_metric(
    df: pd.DataFrame,
    n: int,
    metric: EvaluationMetric    
) -> List[str]:
    df_res = None
    if metric.value == EvaluationMetric.EV_EBIT.value:
        df_res = df[(df['evebit'] > 0) & (df['ev'] > 0)]
    elif metric.value == EvaluationMetric.P_E.value:
        df_res = df[df['pe'] > 0]
    elif metric.value == EvaluationMetric.P_B.value:
        df_res = df[df['pb'] > 0]
    elif metric.value == EvaluationMetric.DIV_YIELD.value:
        raise Exception('EvaluationMetric.DIV_YIELD not yet supported.')
    else:
        raise Exception(f'Unsupported evaluation metric {metric}')
    
    return list(df_res[:n]['ticker'])

def filter_df_by_date(
    df: pd.DataFrame,
    date: datetime.datetime
) -> pd.DataFrame:
    return df[df.date == date.strftime('%Y-%m-%d')]

def get_stock_basket_price(
    df: pd.DataFrame,
    tickers: List[str]
) -> int:
    stocks_of_interest = df.loc[df['ticker'].isin(tickers)]
    assert len(tickers) >= len(stocks_of_interest)
    if len(stocks_of_interest) != len(tickers):
        print(date, ' Stocks closed: ', set(tickers) - set(stocks_of_interest['ticker']))
    return round(stocks_of_interest['price'].sum(), 2)        

In [None]:
str(EvaluationMetric.EV_EBIT)

In [None]:
# Prepare Inputs for Base + Test

INITIAL_PORTFOLIO_VALUE = 10000
PORTFOLIO_SIZE = 30

base_metric = EvaluationMetric.EV_EBIT
test_metric = EvaluationMetric.P_B
stocks_universe = StockUniverse.LARGE

start_date = datetime.datetime.strptime(min(daily_data['date']), '%Y-%m-%d')
end_date = datetime.datetime.strptime(max(daily_data['date']), '%Y-%m-%d')
rebalance_dates = get_rebalance_dates(start_date, end_date, timedelta(days=90))

# Optimization: Sorting every time would take too long
base_sorted_daily_data = sort_df_by_metric(daily_data, base_metric)
test_sorted_daily_data = sort_df_by_metric(daily_data, test_metric)

In [None]:
# Compute Results for Base + Test

base_portfolio_value = INITIAL_PORTFOLIO_VALUE
test_portfolio_value = INITIAL_PORTFOLIO_VALUE
portfolio_size = PORTFOLIO_SIZE

start_date = rebalance_dates[0]

base_sorted_df = filter_df_by_date(base_sorted_daily_data, start_date)
test_sorted_df = filter_df_by_date(test_sorted_daily_data, start_date)

base_sorted_df = filter_stocks_by_universe(base_sorted_df, stocks_universe)
test_sorted_df = filter_stocks_by_universe(test_sorted_df, stocks_universe)

base_portfolio = get_top_n_stocks_by_metric(base_sorted_df, portfolio_size, base_metric)
test_portfolio = get_top_n_stocks_by_metric(test_sorted_df, portfolio_size, test_metric)

base_price = get_stock_basket_price(base_sorted_df, base_portfolio)
test_price = get_stock_basket_price(test_sorted_df, test_portfolio)    

res = {}
res[start_date] = {
    'base_basket_price': base_price,
    'base_portfolio_value': base_portfolio_value,
    'test_basket_price': test_price,
    'test_portfolio_value': test_portfolio_value,        
}

for date in rebalance_dates:
    print(date)

    prev_base_price = base_price
    prev_test_price = test_price    
    
    base_sorted_df = filter_df_by_date(base_sorted_daily_data, date)
    test_sorted_df = filter_df_by_date(test_sorted_daily_data, date)
    
    base_price = get_stock_basket_price(base_sorted_df, base_portfolio)
    test_price = get_stock_basket_price(test_sorted_df, test_portfolio)
    
    base_change = base_price / prev_base_price
    test_change = test_price / prev_test_price
    
    base_portfolio_value = round(base_portfolio_value * base_change, 2)
    test_portfolio_value = round(test_portfolio_value * test_change, 2)    
    
    res[date] = {
        'base_basket_price': base_price,
        'base_portfolio_value': base_portfolio_value,
        'test_basket_price': test_price,
        'test_portfolio_value': test_portfolio_value,        
    }

    base_sorted_df = filter_stocks_by_universe(base_sorted_df, stocks_universe)
    test_sorted_df = filter_stocks_by_universe(test_sorted_df, stocks_universe)    
    
    base_portfolio = get_top_n_stocks_by_metric(base_sorted_df, portfolio_size, base_metric)
    test_portfolio = get_top_n_stocks_by_metric(test_sorted_df, portfolio_size, test_metric)

    base_price = get_stock_basket_price(base_sorted_df, base_portfolio)
    test_price = get_stock_basket_price(test_sorted_df, test_portfolio)

    
df_res = pd.DataFrame.from_dict(res, orient='index')
df_res

In [None]:
df_to_plot = df_res[['base_portfolio_value', 'test_portfolio_value']]
df_to_plot.plot(title=f'{str(base_metric)} (base) vs {str(test_metric)} (test)')

In [None]:
daily_data[daily_data.ticker == 'AAPL'][['date', 'price']].set_index('date').sort_values(by=['date']).plot()

In [None]:
daily_data[daily_data.ticker == 'AAPL'].set_index('date').sort_values(by=['date'])

In [None]:
# d1 = datetime.datetime.strptime('2011-07-01', '%Y-%m-%d')
# d2 = datetime.datetime.strptime('2011-09-30', '%Y-%m-%d')

# get_stock_basket_price(base_sorted_df, d1, tickers)
# get_stock_basket_price(base_sorted_df, d2, tickers)

# date_data = base_sorted_df[base_sorted_df.date == d1.strftime('%Y-%m-%d')]
# stocks_of_interest = date_data.loc[date_data['ticker'].isin(tickers)]
# stocks_of_interest

# date_data = base_sorted_df[base_sorted_df.date == d2.strftime('%Y-%m-%d')]
# stocks_of_interest = date_data.loc[date_data['ticker'].isin(tickers)]
# stocks_of_interest
df_res