# HMM on Stock Data
Updated: 09/02/2024

## Notes
* How often do we think states transitions in the market?
* need to aggregate x series over some meaningful period for which you think states will endure (eg monthly regimes)
* construct your own index (eg FAANG)
* use a bayesian prior to keep transition matrix regularized and proba's more central

In [None]:
from hmmlearn import hmm
from sklearn.cluster import KMeans
from fredapi import Fred
import pandas as pd
import matplotlib.pyplot as plt
from scipy.stats import gaussian_kde
from hmmlearn.hmm import MultinomialHMM, GaussianHMM
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns
import numpy as np
import os

In [None]:
# mean and standard deviation of each state
LOW_GROWTH_MEAN = 0.01
LOW_VOLATILITY_STD = 0.1
HIGH_GROWTH_MEAN = 0.2
HIGH_VOLATILITY_STD = 0.2

# prior state probabilities
pi = np.array([0.25, 0.25, 0.25, 0.25])

# Simulation Example 2 states

* $n = 2$ states
* $S_1$ is low growth
* $S_2$ is high growth

* Prices $X_t$ are $N(\mu, \sigma^2)$ and we observe $x_t = ln(X_t)$.
* If state is low growth, then $\mu = \mu_{low}$; if state is high growth $\mu = \mu_{high}$
* there is an equal probability of starting in each state

Variant 1
* Transition probabilities are equally likely

Variant 2
* Transition probabilities are sticky

In [None]:
def generate_X(state: int, periods: int) -> np.ndarray:
    """
    Generate a sequence of observations given a state and number of periods
    """
    # low growth
    if state == 1:
        mu = LOW_GROWTH_MEAN
        sigma = HIGH_VOLATILITY_STD
    elif state == 2:
        mu = HIGH_GROWTH_MEAN
        sigma = HIGH_VOLATILITY_STD
    else:
        raise ValueError('Invalid state')
    X = np.random.normal(mu, sigma, periods)
    #return np.exp(X)
    return X

In [None]:
A = np.array([[0.65, 0.35],
                [0.35, 0.65]])
A = np.array([[0.5, 0.5],
                [0.5, 0.5]])

T = 1_000
n_states = 2
state_descriptions = {
    1: "Low growth, high vol",
    2: "High growth, high vol",
}

