This file is for a generic hidden markov chain model
https://www.kaggle.com/fortjohnson/markov-chains-and-hidden-markov-models

In [1]:
from matplotlib import pyplot as plt
from datetime import datetime
from sklearn import preprocessing
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans
from sklearn.covariance import EllipticEnvelope
from pyemma import msm
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from hmmlearn import hmm

import pandas as pd
import numpy as np
import random as rd
import matplotlib
import matplotlib.dates as md

np.random.seed(42)

In [2]:
# Data cleaning

x_filename = '../data/gemini_BTCUSD_2021_1min.csv'
x_testFile = '../data/gemini_BTCUSD_2020_1min.csv'

def cleanData(fileName = '../data/gemini_BTCUSD_2021_1min.csv', timeRangeInHour=1, valueUsed="open"):
    # Read Data
    df = pd.read_csv(x_filename, skiprows=[0], low_memory=False)
    cdf = df.drop(columns=['Date', 'Symbol', 'Unix Timestamp'], axis=1)
    return cdf

cleandf = cleanData()
testdf = cleanData(x_testFile)

In [3]:
# Get Hidden Markov Chain Model

remodel = hmm.GaussianHMM(n_components=4, covariance_type="full", n_iter=100)
remodel.fit(cleandf)
Z2 = remodel.predict(testdf)

In [4]:
# Create trading Agent

def algoPredict(remodel, testdf, buyInState, buyOutState, num_state=3, valueUsed="Open", stockPerBuy=10, initialMoney=0, allowDebt=True):
    # Note: initialMoney and allowDebt is not used at the moment
    Z2 = remodel.predict(testdf)
    
    money = 0
    inHandStock = 0
    for i in range(len(Z2)):
        if i == (len(Z2) - 1) and inHandStock > 0:
            money += inHandStock * testdf.at[i,valueUsed]
            inHandStock = 0
        elif Z2[i] == buyInState:
            inHandStock += stockPerBuy
            money -= stockPerBuy * testdf.at[i,valueUsed]
        elif Z2[i] == buyOutState and inHandStock > 0:
            inHandStock -= stockPerBuy
            money += stockPerBuy * testdf.at[i,valueUsed]
    return money

In [5]:
# Create trading Agent 2

def algoPredict2(remodel, testdf, buyInState, buyOutState, num_state=3, valueUsed="Open", stockPerBuy=10, initialMoney=0, allowDebt=True):
    # Note: initialMoney and allowDebt is not used at the moment
    Z2 = remodel.predict(testdf)
    
    canBuy = True
    money = 0
    inHandStock = 0
    for i in range(len(Z2)):
        if i == (len(Z2) - 1) and inHandStock > 0:
            money += inHandStock * testdf.at[i,valueUsed]
            inHandStock = 0
        elif Z2[i] == buyInState and canBuy:
            inHandStock += stockPerBuy
            money -= stockPerBuy * testdf.at[i,valueUsed]
            canBuy = False
        elif Z2[i] == buyOutState and inHandStock > 0 and not canBuy:
            inHandStock -= stockPerBuy
            money += stockPerBuy * testdf.at[i,valueUsed]
            canBuy = True
    return [buyInState, buyOutState, money]

In [6]:
# Get best prediction result

def findMarkovBest(remodel, testdf, num_state=3):
    best_result = []
    for i in range(num_state):
        for j in range(num_state):
            if i != j:
                curr_result = algoPredict2(remodel, testdf, i, j, num_state)
                if len(best_result) == 0:
                    best_result = curr_result
                elif curr_result[2] > best_result[2]:
                    best_result = curr_result
#     print(f'best result = {best_result}')
    return best_result

findMarkovBest(remodel, testdf, num_state=3) 

[1, 2, 608183.600000005]

In [7]:
# Get max gain as our top-line

def theoretical_max_gain_no_overlap(data,  trade_factor = 1, price_type = 'Open'):
    labels = list(filter(lambda label: label != price_type, ['High', 'Low', 'Close', 'Volume']))
    diff = data.drop(columns=labels, axis=1).diff()
    diff = diff.drop(index=diff.index[0])
    diff = diff[diff[price_type] > 0.0]
    return (diff.sum() * trade_factor)[0]

