# Data preparation

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import time
from hmmlearn import hmm
from scipy.stats import skew, kurtosis

# GLOBAL VARIABLES

# time horizon in years
T = 20  

# number of time steps
N = int(T * 252 * 7)  

# change remige's lenght
l_regime = int(0.5  * 252 * 7)

# time interval
dt = T / N



# Merton Jump Diffusion model (MJD) parameters:
# mu - Drift
# sigma - Volatility
# lambda - Jump intensity (average number of jumps per year)
# gamma - Mean of the jump size (log-normal jump)
# delta - Standard deviation of the jump size

mjd_par = np.array(
    [[0.05, 0.2, 5, 0.02, 0.0125], # (mu,sigma, lambda, gamma, delta) bull-regime
    [-0.05, 0.4, 10, -0.04, 0.1]]) # (mu,sigma, lambda, gamma, delta) bear-regime

# array of all the timesteps
timestep = np.linspace(0, T, N)

In [None]:
def data_par(h_1, h_2):
    '''
    Given the hyper parameters h_1 and h_2 it returns the number of sub-sequences M and the effective number of log-returns that
    are involved in the analysis N_prime.
    
    '''
    
    # check the number of possible sub sequences M
    i = 0
    # N - 2 (-1:from price to log-return and -1:becuase the last index is lenght of the array -1)
    while ((h_1 - h_2) * i + h_1) <= (N-2):
        i = i + 1

    # IMPORTANT parameters
    M = i 
    N_prime = (h_1 - h_2) * (M-1) + h_1 + 1
    
    return N_prime, M

h_1 = 35
h_2 = 28

N_prime, M = data_par(h_1, h_2)
t = timestep[: N_prime + 1]

print(f"price values not included in the analysis = {len(timestep) - len(t)}")

In [None]:
def generate_regimes(N_prime):
    '''
    It generates randomly 10 different time interval of the same same lenght.
    
    '''

    A = np.arange(0, N_prime+1)

    # Parametri delle sottosequenze
    num_subsequences = 10
    subseq_length = l_regime 

    # Set per memorizzare gli indici di partenza usati
    used_indices = set()

    # Funzione per generare un indice di partenza valido
    def generate_start_index(random_state=17):
        np.random.seed(random_state)
        while True:
            # Genera un indice di partenza casuale
            start_index = np.random.randint(0, len(A) - subseq_length - 1)
            # Controlla se l'indice di partenza e l'indice finale (con buffer di 1) sono validi
            if all((start_index + i) not in used_indices for i in range(subseq_length + 1)):
                for i in range(subseq_length + 1):
                    used_indices.add(start_index + i)
                return start_index

    # Generazione delle sottosequenze random non sovrapposte con almeno un elemento di distanza
    subsequences = []
    for _ in range(num_subsequences):
        start_index = generate_start_index()
        subsequences.append(A[start_index:start_index + subseq_length])

    subsequences = np.sort(np.array(subsequences), axis=0)
    
    # label for the log-returns
    B = np.zeros(N_prime)
    for sub in subsequences:
        B[sub[0]: sub[-1]] = 1    
    B = B.astype(int)

    # label for prices
    C = np.zeros(N_prime+1)
    for sub in subsequences:
        C[sub] = 1    
    C = C.astype(int)


    
    return subsequences, B, C

subsequences, theo_labels, labels_prices = generate_regimes(N_prime)

# plot of the regimes
plt.figure(figsize=(10, 6))
for i in range(10):
    plt.axvspan(timestep[subsequences[i][0]], timestep[subsequences[i][-1]], color='red', alpha=0.3)
plt.show()

