In [235]:
## Hidden Markov Model
import numpy as np
import random
import pandas as pd
import matplotlib.pyplot as plt
from hmmlearn.hmm import MultinomialHMM
from hmmlearn.vhmm import VariationalCategoricalHMM

SEED = 42
df = pd.read_csv('aggregated.csv')
trend_data = df['nvidia'].values
price_data = df['NVDA Monthly'].values
random.seed(SEED)

In [241]:
# Construct assumption Start Matrix

states = ['Happy', 'Sad', 'Neutral']
start_matrix = np.array([0.5, 0.3, 0.2])

## UNIT TESTING
assert np.sum(start_matrix) == 1

In [237]:
n_hidden = 3 # states: happy, sad, neutral
n_obs = 3 # observations: up, down, negligible change
traintest_split = 0.7
threshold = 0.07
start_time, stop_time = 228, 242 # length of data: 242

trend_data = trend_data[start_time:stop_time]
price_data = price_data[start_time:stop_time]

## Trend data: 0 - Happy, 1 - Sad, 2 - Neutral
discrete_trend_data = [0 if x >= threshold else 1 if x <= -threshold else 2 for x in trend_data]
## Price data: 0 - Up, 1 - Down, 2 - Negligible change
discrete_price_data = [0 if x >= threshold else 1 if x <= -threshold else 2 for x in price_data]

# Test and Train data
trainTrend, testTrend = discrete_trend_data[:int(len(trend_data)*traintest_split)], discrete_trend_data[int(len(trend_data)*traintest_split):]
trainPrice, discreteTestPrice = discrete_price_data[:int(len(price_data)*traintest_split)], discrete_price_data[int(len(price_data)*traintest_split):]

In [238]:
# Constructing the Transition Matrix

# row is t, column is t+1
symbolVocabTrend = {0: "Happy", 1: "Sad", 2: "Neutral"}

# Construct count of transitions from one hidden state to another i.e how many times 0 becomes 1, 1 becomes 2 etc.
transition_counts = {}
for i in range(1, len(trainTrend)):
    transition = (symbolVocabTrend[discrete_trend_data[i-1]], symbolVocabTrend[discrete_trend_data[i]])
    if transition not in transition_counts:
        transition_counts[transition] = 0
    transition_counts[transition] += 1

# print(transition_counts)

# Construct the transition matrix from the counts
transition_matrix = np.zeros((n_hidden, n_hidden))
for i in range(n_hidden):
    for j in range(n_hidden):
        transition = (symbolVocabTrend[i], symbolVocabTrend[j])
        if transition in transition_counts:
            transition_matrix[i, j] = transition_counts[transition] / sum([value for key, value in transition_counts.items() if key[0] == symbolVocabTrend[i]])

# Pretty print the transition matrix with row and column labels
print("Transition Matrix:")
print(pd.DataFrame(transition_matrix, index=["Happy", "Sad", "Neutral"], columns=["Happy", "Sad", "Neutral"]))

### UNIT TESTING ###

# Test the transition matrix
assert np.allclose(transition_matrix.sum(axis=1), np.ones(n_hidden)), "Transition matrix rows should sum to 1"


Transition Matrix:
            Happy       Sad   Neutral
Happy    0.000000  0.333333  0.666667
Sad      1.000000  0.000000  0.000000
Neutral  0.333333  0.666667  0.000000


In [239]:
# Building Price Emission Matrix

# row is emitting column
symbolVocabPrice = {0: "Up", 1: "Down", 2: "Negligible Change"}

# Building Price Emission Matrix

price_symbol_counts = {}

# Iterate over the time series data of trend and price data along with the corresponding hidden states
for trendState, priceAction in zip(trainTrend, trainPrice):
    # Update symbol counts for price data
    emission = (symbolVocabTrend[trendState], symbolVocabPrice[priceAction])
    if emission not in price_symbol_counts:
        price_symbol_counts[emission] = 0
    price_symbol_counts[emission] += 1

# print(price_symbol_counts)