theoretical_max_gain_no_overlap(testdf, 10)

def theoretical_max_loss_no_overlap(data,  trade_factor = 1, price_type = 'Open'):
    labels = list(filter(lambda label: label != price_type, ['High', 'Low', 'Close', 'Volume']))
    data.apply(lambda num: -1 * num)
    diff = data.drop(columns=labels, axis=1).diff()
    diff = diff.drop(index=diff.index[0])
    diff = diff[diff[price_type] < 0.0]
    return (diff.sum() * trade_factor)[0]

theoretical_max_loss_no_overlap(testdf, 10)

-55107352.49999999

In [8]:
# Get random gain as our base-line

def random(bit_num, data, cleandf):
    open_col = cleandf[data]
    num_choice = rd.randint(0, len(open_col))
    random_choice = rd.choices(open_col, k = num_choice)
    result = 0
    for i in range(num_choice-1):
        result += (bit_num*(random_choice[i+1]-random_choice[i]) )
        i += 1
    return result


# Get an average random gain for better accuracy

def getRandomAvg(bit_num, data, cleandf, iterations = 30):
    # we repeat getting random for iterations times and get an average random
    sum = 0
    for i in range(iterations):
        sum += random(bit_num, data, cleandf)
        
    return sum / iterations
        
getRandomAvg(10, "Open", cleandf)

47910.5500000005

In [9]:
# Get a collection of learn results from all the algorithms we have implemented

def getLearnResult(inputData, factor=10, col_name = "Open"):
    # Returns an array of values in order: "Random, HMM(state = 3), HMM(state = 4), HMM(state = 5), HMM(state = 6), Max Gain" for the given inputData (testdf)
    result_array = []
    result_array.append(theoretical_max_loss_no_overlap(inputData, factor, col_name))
    result_array.append(0) # 0 gain comparison
    result_array.append(getRandomAvg(factor, col_name, inputData))
    result_array.append(findMarkovBest(remodel, inputData, 2)[2])
    result_array.append(findMarkovBest(remodel, inputData, 3)[2])
    result_array.append(findMarkovBest(remodel, inputData, 4)[2])
    result_array.append(findMarkovBest(remodel, inputData, 5)[2])
    result_array.append(findMarkovBest(remodel, inputData, 6)[2])
    result_array.append(theoretical_max_gain_no_overlap(inputData, factor, col_name))
    
    return result_array

inputResult = getLearnResult(testdf)
print(inputResult)

[-55107352.49999999, 0, 7676.913333332946, 13376.09999999986, 608183.600000005, 948299.8999999949, 948299.8999999949, 948299.8999999949, 55032469.99999999]


In [10]:
# Get a collection of score based on the learn results we got from all the algorithms we have implemented

def getScore(inputResult):
    # Returns an array of scores calculated by the total gain from random, HMM(state = 2), HMM(state = 3), HMM(state = 4), HMM(state = 5), HMM(state = 6)..., Max Gain 
    # where the first of inputResult will always be max loss, second is random and the last will always be max
    # score = (predict_result-random_result)/(max_result-random_result)
    
    score_array = []
    score_length = len(inputResult)
    max_loss = inputResult[0]
    random_gain = inputResult[1]
    max_gain = inputResult[-1]
    
    for i in range(0, len(inputResult)):
        score_array.append((inputResult[i]-max_loss)/(max_gain-max_loss))
    
    return score_array

getScore(inputResult)

[0.0,
 0.5003399428939519,
 0.5004096444166081,
 0.5004613894306938,
 0.5058618657207297,
 0.5089499068331983,
 0.5089499068331983,
 0.5089499068331983,
 1.0]

In [11]:
# We make a more realistic prediction by findind the daily, monthly, and yearly gains

chunkSize = testdf.size/365

