Use of Hidden Markov models for market regime prediction from 2019 December to 2023 December.

In [1]:
import torch
if torch.backends.mps.is_available():
    device = torch.device("mps")  
    print("MPS is available and in use!")
else:
    print("MPS is not available. Using CPU instead.")
    device = torch.device("cpu")

MPS is available and in use!


In [2]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import MinMaxScaler

data_main = pd.read_csv(
    '/Users/tejasmacipad/Desktop/Final_inter_IIT_submission/BTC/BTC_2019_2023_1h.csv',
    parse_dates=['datetime'],  # Parse the 'datetime' column as datetime
    index_col='datetime'       # Use 'datetime' as the index
)

train_size = int(len(data_main) * 0.8)
df = data_main.iloc[:train_size]
test_data = data_main.iloc[train_size:]


In [3]:
freq = 6

In [4]:
features = ['open', 'high', 'low', 'close', 'volume']
scaler = MinMaxScaler(feature_range=(0, 1))
# df[features] = scaler.fit_transform(df[features])  

# df['trend'] = df['close'].shift(-freq) - df['close']  
# df['trend'] = np.where(df['trend'] > 0, 1, 0) 
df = df.copy() 

df.loc[:, features] = scaler.fit_transform(df[features])

df['trend'] = df['close'].shift(-freq) - df['close']
df['trend'] = np.where(df['trend'] > 0, 1, 0)

data = df[features + ['trend']].values

def create_sequences(data, seq_length=freq):
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i+seq_length, :-1]) 
        y.append(data[i+seq_length, -1])    
    return np.array(X), np.array(y)

seq_length = freq  
X, y = create_sequences(data, seq_length)

X = torch.tensor(X, dtype=torch.float32)
y = torch.tensor(y, dtype=torch.float32).view(-1, 1)

class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size=64):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        out = self.fc(lstm_out[:, -1, :])  
        return self.sigmoid(out)

input_size = len(features)  
model = LSTMModel(input_size)
criterion = nn.BCELoss() 
optimizer = optim.Adam(model.parameters(), lr=0.001)

epochs = 15
batch_size = 16
for epoch in range(epochs):
    permutation = torch.randperm(X.size(0))  
    epoch_loss = 0
    for i in range(0, X.size(0), batch_size):
        indices = permutation[i:i+batch_size]
        batch_X, batch_y = X[indices], y[indices]

        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()

    print(f"Epoch {epoch+1}/{epochs}, Loss: {epoch_loss:.4f}")

Epoch 1/15, Loss: 1309.5800
Epoch 2/15, Loss: 1309.4416
Epoch 3/15, Loss: 1309.4144
Epoch 4/15, Loss: 1309.4140
Epoch 5/15, Loss: 1309.3433
Epoch 6/15, Loss: 1309.3308
Epoch 7/15, Loss: 1309.2611
Epoch 8/15, Loss: 1309.2900
Epoch 9/15, Loss: 1309.1793
Epoch 10/15, Loss: 1309.2122
Epoch 11/15, Loss: 1309.2000
Epoch 12/15, Loss: 1308.9975
Epoch 13/15, Loss: 1309.1461
Epoch 14/15, Loss: 1308.9029
Epoch 15/15, Loss: 1308.9299


In [5]:
df["trend"].head(20)

datetime
2019-09-08 17:00:00    1
2019-09-08 18:00:00    1
2019-09-08 19:00:00    0
2019-09-08 20:00:00    0
2019-09-08 21:00:00    0
2019-09-08 22:00:00    0
2019-09-08 23:00:00    0
2019-09-09 00:00:00    0
2019-09-09 01:00:00    0
2019-09-09 02:00:00    0
2019-09-09 03:00:00    0
2019-09-09 04:00:00    1
2019-09-09 05:00:00    1
2019-09-09 06:00:00    1
2019-09-09 07:00:00    1
2019-09-09 08:00:00    1
2019-09-09 09:00:00    1
2019-09-09 10:00:00    1
2019-09-09 11:00:00    0
2019-09-09 12:00:00    0
Name: trend, dtype: int64