# Construct the emission matrix from the counts
emission_matrix = np.zeros((n_hidden, n_obs))
for i in range(n_hidden):
    for j in range(n_obs):
        emission = (symbolVocabTrend[i], symbolVocabPrice[j])
        if emission in price_symbol_counts:
            emission_matrix[i, j] = price_symbol_counts[emission] / sum([value for key, value in price_symbol_counts.items() if key[0] == symbolVocabTrend[i]])

# Pretty print the emission matrix with row and column labels
print("Emission Matrix:")
print(pd.DataFrame(emission_matrix, index=["Happy", "Sad", "Neutral"], columns=["Up", "Down", "Negligible Change"]))

### UNIT TESTING ###

# Test the emission matrix
assert np.allclose(emission_matrix.sum(axis=1), np.ones(n_hidden)), "Emission matrix rows should sum to 1"

Emission Matrix:
               Up  Down  Negligible Change
Happy    0.666667   0.0           0.333333
Sad      0.666667   0.0           0.333333
Neutral  0.666667   0.0           0.333333


## Hidden Markov Model
Using Viterbi and Forward-Backward to decide states, we draft our plan as such: \



1. q_state = 3 { Happy, Sad, Neutral }

2. obs_states = { Up, Down, Negligible Change }

3. transition matrix is defined as $P(q_{t+1}|q_t)$: \
    Row: Hidden States [$q$], Column: Hidden States [$q$]
4. emission matrix is defined as $P(S_j|q_i)$: \
    Row: Hidden States [$q$], Column: Observable States [$S$]

#### Matrix plan v1
Threshold variability

#### Matrix plan v2
Vary backtest period. Last 6 months? Last decade?

#### Matrix plan v3
Assume hidden states as a distribution

#### Notes:
1. Accuracy does not lead to better returns.
2. Attempted Variational Inference model but couldn't get it to work reliably. I predict it could peprform better due to Bayesian Inference.

#### Housekeeping
I have:
1. Created the transition matrix based on categorising trend data as "Happy, Sad, Neutral".

2. Created the emission matrix based on "Happy emitting Up", etc.

3. Set start matrix as arbitrary.

4. Made a bunch of models to test parameters and versions

5. Run simulation against test price data and keep track of profit as return %

6. Create logger for results

7. Results ran through certain parameters

I need to:

3. Structure document




In [243]:
## Model v1 - Forward-Backward Algorithm

# Initialize the HMM model
MultiMapModel = MultinomialHMM(n_components=n_hidden,init_params='',algorithm='map', random_state=SEED)

# Set the model parameters
MultiMapModel.n_features = n_obs
MultiMapModel.startprob_ = start_matrix
MultiMapModel.transmat_ = transition_matrix
MultiMapModel.emissionprob_ = emission_matrix

# Fit the model to the data
X = np.tile(trainPrice, (3, 1)).T
MultiMapModel.fit(X)
logprob, received = MultiMapModel.decode(np.tile(trainPrice, (3, 1)).T)

# Accuracy of most likely sequence of hidden states with respect to the trend data
print("Accuracy: ", sum(1 for received_item, trainTrend_item in zip(received, trainTrend) if received_item == trainTrend_item)/len(received))

# Pretty print the learned transition matrix
print("Learned Transition Matrix:")
print(pd.DataFrame(MultiMapModel.transmat_, index=["Happy", "Sad", "Neutral"], columns=["Happy", "Sad", "Neutral"]))

print("Learned Emission Matrix:")
print(pd.DataFrame(MultiMapModel.emissionprob_, index=["Happy", "Sad", "Neutral"], columns=["Up", "Down", "Negligible Change"]))




## Model v1 - Viterbi

# Initialize the HMM model
MultiVitModel = MultinomialHMM(n_components=n_hidden,init_params='',algorithm='viterbi', random_state=SEED)

# Set the model parameters
MultiVitModel.n_features = n_obs
MultiVitModel.startprob_ = start_matrix
MultiVitModel.transmat_ = transition_matrix
MultiVitModel.emissionprob_ = emission_matrix

# Fit the model to the data
X = np.tile(trainPrice, (3, 1)).T
MultiVitModel.fit(X)
logprob, received = MultiVitModel.decode(np.tile(trainPrice, (3, 1)).T)

