#### Evaluate ETHUSDT price using model built by BTCUSDT

In [1]:
import torch
from torch.utils.data import IterableDataset, DataLoader, Subset
from datetime import datetime as dt, timedelta
import pandas as pd
import os
import random
import numpy as np
import torch.nn as nn
from pandas import DataFrame as df
import mplfinance as mpf
#import mathplotlib.pyplot as plt

In [2]:
# check if CUDA is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)


cuda


In [3]:
seed = 42  # choose any seed you prefer
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(seed)

In [4]:
class PriceDataset(torch.utils.data.Dataset):
    def __init__(self, item, timespan, start_date_str, end_date_str):
        self.directory = f'csvfiles/{item}'
        self.item = item
        self.timespan = timespan
        start_date = dt.strptime(start_date_str, '%Y-%m-%d').date()
        end_date = dt.strptime(end_date_str, '%Y-%m-%d').date()
        self.dates = [single_date.strftime("%Y-%m-%d") for single_date in self.daterange(start_date, end_date)]
        self.columns = [1, 4]  # Selecting open and close prices
        self.filenames = self.get_filenames()

    def daterange(self, start_date, end_date):
        for n in range(int((end_date - start_date).days) + 1):
            yield start_date + timedelta(n)

    def get_filenames(self):
        filenames = []
        for date in self.dates:
            filename = f"{self.directory}/{self.item}-{self.timespan}-{date}.csv"
            if os.path.exists(filename):
                filenames.append(filename)
        return filenames

    def __len__(self):
        return len(self.filenames)

    def __getitem__(self, idx):
        filename = self.filenames[idx]
        df = pd.read_csv(filename, usecols=self.columns, header=None)
        df = df.diff(axis=1)[1] > 0  # Compute if close price is higher than open
        labels = df.astype(int).values  # Convert to integer labels
        return torch.tensor(labels, dtype=torch.float32)

# Dataset is supposed to be [number of dates used][number of rows in each csv file][6, means the number of columns we are using in each csv file]


In [5]:
# Create the dataset
dataset = PriceDataset('ETHUSDT', '1m', '2021-03-01', '2023-04-30')

In [6]:
def sliding_window_fn(batch):
    windows = []
    for tensor in batch:
        for i in range(tensor.shape[0] - 100 + 1):  # Create windows of 100 rows each
            windows.append(tensor[i:i+100])
    return torch.stack(windows)



# Shuffle the dataset indices
indices = list(range(len(dataset)))
random.shuffle(indices)

# Split the indices into training and test sets
split_idx = int(0.8 * len(indices))
train_indices, test_indices = indices[:split_idx], indices[split_idx:]

# Create data subsets using the indices
train_data = Subset(dataset, train_indices)
test_data = Subset(dataset, test_indices)

# Create the data loaders
train_loader = DataLoader(train_data, batch_size=1, collate_fn=sliding_window_fn, shuffle=False, drop_last=True)
test_loader = DataLoader(test_data, batch_size=1, collate_fn=sliding_window_fn, shuffle=False, drop_last=True)

In [7]:
# To view first batch of the training data loader
for batch_idx, batch in enumerate(train_loader):
    print(f'Batch index: {batch_idx}, Batch tensor shape: {batch.shape}')
    break  # To prevent printing all the data

# To view first batch of the test data loader
for batch_idx, batch in enumerate(test_loader):
    print(f'Batch index: {batch_idx}, Batch tensor shape: {batch.shape}')
    break  # To prevent printing all the data





Batch index: 0, Batch tensor shape: torch.Size([1341, 100])
Batch index: 0, Batch tensor shape: torch.Size([1261, 100])


In [8]:
class LSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim, layer_dim, output_dim):
        super(LSTM, self).__init__()

        # Defining the number of layers and the nodes in each layer
        self.hidden_dim = hidden_dim
        self.layer_dim = layer_dim

        # LSTM layers
        self.lstm = nn.LSTM(input_dim, hidden_dim, layer_dim, batch_first=True)

        # Fully connected layer
        self.fc = nn.Linear(hidden_dim, output_dim)
        
    def forward(self, x):
        # Initialize hidden state with zeros
        h0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).requires_grad_().to(device)
        
        # Initialize cell state
        c0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).requires_grad_().to(device)

        out, (hn, cn) = self.lstm(x, (h0.detach(), c0.detach()))

        # Only take the output from the final timestep
        out = self.fc(out[:, -1, :]) 
        return out


In [13]:
# Model, Loss, and Optimizer
input_dim = 1   # We are now dealing with 1D sequences
hidden_dim = 50   
layer_dim = 1
output_dim = 2  # Now output will be of size 2 (for 'rise' and 'fall')

model = LSTM(input_dim, hidden_dim, layer_dim, output_dim)

# Load model
checkpoint = torch.load('binaryclassification.pth')
model.load_state_dict(checkpoint['model_state_dict'])
model.eval()
model.to(device)

LSTM(
  (lstm): LSTM(1, 50, batch_first=True)
  (fc): Linear(in_features=50, out_features=2, bias=True)
)

In [14]:
def predict(model, data_loader):
    with torch.no_grad():
        correct = 0
        total = 0
        for batch in data_loader:
            features = batch[:, :-1].unsqueeze(-1).to(device)  # Exclude the last label from features
            targets = batch[:, -1].float().to(device)  # Last label is the target

            output = model(features)

            # Only take the last output for each sequence
            last_output = output[:, -1]

            # Use a threshold to determine the class
            predicted = (last_output.data > 0.5).float()
            total += targets.size(0)
            correct += (predicted == targets).sum().item()

        print(f'Accuracy: {correct / total:.2f}')


predict(model, test_loader)

Accuracy: 1.00
