In [1]:
# import dependencies

import pandas as pd
import numpy as np
import datetime as dt
from typing import List, Dict, Tuple, Any

# 2.2: The Return Series

In [2]:
# 2.1: calculating the return series in pure python

def calculate_return_series(prices: List[float]) -> List[float]:
    '''Calculates return series as parallel list of returns on prices'''
    return_seres = [None]
    for i in range(1, len(prices)):
        return_series.append((prices[i] / prices[i-1]) - 1)
    return return_series        

In [3]:
# 2.2:  calculating the return series with pandas
def calculate_return_series_pd(series: pd.Series) -> pd.Series:
    '''
    Calculates the return series of a time series.
    The first value will always be NaN
    Output series retains the inex of the input series
    '''
    shifted_series = series.shift(1, axis=0)
    return series / shifted_series -1

In [4]:
# 2.3:  calculating the log return series with pandas
def calculate_log_return_series(series: pd.Series) -> pd.Series:
    '''
    Same as calculate_return_series_pd but with log returns
    '''
    shifted_series = series.shift(1, axis=0)
    return pd.Series(np.log(series / shifted_series))

# 2.3: Performance metrics

In [5]:
# 2.4:  calculating annualized volatility
def get_years_past(series: pd.Series) -> float:
    '''
    Calculate the years past according to the index of the series for use with functions that require annualization
    '''
    start_date = series.index[0]
    end_date = series.index[-1]
    return (end_date - start_date).days / 365.25

def calculate_annualized_volatility(return_series: pd.Series) -> float:
    '''
    Calculates annualized volatility for a date-indexed return series.
    Works for any interval of date-index prices and returns
    '''
    years_past = get_years_past(return_series)
    entries_per_year = return_series.shape[0] / years_past
    return return_series.std() * np.sqrt(entries_per_year)

In [6]:
# loading functions from pypm folder to load EOD data
from pypm import data_io, metrics

# df = data_io.load_eod_data('AWU')

df = pd.read_csv('data/SPY.csv')
df['date'] = pd.to_datetime(df['date'])
df.set_index(df['date'], inplace=True)
df.drop(columns=['date'], inplace=True)

return_series = metrics.calculate_log_return_series(df['close'])
df.head()

Unnamed: 0_level_0,open,close,low,high,volume
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2010-01-04,112.37,113.33,111.51,113.39,118944600
2010-01-05,113.26,113.63,112.85,113.68,111579900
2010-01-06,113.52,113.71,113.43,113.99,116074400
2010-01-07,113.5,114.19,113.18,114.33,131091100
2010-01-08,113.89,114.57,113.66,114.62,126402800