def getChunkedResults(testdf, num_of_chunks=365):
    # Returns a 2D array of the learn results of each chunk of data
    # The purpose of this function is to estimate the amount gain and loss during a daily, weekly, or yearly trading session
    
    result_array_2d = []
    
    intervals = np.array_split(testdf, num_of_chunks)
    for i in range(len(intervals)):
        result_array_2d.append(getLearnResult(intervals[i].reset_index(drop=True)))
        
    return result_array_2d

learn_results_daily = getChunkedResults(testdf)

In [12]:
# We will use the learn results to get a 2D array of scores based on the learn results

def getChunkedResults(learn_results_in_chuncks):
    # Returns a 2D array of the learn results of each chunk of data
    # The purpose of this function is to estimate the amount gain and loss during a daily, weekly, or yearly trading session
    
    score_array_2d = [] 
    
    for result in learn_results_in_chuncks:
        score_array_2d.append(getScore(result))
    
    return score_array_2d

scores_daily_2d = getChunkedResults(learn_results_daily)


def outputFinalResultScoreDfsAndCsv(learn_result_2d, scores_2d):
    learn_result_df = pd.DataFrame(learn_result_2d, columns=["Learn Result: Max Loss","Zero Loss", "Random", "HMM(2)", "HMM(3)", "HMM(4)", "HMM(5)", "HMM(6)", "Max Gain"])
    scores_daily_df = pd.DataFrame(scores_2d, columns=["Score: Max Loss", "Zero Line", "Random", "HMM(2)", "HMM(3)", "HMM(4)", "HMM(5)", "HMM(6)", "Max Gain"])
    
    learn_result_df.to_csv('dailyLearnResults.csv')
    scores_daily_df.to_csv('dailyScores.csv')
    return [learn_result_df, scores_daily_df]

outputFinalResultScoreDfsAndCsv(learn_results_daily, scores_daily_2d)

[     Learn Result: Max Loss  Zero Loss       Random  HMM(2)   HMM(3)   HMM(4)  \
 0                 -129485.7          0  -952.753333  7993.4  10847.8  10847.8   
 1                 -119410.8          0  -516.976667     0.0   1611.6   1611.6   
 2                 -118403.1          0 -1659.670000     0.0      0.0      0.0   
 3                 -148177.5          0    37.076667     0.0      0.0      0.0   
 4                 -124005.4          0   588.980000     0.0  10786.7  10786.7   
 ..                      ...        ...          ...     ...      ...      ...   
 360               -143263.2          0 -1014.823333     0.0      0.0  13877.1   
 361               -131658.1          0   568.320000     0.0      0.0      0.0   
 362                -49212.3          0  -808.926667     0.0      0.0      0.0   
 363                -64978.0          0  -434.153333     0.0     35.0     35.0   
 364                -61600.6          0  -326.096667     0.0     99.6     99.6   
 
       HMM(5) 

In [20]:
# We find the answer to : Does our algorithm garantees a gain and is it always better than the random algorithm?

def getYearlySuccessRate(dailyResultData_2d_array):
    # counters
    aboveZero = 0
    aboveRandom = 0
    garanteeBetter = 0
    
    for row in dailyResultData_2d_array: 
        if row[-2] >= row[1]:
            aboveZero += 1
        if row[-2] >= row[2]:
            aboveRandom += 1
        if row[-2] >= row[1] and row[-2] >= row[2]:
            garanteeBetter += 1

    return [aboveZero / len(dailyResultData_2d_array), aboveRandom / len(dailyResultData_2d_array), garanteeBetter / len(dailyResultData_2d_array)]

percentage_gain = getYearlySuccessRate(learn_results_daily)

print(f'Percentage of no loss (1 = 100%) = {percentage_gain[0]}')
print(f'Percentage of better than random (1 = 100%) = {percentage_gain[1]}')
print(f'Percentage of garantee better (1 = 100%) = {percentage_gain[2]}')

Percentage of no loss (1 = 100%) = 1.0
Percentage of better than random (1 = 100%) = 0.8657534246575342
Percentage of garantee better (1 = 100%) = 0.8657534246575342
