# Markov Chain

Goal:
- learns market states automatically (trained from real price data)
- model transitions between states
- simnulates future paths
- detects market regime changes (bull/bear, high/low, volatility)
- generates probabilitistic forecasts of returns of volatility


This model resembles a Regime-Switching Model (e.g. Hamilton 1989 hidden markov model for macroeconomics), but fully built with *8first principles**.

Markov Chain:
- Set of states $S = {s₁, s₂, ..., sₙ}$
- Transition probabilities P(i → j), such that: $\sum_j P(i \to j) = 1 \quad \forall i$
- Memoryless: Next state depends only on current state.

In finance, “state” could mean:

| State Types       |                   Examples                        |
|-------------------|---------------------------------------------------|
| Price Bins        |  S&P 500 is in [3000-3100], [3100-3200], etc      |
| Return Bins       | Daily returns in ranges (-2%, -1%), (0%, 1%), etc |
| Volatility Levels | High vol, low vol                                 |
| Economic Regime   | Recession, expansion                              |


In [None]:
%pip install numpy pandas sklearn

## Preprocessing 

In [None]:
import numpy as np
import pandas as pd
from sklearn.cluster import KMeans

def load_price_data(filepath):
    df = pd.read_csv(filepath)
    df["Data"] = pd.to_datetime(df["Data"])
    df = df.sort_values("Date")
    df["Return"] = df["Close"].get_change()
    df["LogReturn"] = np.log1p(df["Return"])

    return df.dropna()


## State Modeling

discretize returns into states. 

Example:
- Return < -1% → state 0
- -1% ≤ Return < 0% → state 1
- 0% ≤ Return < 1% → state 2
- Return ≥ 1% → state 3

Explanation:

bins = [-inf, -1%, 0%, +1%, +inf] define 4 states (0,1,2,3).


In [None]:
def assign_states(df, thresholds=[-0.01, 0, 0.01]):
    bins = [-np.inf] + thresholds + [np.inf]
    df['State'] = pd.cut(df['Return'], bins=bins, labels=False)
    
    return df


def cluster_returns(df, n_clusters=4):
    returns = df["Return"].values.reshape(-1, 1)
    kmeans = KMeans(n_clusters=n_clusters, random_state=0).fit(returns)
    df["State"] = kmeans.labels_

    return df


## Transition Matrix Estimation

Estimate the transition 

Note:

Rows must sum to 1 → ∑ P(i→j) = 1

In [None]:
def estimate_transition_matrix(states, num_states):
    matrix = np.zeros((num_states, num_states))

    for (current_state, next_state) in zip(states[:-1], states[1:]):
        matrix[current_state, next_state] += 1

    row_sums = matrix.sum(axis=1)
    transition_matrix = matrix / row_sums[:, None] # normalize rows

    return transition_matrix

def sliding_transition_matrices(states, num_states, window_size=252):
    matrices = []

    for start in range(0, len(states) - window_size):
        window = states[start:start+window_size]
        matrix = estimate_transition_matrix(window, num_states)
        matrices.append(matrix)
        
    return matrices

## Forecasting and Simulaiton

In [None]:
def simulate_markov_chain(transition_matrix, start_state, n_steps):
    states = []
    current_state = start_state
    
    for _ in range(n_steps):
        next_state = np.random.choice(
            a=len(transition_matrix),
            p=transition_matrix[current_state]
        )
        states.append(next_state)
        current_state = next_state

    return states


## Risk Model per State

In [None]:
def compute_state_risk_profile(df):
    grouped = df.groupby('State')['Return']
    state_return_mean = grouped.mean()
    state_return_std = grouped.std()
    
    return state_return_mean, state_return_std

## Hidden Markov Model (HMM)

Introduce latent states: observed returns are noisy emissions; real market regimes are hidden

Concept: 

- Hidden states: “Bull”, “Bear”, “Sideways”, etc. (latent)
- Observed emissions: daily returns.
- Two sets of parameters:
- Transition probabilities between hidden states.
- Emission probabilities (observation model).

This improves realism: markets aren’t purely observable.