In [None]:
def mjd(S0, mu, sigma, lam, gamma, delta, n):
    """
    Simulates a Merton Jump Diffusion process (MJD).

    Parameters:
    S0 (float): Initial stock price
    mu (float): Drift
    sigma (float): Volatility
    lambda_ (float): Jump intensity (average number of jumps per year)
    gamma (float): Mean of the jump size (log-normal jump)
    delta (float): Standard deviation of the jump size
    n (int): Number of time steps

    Returns:
    np.ndarray: Simulated stock prices

    """
    # Initialize arrays to store the simulated path
    S = np.zeros(n)
    S[0] = S0
    
    # Simulate Brownian motion for the continuous part
    dW = np.random.normal(0, np.sqrt(dt), n-1)
    
    # Simulate Poisson process for the jump part
    dN = np.random.poisson(lam * dt, n-1)
    
    # Simulate jump sizes (log-normal distribution for jumps)
    J = np.exp(np.random.normal(gamma, delta, n-1))
    
    for i in range(1, n):
        # Continuous part (Brownian motion)
        S[i] = S[i-1] * np.exp((mu - 0.5 * sigma**2) * dt + sigma * dW[i-1])
        
        # Jump part (if a jump occurs, dN[i-1] will be 1)
        if dN[i-1] > 0:
            S[i] *= J[i-1]  # Apply jump (multiply by the jump size)
        
    return S

def mjd_path(N_prime, C, t):
    '''
    It simulates the entire path of a MJD with regimes switch.
    
    '''
    # array of prices
    s = np.zeros(N_prime + 1)
    # initial stock price
    s[0] = 1
    s_0 = s[0]
    start_index = 0
    stop_index = 1

    for k in range(1, N_prime+1):
        if k == N_prime:
            s[start_index : stop_index + 1] = mjd(s_0, mjd_par[C[k]][0], mjd_par[C[k]][1], mjd_par[C[k]][2], mjd_par[C[k]][3], mjd_par[C[k]][4], len(t[start_index : stop_index + 1]))

        elif C[k] == C[k+1]:
            stop_index = k+1

        else:
            s[start_index : stop_index + 1] = mjd(s_0, mjd_par[C[k]][0], mjd_par[C[k]][1], mjd_par[C[k]][2], mjd_par[C[k]][3], mjd_par[C[k]][4], len(t[start_index : stop_index + 1]))
            #updates
            start_index = k
            s_0 = s[k]
            stop_index = k + 1
            
    return s

# to ensure reproducibility
seed_path = 50
np.random.seed(seed_path)

# relevant time series
prices = mjd_path(N_prime, labels_prices, t)  
log_returns = np.diff(np.log(prices))

# it was just a check for the seed...
print(f'mean_path = {np.mean(prices)} \nstd_path = {np.std(prices)}')

# plot price path
plt.figure(figsize=(10, 6))
plt.plot(t,prices)
for i in range(10):
    if i == 0:
        plt.axvspan(t[subsequences[i][0]], t[subsequences[i][-1]], color='red', alpha=0.3, label='regime switch')
        
    else:
        plt.axvspan(t[subsequences[i][0]], t[subsequences[i][-1]], color='red', alpha=0.3)
        
    
#plt.title("Merton Jump Diffusion Simulation")
plt.xlabel("time (years)")
plt.ylabel("stock price")
plt.grid()
plt.legend()
plt.show()
plt.show()

# Hidden Markov Models

In [None]:
%%time

# using log returns
time_series_data = log_returns.reshape(-1, 1)

seed_clustering = 30

max_iter = 100
tol = 1e-2

# Define the HMM
model = hmm.GaussianHMM(n_components=2, covariance_type='diag', random_state=seed_clustering, n_iter=max_iter, tol=tol)

# Fit the HMM to the time series data
model.fit(time_series_data)

# Predict hidden states
hmm_labels = model.predict(time_series_data)
print(f'model converged? {model.monitor_.converged}')

off_regime_index = 0
on_regime_index = 1

if (hmm_labels == 0).sum() < (hmm_labels == 1).sum():
    off_regime_index = 1
    on_regime_index = 0

# Accuracy scores

