In [15]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import json
from sklearn.metrics import accuracy_score

In [16]:
class KeyboardDataset(Dataset):
    def __init__(self, data, labels):
        self.data = self.pad_data(data)
        self.labels = labels
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

    def pad_data(self, data):
        new_data_list = []
        average_len = np.mean([len(sublist) for sublist in data])
        for sublist in data:
            curr_len = 0
            new_data = sublist
            while (curr_len < average_len):
                if curr_len > len(sublist):
                    new_data.append([-1.0, -1, -1])
                else: 
                    new_data.append(sublist[curr_len])
                curr_len += 1
            new_data_list.append(new_data)
                
        return new_data_list


In [17]:
class TransformerModel(nn.Module):

    def __init__(self, vocab_size, embedding_size, num_classes, num_layers=1, num_heads=2, hidden_size=64, dropout=0.1):
        super(TransformerModel, self).__init__()
        # self.embedding = nn.Embedding(vocab_size, embedding_size)
        
        # Create an instance of nn.TransformerEncoderLayer
        encoder_layer = nn.TransformerEncoderLayer(embedding_size, num_heads, hidden_size, dropout)
        
        # Pass the encoder layer instance to nn.TransformerEncoder
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers)
        
        self.fc = nn.Linear(embedding_size, num_classes)
        
    def forward(self, x):
        # embedded = self.embedding(x)
        # embedded = embedded.permute(1, 0, 2)  # Change dimensions for transformer
        # output = self.transformer(embedded)
        output = self.transformer(x)
        output = output.mean(dim=0)  # Average across time steps
        output = self.fc(output)
        return output

In [18]:
def train(model, iterator, optimizer, criterion):
    model.train()
    epoch_loss = 0
    for src, trg in iterator:
        optimizer.zero_grad()
        output = model(src, trg)
        output_dim = output.shape[-1]
        output = output[1:].view(-1, output_dim)
        trg = trg[1:].view(-1)
        loss = criterion(output, trg)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
    return epoch_loss / len(iterator)

def evaluate(model, iterator, criterion):
    model.eval()
    epoch_loss = 0
    with torch.no_grad():
        for src, trg in iterator:
            output = model(src, trg)
            output_dim = output.shape[-1]
            output = output[1:].view(-1, output_dim)
            trg = trg[1:].view(-1)
            loss = criterion(output, trg)
            epoch_loss += loss.item()
    return epoch_loss / len(iterator)



In [19]:
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(device)

mps


In [20]:
# labels key:
# 0: Aidan
# 1: Srujan
# 2: Eric
# 3: Tony

vocab_size = 113  # Assuming ASCII characters
embedding_size = 3
num_classes = 4
num_layers = 2
num_heads = 1
hidden_size = 128
dropout = 0.1
learning_rate = 0.001
# batch_size = 32
batch_size = 1
epochs = 10
file_prefix = '../'
datapoints_per_person = 3000

fh = open(f'{file_prefix}aidan_final_data_overlapping.json', 'r')
aidan_data = json.load(fh)[:datapoints_per_person]

fh = open(f'{file_prefix}srujan_final_data_overlapping.json', 'r')
srujan_data = json.load(fh)[:datapoints_per_person]

fh = open(f'{file_prefix}eric_final_data_overlapping.json', 'r')
eric_data = json.load(fh)[:datapoints_per_person]

fh = open(f'{file_prefix}tony_final_data_overlapping.json', 'r')
tony_data = json.load(fh)[:datapoints_per_person]

data = aidan_data + srujan_data + eric_data + tony_data
labels = ([0] * datapoints_per_person) + ([1] * datapoints_per_person) + ([2] * datapoints_per_person) + ([3] * datapoints_per_person)

print(f'There are {len(data)} 5-second intervals, {datapoints_per_person} intervals from each person')

train_data, val_data, train_labels, val_labels = train_test_split(data, labels, test_size=0.2, random_state=42)

train_dataset = KeyboardDataset(train_data, train_labels)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

val_dataset = KeyboardDataset(val_data, val_labels)
val_loader = DataLoader(val_dataset, batch_size=batch_size)

model = TransformerModel(vocab_size, embedding_size, num_classes, num_layers, num_heads, hidden_size, dropout)
model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)


There are 12000 5-second intervals, 3000 intervals from each person