In [7]:
df.info()

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 2516 entries, 2010-01-04 to 2019-12-31
Data columns (total 5 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   open    2516 non-null   float64
 1   close   2516 non-null   float64
 2   low     2516 non-null   float64
 3   high    2516 non-null   float64
 4   volume  2516 non-null   int64  
dtypes: float64(4), int64(1)
memory usage: 117.9 KB


In [9]:
print(metrics.calculate_annualized_volatility(return_series))

0.14768527967136866


## 2.3.2: Sharpe Ratio

In [14]:
# 2.3.2.1:  calculating Compounded Annual Growth Rate - CAGR
def calculate_cagr(series: pd.Series) -> float:
    '''
    Calculate compounded annual growth rate
    '''
    value_factor = series.iloc[-1] / series.iloc[0]
    year_past = get_years_past(series)
    return (value_factor ** (1 / year_past)) -1

In [15]:
# calculating CAGR on SPY
print(metrics.calculate_cagr(df['close']))

0.11016653428843459


In [16]:
# 2.3.2.3:  calculating the sharpe ratio
def calculate_sharpe_ratio(price_series: pd.Series, benchmark_rate: float=0) -> float:
    '''
    Calculate the sharpe ratio given a price series. Defaults to the benchmark_rate of zero
    '''
    cagr = calculate_cagr(price_series)
    return_series = calculate_return_series(price_series)
    volatility = calculate_annualized_volatility(return_series)
    return (cagr - benchmark_rate) / volatility

In [17]:
# calculating sharpe ratio on SPY
print(metrics.calculate_sharpe_ratio(df['close']))

0.7472551217837079


In [18]:
# 2.3.3  Sortino Ratio
def calculate_annualized_downside_deviation(return_series: pd.Series, benchmark_rate: float=0) -> float:
    '''
    Calculates the downside deviation for use in the sortino ratio.
    
    Benchmark rate is assumed to be annualized. It will be adjusted 
    according to the number of periods per year seen in the data
    '''
    # For both de-annualized the benchmark rate and annualized result
    years_past = get_years_past(return_series)
    entries_per_year = return_series.shape[0] / years_past
    
    adjusted_benchmark_rate = ((1 + benchmark_rate) ** (1/entries_per_year)) -1
    
    downside_series = adjusted_benchmark_rate - return_series
    downside_sum_of_squares = (downside_series[downside_series > 0] ** 2).sum()
    denominator = return_series.shape[0] - 1
    downside_deviation = np.sqrt(downside_sum_of_squares / denominator)
    
    return downside_deviation * np.sqrt(entries_per_year)

def calculate_sortino_ratio(price_series: pd.Series, benchmark_rate: float=0) -> float:
    '''
    Calculates the sortino ratio
    '''
    cagr = calculate_cagr(price_series)
    return_series = calculate_return_series(price_series)
    downside_deviation = calculate_annualized_downside_deviation(return_series)
    
    return (cagr - benchmark_rate) / downside_deviation

In [19]:
# calculate sortino ratio for SPY
print(metrics.calculate_sortino_ratio(df['close']))

1.0445291425749788


## 2.3.4  Maximum Drawdown Statistics

In [20]:
from typing import Dict, Any, Callable

In [21]:
# 2.10 computing maximum drawdown

DRAWDOWN_EVALUATORS: Dict[str, Callable] = {
    'dollar': lambda price, peak: peak - price,
    'percent': lambda price, peak: -((price / peak) -1),
    'log': lambda price, peak: np.log(peak) - np.log(price),
}

In [22]:
def calculate_drawdown_series(series: pd.Series, method: str='log') -> pd.Series:
    '''
    Returns the drawdown series
    '''
    assert method in DRAWDOWN_EVALUATORS, \
        f'Method "{method}" must be one of {list(DRAWDOWN_EVALUATORS.keys())}'
    
    evaluator = DRAWDOWN_EVALUATORS[method]
    return evaluator(series, series.cummax())

def calculate_max_drawdown(series: pd.Series, method: str='log') -> float:
    '''
    Simply returns the max drawdown as a float
    '''
    return calculate_drawdown_series(series, method).max()

In [23]:
print(metrics.calculate_max_drawdown(df['close'], method='dollar'))

59.23999999999998


In [24]:
# 2.11 calculating max drawdown with metadata

def calculate_max_drawdown_with_metadata(series: pd.Series, method: str='log') -> Dict[str, Any]:
    '''
    Calculates max_drawdown and stores metadata about when and where. Returns a dictionary of the form
        {
        'max_drawdown': float,
        'peak_date': pd.Timestamp,
        'peak_price': float,
        'trough_date': pd.Timestamp,
        'trough_price': float,
        }'''
    assert method in DRAWDOWN_EVALUATORS, \
        f'Method "{method}" must be one of {list(DRAWDOWN_EVALUATORS.keys())}'
    
    evaluator = DRAWDOWN_EVAlUATORS[method]
    
    max_drawdown = 0
    local_peak_date = peak_date = trough_date = series.index[0]
    local_peak_price = peak_price = trough_price = series.iloc[0]
    
    for date, price in series.iteritems():
        
        # Keep track of rolloing max
        if price > local_peak_price:
            local_peak_date = date
            local_peak_price = price
            
        # Compute the drawdown
        drawdown = evaluator(price, local_peak_price)
        
        # Store new max drawdown values
        if drawdown > max_drawdown:
            max_drawdown = drawdown
            
            peak_date = local_peak_date
            peak_price = local_peak_price
            
            trough_date = date
            trough_price = price
            
    return {
        'max_drawdown': max_drawdown,
        'peak_date': peak_date,
        'peak_price': peak_price,
        'trough_date': trough_date,
        'trough_price': trough_price
    }

In [25]:
print(metrics.calculate_max_drawdown_with_metadata(df['close']))

{'max_drawdown': 0.22537712228907747, 'peak_date': Timestamp('2018-09-20 00:00:00'), 'peak_price': 293.58, 'trough_date': Timestamp('2018-12-24 00:00:00'), 'trough_price': 234.34}


#### 2.3.4.2  Log Max Drawdown Ratio

In [26]:
# 2.12:  Computing log max drawdown ratio
def calculate_log_max_drawdown_ratio(series: pd.Series) -> float:
    log_drawdown = calculate_max_drawdown(series, method='log')
    log_return = np.log(series.iloc[-1]) - np.log(series.iloc[0])
    
    return log_return - log_drawdown

In [27]:
print(metrics.calculate_log_max_drawdown_ratio(df['close']))

0.8184356293564141


#### 2.3.4.3  Calmar Ratio

In [28]:
# Calmar Ratio - the percentage max drawdown ratio computed over a three year trailing period

def calculate_calmar_ratio(series: pd.Series, years_past: int=3) -> float:
    '''
    Return the percent max drawdown ratio over the past three years using CAGR as the numerator, 
    otherwise known as the Calmar Ratio
    '''
    
    # Filter series on past three years
    last_date = series.index[-1]
    three_years_ago = last_date - pd.Timedelta(days=years_past*365.25)
    series = series[series.index > three_years_ago]
    
    # Compute annualized percent max drawdown ratio
    percent_drawdown = calculate_max_drawdown(series, method='percent')
    cagr = calculate_cagr(series)
    
    return cagr / percent_drawdown

In [29]:
print(metrics.calculate_calmar_ratio(df['close']))

0.6284491537903574


### 2.3.5  Regression-based Statistics

#### 2.3.5.1  Pure Profit Score (PPS)

In [32]:
# The PPS scales the annualized portfolio return against the linearity of the equity curve.

from sklearn.linear_model import LinearRegression

def calculate_pure_profit_score(price_series: pd.Series) -> float:
    '''
    Calculates the pure profit score
    '''
    cagr = calculate_cagr(price_series)
    
    # Build a single column for a predictor, t
    t: np.npdarray = np.arrange(0, price_series.shape[0]).reshape(-1, 1)
        
    # Fit the regression
    regression = LinearRegression().fit(t, price_series)
    
    # Get the r-squared value
    r_sqaured = regression.score(t, price_series)
    
    return cagr * r_squared

In [33]:
print(metrics.calculate_pure_profit_score(df['close']))

0.10652027614035113


#### 2.3.5.2  Jensen's Alpha

In [34]:
# Jensen's Alpha is the a (alpha) of the following regression

# 2.15:  Calculating Jensen's Alpha

def calculate_jensens_alpha(return_series: pd.Series, benchmark_return_series: pd.Series) -> float:
    '''
    Calculates jensens alpha. Prefers input series have the same index.
    Handles NAs.
    '''
    
    # Join series along date index and purge NAs
    df = pd.concat([return_series, benchmark_return_series], sort=True, axis=1)
    df = df.dropna()
    
    # Get the appropriate data structure for scikit learn
    clean_returns: pd.Series = df[return_series.name]
    clean_benchmarks = pd.DataFrame(df[benchmark_return_series.name])
    
    # Fit a linear regression and return the alpha
    regression = LinearRegression().fit(clean_benchmarks, y=clean_returns)
    
    return regression.intercept_

In [37]:
# in this case, SPY would most likely be the benchmark index
# and since it is the only DF i have loaded in, i am comparing it against itself

print(metrics.calculate_jensens_alpha(df['close'], df['close']))

[-5.68434189e-14 -5.68434189e-14]