x1 = generate_X(1, T//n_states)
x2 = generate_X(2, T//n_states)
x = np.concatenate([x1, x2,])
arrays = [x1, x2,]

# Calculate statistics for each state
returns = []
sharpe_ratios = []
for x in arrays:
    cumulative_return = (x[-1] / x[0]) - 1
    sharpe_ratio = cumulative_return / np.std(x)
    returns.append(cumulative_return)
    sharpe_ratios.append(sharpe_ratio)
summary_stats = []
for i, y in enumerate(arrays):
    summary_stats.append({
        "state": state_descriptions[i+1],
        "mean": np.mean(y),
        "stdev": np.std(y),
        'z-score': (np.mean(y) - 0.01) / np.std(y),
        "return": returns[i],
        "sharpe_ratio": sharpe_ratios[i]
    })

df = pd.DataFrame(summary_stats)
print(df.sort_values('return').to_markdown(index=False))
    
# Line plot with shaded rectanglur regions for each state
k = T // n_states
x = np.concatenate([x1, x2,])
plt.figure(figsize=(10, 6))
plt.plot(x)
for i in range(n_states):
    plt.axvspan(i * k, (i + 1) * k, color='gray', alpha=0.2 * (i + 1))
plt.title("Simulated time series data")
plt.show()

# Plot each density estimate
colors = ['blue', 'green', 'red', 'purple']
plt.figure(figsize=(10, 6))

for i, x in enumerate(arrays):
    density = gaussian_kde(x)
    xs = np.linspace(min(x) * 0.9, max(x) * 1.1, 200)
    ys = density(xs)
    plt.plot(xs, ys, color=colors[i])
    
    # Find the peak of the density curve
    peak_idx = np.argmax(ys)
    peak_x = xs[peak_idx]
    peak_y = ys[peak_idx]
    
    # Add text label near the peak of the density curve
    plt.text(peak_x, peak_y, f"State {state_descriptions[i+1]}", 
             color=colors[i], fontsize=6, ha='left', va='bottom')

plt.title("Density estimates of each state")
plt.grid(True)
plt.show()

In [None]:
x = np.concatenate([x1, x2,])
x.shape

columns = ['time', 'x', 'state', 'state_description']
df1 = pd.DataFrame(x1, columns=['x'])
df1['state'] = 1
df1['state_description'] = state_descriptions[1]
df2 = pd.DataFrame(x2, columns=['x'])
df2['state'] = 2
df2['state_description'] = state_descriptions[2]
df = pd.concat([df1, df2])
df['time'] = range(len(df))
df = df[columns]
df.set_index('time', inplace=True)
df

In [None]:
x = df['x'].values
X = x.reshape(-1, 1)

kmeans = KMeans(n_clusters=n_states,).fit(X)
initial_means = kmeans.cluster_centers_

model = GaussianHMMWithLog(n_components=n_states, covariance_type="full", init_params='s', tol=0.01)
model.transmat_ = A
model.means_ = initial_means
model.covars_ = np.tile(np.cov(X.T), (n_states, 1, 1))  # Uniform covariance initialization
model.fit(X)

log_likelihood = model.score(X)
aic = 2 * k - 2 * log_likelihood
bic = np.log(X.shape[0]) * k - 2 * log_likelihood
print(aic, bic)

In [None]:
x = df['x'].values
X = x.reshape(-1, 1)
kmeans = KMeans(n_clusters=n_states,).fit(X)
initial_means = kmeans.cluster_centers_
min_aic = np.inf
for i in range(1_000):  
    try:
        model = GaussianHMMWithLog(n_components=n_states, covariance_type="full", init_params='s', tol=0.01)
        model.transmat_ = A
        model.means_ = initial_means
        model.covars_ = np.tile(np.cov(X.T), (n_states, 1, 1))  # Uniform covariance initialization
        model.fit(X)
        log_likelihood = model.score(X)
        aic = 2 * k - 2 * log_likelihood
        bic = np.log(X.shape[0]) * k - 2 * log_likelihood
        print(i, aic, bic)
        if aic < min_aic: 
            min_aic = aic
            best_model = model
    except:
        continue

model = best_model
# Extract the transition matrix
transition_matrix = model.transmat_
print("Estimated Transition Matrix A:")
print(np.round(transition_matrix,3))

hidden_states = model.predict(X)
df['predicted_state'] = hidden_states+1
df[['state', 'predicted_state']].corr()
print(classification_report(df['state'], df['predicted_state']))

cm = confusion_matrix(df['state'], df['predicted_state'])
plt.figure(figsize=(6, 4))
sns.heatmap(cm, annot=True, cmap='Blues', fmt='g')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix')
plt.show()

In [None]:
df[['state', 'predicted_state']].corr()

In [None]:
# Line plot with shaded rectanglur regions for each predicted state

## random switching

In [None]:
pi = [0.5, 0.5]
A = np.array([[0.55, 0.45],
                [0.45, 0.55]])

k = 100 # number of regime changes
def generate_states():
    states = []
    current_state = np.random.choice(n_states, size=1, p=[0.5, 0.5])[0]
    print(f"Initial state: {current_state}")
    states.append(current_state)
    for i in range(1,k):
        pi_new = A[current_state]
        new_state = np.random.choice(n_states, size=1, p=pi_new)[0]
        states.append(new_state)
        current_state = new_state    
    return np.array(states)

while True:
    states = generate_states()
    if len(set(states)) == n_states:
        break

time = list(range(k))
plt.bar(height=states+1, x=time)
plt.show()

# For each state, generate a sequence of observations with duration d drawn from a lognormal distribution
# with mean and standard deviation for each state

df_records = []
for i, state in enumerate(states):
    state += 1
    d = int(np.random.lognormal(np.log(20), np.log(1.5)))
    #print(f"{i}\tState: {state}, Duration: {d}")
    x = generate_X(state, periods=d)
    _df = pd.DataFrame(x, columns=['x'])
    _df['state'] = state
    df_records.append(_df)
df = pd.concat(df_records)
df['time'] = range(len(df))
df = df[['time', 'x', 'state']]
df.set_index('time', inplace=True)
df

In [None]:
T=250
x = df['x'].iloc[:T].values
states = df['state'].iloc[:T].values
X = x.reshape(-1, 1)
# plot x values with state transitions
plt.figure(figsize=(10, 6))
plt.axvline(0, color='lightblue', linestyle='-')
# label
plt.text(0, 0, f"State {states[i]}", rotation=90, fontsize=6)
for i in range(1, len(states)):
    if states[i] != states[i-1]:
        plt.axvline(i, color='lightblue', linestyle='-')
        # label
        plt.text(i, 0, f"State {states[i]}", rotation=90, fontsize=6)
plt.plot(x)        
plt.title("Simulated time series data")
plt.show()

In [None]:
T = 250
epsilon = 0.2
X = df.copy()
X = X.iloc[:T]
x_values = X['x'].values.reshape(-1, 1)
model = GaussianHMM(n_states, n_iter=200, tol=1e-4, init_params='mc')
model.startprob_ = np.array([0.5, 0.5])
model.transmat_ = np.array([[0.5, 0.5],
                            [0.5, 0.5]])
model.fit(x_values)
print(LOW_GROWTH_MEAN, HIGH_GROWTH_MEAN)
print(model.means_.flatten())
score = model.score(x_values)
print(model.aic(x_values), model.bic(x_values), score)
print(model.transmat_)
X['predicted_state'] = model.predict(X)
X['predicted_state'] = X['predicted_state'] + 1

cm = confusion_matrix(X['state'], X['predicted_state'])
plt.figure(figsize=(6, 4))
sns.heatmap(cm, annot=True, cmap='Blues', fmt='g')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix')
plt.show()

In [None]:
probs = np.round(model.predict_proba(x_values), 4)
X[['p1','p2']] = pd.DataFrame(probs, columns=[f"State {i}" for i in range(1, n_states+1)])

x = X['x'].values
states = X['state'].values

# plot x values with state transitions
plt.figure(figsize=(10, 6))
plt.plot(X['p1'], label='State 1', color='green')
plt.plot(X['p2'], label='State 2', color='yellow')

plt.axvline(0, color='lightblue', linestyle='-')
plt.text(0, 0, f"State {states[i]}", rotation=90, fontsize=6)
for i in range(1, len(states)):
    if states[i] != states[i-1]:
        plt.axvline(i, color='lightblue', linestyle='-')
        plt.text(i, 0, f"State {states[i]}", rotation=90, fontsize=6)
#plt.plot(x)
plt.plot((x-np.min(x))/(np.max(x)-np.min(x)), color='blue', linestyle='--', linewidth=.5)
plt.title("Simulated time series data")
plt.show()

# Simulation Example 4 states

* $n = 4$ states
* $S_1$ is (low growth, low volatility)
* $S_2$ is (high growth, low volatility)
* $S_3$ is (high growth, high volatility)
* $S_4$ is (low growth, high volatility)
* Prices $X_t$ are $N(\mu, \sigma^2)$ and we observe $x_t = ln(X_t)$.
* If state is low growth, then $\mu = \mu_{low}$; if state is high growth $\mu = \mu_{high}$
* If state is low volatility, then $\sigma^2 = \sigma^2_{low}$; if state is high volatility $\sigma^2 = \sigma^2_{high}$
* there is an equal probability of starting in each state

Variant 1
* Transition probabilities are equally likely

In [None]:
def generate_X(state: int, periods: int) -> np.ndarray:
    """
    Generate a sequence of observations given a state and number of periods
    """
    # low growth
    if state == 1:
        mu = LOW_GROWTH_MEAN
        sigma = LOW_VOLATILITY_STD
    elif state == 2:
        mu = HIGH_GROWTH_MEAN
        sigma = LOW_VOLATILITY_STD
    elif state == 3:
        mu = HIGH_GROWTH_MEAN
        sigma = HIGH_VOLATILITY_STD
    elif state == 4:
        mu = LOW_GROWTH_MEAN
        sigma = HIGH_VOLATILITY_STD
    else:
        raise ValueError('Invalid state')
    X = np.random.normal(mu, sigma, periods)
    #return np.exp(X)
    return X

## deterministic switching

In [None]:
T = 1_000
n_states = 4

# Parameters of observed data

# state transition probabilities
# Transition probabilities are such that the most likely transition is from $S_i$ to $S_i$, that is no change; all other transitions equally likely
A = np.array([[0.4, 0.2, 0.2, 0.2],
                [0.2, 0.4, 0.2, 0.2],
                [0.2, 0.2, 0.4, 0.2],
                [0.2, 0.2, 0.2, 0.4]])

# A = np.array([[0.25, 0.25, 0.25, 0.25],
#                 [0.25, 0.25, 0.25, 0.25],
#                 [0.25, 0.25, 0.25, 0.25],
#                 [0.25, 0.25, 0.25, 0.25]])
print(f"Transition matrix A: {A}")

x1 = generate_X(1, T//n_states)
x2 = generate_X(2, T//n_states)
x3 = generate_X(3, T//n_states)
x4 = generate_X(4, T//n_states)
x = np.concatenate([x1, x2, x3, x4])
arrays = [x1, x2, x3, x4]
state_descriptions = {1: "Low growth, low vol",
                     2: "High growth, low vol",
                     3: "High growth, high vol",
                     4: "Low growth, high vol"}

returns = []
sharpe_ratios = []
for x in arrays:
    cumulative_return = (x[-1] / x[0]) - 1
    sharpe_ratio = cumulative_return / np.std(x)
    returns.append(cumulative_return)
    sharpe_ratios.append(sharpe_ratio)

summary_stats = []

# Calculate statistics for each state
for i, y in enumerate(arrays):
    summary_stats.append({
        "state": state_descriptions[i+1],
        "mean": np.mean(y),
        "stdev": np.std(y),
        'z-score': (np.mean(y) - 0.01) / np.std(y),
        "return": returns[i],
        "sharpe_ratio": sharpe_ratios[i]
    })

df = pd.DataFrame(summary_stats)
print(df.sort_values('return').to_markdown(index=False))
    
# plot with shaded rectanglur regions for each state
k = T // n_states
x = np.concatenate([x1, x2, x3, x4])
plt.figure(figsize=(10, 6))
plt.plot(x)
for i in range(n_states):
    plt.axvspan(i * k, (i + 1) * k, color='gray', alpha=0.2 * (i + 1))
plt.title("Simulated time series data")
plt.show()

# Plot each density estimate
arrays = [x1, x2, x3, x4]
colors = ['blue', 'green', 'red', 'purple']
plt.figure(figsize=(10, 6))

for i, x in enumerate(arrays):
    density = gaussian_kde(x)
    xs = np.linspace(min(x) * 0.9, max(x) * 1.1, 200)
    ys = density(xs)
    plt.plot(xs, ys, color=colors[i])
    
    # Find the peak of the density curve
    peak_idx = np.argmax(ys)
    peak_x = xs[peak_idx]
    peak_y = ys[peak_idx]
    
    # Add text label near the peak of the density curve
    plt.text(peak_x, peak_y, f"State {state_descriptions[i+1]}", 
             color=colors[i], fontsize=6, ha='left', va='bottom')

plt.title("Density estimates of each state")
plt.grid(True)
plt.show()

## random switching

In [None]:
pi = [0.25, 0.25, 0.25, 0.25]
A = np.array([[0.6, .4/3, .4/3, .4/3],
                [.4/3, 0.6, .4/3, .4/3],
                [.4/3, .4/3, 0.6, .4/3],
                [.4/3, .4/3, .4/3, 0.6]])
n_states = 4
k = 100 # number of regime changes
def generate_states():
    states = []
    current_state = np.random.choice(n_states, size=1, p=pi)[0]
    print(f"Initial state: {current_state}")
    states.append(current_state)
    for i in range(1,k):
        pi_new = A[current_state]
        new_state = np.random.choice(n_states, size=1, p=pi_new)[0]
        states.append(new_state)
        current_state = new_state    
    return np.array(states)

while True:
    states = generate_states()
    if len(set(states)) == n_states:
        break

time = list(range(k))
plt.bar(height=states+1, x=time)
plt.show()

# For each state, generate a sequence of observations with duration d drawn from a lognormal distribution
# with mean and standard deviation for each state

df_records = []
for i, state in enumerate(states):
    state += 1
    d = int(np.random.lognormal(np.log(20), np.log(1.5)))
    #print(f"{i}\tState: {state}, Duration: {d}")
    x = generate_X(state, periods=d)
    _df = pd.DataFrame(x, columns=['x'])
    _df['state'] = state
    df_records.append(_df)
df = pd.concat(df_records)
df['time'] = range(len(df))
df = df[['time', 'x', 'state']]
df.set_index('time', inplace=True)
df

In [None]:
z = df[['state', 'x']].groupby('state').mean().join(
    df[['state', 'x']].groupby('state').std(), rsuffix='_std'
).rename(columns={'x': 'mean', 'x_std': 'std'}).reset_index()
z['z_score'] = z['mean'] / z['std']
z

In [None]:
T=1_000
x = df['x'].iloc[:T].values
states = df['state'].iloc[:T].values
X = x.reshape(-1, 1)
# plot x values with state transitions
plt.figure(figsize=(10, 6))
plt.axvline(0, color='lightblue', linestyle='-')
# label
plt.text(0, 0, f"State {states[i]}", rotation=90, fontsize=6)
for i in range(1, len(states)):
    if states[i] != states[i-1]:
        plt.axvline(i, color='lightblue', linestyle='-')
        # label
        plt.text(i, 0, f"State {states[i]}", rotation=90, fontsize=6)
plt.plot(x)        
plt.title("Simulated time series data")
plt.show()

In [None]:
#T = 250
X = df.copy()
#X = X.iloc[:T]
x_values = X['x'].values.reshape(-1, 1)

model = GaussianHMM(n_states, n_iter=1000, tol=1e-4, init_params='mc')
model.startprob_ = np.array(pi)
model.transmat_ = np.array([[.25, .25, .25, .25],
                            [.25, .25, .25, .25],
                            [.25, .25, .25, .25],
                            [.25, .25, .25, .25]])
model.fit(x_values)

X['predicted_state'] = model.predict(X)
X['predicted_state'] = X['predicted_state'] + 1

score = model.score(x_values)
aic = model.aic(x_values)
bic = model.bic(x_values)

print(LOW_GROWTH_MEAN, HIGH_GROWTH_MEAN)
print(np.round(model.means_.flatten(),2))
print(f"AIC: {aic}, BIC: {bic}, Score: {score}")
print(np.round(model.transmat_,2))

cm = confusion_matrix(X['state'], X['predicted_state'])
plt.figure(figsize=(6, 4))
sns.heatmap(cm, annot=True, cmap='Blues', fmt='g')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix')
plt.show()

## Estimate
assuming $n=4$ states, identify and estimate $(A, \mu, \sigma)$ in each

In [None]:
X = df.copy() 
model = hmm.GaussianHMM(n_components=n_states, covariance_type='full', n_iter=10, init_params='smc')
model.transmat_ = A
model.fit(X['x'].values.reshape(-1, 1))
transition_matrix = model.transmat_
print(A)
np.round(transition_matrix,1)


In [None]:
T = 1_000
n_states = 4
k = 10  # number of state transitions/regime switches

# Parameters of observed data
# mean and standard deviation of each state
LOW_GROWTH_MEAN = 0.01
LOW_VOLATILITY_STD = 0.1
HIGH_GROWTH_MEAN = 0.2
HIGH_VOLATILITY_STD = 0.2

# prior state probabilities
pi = np.array([0.25, 0.25, 0.25, 0.25])

# state transition probabilities
A = np.array([[0.4, 0.2, 0.2, 0.2],
                [0.2, 0.4, 0.2, 0.2],
                [0.2, 0.2, 0.4, 0.2],
                [0.2, 0.2, 0.2, 0.4]])

x = np.concatenate([x1, x2, x3, x4])

states = []
while len(set(states)) < n_states:
    states = []
    state = 1+np.random.choice(n_states, size=1, p=pi)[0]
    states.append(state)
    for _ in range(1,k):
        pi_new = A[state-1]
        state = 1+np.random.choice(n_states, size=1, p=pi_new)[0]
        states.append(state)
    print("Not all states present, trying again...")
print(f"States: {states}")
print(len(states))

# Generate the time series X and corresponding state labels
X = []
for state in states:  
    # duration of each state is uniform over [.5T//k, 2T//k]
    duration = np.random.randint(T//(2*k), 2*T//k)
    _df = pd.DataFrame(generate_X(state, duration), columns=['x'])
    _df['state'] = state
    _df['state_description'] = state_descriptions[state]
    X.append(_df)
df = pd.concat(X)
print(df.shape)

df['time'] = range(1, len(df) + 1)
df = df[['time', 'state', 'state_description', 'x']]
df.set_index('time', inplace=True)

plt.bar(height=df['state'], x=df.index)
plt.show()
df['state_description'].value_counts()


In [None]:
class GaussianHMMWithLog(hmm.GaussianHMM):
    def fit(self, X, lengths=None):
        self.log_likelihoods_ = []
        self._X = X  # Store the dataset
        super().fit(X, lengths)
        return self

    def _do_mstep(self, stats):
        """Perform the M-step of EM algorithm and track the log-likelihood."""
        super()._do_mstep(stats)
        log_likelihood = self.score(self._X)
        self.log_likelihoods_.append(log_likelihood)

## simulate using GaussianHMM

In [None]:
model = GaussianHMM(n_components=4, covariance_type="full", n_iter=100)
model.n_features = 4
model.startprob_ = np.array([1/4., 1/4., 1/4., 1/4.])
model.transmat_ = np.array([[0.3, 0.4, 0.2, 0.1],
                            [0.1, 0.2, 0.3, 0.4],
                            [0.5, 0.2, 0.1, 0.2],
                            [0.25, 0.25, 0.25, 0.25]])
model.means_ = np.array([[-2.5], [0], [2.5], [5.]])
model.covars_ = np.sqrt([[0.25], [0.25], [0.25], [0.25]])

X, _ = model.sample(1000, )
lengths = [X.shape[0]]
X.shape

In [None]:
plt.plot(X)
plt.show()

# Unknown number of hidden states

# Real S&P 500 data

In [None]:
if os.name == 'nt':
    API_KEY = open("C:/Users/regin/Dropbox/API_KEYS/FRED-API-KEY","r").readlines()[0].strip()
else:
    # if not Windows
    API_KEY = open("/home/reggie/Dropbox/API_KEYS/FRED-API-KEY","r").readlines()[0].strip()

# TODO: move to utils.py
def lineplot(df, x_col, y_col, title, xlabel, ylabel, size=(10,6)):
    plt.figure(figsize=size)
    plt.plot(df[x_col], df[y_col])
    plt.title(title)
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    plt.show()
    
#lineplot(df=sp500_df, x_col='date', y_col='sp500', title='S&P 500', xlabel='Date', ylabel='S&P 500 Index Value', size=(20,8))

fred = Fred(api_key=API_KEY)
sp500_df = fred.get_series('SP500', observation_start='2000-01-01')
sp500_df = pd.DataFrame(sp500_df).reset_index()
sp500_df.columns=['date', 'sp500']
sp500_df['date'] = pd.to_datetime(sp500_df['date'])
sp500_df = sp500_df.iloc[1:,]
sp500_df["return"] = sp500_df["sp500"].pct_change()
sp500_df = sp500_df.dropna()
sp500_df.head()

In [None]:
X = sp500_df.copy()
x_values = X['return'].values.reshape(-1, 1)
n_states = 2
def run(n_states):
    print(f"Running with {n_states} states")
    model = GaussianHMM(n_states, n_iter=1000, tol=1e-3, init_params='mc')
    model.startprob_ = np.array(np.array([1]*n_states)/n_states)
    model.transmat_ = np.ones((n_states, n_states))/n_states
    model.fit(x_values)

    X['predicted_state'] = model.predict(x_values)
    X['predicted_state'] = X['predicted_state'] + 1

    score = model.score(x_values)
    aic = model.aic(x_values)
    bic = model.bic(x_values)

    # print to 2 decimal places
    print(f"AIC: {aic:.2f}, BIC: {bic:.2f}, Score: {score:.2f}")

    return {'model': model,  'aic': aic, 'bic': bic, 'score': score}
model = run(n_states=2)
model['aic']

min_aic = np.inf
best_model = run(n_states=2)
for n_states in range(3, 10):
    model = run(n_states)
    
    if model['aic'] < min_aic:
        print(f"States: {n_states}, AIC: {model['aic']}, BIC: {model['bic']}, Score: {model['score']}")
        min_aic = model['aic']
        best_model = model
model = best_model['model']



In [None]:
print(np.round(model.transmat_,2))

In [None]:
X[['p0', 'p1', 'p2']] = np.round(model.predict_proba(x_values),2)
X[['p0', 'p1', 'p2']] = X[['p0', 'p1', 'p2']].apply(lambda x: (x + 1/model.n_components)/2)
X[['p0', 'p1', 'p2']] = X[['p0', 'p1', 'p2']].round(2)
p0 = X['p0'].values
p1 = X['p1'].values
p2 = X['p2'].values
plt.figure(figsize=(10, 6))
plt.ylim(0, 1)
plt.plot(p0, label='State 0', color='green')
plt.plot(p1, label='State 1', color='yellow')
plt.plot(p2, label='State 2', color='red')
plt.title("State Probabilities")
plt.legend()
plt.show()

In [None]:
# sliding window of 10 days
X['p0_mean'] = X['p0'].rolling(window=10).mean()
X['p1_mean'] = X['p1'].rolling(window=10).mean()
X['p2_mean'] = X['p2'].rolling(window=10).mean()
X[['p0_mean', 'p1_mean', 'p2_mean']] = X[['p0_mean', 'p1_mean', 'p2_mean']].round(1)
X.iloc[10:,]

In [None]:
mean_return_by_state = X[['predicted_state', 'return']].groupby('predicted_state').mean()
std_return_by_state = X[['predicted_state', 'return']].groupby('predicted_state').std()
mean_return_by_state.columns = ['mean_return']
std_return_by_state.columns = ['std_return']

return_summary_by_state = mean_return_by_state.join(std_return_by_state).reset_index()
return_summary_by_state['z_score'] = return_summary_by_state['mean_return'] / return_summary_by_state['std_return']
return_summary_by_state

In [None]:
# TODO: test if transition matrix is ergodic

In [None]:
# plot x values with state transitions
plt.figure(figsize=(10, 6))
plt.plot(X['return'])
plt.axvline(0, color='lightblue', linestyle='-')
plt.text(0, 0, f"State {X['predicted_state'].iloc[0]}", rotation=90, fontsize=6)
for i in range(1, len(X)):
    if X['predicted_state'].iloc[i] != X['predicted_state'].iloc[i-1]:
        plt.axvline(i, color='lightblue', linestyle='-')
        plt.text(i, 0, f"State {X['predicted_state'].iloc[i]}", rotation=90, fontsize=6)
plt.title("S&P 500 Returns")
plt.show()


In [None]:
# Function to bucket returns
def bucket_return(x, n_buckets=10):
    # Create quantile buckets
    quantiles = np.linspace(0, 1, n_buckets + 1)
    bins = np.quantile(x.dropna(), quantiles)
    bins[0] = bins[0] - 1e-8  # Ensure the minimum value is included
    return np.digitize(x, bins) - 1

# Function to predict prices
def predict_prices(hmm, test_data, bucket_func, is_multinomial=True):
    if is_multinomial:
        test_buckets = bucket_func(test_data['return'])
        hidden_states = hmm.predict(test_buckets.reshape(-1, 1))
    else:
        hidden_states = hmm.predict(test_data['return'].values.reshape(-1, 1))
    
    # Reverse the transformation
    if is_multinomial:
        means = [train_data[train_data['bucket'] == i]['return'].mean() for i in range(10)]
        predicted_returns = [means[state] for state in hidden_states]
    else:
        predicted_returns = hmm.means_[hidden_states].flatten()
    
    # Compute the predicted prices
    predicted_prices = [test_data['sp500'].values[0]]
    for r in predicted_returns:
        predicted_prices.append(predicted_prices[-1] * (1 + r))
    
    return predicted_prices[1:]


# Separate the training and test data
X = sp500_df[sp500_df['date'] < '2020-03-01']
train_data = X[X['date'] < '2019-01-01']
print(train_data.columns)
test_data = X[X['date'] >= '2019-01-01']

# Apply bucketing to training data
train_data['bucket'] = bucket_return(train_data['return'])

# Prepare the data for HMM
X_train = train_data['bucket'].values.reshape(-1, 1)
X_train_gaussian = train_data['return'].values.reshape(-1, 1)

# Train MultinomialHMM
n_states = 6  # You can adjust the number of states
multi_hmm = MultinomialHMM(n_components=n_states, n_iter=1000, init_params='ste')
multi_hmm.fit(X_train)

# Ensure the probabilities are properly normalized
if not np.allclose(multi_hmm.startprob_.sum(), 1):
    multi_hmm.startprob_ = np.full(n_states, 1.0 / n_states)
if not np.allclose(multi_hmm.transmat_.sum(axis=1), 1):
    multi_hmm.transmat_ = np.full((n_states, n_states), 1.0 / n_states)

# Train GaussianHMM
gaussian_hmm = GaussianHMM(n_components=n_states, covariance_type="full", n_iter=1000, init_params='ste')
gaussian_hmm.fit(X_train_gaussian)

# Ensure the probabilities are properly normalized
if not np.allclose(gaussian_hmm.startprob_.sum(), 1):
    gaussian_hmm.startprob_ = np.full(n_states, 1.0 / n_states)
if not np.allclose(gaussian_hmm.transmat_.sum(axis=1), 1):
    gaussian_hmm.transmat_ = np.full((n_states, n_states), 1.0 / n_states)

# Predict prices using both models
multi_predicted_prices = predict_prices(multi_hmm, test_data, bucket_return, is_multinomial=True)
gaussian_predicted_prices = predict_prices(gaussian_hmm, test_data, bucket_return, is_multinomial=False)

# Add the predictions to the test data
test_data['multi_predicted'] = multi_predicted_prices
test_data['gaussian_predicted'] = gaussian_predicted_prices

# Plot the hidden states for MultinomialHMM
def plot_hidden_states(df, hidden_states, title):
    plt.figure(figsize=(14, 8))
    for state in np.unique(hidden_states):
        state_mask = hidden_states == state
        plt.plot(df['date'][state_mask], df['return'][state_mask], '.', label=f'State {state}')
    
    plt.title(title)
    plt.xlabel('Date')
    plt.ylabel('Return')
    plt.legend()
    plt.show()

# Visualize hidden states for MultinomialHMM
hidden_states_multi = multi_hmm.predict(X_train)
plot_hidden_states(train_data, hidden_states_multi, 'Hidden States (MultinomialHMM)')

# Visualize hidden states for GaussianHMM
hidden_states_gaussian = gaussian_hmm.predict(X_train_gaussian)
plot_hidden_states(train_data, hidden_states_gaussian, 'Hidden States (GaussianHMM)')

# Plot the actual vs predicted prices
plt.figure(figsize=(14, 8))
plt.plot(test_data['date'], test_data['sp500'], label='Actual Price')
plt.plot(test_data['date'], test_data['multi_predicted'], label='Predicted Price (MultinomialHMM)')
plt.plot(test_data['date'], test_data['gaussian_predicted'], label='Predicted Price (GaussianHMM)')
plt.title('S&P 500 Actual vs Predicted Prices')
plt.xlabel('Date')
plt.ylabel('Price')
plt.legend()
plt.show()


In [None]:
test_data["combined"] = (test_data["multi_predicted"] + test_data["gaussian_predicted"])/2
plt.figure(figsize=(14, 8))
plt.plot(test_data['date'], test_data['sp500'], label='Actual Price')
plt.plot(test_data['date'], test_data['multi_predicted'], label='Predicted Price (MultinomialHMM)')
plt.plot(test_data['date'], test_data['gaussian_predicted'], label='Predicted Price (GaussianHMM)')
plt.plot(test_data['date'], test_data['combined'], label='Predicted Price (Combined)')
plt.title('S&P 500 Actual vs Predicted Prices')
plt.xlabel('Date')
plt.ylabel('Price')
plt.legend()
plt.show()


# Rabiner Example

In [None]:
import numpy as np
# Basic HMM from Rabiner 1989

# Symbols
V = ["rain", "cloudy", "sun",]
S = [1, 2, 3]

# number of observations
T = 8

# State distribution
pi = [0.5, 0.5]

# Transition matrix
A = [[0.7, 0.3],
     [0.3, 0.7]]
#_A = [0.4 0.3 0.3 0.2 0.6 0.2 0.1 0.1 0.8]
_A = [0.4, 0.3, 0.3, 0.2, 0.6, 0.2, 0.1, 0.1, 0.8]
A = np.array(_A).reshape(3,3)
print(A)

q_states = [3] # "sun"
for t in range(1, T):
    # Transition state
    current_state = q_states[t-1]
    transition_probs = A[current_state-1]
    print(t, transition_probs)
    state = np.random.choice(S, p=transition_probs)
    q_states.append(state)
q_states

In [None]:
# get probability of a given sequence `y`
A = [[0.4, 0.3, 0.3], [0.2, 0.6, 0.2], [0.1, 0.1, 0.8]]
y = [3-1,3-1,3-1,1-1,1-1,3-1,2-1,3-1]
def sequence_prob(A, y, verbose=False):
    T = len(y)
    for t in range(T):
        if t==0:
            current_state = y[t]
            next_state = y[t+1]
            p = 1
            if verbose:
                print(f"t={t+1}, current_state={current_state+1}, next_state={next_state+1}, p={p}")
        else:
            next_state = y[t]
            p *= A[current_state][next_state]
            if verbose:
                print(f"t={t+1}, current_state={current_state+1}, next_state={next_state+1}, p={p}")
            current_state = next_state
    return p
sequence_prob(A, y, verbose=True)

In [None]:
t, current_state, next_state