In [None]:
%%time
dec = 5

# regime-off accuracy score (ROFS)
ROFS = np.sum(hmm_labels[theo_labels == 0] == off_regime_index) / len(hmm_labels[theo_labels == 0])
print(f'ROFS = {round(ROFS, dec)}')

# regime-off accuracy score (ROFS)
RONS = np.sum(hmm_labels[theo_labels == 1] == on_regime_index) / len(hmm_labels[theo_labels == 1])
print(f'RONS = {round(RONS, dec)}')

# total accuracy (TA)
TA = (np.sum(hmm_labels[theo_labels == 0] == off_regime_index) + np.sum(hmm_labels[theo_labels == 1] == on_regime_index)) / len(hmm_labels)
print(f'TA = {round(TA, dec)}')

## log-returns

In [None]:
# two important functions to allow a correct way to plot data
def compare_columns(B):
    if off_regime_index == 1:
        B = np.where(B == 0, 1, np.where(B == 1, 0, B))
    return B

In [None]:
b = compare_columns(hmm_labels)
color = ['green', 'red']
# set the size of line and marker
m_size = 0.5

start_j = 0
end_j = 0

plt.figure(figsize=(10, 6))
for i in range(0, len(log_returns)):
    
    if i == (len(log_returns) - 1):
        plt.plot(t[start_j: end_j + 1], log_returns[start_j: end_j + 1], 
                 color=color[b[i]], marker='.', linewidth=m_size, markersize=m_size)
    
    elif b[i] == b[i+1]:
        end_j = i + 1
        
    else:
        plt.plot(t[start_j: end_j + 1], log_returns[start_j: end_j + 1], 
                 color=color[b[i]], marker='.', linewidth=m_size, markersize=m_size)
        start_j = i + 1
        end_j = i + 1
        
for i in range(10):
    if i == 0:
        plt.axvspan(t[subsequences[i][0]], t[subsequences[i][-1]], color='red', alpha=0.3, label='regime switch')
        
    else:
        plt.axvspan(t[subsequences[i][0]], t[subsequences[i][-1]], color='red', alpha=0.3)        
        
plt.legend()  
plt.ylabel('log-returns')
plt.xlabel('time (years)')
plt.show()    

## price path

In [None]:
b = compare_columns(hmm_labels)
color = ['green', 'red']
# set the size of line and marker
m_size = 0.5

start_j = 0
end_j = 0

plt.figure(figsize=(10, 6))
for i in range(0, len(log_returns)):
    
    if i == (len(log_returns) - 1):
        plt.plot(t[start_j: end_j + 1], prices[start_j: end_j + 1], 
                 color=color[b[i]], marker='.', linewidth=m_size, markersize=m_size)
    
    elif b[i] == b[i+1]:
        end_j = i + 2
        
    else:
        plt.plot(t[start_j: end_j + 1], prices[start_j: end_j + 1], 
                 color=color[b[i]], marker='.', linewidth=m_size, markersize=m_size)
        start_j = i + 2
        end_j = i + 2
        
for i in range(10):
    if i == 0:
        plt.axvspan(t[subsequences[i][0]], t[subsequences[i][-1]], color='red', alpha=0.3, label='regime switch')
        
    else:
        plt.axvspan(t[subsequences[i][0]], t[subsequences[i][-1]], color='red', alpha=0.3)        
      
        
plt.legend()  
plt.ylabel('price')
plt.xlabel('time (years)')
plt.show()    

# CLUSTERING VALIDATION

