In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import random

In [2]:
df = pd.read_csv('../HMM_randomportfolio.csv')
df = df.tail(1000)
#df = df.set_index(df.pricingdate).drop('pricingdate', axis=1)
df = df.reset_index().drop('index', axis=1)
df

Unnamed: 0,pricingdate,priceclose,r1,signal
0,2018-06-12,1293.170013,0.002628,1
1,2018-06-13,1296.170003,0.002320,1
2,2018-06-14,1303.909986,0.005971,1
3,2018-06-15,1308.450022,0.003482,1
4,2018-06-18,1299.859995,-0.006565,0
...,...,...,...,...
995,2022-05-24,1871.590002,0.002630,1
996,2022-05-25,1913.690022,0.022494,1
997,2022-05-26,1923.819968,0.005293,1
998,2022-05-27,1967.990009,0.022960,1


In [3]:
def initial_matrices(n_hidden, n_visible, random_state = 2452):
    random.seed(random_state)
    A = np.zeros((n_hidden, n_hidden))
    for i in range(A.shape[0]):
        A[i] = np.array([(1/n_hidden)+np.random.normal(0,0.2),(1/n_hidden)+np.random.normal(0,0.2), 0])
        A[i,A.shape[0]-1] = 1 - sum(A[i])
    B = np.zeros((n_hidden,n_visible))
    for i in range(B.shape[0]):
        B[i] = np.array([(1/n_visible)+np.random.normal(0,0.2), 0])
        B[i, B.shape[1]-1] = 1 - sum(B[i])
    pi = np.zeros(n_hidden)
    for i in range(pi.shape[0]):
        pi[i] = np.mean(A[:,i])
    return A, B, pi

In [4]:
S = ['B','H','S'] # buy, hold, sell

In [5]:
n_visible = len(df.signal.unique())
n_hidden = len(S)

In [6]:
A, B, pi = initial_matrices(n_hidden, n_visible)

In [7]:
def forward(V, A, B, pi):
    T = V.shape[0]
    N = B.shape[0]
    
    alpha = np.zeros((T,N))
    alpha[0] = pi[0] * B[:,V[0]]
        
    for t in range(0,T-1):
        for j in range(N):
            alpha[t+1,j] = alpha[t,:].dot(A[:,j]) * (B[j,V[t+1]])
            
    return alpha

In [8]:
def backward(V, A, B):
    T = V.shape[0]
    N = B.shape[0]
    
    beta = np.zeros((T,N))
    beta[T-1] = np.ones((N))
    
    for t in range(T-2, -1, -1):
        for j in range(N):
            beta[t,j] = (beta[t+1,:] * B[:,V[t+1]]).dot(A[j,:])
            
    return beta

In [9]:
def viterbi(V, A, B, pi):
    T = V.shape[0]
    N = A.shape[0]
    
    alpha = forward(V, A, B, pi)
    beta = backward(V, A, B)
    
    gamma = np.zeros((T, N))
    for i in range(N):
        for t in range(T):
            gamma[t,i] = (alpha[t,i] * beta[t,i]) / sum(alpha[t]*beta[t])
    
    return gamma

In [10]:
def baum_welch(V, A, B, pi, n_iter=100):
    T = V.shape[0]
    N = A.shape[0]
 
    for n in range(n_iter):
        alpha = forward(V, A,B,pi)
        beta = backward(V, A,B)
 
        xi = np.zeros((N, N, T - 1))
        for t in range(T - 1):
            denominator = np.dot(np.dot(alpha[t, :].T, A) * B[:, V[t + 1]].T, beta[t + 1, :])
            for i in range(N):
                numerator = alpha[t, i] * A[i, :] * B[:, V[t + 1]].T * beta[t + 1, :].T
                xi[i, :, t] = numerator / denominator
 
        gamma = np.sum(xi, axis=1)
        A = np.sum(xi, 2) / np.sum(gamma, axis=1).reshape((-1, 1))
 
        # Add additional T'th element in gamma
        gamma = np.hstack((gamma, np.sum(xi[:, :, T - 2], axis=0).reshape((-1, 1))))
 
        K = B.shape[1]
        denominator = np.sum(gamma, axis=1)
        for l in range(K):
            B[:, l] = np.sum(gamma[:, V == l], axis=1)
 
        B = np.divide(B, denominator.reshape((-1, 1)))
 
    return (A, B)
 
 

In [11]:
t = baum_welch(df.signal, A, B, pi, n_iter=)

In [12]:
t

(array([[0.42871683, 0.2193267 , 0.35195646],
        [0.28687645, 0.22500404, 0.48811951],
        [0.50184183, 0.33918807, 0.1589701 ]]),
 array([[0.63171634, 0.36828366],
        [0.63759113, 0.36240887],
        [0.08287226, 0.91712774]]))

In [None]:
gamma = viterbi(df.signal,t[0],t[1],pi)
h = np.zeros((gamma.shape[0]))
for i in range(gamma.shape[0]):
    h[i] = np.argmax(gamma[i])

h

In [None]:
pd.DataFrame(h).value_counts()

In [None]:
def viterbi(V, a, b, initial_distribution):
    T = V.shape[0]
    M = a.shape[0]
 
    omega = np.zeros((T, M))
    omega[0, :] = np.log(initial_distribution * b[:, V[0]])
 
    prev = np.zeros((T - 1, M))
 
    for t in range(1, T):
        for j in range(M):
            # Same as Forward Probability
            probability = omega[t - 1] + np.log(a[:, j]) + np.log(b[j, V[t]])
 
            # This is our most probable state given previous state at time t (1)
            prev[t - 1, j] = np.argmax(probability)
 
            # This is the probability of the most probable state (2)
            omega[t, j] = np.max(probability)
 
    # Path Array
    S = np.zeros(T)
 
    # Find the most probable last hidden state
    last_state = np.argmax(omega[T - 1, :])
 
    S[0] = last_state
 
    backtrack_index = 1
    for i in range(T - 2, -1, -1):
        S[backtrack_index] = prev[i, int(last_state)]
        last_state = prev[i, int(last_state)]
        backtrack_index += 1
 
    # Flip the path array since we were backtracking
    S = np.flip(S, axis=0)
 
    # Convert numeric values to actual hidden states
    result = []
    for s in S:
        if s == 0:
            result.append("A")
        else:
            result.append("B")
 
    return result

In [None]:
v = viterbi(df.signal, t[0], t[1], pi)

In [None]:
v