# Stock Price Prediction
Using Hidden Markov Model, Kalman Filter, and Dynamic Bayesian Networks\
By: Peter

In [188]:
import yfinance as yf
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm

In [189]:
# Define constants 5, 10, 20, 30, 50, 100, 200, 300, 500, 
# START_DATE = "2018-12-30"
START_DATE = "1927-12-30"
END_DATE = "2025-03-31"
TICKER = "^GSPC"
WINDOW_SIZES = [1000]

In [190]:
# Download S&P 500 data using yfinance
def download_data():
    df = yf.download(TICKER, start=START_DATE, end=END_DATE, auto_adjust=False)
    df = df[["Adj Close"]].dropna()
    df.reset_index(inplace=True)
    return df

# Compute Mean Absolute Percentage Error
def mape(actual, predicted):
    return np.abs((actual - predicted) / actual) * 100

# Base Class
class BaseModel:
    def fit(self, series):
        pass

    def predict(self):
        pass

# HMM
from hmmlearn.hmm import GaussianHMM

class HMMModel(BaseModel):
    def __init__(self, n_components=2):
        self.model = GaussianHMM(n_components=n_components)

    def fit(self, series):
        self.series = np.array(series).reshape(-1, 1)
        self.model.fit(self.series)

    def predict(self):
        logprob, hidden_states = self.model.decode(self.series)
        last_state = hidden_states[-1]
        return self.model.means_[last_state][0]

# Kalman Filter
from pykalman import KalmanFilter

class KalmanModel(BaseModel):
    def __init__(self):
        self.kf = KalmanFilter(transition_matrices=[1],
                               observation_matrices=[1],
                               initial_state_mean=0,
                               initial_state_covariance=1,
                               observation_covariance=1,
                               transition_covariance=0.0001)

    def fit(self, series):
        self.series = np.array(series)
        self.kf = self.kf.em(self.series, n_iter=5)
        self.filtered_state_means, _ = self.kf.filter(self.series)

    def predict(self):
        next_state = self.kf.transition_matrices[0] * self.filtered_state_means[-1]
        return next_state[0]

# Dynamic Bayesian Network
import pymc as pm

class DBNModel(BaseModel):
    def fit(self, series):
        self.series = np.array(series)
        y_t = self.series[1:]
        y_tm1 = self.series[:-1]

        with pm.Model() as self.model:
            alpha = pm.Normal("alpha", mu=0, sigma=10)
            beta = pm.Normal("beta", mu=0, sigma=1)
            sigma = pm.HalfNormal("sigma", sigma=1)

            mu = alpha + beta * y_tm1
            y_obs = pm.Normal("y_obs", mu=mu, sigma=sigma, observed=y_t)

            map_estimate = pm.find_MAP(progressbar=False)
            self.alpha = map_estimate["alpha"]
            self.beta = map_estimate["beta"]

    def predict(self):
        last = self.series[-1]
        return self.alpha + self.beta * last


# Evaluate model for a given window size
from joblib import Parallel, delayed

def evaluate_model(model_class, series, n, stride=10, parallel=True):
    def process_window(i):
        window_data = series[i - n:i]
        actual = series[i]
        model = model_class()
        try:
            model.fit(window_data)
            pred = model.predict()
            error = mape(actual, pred) if not np.isnan(pred) else None
        except Exception as e:
            error = None  # Skip window on failure
        if i % 1000 == 0:
            print(f"[{i}] Completed")
        return error

    indices = range(n, len(series) - 1, stride)
    
    if parallel:
        errors = Parallel(n_jobs=-1)(
            delayed(process_window)(i) for i in indices
        )
    else:
        errors = [process_window(i) for i in indices]

    errors = [e for e in errors if e is not None]
    return np.mean(errors) if errors else np.nan


In [191]:
df = download_data()
prices = df["Adj Close"].values

[*********************100%***********************]  1 of 1 completed


In [192]:
results = {"n": WINDOW_SIZES}
models = {
    # "HMM": HMMModel,
    # "Kalman": KalmanModel,
    "DBN": DBNModel
}

In [193]:
print(f"Series length: {len(prices)}")

for name, ModelClass in models.items():
    avg_mapes = []
    print(f"\nEvaluating {name}...")
    for n in WINDOW_SIZES:
        avg_mape = evaluate_model(ModelClass, prices, n)
        avg_mapes.append(avg_mape)
    results[name] = avg_mapes

# Plotting
plt.figure(figsize=(10, 6))
for name in models:
    plt.plot(WINDOW_SIZES, results[name], label=name)
plt.xlabel("Window Size (n)")
plt.ylabel("Average MAPE (%)")
plt.title("Prediction Accuracy Comparison")
plt.legend()
plt.grid(True)
plt.show()


Series length: 24426

Evaluating DBN...
[1000] Completed
[2000] Completed
[3000] Completed
[4000] Completed
[5000] Completed
[6000] Completed
[7000] Completed
[8000] Completed
[9000] Completed
[10000] Completed
[11000] Completed


KeyboardInterrupt: 

In [181]:
results

{'n': [1000], 'HMM': [np.float64(11.902312333230364)]}