Based on the papers:

@article{wood2021trading,
  title={Trading with the Momentum Transformer: An Intelligent and Interpretable Architecture},
  author={Wood, Kieran and Giegerich, Sven and Roberts, Stephen and Zohren, Stefan},
  journal={arXiv preprint arXiv:2112.08534},
  year={2021}
}

@article {Wood111,
	author = {Wood, Kieran and Roberts, Stephen and Zohren, Stefan},
	title = {Slow Momentum with Fast Reversion: A Trading Strategy Using Deep Learning and Changepoint Detection},
	volume = {4},
	number = {1},
	pages = {111--129},
	year = {2022},
	doi = {10.3905/jfds.2021.1.081},
	publisher = {Institutional Investor Journals Umbrella},
	issn = {2640-3943},
	URL = {https://jfds.pm-research.com/content/4/1/111},
	eprint = {https://jfds.pm-research.com/content/4/1/111.full.pdf},
	journal = {The Journal of Financial Data Science}
}

In [None]:
!pip install empyrical-reloaded

In [37]:
from empyrical import (sharpe_ratio, max_drawdown, downside_risk, annual_return, annual_volatility,)
from typing import Dict, List, Optional, Tuple, Union
import pandas as pd
import numpy as np
import os

import warnings
warnings.filterwarnings('ignore')

In [2]:
def calc_performance_metrics(data: pd.DataFrame, metric_suffix="", num_identifiers = None):
    if not num_identifiers:
        num_identifiers = len(data.dropna()["ticker"].unique()) #Might just make ticker or permno
    
    srs = data.dropna().groupby(level=0)["captured_returns"].sum() / num_identifiers
    
    return {
        f"annual_return{metric_suffix}": annual_return(srs),
        f"annual_volatility{metric_suffix}": annual_volatility(srs),
        f"sharpe_ratio{metric_suffix}": sharpe_ratio(srs),
        f"downside_risk{metric_suffix}": downside_risk(srs),
        f"max_drawdown{metric_suffix}": -max_drawdown(srs),
        f"perc_pos_return{metric_suffix}": len(srs[srs > 0.0]) / len(srs),
        f"profit_loss_ratio{metric_suffix}": np.mean(srs[srs > 0.0])/ np.mean(np.abs(srs[srs < 0.0]))
    }

In [None]:
#Function calc_net_returns used to get volitility adjusted returns which I don't know if we care about
#Also used to get the captured returns minus the transaction costs which might be useful
#Not going to implement now but in the classical_strategies file


In [3]:
def calc_sharpe_by_year(data: pd.DataFrame, suffix=""):
    data = data.copy()
    data["year"] = data.index.year

    sharpes = (
        data.dropna()[["year", "captured_returns"]]
        .groupby(level=0)
        .mean()
        .groupby("year")
        .apply(lambda y: sharpe_ratio(y["captured_returns"]))
    )

    sharpes.index = "sharpe_ratio_" + sharpes.index.map(int).map(str) + suffix

    return sharpes.to_dict()

In [4]:
def calc_returns(srs: pd.Series, day_offset: int = 1):
    returns = srs / srs.shift(day_offset) - 1
    return returns

In [5]:
def calc_daily_vol(daily_returns):
    return (
        daily_returns.ewm(span = 60, min_periods = 60).std().fillna(method="bfill")
    )

In [6]:
def calc_vol_scaled_returns(daily_returns, daily_vol=pd.Series(None)):
    if not len(daily_vol):
        daily_vol = calc_daily_vol(daily_returns)
    annualized_vol = daily_vol * np.sqrt(252)
    return daily_returns / annualized_vol.shift(1) #Had multiplication by target vol but don't care about that

In [7]:
class MACDStrat:
    def __init__(self, trend_combinations: List[Tuple[float, float]] = None):
        if trend_combinations is None:
            self.trend_combinations = [(8, 24), (16, 48), (32, 96)]
        else:
            self.trend_combinations = trend_combinations 
    
    @staticmethod
    def calc_signal(prices: pd.Series, short_timescale: int, long_timescale: int):

        def calc_halflife(timescale):
            return np.log(0.5) / np.log(1 - 1/timescale)
        
        macd = (
            prices.ewm(halflife= calc_halflife(short_timescale)).mean() - prices.ewm(halflife = calc_halflife(long_timescale)).mean()
        )

        q = macd / prices.rolling(63).std().fillna(method="bfill") #Standardize MACD with volatility 
        return q / q.rolling(252).std().fillna(method="bfill")


In [None]:
def read_changepoint_file(file_path: str, lookback_window_length: int):
    return (
        pd.read_csv(file_path, index_col=0, parse_dates=True)
        .fillna(method="ffill")
        .dropna() 
        .assign(
            cp_location_norm=lambda row: (row["t"] - row["cp_location"])/ lookback_window_length
        ) 
    )

In [44]:
def prepare_cpd_features(folder_path: str, lookback_window_length: int):
    return pd.concat(
        [
            read_changepoint_file(
                os.path.join(folder_path, f), lookback_window_length
            ).assign(ticker=os.path.splitext(f)[0])
            for f in os.listdir(folder_path)
        ]
    )

In [34]:
def deep_momentum_features(df_asset:pd.DataFrame):
    df_asset["srs"] = df_asset["close"]
    ewm = df_asset["srs"].ewm(halflife=252)
    means = ewm.mean()
    stds = ewm.std()
    df_asset["srs"] = np.minimum(df_asset["srs"], means + 5 * stds)
    df_asset["srs"] = np.maximum(df_asset["srs"], means - 5 * stds)

    df_asset["daily_returns"] = calc_returns(df_asset["srs"])
    df_asset["daily_vol"] = calc_daily_vol(df_asset["daily_returns"])

    df_asset["target_returns"] = calc_vol_scaled_returns(
        df_asset["daily_returns"], df_asset["daily_vol"]
    ).shift(-1)

    def calc_normalized_returns(day_offset):
        return (
            calc_returns(df_asset["srs"], day_offset) / df_asset["daily_vol"] / np.sqrt(day_offset)
        )

    df_asset["norm_daily_return"] = calc_normalized_returns(1)
    df_asset["norm_monthly_return"] = calc_normalized_returns(21)
    df_asset["norm_quarterly_return"] = calc_normalized_returns(63)
    df_asset["norm_biannual_return"] = calc_normalized_returns(126)
    df_asset["norm_annual_return"] = calc_normalized_returns(252)

    trend_combinations = [(8, 24), (16, 48), (32, 96)]
    for short_window, long_window in trend_combinations:
        df_asset[f"macd_{short_window}_{long_window}"] = MACDStrat.calc_signal(
            df_asset["srs"], short_window, long_window
        )

    # date features
    if len(df_asset):
        df_asset["day_of_week"] = df_asset.index.dayofweek
        df_asset["day_of_month"] = df_asset.index.day
        df_asset["week_of_year"] = df_asset.index.isocalendar().week
        df_asset["month_of_year"] = df_asset.index.month
        df_asset["year"] = df_asset.index.year
        df_asset["date"] = df_asset.index 
    else:
        df_asset["day_of_week"] = []
        df_asset["day_of_month"] = []
        df_asset["week_of_year"] = []
        df_asset["month_of_year"] = []
        df_asset["year"] = []
        df_asset["date"] = []
    
    return df_asset.dropna()

In [11]:
def include_changepoint_features(features: pd.DataFrame, cpd_folder_name: str, lookback_window_length: int):
    features = features.merge(
        prepare_cpd_features(cpd_folder_name, lookback_window_length)[
            ["ticker", "cp_location_norm", "cp_score"]
        ]
        .rename(
            columns={
                "cp_location_norm": f"cp_rl_{lookback_window_length}",
                "cp_score": f"cp_score_{lookback_window_length}"
            }
        )
        .reset_index(),
        on =["date", "ticker"]
    )

    features.index = features["date"]

    return features

In [None]:
#Use the wrds data with date and price as a dataframe for input to deep_momentum_features()
#Then we pass this features df to include_changepoint_features() along with the cpd folder and window length to find this file and add to the features


#Then save features to a csv to import to backtest (This will need to be done for each company)


In [71]:
import wrds


conn = wrds.Connection()

ticker = 'GOOGL'

query = f"""
SELECT DISTINCT
    d.date,
    n.ticker,
    d.prc / d.cfacpr as close
FROM
    crsp.dsf as d
JOIN 
    crsp.dsenames as n on d.permno = n.permno
WHERE
    n.ticker = '{ticker}' 
    and date BETWEEN '2016-01-01' and '2023-12-31'
ORDER BY
    date
"""

df = conn.raw_sql(query)


WRDS recommends setting up a .pgpass file.
You can create this file yourself at any time with the create_pgpass_file() function.
Loading library list...
Done


In [None]:
df['date'] = pd.to_datetime(df['date'])
df = df.set_index('date')

features = deep_momentum_features(df.copy())
features = features.reset_index(drop=True)

In [66]:
features = include_changepoint_features(features, "C:/Users/Maxim/Desktop/DDMIF/Changepoint files/", 21)

C:/Users/Maxim/Desktop/DDMIF/Changepoint files/ADM.csv
C:/Users/Maxim/Desktop/DDMIF/Changepoint files/alb_lbw21.csv
C:/Users/Maxim/Desktop/DDMIF/Changepoint files/alco_lbw21.csv
C:/Users/Maxim/Desktop/DDMIF/Changepoint files/bg_lbw21.csv
C:/Users/Maxim/Desktop/DDMIF/Changepoint files/bkng_lbw21.csv
C:/Users/Maxim/Desktop/DDMIF/Changepoint files/cah_lbw21.csv
C:/Users/Maxim/Desktop/DDMIF/Changepoint files/cmcsa_lbw21.csv
C:/Users/Maxim/Desktop/DDMIF/Changepoint files/dis_lbw21.csv
C:/Users/Maxim/Desktop/DDMIF/Changepoint files/duk_lbw21.csv
C:/Users/Maxim/Desktop/DDMIF/Changepoint files/fcx_lbw21.csv
C:/Users/Maxim/Desktop/DDMIF/Changepoint files/fdp_lbw21.csv
C:/Users/Maxim/Desktop/DDMIF/Changepoint files/googl_lbw21.csv
C:/Users/Maxim/Desktop/DDMIF/Changepoint files/gww_lbw21.csv
C:/Users/Maxim/Desktop/DDMIF/Changepoint files/hsic_lbw21.csv
C:/Users/Maxim/Desktop/DDMIF/Changepoint files/lmnr_lbw21.csv
C:/Users/Maxim/Desktop/DDMIF/Changepoint files/mck_lbw21.csv
C:/Users/Maxim/Desktop/