In [None]:
%env CUDA_VISIBLE_DEVICES=1

In [None]:
from AudioKeystrokeDataset.AudioKeystrokeDataset import AudioKeystrokeDataset
from CoatNet.CoatNet import CoAtNet
from CoatNet.Trainer import Trainer

import os

from torch.utils.data import random_split, DataLoader

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as transforms
from torch.optim.lr_scheduler import ReduceLROnPlateau

## Utils

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:
import json

with open('config.json', 'r') as f:
    config = json.load(f)

DATASET_PATH = config['DATASET_PATH']['all']

## 1. Create Dataset fot Training

In [None]:
dataset = AudioKeystrokeDataset(DATASET_PATH, full_dataset=True)
print(f"Dataset contains {len(dataset)} keystroke samples.")

In [None]:
dataset_size = len(dataset)
train_size = int(0.8 * dataset_size)
val_size = dataset_size - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

val_dataset_size = len(val_dataset)
val_size = int(0.5 * val_dataset_size)
test_size = val_dataset_size - val_size
val_dataset, test_dataset = random_split(val_dataset, [val_size, test_size])

print(f"Training dataset size: {len(train_dataset)}")
print(f"Validation dataset size: {len(val_dataset)}")
print(f"Testing dataset size: {len(test_dataset)}")

In [None]:
num_classes = set(dataset.get_labels())
num_classes

In [None]:
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

## 2. Create Model

In [None]:
model = CoAtNet(num_classes=len(dataset.label2idx), in_channels=1)
model = model.to(device)

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-5)
scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.1, patience=5, verbose=True)

In [None]:
import torch
import os

class Trainer:
    def __init__(self, model, train_loader, val_loader, criterion, optimizer, device, scheduler=None, early_stopping_patience=10):
        self.model = model.to(device)
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.criterion = criterion
        self.optimizer = optimizer
        self.device = device
        self.scheduler = scheduler  
        self.early_stopping_patience = early_stopping_patience

    def train_epoch(self):
        self.model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        for data, targets in self.train_loader:
            data, targets = data.to(self.device), targets.to(self.device)
            if len(data.shape) == 3: 
                data = data.unsqueeze(1)
            self.optimizer.zero_grad()
            outputs = self.model(data)
            loss = self.criterion(outputs, targets)
            loss.backward()
            self.optimizer.step()
            running_loss += loss.item() * data.size(0)
            _, predicted = torch.max(outputs, 1)
            total += targets.size(0)
            correct += (predicted == targets).sum().item()
        epoch_loss = running_loss / total
        epoch_acc = correct / total
        return epoch_loss, epoch_acc

    def validate(self):
        self.model.eval()
        running_loss = 0.0
        correct = 0
        total = 0
        with torch.no_grad():
            for data, targets in self.val_loader:
                data, targets = data.to(self.device), targets.to(self.device)
                if len(data.shape) == 3:
                    data = data.unsqueeze(1)
                outputs = self.model(data)
                loss = self.criterion(outputs, targets)
                running_loss += loss.item() * data.size(0)
                _, predicted = torch.max(outputs, 1)
                total += targets.size(0)
                correct += (predicted == targets).sum().item()
        epoch_loss = running_loss / total
        epoch_acc = correct / total
        return epoch_loss, epoch_acc

    def train(self, num_epochs, save_path=None, resume=False, load_path=None):
        start_epoch = 0
        best_val_acc = 0.0
        patience_counter = 0
        
        if resume and load_path and os.path.exists(load_path):
            checkpoint = torch.load(load_path)
            self.model.load_state_dict(checkpoint['model_state_dict'])
            start_epoch = checkpoint.get('epoch', 0)
            best_val_acc = checkpoint.get('best_val_acc', 0.0)
            print(f"Resuming training from epoch {start_epoch}")
            
        for epoch in range(start_epoch, start_epoch + num_epochs):
            train_loss, train_acc = self.train_epoch()
            val_loss, val_acc = self.validate()
            print(f"Epoch {epoch+1}/{start_epoch + num_epochs} | "
                  f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f} | "
                  f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
            
            if self.scheduler:
                self.scheduler.step(val_acc)
            
            if val_acc > best_val_acc:
                best_val_acc = val_acc
                patience_counter = 0
                if save_path:
                    torch.save({
                        'epoch': epoch + 1,
                        'model_state_dict': self.model.state_dict(),
                        'best_val_acc': best_val_acc
                    }, save_path)
                    print(f"Saved best model at epoch {epoch+1} with Val Acc: {best_val_acc:.4f}")
            else:
                patience_counter += 1
                if patience_counter >= self.early_stopping_patience:
                    print(f"Early stopping triggered at epoch {epoch+1}. No improvement in validation accuracy for {self.early_stopping_patience} epochs.")
                    break