In [22]:
# Training loop
epochs = 100
train_losses = []
val_losses = []
train_accuracies = []
val_accuracies = []

for epoch in range(epochs):
    model.train()
    correct_train = 0  
    total_train = 0   
    for inputs, labels in train_loader:
        optimizer.zero_grad()
        labels = torch.tensor(labels)
        
        # re-format the data
        # batch length list, where each list is length 96
        reformatted_data = [[] for i in range(batch_size)]
        for group in inputs:
            j = 0
            times = group[0]  # len batch_size
            characters = group[1]  # len batch_size
            updown = group[2]  # len batch_size
            while j < batch_size:
                reformatted_data[j].append([float(times[j]), float(characters[j]), float(updown[j])])
                j = j + 1
        
        reformatted_data = torch.tensor(reformatted_data)[0]

        outputs = model(reformatted_data.to(device))
        # print('outputs.shape: ', outputs.shape)
        loss = criterion(outputs.to(device), labels[0].to(device))
        train_losses.append(loss.item())
        loss.backward()
        optimizer.step()
       
        _, predicted_train = torch.max(outputs.data, dim = 0)
        total_train += labels.size(0)
        correct_train += (predicted_train == labels[0].to(device)).sum().item()

    train_accuracy = correct_train / total_train
    train_accuracies.append(train_accuracy)
    print(f'Epoch [{epoch+1}/{epochs}], Training Accuracy: {train_accuracy:.4f}')
    
    # Validation
    model.eval()
    val_predictions = []
    val_targets = []
    with torch.no_grad():
        for inputs, labels in val_loader:
            labels = torch.tensor(labels)
            reformatted_data = [[] for i in range(batch_size)]
            for group in inputs:
                j = 0
                times = group[0]  # len batch_size
                characters = group[1]  # len batch_size
                updown = group[2]  # len batch_size
                while j < batch_size:
                    reformatted_data[j].append([float(times[j]), float(characters[j]), float(updown[j])])
                    j = j + 1
            
            reformatted_data = torch.tensor(reformatted_data)[0]
            # print(reformatted_data.shape)

            outputs = model(reformatted_data.to(device))
            loss = criterion(outputs.to(device), labels[0].to(device))
            val_losses.append(loss)
          
            predicted = torch.argmax(outputs)
            val_predictions.append(predicted.cpu().numpy())
            val_targets.append(labels.cpu().numpy())
    
    # print(f'val targets (shape {len(val_targets)}): {val_targets}')
    # print(f'val predictions (shape {len(val_predictions)}): {val_predictions}')
    val_accuracy = accuracy_score(val_targets, val_predictions)
    val_accuracies.append(val_accuracy)
    print(f'Epoch [{epoch+1}/{epochs}], Validation Accuracy: {val_accuracy:.4f}')


  labels = torch.tensor(labels)


x.shape:  torch.Size([90, 3])
x.shape:  torch.Size([54, 3])
x.shape:  torch.Size([45, 3])
x.shape:  torch.Size([41, 3])
x.shape:  torch.Size([53, 3])
x.shape:  torch.Size([91, 3])
x.shape:  torch.Size([136, 3])
x.shape:  torch.Size([87, 3])
x.shape:  torch.Size([99, 3])
x.shape:  torch.Size([100, 3])
x.shape:  torch.Size([87, 3])
x.shape:  torch.Size([60, 3])
x.shape:  torch.Size([72, 3])
x.shape:  torch.Size([82, 3])
x.shape:  torch.Size([66, 3])
x.shape:  torch.Size([84, 3])
x.shape:  torch.Size([114, 3])
x.shape:  torch.Size([112, 3])
x.shape:  torch.Size([51, 3])
x.shape:  torch.Size([54, 3])
x.shape:  torch.Size([45, 3])
x.shape:  torch.Size([99, 3])
x.shape:  torch.Size([48, 3])
x.shape:  torch.Size([71, 3])
x.shape:  torch.Size([52, 3])
x.shape:  torch.Size([64, 3])
x.shape:  torch.Size([104, 3])
x.shape:  torch.Size([92, 3])
x.shape:  torch.Size([74, 3])
x.shape:  torch.Size([111, 3])
x.shape:  torch.Size([48, 3])
x.shape:  torch.Size([122, 3])
x.shape:  torch.Size([49, 3])
x.s

KeyboardInterrupt: 