In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import random
import warnings
from utils import *

In [None]:
df = pd.read_csv('../HMM_randomportfolio.csv')
df.pricingdate = df.pricingdate.astype('datetime64')
df = df.tail(1500)
df = df.set_index(df.pricingdate).drop('pricingdate', axis=1)
df['predicted'] = np.zeros(len(df.priceclose))
df

In [None]:
plt.plot(df.priceclose)

In [None]:
S = ['B','S'] # buy, hold, sell

In [None]:
n_visible = len(df.signal.unique())
n_hidden = len(S)

In [None]:
def forward(V, A, B, pi):
    T = V.shape[0]
    N = A.shape[0]
    
    alpha = np.zeros((T,N))
    alpha[0] = pi[0] * B[:,V[0]]  
    for t in range(0,T-1): alpha[t+1] = np.array([alpha[t,:].dot(A[:,j]) * (B[j,V[t+1]]) for j in range(N)])
            
    return alpha

In [None]:
def backward(V, A, B):
    T = V.shape[0]
    N = A.shape[0]
    
    beta = np.zeros((T,N))
    beta[T-1] = np.ones((N))
    for t in range(T-2, -1, -1): beta[t] = np.array([(beta[t+1,:] * B[:,V[t+1]]).dot(A[j,:]) for j in range(N)])
            
    return beta

In [None]:
def viterbi(V,A,B,pi):
    T = V.shape[0]
    N = A.shape[0]

    alpha = forward(V, A, B, pi)
    beta = backward(V, A, B)

    xi = np.zeros((T, N, N))
    for t in range(T-1):
        denominator = sum(alpha[t,:].dot(A[:,:]) * B[:,V[t+1]] * beta[t+1,:])
        for i in range(N):
            for j in range(N):
                numerator = alpha[t,i] * A[i,j] * (B[j,V[t+1]]) * beta[t+1,j] 
                xi[t,i,j] = numerator / denominator 

           # note: gamma can be also computed as
    gamma = np.zeros((T,N))
    for t in range(T): gamma[t] = np.array([sum(xi[t,i,:]) for i in range(N)])

    return gamma, xi

In [None]:
def baum_welch(V, A, B, pi, max_iter=100, epsilon=1e-5, print_states=True):
    T = V.shape[0]
    N = A.shape[0]
    
    # initialization: for the first iteration, our estimated parameters will be the initial ones
    A_bar = A
    B_bar = B
    pi_bar = pi
    
    for iteration in range(max_iter):
        gamma, xi = viterbi(V, A_bar, B_bar, pi_bar)

        # expectation-maximization step
        pi_bar = np.array([gamma[0,i] for i in range(N)])

        prev = [A_bar, B_bar, pi_bar]
               
        A_bar = np.zeros((N,N))
        for i in range(N): A_bar[i] = np.array([xi[:-1,i,j].sum() / gamma[:-1,i].sum() for j in range(N)])

        B_bar = np.zeros(B.shape)
        for j in range(B.shape[0]):
            for k in range(B.shape[1]):
                # we need to define a list of times t_bar where V[t] is equal to k
                t_bar = [0,0]
                for i in sorted(V.unique()): t_bar[i] = [t for t in range(T) if V[t]==k]
                B_bar[j,k] = sum([gamma[t,j] for t in t_bar[k]]) / gamma[:,j].sum()
        
        A_delta = delta(A_bar, prev[0])
        B_delta = delta(B_bar, prev[1])
        pi_delta = delta(pi_bar, prev[2])
        
        e = A_delta + B_delta + pi_delta
        
        if print_states == True: print(f'iteration {iteration} - delta {e}')
        if e < epsilon: break

    return {'A_hat': A_bar, 'B_hat': B_bar, 'pi_hat': pi_bar, 'gamma': gamma}

In [None]:
rolling1 = df[0:299]
state = np.empty(299)
plt.plot(rolling1.priceclose)

In [None]:
A_init = np.array([[.9,.1],
                  [.25,.75]])
B_init = np.array([[.8,.2],
                   [.2,.8]])