In [None]:
def clustering_validation(h_1, h_2, n_iter, tol, n_runs):
    
    rofs = np.zeros(n_runs)
    rons = np.zeros(n_runs)
    ta = np.zeros(n_runs)
    convergence_count = np.zeros(n_runs)
    iteration_times = np.zeros(n_runs)
    
    N_prime, M = data_par(h_1, h_2)
    t = timestep[: N_prime + 1]
    subs, theo_labels, price_labels = generate_regimes(N_prime)
    
    for j in range(n_runs): 
        
        # data preparation
        np.random.seed(j)
        log_returns = np.diff(np.log(mjd_path(N_prime, price_labels, t)))
        
        # clustering
        start = time.time()
        
        # using log returns
        time_series_data = log_returns.reshape(-1, 1)

        # Define the HMM
        model = hmm.GaussianHMM(n_components=2, covariance_type='diag', n_iter=n_iter, tol=tol)

        # Fit the HMM to the time series data
        model.fit(time_series_data)

        # Predict hidden states
        hmm_labels = model.predict(time_series_data)

        # check for convergence
        if model.monitor_.converged:
            convergence_count[j] = 1

        # off/on indexes
        off_regime_index = 0
        on_regime_index = 1

        if (hmm_labels == 0).sum() < (hmm_labels == 1).sum():
            off_regime_index = 1
            on_regime_index = 0

        # regime-off accuracy score (ROFS)
        rofs[j] = np.sum(hmm_labels[theo_labels == 0] == off_regime_index) / len(hmm_labels[theo_labels == 0])

        # regime-off accuracy score (ROFS)
        rons[j] = np.sum(hmm_labels[theo_labels == 1] == on_regime_index) / len(hmm_labels[theo_labels == 1])

        # total accuracy (TA)
        ta[j] = (np.sum(hmm_labels[theo_labels == 0] == off_regime_index) + np.sum(hmm_labels[theo_labels == 1] == on_regime_index)) / len(hmm_labels)

        iteration_times[j] = time.time() - start

    return rofs, rons, ta, convergence_count, iteration_times

In [None]:
%%time
# clustering validation parameters
n_runs = 50
max_iter = 100
tol = 1e-3


rofs, rons, ta, convergence_count, iteration_times = clustering_validation(h_1, h_2, max_iter, tol, n_runs)

dec = 4
print(f"ROFS = {round(np.mean(rofs), dec)} -+ {round(np.std(rofs), dec)}")
print(f"RONS = {round(np.mean(rons), dec)} -+ {round(np.std(rons), dec)}")
print(f"TA = {round(np.mean(ta), dec)} -+ {round(np.std(ta), dec)}")
print(f"RUN TIME = {round(np.mean(iteration_times), dec)} -+ {round(np.std(iteration_times), dec)}")

print(f'\nCONVERGENCE RATE = {np.sum(convergence_count)/n_runs}')
if (convergence_count == 1).all():
    print('every iteration reaches convergence!')

In [None]:
# print the results as txt file

df = pd.DataFrame({
    'ROFS': rofs,
    'RONS': rons,
    'TA': ta,
    'CONVERGENCE': convergence_count,
    'RUNTIME': iteration_times
})


df.to_csv(f'numerical_results/HMM_h_{h_1}_{h_2}_MJD_n_{n_runs}_ite_{max_iter}_tol_{tol}.txt', index=False)

In [None]:
# read the results
df = pd.read_csv('numerical_results/')

rofs = df['ROFS'].values
rons = df['RONS'].values
ta = df['TA'].values
convergence_count = df['CONVERGENCE'].values
iteration_times = df['RUNTIME'].values

dec = 4
print(f"ROFS = {round(np.mean(rofs), dec)} -+ {round(np.std(rofs), dec)}")
print(f"RONS = {round(np.mean(rons), dec)} -+ {round(np.std(rons), dec)}")
print(f"TA = {round(np.mean(ta), dec)} -+ {round(np.std(ta), dec)}")
print(f"RUN TIME = {round(np.mean(iteration_times), dec)} -+ {round(np.std(iteration_times), dec)}")

print(f'\nCONVERGENCE RATE = {np.sum(convergence_count)/len(convergence_count)}')
if (convergence_count == 1).all():
    print('every iteration reaches convergence!')