# Accuracy of most likely sequence of hidden states with respect to the trend data
print("Accuracy: ", sum(1 for received_item, trainTrend_item in zip(received, trainTrend) if received_item == trainTrend_item)/len(received))

# Pretty print the learned transition matrix
print("Learned Transition Matrix:")
print(pd.DataFrame(MultiVitModel.transmat_, index=["Happy", "Sad", "Neutral"], columns=["Happy", "Sad", "Neutral"]))

print("Learned Emission Matrix:")
print(pd.DataFrame(MultiVitModel.emissionprob_, index=["Happy", "Sad", "Neutral"], columns=["Up", "Down", "Negligible Change"]))




## Model Empty

MultiNullModel = MultinomialHMM(n_components=n_hidden,init_params='ste', random_state=SEED)

X = np.tile(trainPrice, (3, 1)).T
MultiNullModel.fit(X)
logprob, received = MultiNullModel.decode(np.tile(trainPrice, (3, 1)).T)

# Accuracy of most likely sequence of hidden states with respect to the trend data
print("Accuracy: ", sum(1 for received_item, trainTrend_item in zip(received, trainTrend) if received_item == trainTrend_item)/len(received))

# Pretty print the learned transition matrix
print("Learned Transition Matrix:")
print(pd.DataFrame(MultiNullModel.transmat_, index=["Happy", "Sad", "Neutral"], columns=["Happy", "Sad", "Neutral"]))


print("Learned Emission Matrix:")
print(pd.DataFrame(MultiNullModel.emissionprob_, index=["Happy", "Sad", "Neutral"], columns=["Up", "Down", "Negligible Change"]))

