# Strategy V4.1

In this iteration, we aim to use Markov Autoregression Models to predict regime.

In [24]:
from pandas import DataFrame, Series
from typing import List, Dict, Tuple
from numpy import ndarray

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.regime_switching.markov_autoregression import MarkovAutoregression
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf

ma_lookback: int = 20

In [25]:
def get_non_stationary_instruments(start_index: int, end_index: int) -> Dict[int, DataFrame]:
	raw_prices: DataFrame = pd.read_csv("../../prices.txt", sep=r"\s+", index_col=None, header=None)
	price_history: ndarray = raw_prices.values[start_index:end_index][:].T
	data: Dict[int, DataFrame] = {}

	for instrument_no in range(0, 50):
		result = adfuller(price_history[instrument_no], autolag="AIC")

		if result[1] > 0.05:
			data[instrument_no] = pd.DataFrame(columns=["price"])
			data[instrument_no]["price"] = price_history[instrument_no] 
			
			# Implement moving average and clip data
			data[instrument_no]["ma"] = (data[instrument_no]["price"].ewm
													   (span=ma_lookback).mean())
			data[instrument_no] = data[instrument_no].iloc[ma_lookback:].reset_index(drop=True)

	return data

def plot_moving_average(data: Dict[int, DataFrame]) -> None:
	for instrument_no in data:
		moving_average: Series = data[instrument_no]["ma"]
		price: Series = data[instrument_no]["price"]
		
		plt.figure(figsize=(16,4))
		plt.plot(price, color="blue", linestyle="--", label="Price")
		plt.plot(moving_average, color="yellow", linestyle="--", label="Moving Average")
		plt.xlabel("Time")
		plt.ylabel("Moving Average $")
		plt.title(f"Instrument {instrument_no}: Moving average ({ma_lookback} day lookback)")
		plt.grid(True)
		plt.legend()
		plt.show()

def implement_returns(data: Dict[int, DataFrame]) -> Dict[int, DataFrame]:
	for instrument_no in data:
		data[instrument_no]["returns"] = data[instrument_no]["ma"].diff().dropna()

		# Clip the data
		data[instrument_no] = data[instrument_no].iloc[1:].reset_index(drop=True)

	return data

def plot_returns(data: Dict[int, DataFrame]) -> None:
	for instrument_no in data:
		returns: Series = data[instrument_no]["returns"]
		price: Series = data[instrument_no]["price"]
		
		fig, (ax1, ax2) = plt.subplots(nrows=1, ncols=2, figsize=(16,4))
		
		ax1.plot(price, color="blue", linestyle="--")
		ax1.set_xlabel("Time")
		ax1.set_ylabel("Price")
		ax1.set_title(f"Instrument {instrument_no} Price")
		ax1.grid(True)
		
		ax2.plot(returns, color="yellow", linestyle="--")
		ax2.set_xlabel("Time")
		ax2.set_ylabel("Returns")
		ax2.set_title(f"Instrument {instrument_no} returns")
		ax2.grid(True)
		
		plt.show()

data: Dict[int, DataFrame] = get_non_stationary_instruments(0, 550)
data = implement_returns(data)

### Fit Markov Autoregression Model

In [26]:
import pickle

def fit_markov_autoregression(data: Dict[int, DataFrame]) -> Tuple[Dict[int, DataFrame],
Dict[int, any]]:
	models: Dict[int, any] = {}

	for instrument_no in data:
		# Fit the model using returns
		model = MarkovAutoregression(data[instrument_no]["returns"], k_regimes=2, order=2,
			switching_ar=False)
		results = model.fit()
		probability_zero = pd.Series(results.filtered_marginal_probabilities[0])
		probability_one = pd.Series(results.filtered_marginal_probabilities[1])
		predicted_regimes = pd.Series(np.argmax(results.filtered_marginal_probabilities , axis=1))		
		
		# Clip data by 1
		data[instrument_no] = data[instrument_no].iloc[1:].reset_index(drop=True)
		data[instrument_no]["predicted_regime"] = predicted_regimes 
		data[instrument_no]["probability_zero"] = probability_zero
		data[instrument_no]["probability_one"] = probability_one
		models[instrument_no] = results
		
		with open(f"models/instrument_{instrument_no}_model.pkl", "wb") as file:
			pickle.dump(results,file)
		
		file.close()

	return data, models


def plot_price_and_regime(data: Dict[int, DataFrame]) -> None:
	for instrument_no in data:
		dates: Series = data[instrument_no].index
		prices: Series = data[instrument_no]["price"]
		regimes: Series = data[instrument_no]["predicted_regime"]
		regime_states: Dict[int, str] = {0: "Uptrend", 1: "Downtrend"}

		# Price plot
		# build a DataFrame to detect regime‐change segments
		df = pd.DataFrame({'price': prices, 'regime': regimes}, index=dates)
		# each time regime != previous, start a new segment
		df['segment'] = (df['regime'] != df['regime'].shift()).cumsum()
		
		plt.figure(figsize=(16,4))

		# plot each segment in its color
		for _, seg in df.groupby('segment'):
			color = "red" if seg['regime'].iloc[0] == 1 else "green"
			plt.plot(seg.index, seg['price'], color=color, linestyle="--", zorder=3)
			
		plt.plot(prices, zorder=1, linestyle="--", color='blue')
		plt.title(f"Instrument {instrument_no} Price and Regime")
		plt.ylabel("Price")
		plt.xlabel("Days")
		plt.grid(alpha=0.3)
		plt.show()

### Identifying which Regime Gives an Uptrend or a Downtrend

