In [258]:
import torch
import torch.nn as nn
from torchvision import datasets, transforms
from torch.utils.data import Dataset, DataLoader, random_split
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import math

In [259]:
# Set random seed for reproducibility
torch.manual_seed(7)  # Set the seed for generating random numbers in PyTorch
np.random.seed(7)     # Set the seed for generating random numbers in NumPy

In [260]:
# Creates CoinDataSet
class CoinDataSet(Dataset):

    def __init__(self):
        # Data Loading
        xy =  np.loadtxt('./test.csv',delimiter=',',dtype=np.float32)
        self.x = torch.from_numpy(xy[:,1:]) # Indexes inputs
        self.y = torch.from_numpy(xy[:,0]).type(torch.LongTensor) # Indexes Labels, Turns to int64 (Required by CrossEntropyLoss)
        self.n_samples = xy.shape[0] # n_samples
        
        # Normalize ?
        self.sc = StandardScaler()
        self.x = torch.from_numpy(self.sc.fit_transform(self.x)).float() # Applies Standard Scaler and transforms to float32

    def __getitem__(self, index):
        # Allows indexing
        return self.x[index], self.y[index]

    def __len__(self):
        # Allows calling length
        return self.n_samples

In [261]:
# Define Hyper Parameters

hidden_size = 64
num_classes = 7
num_epochs = 10
batch_size = 4
learning_rate = 0.001

# 128 x 6008 = 769024
input_size = 128
sequence_length = 6008
num_layers = 2

# Device config
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [262]:
# Creates Dataset
dataset = CoinDataSet()

# Defines Train and Test Datasets sizes
train_size = int(0.8 * len(dataset))  # 80% of the data will be used for training
test_size = len(dataset) - train_size  # Remaining 20% will be used for test

# Split dataset into training set and test set
train_data, test_data = random_split(dataset, [train_size, test_size])

# Creates Dataloaders for train and test datasets
train_loader = DataLoader(dataset=train_data, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_data, batch_size=batch_size, shuffle=False)

examples = iter(train_loader)
samples, labels = next(examples)
#print(samples.shape, labels.shape)
#print(samples[0,0].dtype)
#print(labels[0].dtype)

In [263]:
# Create Model

# Model 
class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(LSTM, self).__init__()
        self.num_layers = num_layers
        self.hidden_size = hidden_size
        
        # x -> (batch_size, seq, input_size)
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self,x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)

        out, _ = self.lstm(x, (h0,c0))
        # out: batch_size, seq_length, hidden_size
        # out (N, 6008, 128)
        # : all samples in batch, -1 last time step, : all features in hidden size
        out = out[:, -1, :] 
        # out (N, 128)
        out = self.fc(out)
        return out

In [264]:
model = LSTM(input_size, hidden_size, num_layers, num_classes).to(device)

# loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
# Training loop

n_total_steps = len(train_loader)
total_loss = 0.0
for epoch in range(num_epochs):
    for i, (inputs, labels) in enumerate(train_loader):

        inputs = inputs.reshape(-1, sequence_length, input_size).to(device)
        labels = labels.to(device)

        # Forward
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Backward
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Accumulate loss
        total_loss += loss.item()

        if (i+1) % 2 == 0:
            print(f'epoch {epoch+1} / {num_epochs}, step {i+1}/{n_total_steps}, loss = {loss.item():.4f}')
        
    avg_train_loss = total_loss / len(train_loader)

with torch.no_grad():
    n_correct = 0
    n_samples = 0
    for inputs, labels in test_loader:
        inputs = inputs.reshape(-1, sequence_length, input_size).to(device)
        labels = labels.to(device)
        outputs = model(inputs)

        # value, index
        _, predictions = torch.max(outputs, 1)
        n_samples += labels.shape[0]
        n_correct += (predictions == labels).sum().item()
    
    acc = 100 * n_correct / n_samples
    print(f'accuracy = {acc}')

epoch 1 / 10, step 2/14, loss = 1.8854
epoch 1 / 10, step 4/14, loss = 1.9039
epoch 1 / 10, step 6/14, loss = 1.9494
epoch 1 / 10, step 8/14, loss = 1.9141
epoch 1 / 10, step 10/14, loss = 1.8950
epoch 1 / 10, step 12/14, loss = 1.8987
epoch 1 / 10, step 14/14, loss = 1.9026
epoch 2 / 10, step 2/14, loss = 1.7766
epoch 2 / 10, step 4/14, loss = 1.8239
epoch 2 / 10, step 6/14, loss = 1.9051
epoch 2 / 10, step 8/14, loss = 1.8854
epoch 2 / 10, step 10/14, loss = 1.8966
epoch 2 / 10, step 12/14, loss = 1.8492
epoch 2 / 10, step 14/14, loss = 1.7232
epoch 3 / 10, step 2/14, loss = 1.7235
epoch 3 / 10, step 4/14, loss = 1.6139
epoch 3 / 10, step 6/14, loss = 1.8802
epoch 3 / 10, step 8/14, loss = 1.8391
epoch 3 / 10, step 10/14, loss = 1.7534
epoch 3 / 10, step 12/14, loss = 1.4936
epoch 3 / 10, step 14/14, loss = 1.5716
epoch 4 / 10, step 2/14, loss = 1.7947
epoch 4 / 10, step 4/14, loss = 1.1510
epoch 4 / 10, step 6/14, loss = 1.9946
epoch 4 / 10, step 8/14, loss = 1.7209
epoch 4 / 10, st