In [6]:
def simulate_trading(test_data, test_dates, output_file="trading_results"):
    """ Simulates trading with rolling window prediction and saves results to CSV & Excel """
    test_seq = test_data[:seq_length, :-1].tolist() 
    results = [] 

    for i in range(len(test_data)):
        date = test_dates[i]  

        if i < seq_length:
            results.append([date, "NULL"]) 
            continue

        new_day = test_data[i, :-1]  
        test_seq.append(new_day)

        test_seq = test_seq[-seq_length:]

        input_data = torch.tensor(test_seq, dtype=torch.float32).unsqueeze(0)

        prediction = model(input_data).item()
        trend = "Bullish" if prediction > 0.5 else "Bearish"

        results.append([date, trend])

    results_df = pd.DataFrame(results, columns=["Date", "Market Trend"])

    results_df.to_csv(f"{output_file}.csv", index=False)
    results_df.to_excel(f"{output_file}.xlsx", index=False)


    print(f"Results saved to {output_file}.csv and {output_file}.xlsx")

In [None]:
test_data = test_data.copy()
test_data['trend'] = test_data['close'].shift(-40) - test_data['close']
test_data['trend'] = np.where(test_data['trend'] > 0, 1, 0)

test_data[features] = scaler.transform(test_data[features])

test_data_output = test_data[features + ['trend']].values 

simulate_trading(test_data_output, test_data.index)

In [None]:
import numpy as np
import hmmlearn as hmm

# We will take the market to have 3 states, bullish, bearish and neutral and use these
n_states = 3
n_observations = 3

model = hmm.CategoricalHMM(n_components = n_states)
#Defining start probabilities
model.startprob_ = np.array([0.4, 0.3, 0.3])
# Defining state transition probabilities
model.transmat_ = np.array([
    [0.5, 0.3, 0.2],
    [0.4, 0.4, 0.2],
    [0.5, 0.4, 0.1]
    ])
# Defining emission probabilities 
model.emissionprob_ = np.array([
    [0.8, 0.1, 0.1],
    [0.1, 0.8, 0.1],
    [0.1, 0.1, 0.8]
])

# Writing observations here (Write actual observations using data given)
df['Market_state'] = [x if x == 1 else y if y == 1 else z for x, y, z in zip(df['Bullish_yesterday'], df['Bearish_Yesterday'], df['Neutral_yesterday'])]

observations = df['Market_state'].T

hidden_states = model.predict(observations) 
df = pd.concat([df, hidden_states], axis=1)







# Trading based on HMM
if (
df['hidden_states'].iloc[i] == 0
):
df.loc[df.index[i], 'Signal'] = 1
df.loc[df.index[i], 'trade'] = 1
df.loc[df.index[i], 'trade_type'] = "long"
df.loc[df.index[i], 'SL'] = df['close'].iloc[i] - 1.5 * df['ATR'].iloc[i]
df.loc[df.index[i], 'TP'] = df['close'].iloc[i] + 3 * df['ATR'].iloc[i]

if (
df['hidden_states'].iloc[i] == 1
):
df.loc[df.index[i], 'Signal'] = -1
df.loc[df.index[i], 'trade'] = -1
df.loc[df.index[i], 'trade_type'] = "short"
df.loc[df.index[i], 'SL'] = df['close'].iloc[i] - 1.5 * df['ATR'].iloc[i]
df.loc[df.index[i], 'TP'] = df['close'].iloc[i] + 3 * df['ATR'].iloc[i]

if (
df['hidden_states'].iloc[i] == 2
):
df.loc[df.index[i], 'Signal'] = 0
df.loc[df.index[i], 'trade'] = 0
df.loc[df.index[i], 'trade_type'] = "neutral"
df.loc[df.index[i], 'SL'] = df['close'].iloc[i] - 1.5 * df['ATR'].iloc[i]
df.loc[df.index[i], 'TP'] = df['close'].iloc[i] + 3 * df['ATR'].iloc[i]