With an Markov AutoRegression Model, it can be seen that it outputs 2 states and doesn't 
automatically identify which ones are uptrends and which are downtrends.

We need to build a function to be able to differentiate these two states.

In [27]:
is_zero_uptrend: Dict[int, bool] = {
	0: True,
	1: True,
	2: True,
	3: False,
	4: True,
	5: True,
	6: True,
	7: False,
	8: True,
	9: True,
	10: False,
	11: False,
	12: False,
	13: False,
	14: False,
	15: False,
	16: True,
	17: False,
	19: True,
	20: True,
	21: False,
	22: False,
	23: False,
	24: False,
	25: True,
	27: True,
	28: False,
	29: False,
	30:True,
	31: False,
	32: False,
	34: False,
	35: True,
	36: False,
	38: False,
	39: False,
	40: False,
	41: False,
	42: False,
	43: True,
	44: False,
	45: False,
	46: True,
	47: True,
	48: False,
	49: False,
}

def generate_signals(data: Dict[int, DataFrame]) -> Dict[int, DataFrame]:
	for instrument_no in data:
		regimes: Series = data[instrument_no]["predicted_regime"]

		# Uptrend
		if is_zero_uptrend[instrument_no]: 
			data[instrument_no]["signal"] = np.where(regimes == 0, 1, -1)
		# Downtrend
		else:
			data[instrument_no]["signal"] = np.where(regimes == 1, 1, -1)

	return data



### Backtesting and Measuring Performance

In [28]:
def get_strategy_results(data: Dict[int, DataFrame]) -> Dict[int, DataFrame]:
	for instrument_no in data:
		data[instrument_no]["log_returns"] = np.log(data[instrument_no]["price"].shift(-1))
		
		# Get Strategy Return
		data[instrument_no]["strategy_return"] = (data[instrument_no]["signal"]
												  * data[instrument_no]["log_returns"])

		# Get Position changes
		position_change: ndarray = data[instrument_no]["signal"].diff().abs()

		# Apply the commission fee
		data[instrument_no]["strategy_return"] -= position_change * 0.0005

	return data


def show_performance_metrics(data: Dict[int, DataFrame]) -> None:
	performance_metrics: Dict[str, List[int | float]] = {}
	performance_metrics["Instrument No."] = list(data.keys())
	performance_metrics["Profit Factor"] = []
	performance_metrics["Sharpe Ratio"] = []

	for instrument_no in data:
		# Get Returns
		returns: Series = data[instrument_no]["strategy_return"]

		# Compute performance metrics
		profit_factor = returns[returns > 0].sum() / returns[returns < 0].abs().sum()
		sharpe = (returns.mean() / returns.std()) * (252 ** 0.5)

		performance_metrics["Profit Factor"].append(profit_factor)
		performance_metrics["Sharpe Ratio"].append(sharpe)

	performance_metrics_df: DataFrame = pd.DataFrame(performance_metrics)
	print(performance_metrics_df.to_string(index=False))

In [None]:
sample_data: Dict[int, DataFrame] = get_non_stationary_instruments(0, 550)
sample_data = implement_returns(sample_data)
sample_data, models = fit_markov_autoregression(sample_data)
sample_data = generate_signals(sample_data)
sample_data = get_strategy_results(sample_data)
plot_price_and_regime(sample_data)

### Testing out of sample performance

In [30]:
def get_out_of_sample_data(data: Dict[int, DataFrame], start_index, end_index, models: Dict[int,
	any]) -> Dict[int, DataFrame]:
	out_of_sample_data: Dict[int, DataFrame] = {}
	raw_prices: DataFrame = pd.read_csv("../../prices.txt", sep=r"\s+", index_col=None, header=None)
	price_history: ndarray = raw_prices.values[start_index:end_index][:].T

	for instrument_no in data:
		out_of_sample_data[instrument_no] = pd.DataFrame(columns=["price"])
		out_of_sample_data[instrument_no]["price"] = price_history[instrument_no]
		out_of_sample_data[instrument_no]["returns"] = (out_of_sample_data[instrument_no]["price"]
														.diff().dropna())

		# Clip data
		out_of_sample_data[instrument_no] = (out_of_sample_data[instrument_no].iloc[1:].reset_index
												 (drop=True))

		res = models[instrument_no]

		full_returns = pd.concat([data[instrument_no]["returns"],
								  out_of_sample_data[instrument_no]["returns"]], ignore_index=True)

		# 2. Re-create the same model spec on the full data
		model_full = MarkovAutoregression(
			full_returns,
			k_regimes=res.model.k_regimes,
			order=res.model.order,
			switching_ar=res.model.switching_ar,
			switching_variance=res.model.switching_variance
		)

		filt_res = model_full.filter(res.params)
		n_train = len(sample_data[instrument_no])
		probs = filt_res.filtered_marginal_probabilities.iloc[n_train:]	
		
		# Clip data again
		out_of_sample_data[instrument_no] = (out_of_sample_data[instrument_no].iloc[1:].reset_index
													 (drop=True))
		
		out_of_sample_data[instrument_no]["predicted_regime"] = pd.Series(
				np.argmax(probs.values, axis=1),
				index=out_of_sample_data[instrument_no]["returns"].index
		)
		
	return out_of_sample_data


out_of_sample_data: Dict[int, DataFrame] = get_out_of_sample_data(sample_data, 550, 750, models)
out_of_sample_data = generate_signals(out_of_sample_data)
out_of_sample_data = get_strategy_results(out_of_sample_data)
show_performance_metrics(out_of_sample_data)

  return ufunc.reduce(obj, axis, dtype, out, **passkwargs)


ValueError: Length of values (197) does not match length of index (198)