In [None]:
trainer = Trainer(model, train_loader, val_loader, criterion, optimizer, device, scheduler, early_stopping_patience=200)

In [None]:
trainer.train(num_epochs=1100, save_path='best_model.pth', resume=True, load_path='best_model.pth')

In [None]:
model_loaded = CoAtNet(num_classes=len(dataset.label2idx), in_channels=1)
model_loaded = model_loaded.to(device)

checkpoint = torch.load('best_model.pth')
model_loaded.load_state_dict(checkpoint['model_state_dict'])

model_loaded.eval()
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
correct = 0
total = 0
with torch.no_grad():
    for data, targets in test_loader:
        data, targets = data.to(device), targets.to(device)
        if len(data.shape) == 3:
            data = data.unsqueeze(1)
        outputs = model_loaded(data)
        _, predicted = torch.max(outputs, 1)
        total += targets.size(0)
        correct += (predicted == targets).sum().item()
test_accuracy = correct / total
print(f"Test Accuracy: {test_accuracy:.4f}")

## 3. Evaluate

In [None]:
model.eval()
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
correct = 0
total = 0
with torch.no_grad():
    for data, targets in test_loader:
        data, targets = data.to(device), targets.to(device)
        if len(data.shape) == 3:
            data = data.unsqueeze(1)
        outputs = model(data)
        _, predicted = torch.max(outputs, 1)
        total += targets.size(0)
        correct += (predicted == targets).sum().item()
test_accuracy = correct / total
print(f"Test Accuracy: {test_accuracy:.4f}")

In [None]:
import torch
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
import numpy as np

# Set model to evaluation mode
model.eval()

# Lists to store true labels and predictions
all_targets = []
all_predictions = []

# Disable gradient calculations for inference
with torch.no_grad():
    for data, targets in test_loader:
        data, targets = data.to(device), targets.to(device)

        # Ensure correct input dimensions
        if len(data.shape) == 3:
            data = data.unsqueeze(1)

        # Forward pass
        outputs = model(data)
        _, predicted = torch.max(outputs, 1)

        # Store results
        all_targets.extend(targets.cpu().numpy())
        all_predictions.extend(predicted.cpu().numpy())

# Convert lists to numpy arrays
all_targets = np.array(all_targets)
all_predictions = np.array(all_predictions)

# Get unique class labels
class_labels = np.unique(all_targets)

# Compute confusion matrix
cm = confusion_matrix(all_targets, all_predictions, labels=class_labels)

# Plot confusion matrix
plt.figure(figsize=(12, 10))
sns.heatmap(cm, annot=False, fmt="d", cmap="Blues", xticklabels=class_labels, yticklabels=class_labels)
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.title("Confusion Matrix")
# Remove x and y axis ticks completely
ax.set_xticks([])
ax.set_yticks([])
plt.show()


In [None]:
# Convert lists to numpy arrays
all_targets = np.array(all_targets)
all_predictions = np.array(all_predictions)

# Compute confusion matrix
cm = confusion_matrix(all_targets, all_predictions)

# Plot confusion matrix without ticks
plt.figure(figsize=(12, 10))
ax = sns.heatmap(cm, annot=False, fmt="d", cmap="Blues")

# Remove x and y axis ticks completely
ax.set_xticks([])
ax.set_yticks([])

plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.title("Confusion Matrix")
plt.show()

In [None]:
!pip install einops

In [None]:
from coatnet import CoAtNet

num_blocks = [2, 2, 3, 5, 2]            # L
channels = [64, 96, 192, 384, 768]      # D
block_types=['C', 'T', 'T', 'T']        # 'C' for MBConv, 'T' for Transformer

net = CoAtNet((224, 224), 3, num_blocks, channels, block_types=block_types)

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters(), lr=1e-3, weight_decay=1e-5)
scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.1, patience=5, verbose=True)

trainer = Trainer(net, train_loader, val_loader, criterion, optimizer, device, scheduler, early_stopping_patience=20)