MultinomialHMM has undergone major changes. The previous version was implementing a CategoricalHMM (a special case of MultinomialHMM). This new implementation follows the standard definition for a Multinomial distribution (e.g. as in https://en.wikipedia.org/wiki/Multinomial_distribution). See these issues for details:
https://github.com/hmmlearn/hmmlearn/issues/335
https://github.com/hmmlearn/hmmlearn/issues/340
  a -= a_lse
MultinomialHMM has undergone major changes. The previous version was implementing a CategoricalHMM (a special case of MultinomialHMM). This new implementation follows the standard definition for a Multinomial distribution (e.g. as in https://en.wikipedia.org/wiki/Multinomial_distribution). See these issues for details:
https://github.com/hmmlearn/hmmlearn/issues/335
https://github.com/hmmlearn/hmmlearn/issues/340


Learned Transition Matrix:
         Happy  Sad  Neutral
Happy      NaN  NaN      NaN
Sad        NaN  NaN      NaN
Neutral    NaN  NaN      NaN
Learned Emission Matrix:
         Up  Down  Negligible Change
Happy   NaN   NaN                NaN
Sad     NaN   NaN                NaN
Neutral NaN   NaN                NaN


  a -= a_lse


ValueError: startprob_ must sum to 1 (got nan)

In [None]:
# ## Model v2 - Variational Inference Forward-Backward Algorithm

# # Initialize the HMM model
# VariMapModel = VariationalCategoricalHMM(n_components=n_hidden,init_params='',algorithm='map', random_state=SEED)

# # Set the model parameters
# VariMapModel.n_features = n_obs
# VariMapModel.startprob_prior = start_matrix
# VariMapModel.transmat_prior = transition_matrix
# VariMapModel.emissionprob_prior = emission_matrix

# # Fit the model to the data
# X = np.tile(trainPrice, (1, 1))
# VariMapModel.fit(X)
# logprob, received = VariMapModel.decode(X)

# # Accuracy of most likely sequence of hidden states with respect to the trend data
# print("Accuracy: ", sum(1 for received_item, trainTrend_item in zip(received, trainTrend) if received_item == trainTrend_item)/len(received))

# # Pretty print the learned transition matrix
# print("Learned Transition Matrix:")
# print(pd.DataFrame(VariMapModel.transmat_, index=["Happy", "Sad", "Neutral"], columns=["Happy", "Sad", "Neutral"]))

# print("Learned Emission Matrix:")
# print(pd.DataFrame(VariMapModel.emissionprob_, index=["Happy", "Sad", "Neutral"], columns=["Up", "Down", "Negligible Change"]))


# ## Model v2 - Variational Inference Viterbi

# # Initialize the HMM model
# VariVitModel = VariationalCategoricalHMM(n_components=n_hidden,init_params='',algorithm='viterbi', random_state=SEED)

# # Set the model parameters
# VariVitModel.n_features = n_obs
# VariVitModel.startprob_prior_ = start_matrix
# VariVitModel.transmat_prior_ = transition_matrix
# VariVitModel.emissionprob_prior_ = emission_matrix

# # Fit the model to the data
# X = np.array(trainPrice).reshape(-1, 1)
# VariVitModel.fit(X)
# logprob, received = VariVitModel.decode(X)

# # Accuracy of most likely sequence of hidden states with respect to the trend data
# print("Accuracy: ", sum(1 for received_item, trainTrend_item in zip(received, trainTrend) if received_item == trainTrend_item)/len(received))

# # Pretty print the learned transition matrix
# print("Learned Transition Matrix:")
# print(pd.DataFrame(VariVitModel.transmat_, index=["Happy", "Sad", "Neutral"], columns=["Happy", "Sad", "Neutral"]))

# print("Learned Emission Matrix:")
# print(pd.DataFrame(VariVitModel.emissionprob_, index=["Happy", "Sad", "Neutral"], columns=["Up", "Down", "Negligible Change"]))


# ## Model v2 - Variational Inference Null

# # Initialize the HMM model
# VariNullModel = VariationalCategoricalHMM(n_components=n_hidden,init_params='ste', random_state=SEED)

# # Fit the model to the data
# X = np.array(trainPrice).reshape(-1, 1)
# VariNullModel.fit(X)
# logprob, received = VariNullModel.decode(X)

# # Accuracy of most likely sequence of hidden states with respect to the trend data
# print("Accuracy: ", sum(1 for received_item, trainTrend_item in zip(received, trainTrend) if received_item == trainTrend_item)/len(received))

# # Pretty print the learned transition matrix
# print("Learned Transition Matrix:")
# print(pd.DataFrame(VariNullModel.transmat_, index=["Happy", "Sad", "Neutral"], columns=["Happy", "Sad", "Neutral"]))

# print("Learned Emission Matrix:")
# print(pd.DataFrame(VariNullModel.emissionprob_, index=["Happy", "Sad", "Neutral"], columns=["Up", "Down", "Negligible Change"]))

In [None]:
## Predict the hidden states of the test data

X_test = np.tile(discreteTestPrice, (3, 1)).T

# Decode Multinomial models on Test data
logprob, MultiMapModelreceived = MultiMapModel.decode(X_test)
logprob, MultiVitModelreceived = MultiVitModel.decode(X_test)
logprob, MultiNullmodelreceived = MultiNullModel.decode(X_test)

# # Decode Variational models on Test data
# logprob, VariMapModelreceived = VariMapModel.decode(X_test)
# logprob, VariVitModelreceived = VariVitModel.decode(X_test)
# logprob, VariNullModelreceived = VariNullModel.decode(X_test)

test = pd.read_csv('OpenClosePrice.csv')
testPrice = (test['Close'].values+test['Open'].values)/2
testPrice = testPrice[start_time:stop_time]
testPrice = testPrice[int(len(testPrice)*traintest_split):]

# Buy or Sell or Hold based on the hidden states, calculate PnL based on the test price data
MultiMap = [1000, 0] # cash in hand, position
MultiVit = [1000, 0]
MultiNull = [1000, 0]
# VariMap = [1000, 0]
# VariVit = [1000, 0]
# VariNull = [1000, 0]
BaseModel = [1000, 0]

for i in range(len(testPrice)):
    if i == 0:
        BaseModel[0] -= testPrice[i]
        BaseModel[1] += 1

    # Multinomial Forward Backward Model
    if MultiMapModelreceived[i] == 0:
        MultiMap[0] -= testPrice[i]
        MultiMap[1] += 1
    elif MultiMapModelreceived[i] == 1:
        MultiMap[0] += testPrice[i]
        MultiMap[1] -= 1

    # Multinomial Viterbi Model
    if MultiVitModelreceived[i] == 0:
        MultiVit[0] -= testPrice[i]
        MultiVit[1] += 1
    elif MultiVitModelreceived[i] == 1:
        MultiVit[0] += testPrice[i]
        MultiVit[1] -= 1

        # Multinomial Null Model
    if MultiNullmodelreceived[i] == 0:
        MultiNull[0] -= testPrice[i]
        MultiNull[1] += 1
    elif MultiNullmodelreceived[i] == 1:
        MultiNull[0] += testPrice[i]
        MultiNull[1] -= 1


    # # Variational Inference Forward Backward Model
    # if VariMapModelreceived[i] == 0:
    #     VariMap[0] -= testPrice[i]
    #     VariMap[1] += 1
    # elif VariMapModelreceived[i] == 1:
    #     VariMap[0] += testPrice[i]
    #     VariMap[1] -= 1

    # # Variational Inference Viterbi Model
    # if VariVitModelreceived[i] == 0:
    #     VariVit[0] -= testPrice[i]
    #     VariVit[1] += 1
    # elif VariVitModelreceived[i] == 1:
    #     VariVit[0] += testPrice[i]
    #     VariVit[1] -= 1

    # # Variational Inference Null Model
    # if VariNullModelreceived[i] == 0:
    #     VariNull[0] -= testPrice[i]
    #     VariNull[1] += 1
    # elif VariNullModelreceived[i] == 1:
    #     VariNull[0] += testPrice[i]
    #     VariNull[1] -= 1


    if i == len(testPrice): # Exit position at the end
        MultiMap[0] += MultiMap[1]*testPrice[i]
        MultiMap[1] = 0
        MultiVit[0] += MultiVit[1]*testPrice[i]
        MultiVit[1] = 0
        MultiNull[0] += MultiNull[1]*testPrice[i]
        MultiNull[1] = 0
        # VariMap[0] += VariMap[1]*testPrice[i]
        # VariMap[1] = 0
        # VariVit[0] += VariVit[1]*testPrice[i]
        # VariVit[1] = 0
        # VariNull[0] += VariNull[1]*testPrice[i]
        # VariNull[1] = 0
        BaseModel[0] += BaseModel[1]*testPrice[i]
        BaseModel[1] = 0


print("Base Model Return on Investment: ", (BaseModel[0]-1000)/10)
print("Model v1 - Forward Backward Return on Investment: ", (MultiMap[0]-1000)/10)
print("Model v2 - Viterbi Return on Investment: ", (MultiVit[0]-1000)/10)
print("Multi Null Model Return on Investment: ", (MultiNull[0]-1000)/10)
# print("Model v3 - Variational Inference Return on Investment: ", (VariMap[0]-1000)/10)
# print("Model v4 - Variational Inference Viterbi Return on Investment: ", (VariVit[0]-1000)/10)
# print("Vari Null Model Return on Investment: ", (VariNull[0]-1000)/10)


# output is a list of return on investment calculations using lambda function
output = [(x[0]-1000)/10 for x in [BaseModel, MultiMap, MultiVit, MultiNull]]





Base Model Return on Investment:  -16.621427971616846
Model v1 - Forward Backward Return on Investment:  16.621427971616846
Model v2 - Viterbi Return on Investment:  16.621427971616846
Multi Null Model Return on Investment:  44.364562413428644


In [None]:
## Logger

with open('logs.csv', 'w') as f:
    f.write("Model,Parameters,Return on Investment,\n") # Parameters contain backtest period, threshold
    f.write("Base Strategy,{} months | {},{},\n".format(stop_time-start_time,threshold,output[0]))
    f.write("Model v1,Forward-Backward Algorithm | {} months | {},{},\n".format(stop_time-start_time,threshold,output[1]))
    f.write("Model v1,Viterbi Algorithm | {} months | {},{},\n".format(stop_time-start_time,threshold,output[2]))
    f.write("Model v1,Null Model | {} months | {},{},\n".format(stop_time-start_time,threshold,output[3]))