pi_init = np.array([1.15/2,.85/2])

In [None]:
est= baum_welch(rolling1.signal, A_init, B_init, pi_init)

In [None]:
gennaro = est['gamma']
gennaro

In [None]:
gennaro2 = viterbi(df.signal, est['A_hat'], est['B_hat'], est['pi_hat'])
gennaro2

In [None]:
for t in range(299): state[t] = np.argmax(est['gamma'][t])
rolling1['state'] = state[:299]

In [None]:
plt.figure(figsize=(15,10))
plt.plot(rolling1.priceclose, color='black',alpha=.8)
plt.fill_between(rolling1.index,rolling1.priceclose,800,where=rolling1.state==0, color='red', alpha=.7)
plt.fill_between(rolling1.index,rolling1.priceclose,800,where=rolling1.state==1, color='green', alpha=.7)

In [None]:
rolling1.r1[rolling1.state==1].mean()

In [None]:
 # conditional distribution of r1 given the state
(rolling1.r1[rolling1.state==0]*100).plot.density()
(rolling1.r1[rolling1.state==1]*100).plot.density()
plt.legend(['Sell','Buy'])
plt.xlabel('% of increasing return')

In [None]:
prediction = rolling1[-2:-1].priceclose + (rolling1.r1[rolling1.state==rolling1[-2:-1].state[0]].mean())*rolling1[-2:-1].priceclose
df.predicted[298] = prediction[0]
prediction

In [None]:
actual = rolling1[-1:].priceclose
actual

In [None]:
loss = (prediction[0] - actual[0])**2
loss

In [None]:
loss = []
width = 50

warnings.filterwarnings("ignore")
for n in range(width,100):
    rolling = df[(n-width):n]
    state = np.empty(width)
    est = baum_welch(rolling.signal, A_init, B_init, pi_init, print_states = False)
    
    for t in range(width-1): state[t] = np.argmax(est['gamma'][t])
    rolling['state'] = state[:width]
    
    prediction = rolling[-2:-1].priceclose + (rolling.r1[rolling.state==rolling[-2:-1].state[0]].mean())*rolling[-2:-1].priceclose
    df.predicted[n-1] = prediction[0]
    actual = rolling[-1:].priceclose
    loss.append((prediction[0] - actual[0])**2)
    print(f'iteration:{n}')
    
loss = np.array(loss)
total_loss = loss.sum()
mse = loss.mean()

In [None]:
mse

In [None]:
plt.figure(figsize=(15,10))
plt.plot(df.priceclose)
plt.plot(df.predicted)
plt.xlim((17700,17900))
plt.ylim((1150,1450))
plt.legend(('Actual', 'Predicted'))

In [None]:
mse

In [None]:
'''
def viterbi(V, a, b, initial_distribution):
    T = V.shape[0]
    M = a.shape[0]
 
    omega = np.zeros((T, M))
    omega[0, :] = np.log(initial_distribution * b[:, V[0]])
 
    prev = np.zeros((T - 1, M))
 
    for t in range(1, T):
        for j in range(M):
            # Same as Forward Probability
            probability = omega[t - 1] + np.log(a[:, j]) + np.log(b[j, V[t]])
 
            # This is our most probable state given previous state at time t (1)
            prev[t - 1, j] = np.argmax(probability)
 
            # This is the probability of the most probable state (2)
            omega[t, j] = np.max(probability)
 
    # Path Array
    S = np.zeros(T)
 
    # Find the most probable last hidden state
    last_state = np.argmax(omega[T - 1, :])
 
    S[0] = last_state
 
    backtrack_index = 1
    for i in range(T - 2, -1, -1):
        S[backtrack_index] = prev[i, int(last_state)]
        last_state = prev[i, int(last_state)]
        backtrack_index += 1
 
    # Flip the path array since we were backtracking
    S = np.flip(S, axis=0)
 
    # Convert numeric values to actual hidden states
    result = []
    for s in S:
        if s == 0:
            result.append("A")
        else:
            result.append("B")
 
    return result
'''