In [1]:
import numpy as np

## Load & Convert to Sliding Windows

In [2]:
states = np.load("data/states.npy")
labels = np.load("data/labels.npy")

states = states.reshape((-1, 100, 52))
labels = labels.reshape((-1, 100, 1))

print(states.shape)
print(labels.shape)

(1000, 100, 52)
(1000, 100, 1)


In [3]:
# Create sliding window arrays

original_array = states
original_labels = labels

# Define window size
window_size = 5

# Calculate the number of windows
num_windows = original_array.shape[1] - window_size + 1

# Create a list to store the windows
sliding_windows = []
sliding_labels = []

# Generate sliding windows
for i in range(num_windows-1):
    window = original_array[:, i : i + window_size, :]
    lab = original_labels[:, i + window_size, :]
    sliding_windows.append(window)
    sliding_labels.append(lab)

# Convert the list of windows to a NumPy array
sliding_windows_array = np.array(sliding_windows)
sliding_labels = np.array(sliding_labels)

print("Original Array Shape:", original_array.shape)
print("Sliding Windows Array Shape:", sliding_windows_array.shape)

print("Original Labels Shape:", original_labels.shape)
print("Sliding Windows Labels Shape:", sliding_labels.shape)

Original Array Shape: (1000, 100, 52)
Sliding Windows Array Shape: (95, 1000, 5, 52)
Original Labels Shape: (1000, 100, 1)
Sliding Windows Labels Shape: (95, 1000, 1)


In [4]:
# Collapse into a linear dataset of windows and labels
states_windows = sliding_windows_array.reshape((-1, 5, 52))
labels_windows = sliding_labels.reshape((-1, 1))

print(states_windows.shape)
print(labels_windows.shape)

# np.save("data/states_windows.npy", states_windows)
# np.save("data/labels_windows.npy", states_windows)

(95000, 5, 52)
(95000, 1)


## Training Setup

In [5]:
import torch
from torch.utils.data import Dataset, DataLoader, random_split

In [6]:
# Device Configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [7]:
# Dataset object that will contain our sliding window data
class LSTMDataset(Dataset):
    def __init__(self, states, labels):
        self.states = states
        self.labels = labels

    def __len__(self):
        return len(self.states)

    def __getitem__(self, index):
        state = torch.tensor(self.states[index], dtype=torch.float32)
        label = torch.tensor(self.labels[index], dtype=torch.float32)
        return state, label

In [8]:
class LSTMModel(torch.nn.Module):
    def __init__(self, input_dim, hidden_dim, layer_dim, output_dim):
        super(LSTMModel, self).__init__()
        # Hidden dimensions
        self.hidden_dim = hidden_dim

        # Number of hidden layers
        self.layer_dim = layer_dim

        self.lstm = torch.nn.LSTM(input_dim, hidden_dim, layer_dim, batch_first=True)

        # Readout layer
        self.fc = torch.nn.Linear(hidden_dim, output_dim)
        
        # Output Activation
        self.out_act = torch.nn.Sigmoid()

    def forward(self, x):
        # Initialize hidden state with zeros
        h0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).to(device)

        # Initialize cell state
        c0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).to(device)

        out, (hn, cn) = self.lstm(x, (h0.detach(), c0.detach()))

        out = self.out_act(self.fc(out[:, -1, :]))
        # out.size() --> 100, 10
        return out


In [9]:
# Create Dataset and Dataloader
BATCH_SIZE = 128
lstm_dataset = LSTMDataset(states_windows, labels_windows)

train_size = int(0.8 * len(lstm_dataset))
eval_size = len(lstm_dataset) - train_size


train_dataset, eval_dataset = random_split(lstm_dataset, [train_size, eval_size])
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
eval_loader = DataLoader(eval_dataset, batch_size=BATCH_SIZE, shuffle=False)

In [10]:
# Create LSTM Model
INPUT_DIM = 52
HIDDEN_DIM = 100
LAYER_DIM = 2
OUT_DIM = 1
LEARNING_RATE = 1e-3

lstm_model = LSTMModel(INPUT_DIM, HIDDEN_DIM, LAYER_DIM, OUT_DIM).to(device)

# Optimizer
optimizer = torch.optim.Adam(lstm_model.parameters(), lr=LEARNING_RATE)

# Loss Function
loss_fn = torch.nn.BCELoss()

## Training Stage

In [11]:
NUM_EPOCHS = 5

iter = 0
for epoch in range(NUM_EPOCHS):
    for i, (states, labels) in enumerate(train_loader):
        states = states.to(device)
        labels = labels.to(device)
        
        preds = lstm_model(states)
        loss = loss_fn(preds, labels)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        iter += 1
        if iter % 100 == 0:
            # Calculate Loss
            print(f'Epoch: {epoch + 1}/{NUM_EPOCHS}\t Iteration: {iter}\t Loss: {loss.item()}')

Epoch: 1/5	 Iteration: 100	 Loss: 0.045596010982990265
Epoch: 1/5	 Iteration: 200	 Loss: 0.031482890248298645
Epoch: 1/5	 Iteration: 300	 Loss: 0.008816036395728588
Epoch: 1/5	 Iteration: 400	 Loss: 0.0010661588748916984
Epoch: 1/5	 Iteration: 500	 Loss: 0.008033250458538532
Epoch: 2/5	 Iteration: 600	 Loss: 0.044230662286281586
Epoch: 2/5	 Iteration: 700	 Loss: 0.0016415275167673826
Epoch: 2/5	 Iteration: 800	 Loss: 0.0001380309258820489
Epoch: 2/5	 Iteration: 900	 Loss: 0.04252222925424576
Epoch: 2/5	 Iteration: 1000	 Loss: 0.00023315039288718253
Epoch: 2/5	 Iteration: 1100	 Loss: 0.006197980605065823
Epoch: 3/5	 Iteration: 1200	 Loss: 0.0028245483990758657
Epoch: 3/5	 Iteration: 1300	 Loss: 0.000746329955291003
Epoch: 3/5	 Iteration: 1400	 Loss: 0.0005931768100708723
Epoch: 3/5	 Iteration: 1500	 Loss: 6.634011515416205e-05
Epoch: 3/5	 Iteration: 1600	 Loss: 0.00014222874597180635
Epoch: 3/5	 Iteration: 1700	 Loss: 0.0005361746298149228
Epoch: 4/5	 Iteration: 1800	 Loss: 0.0018591762

## Evaluation

In [12]:
import tqdm

lstm_model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    # progress = tqdm.tqdm(eval_loader, total=len(eval_loader))
    
    # Iterate through test dataset
    for states, labels in eval_loader:
        images = states.to(device)
        labels = labels.to(device)

        outputs = lstm_model(images)
        predicted = (outputs > 0.5).float()
        # Total number of labels
        total += labels.size(0)

        # Total correct predictions
        correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total

    # Print Accuracy
    print(f'Accuracy: {accuracy}')

Accuracy: 99.9578947368421