In [None]:
class HiddenMarkovModel:
    def __init__(self, num_hidden_states, num_observation_bins):
        self.num_hidden_states = num_hidden_states
        self.num_observation_bins = num_observation_bins
        self.transition_probs = np.full((num_hidden_states, num_hidden_states), 1/num_hidden_states)
        self.emission_probs = np.full((num_hidden_states, num_observation_bins), 1/num_observation_bins)
        self.initial_probs = np.full(num_hidden_states, 1/num_hidden_states)

    def initialize_random(self):
        self.transition_probs = np.random.dirichlet(np.ones(self.num_hidden_states), size=self.num_hidden_states)
        self.emission_probs = np.random.dirichlet(np.ones(self.num_observation_bins), size=self.num_hidden_states)
        self.initial_probs = np.random.dirichlet(np.ones(self.num_hidden_states))

    def forward(self, observations):
        T = len(observations)
        alpha = np.zeros((T, self.num_hidden_states))

        alpha[0] = self.initial_probs * self.emission_probs[:, observations[0]]

        for t in range(1, T):
            for j in range(self.num_hidden_states):
                alpha[t, j] = np.sum(alpha[t-1] * self.transition_probs[:, j]) * self.emission_probs[j, observations[t]]

        return alpha

    def backward(self, observations):
        T = len(observations)
        beta = np.zeros((T, self.num_hidden_states))

        beta[-1] = np.ones(self.num_hidden_states)

        for t in reversed(range(T-1)):
            for i in range(self.num_hidden_states):
                beta[t, i] = np.sum(self.transition_probs[i, :] * self.emission_probs[:, observations[t+1]] * beta[t+1, :])

        return beta

    def baum_welch_train(self, observations, n_iter=10):
        T = len(observations)

        for _ in range(n_iter):
            alpha = self.forward(observations)
            beta = self.backward(observations)

            xi = np.zeros((T-1, self.num_hidden_states, self.num_hidden_states))
            for t in range(T-1):
                denom = np.dot(np.dot(alpha[t, :], self.transition_probs) * self.emission_probs[:, observations[t+1]], beta[t+1, :])
                for i in range(self.num_hidden_states):
                    numer = alpha[t, i] * self.transition_probs[i, :] * self.emission_probs[:, observations[t+1]] * beta[t+1, :]
                    xi[t, i, :] = numer / denom

            gamma = np.sum(xi, axis=2)
            self.initial_probs = gamma[0]

            self.transition_probs = np.sum(xi, axis=0)
            self.transition_probs /= np.sum(self.transition_probs, axis=1, keepdims=True)

            gamma = np.vstack((gamma, np.sum(xi[-1, :, :], axis=0)))

            for l in range(self.num_observation_bins):
                mask = observations == l
                self.emission_probs[:, l] = np.sum(gamma[mask], axis=0)
            self.emission_probs /= np.sum(self.emission_probs, axis=1, keepdims=True)


## Trading Strategy, Risk Management & Backtesting

In [None]:
class RiskManager:
    def __init__(self, max_position=1.0, vol_target=0.02, stop_loss=-0.05):
        self.max_position = max_position
        self.vol_target = vol_target
        self.stop_loss = stop_loss

    def size_position(self, realized_vol):
        size = min(self.vol_target / (realized_vol + 1e-8), self.max_position)
        return size

    def check_stop_loss(self, pnl):
        return pnl < self.stop_loss

In [None]:
class MarkovTradingStrategy:
    def __init__(self, transition_matrix, hmm_model, state_mean_returns):
        self.transition_matrix = transition_matrix
        self.hmm_model = hmm_model
        self.state_mean_returns = state_mean_returns

    def predict_next_state(self, current_state):
        next_probs = self.transition_matrix[current_state]
        predicted_state = np.argmax(next_probs)
        return predicted_state

    def infer_current_hidden_state(self, observations):
        alpha = self.hmm_model.forward(observations)
        hidden_state = np.argmax(alpha[-1])
        return hidden_state

    def decide_action(self, predicted_state):
        if self.state_mean_returns[predicted_state] > 0:
            return "BUY"
        elif self.state_mean_returns[predicted_state] < 0:
            return "SELL"
        else:
            return "HOLD"

    def live_forecast(self, current_state, observations):
        inferred_hidden = self.infer_current_hidden_state(observations)
        predicted_state = self.predict_next_state(current_state)
        action = self.decide_action(predicted_state)
        return {
            "Inferred Hidden State": inferred_hidden,
            "Predicted Next State": predicted_state,
            "Action": action
        }
    

In [None]:
class BacktestEngine:
    def __init__(self, df, strategy, risk_manager):
        self.df = df
        self.strategy = strategy
        self.risk_manager = risk_manager
        self.equity_curve = []

    def run(self):
        capital = 1.0
        position = 0.0

        for i in range(1, len(self.df)):
            current_state = self.df.iloc[i-1]['State']
            observations = self.df.iloc[:i]['State'].values
            forecast = self.strategy.live_forecast(current_state, observations)
            action = forecast['Action']

            realized_vol = self.df.iloc[max(0, i-21):i]['Return'].std()
            position_size = self.risk_manager.size_position(realized_vol)

            ret = self.df.iloc[i]['Return']

            if action == "BUY":
                pnl = position_size * ret
                position = position_size
            elif action == "SELL":
                pnl = -position_size * ret
                position = -position_size
            else:  # HOLD
                pnl = position * ret

            if self.risk_manager.check_stop_loss(pnl):
                pnl = 0
                position = 0

            capital = capital * (1 + pnl)
            self.equity_curve.append(capital)

        return pd.Series(self.equity_curve)


## Utilities


In [None]:
def compute_stationary_distribution(P):
    eigvals, eigvecs = np.linalg.eig(P.T)
    stat_dist = np.real(eigvecs[:, np.isclose(eigvals, 1)])
    stat_dist = stat_dist[:, 0]
    stat_dist = stat_dist / stat_dist.sum()
    return stat_dist


# Pipeline

In [None]:
df = load_price_data("your_prices.csv")
df = cluster_returns(df, n_clusters=4)
num_states = df['State'].nunique()

transition_matrix = estimate_transition_matrix(df['State'].values, num_states)
mean_returns, std_returns = compute_state_risk_profile(df)

observations = df['State'].values
hmm = HiddenMarkovModel(num_hidden_states=3, num_observation_bins=num_states)
hmm.initialize_random()
hmm.baum_welch_train(observations, n_iter=50)

strategy = MarkovTradingStrategy(transition_matrix, hmm, mean_returns)
risk_manager = RiskManager()
backtester = BacktestEngine(df, strategy, risk_manager)
equity_curve = backtester.run()

print(equity_curve.tail())
