In [1]:
import torch
import numpy as np
from scipy.sparse import load_npz
import os
from torch.utils.data import Dataset, DataLoader, random_split
import yaml 
from torch import nn
from pprint import pprint
import datetime
import pickle
import time

In [2]:
def load_numpy_obj(file_path: str, extension: str, dtype = torch.float32) -> torch.Tensor:
    """Load a numpy object into a torch tensor
    Args:
        file_path: str, path to the file
        extension: str, file extension to load. Eithere  of .npy or .npz
        dtype: torch.dtype, data type to load the data
    Returns:
        matrix: torch.Tensor, tensor with the data loaded"""
    if extension == ".npy":
        matrix = torch.tensor(np.load(file_path), dtype = dtype)
    elif extension == ".npz":
        matrix = torch.tensor(load_npz(file_path).toarray(), dtype = dtype)
    return matrix



class GraphDataset(Dataset):
    def __init__(self, adj_matrix_dir: str, features_dir: str, transform = None):
        """ Initialize the dataset
        Args:
            adj_matrix_dir: str, path to the directory containing the adjacency matrices
            features_dir: str, path to the directory containing the features
            transform: callable, transformation to apply to the data
        Returns:
            (adj_matrix, feature_matrix, label)"""
        
        super().__init__()
        self.adj_matrix_dir = adj_matrix_dir
        self.features_dir = features_dir
        self.transform = transform

        label_mapping = {
            "A": 0,
            "B": 1,
            "C": 2,
            "D": 3,
            "E": 4,
            "F": 5,
            "G": 6,
            "H": 7,
            "I": 8,
            "J": 9, 
            "K": 10,
            "L": 11,
            "M": 12,
            "N": 13,
            "O": 14,
            "P": 15,
            "Q": 16,
            "R": 17,
            "S": 18,
            "T": 19,
            "U": 20,
            "V": 21,
            "W": 22,
            "X": 23,
            "Y": 24,
            "Z": 25,
            "del": 26,
            "nothing": 27,
            "space": 28
        }

        # Collect all paths and labels withouth loading the data
        self.data = []
        for label in sorted(os.listdir(self.features_dir)):
            adj_label_dir = os.path.join(self.adj_matrix_dir, label)
            features_label_dir = os.path.join(self.features_dir, label)
            if os.path.isdir(adj_label_dir):
                for adj_matrix_file, feature_file in zip(sorted(os.listdir(adj_label_dir)), sorted(os.listdir(features_label_dir))):
                    adj_path = os.path.join(adj_label_dir, adj_matrix_file)
                    feature_path = os.path.join(features_label_dir, feature_file)
                    self.data.append((adj_path, feature_path, label_mapping[label]))


    def __len__(self):
        return len(self.data)
    
    
    def __getitem__(self, idx):
        # Get the paths to the adj matrix and the features and the corresponding label
        adj_path, feature_path, label = self.data[idx]

        # Load the data
        adj_matrix = load_numpy_obj(adj_path, ".npz")
        feature_matrix = load_numpy_obj(feature_path, ".npy")

        # Apply the transformation
        if self.transform:
            adj_matrix, feature_matrix = self.transform(adj_matrix, feature_matrix)

        return adj_matrix, feature_matrix, label
    

def load_dataloader(adj_matrix_dir: str, features_dir: str, config: dict):
    """ Load the data as dataloaders and split it into train, val and test sets
    Args:
        adj_matrix_dir: str, path to the directory containing the adjacency matrices
        features_dir: str, path to the directory containing the features
        config: dict, configuration dictionary
    Returns:
        train_data: DataLoader, data loader for the training data
        val_data: DataLoader, data loader for the validation data
        test_data: DataLoader, data loader for the test data
    Dataset format: (adj_matrix, feature_matrix, label)"""
    
    dataset = GraphDataset(adj_matrix_dir, features_dir)
    train_split, val_split = config.get("dataset").get("split_ratio").get("train"), \
                             config.get("dataset").get("split_ratio").get("val")
    
    train_data, val_data, test_data = random_split(dataset = dataset,  lengths = [int(train_split*len(dataset)), int(val_split*len(dataset)), 
                                                                       len(dataset) - int(train_split*len(dataset)) - int(val_split*len(dataset))])
    
    info = f"""
{"-*"*10} Data Information {"-*"*10} \n
Total number of samples: {len(dataset)}
Number of training samples: {len(train_data)}
Number of validation samples: {len(val_data)}
Number of test samples: {len(test_data)}
{"-"*50}
"""
    print(info)

    batch_Size = config.get("training").get("batch_size")
    num_workers = config.get("training").get("num_workers", 0)

    train_loader = DataLoader(train_data, batch_size = batch_Size, shuffle = True, num_workers=num_workers)
    val_loader = DataLoader(val_data, batch_size = batch_Size, shuffle = False, num_workers=num_workers)
    test_loader = DataLoader(test_data, batch_size = batch_Size, shuffle = False, num_workers=num_workers)
    return train_loader, val_loader, test_loader

In [7]:
f = load_numpy_obj("/Volumes/Extreme SSD/American_Sign_Language/data/processed_data/extended_features/A/A1_extended_features.npy", ".npy")
import pandas as pd
df = pd.DataFrame(f.numpy())
df.columns = ["x", "y", "z", "angle"]
df

Unnamed: 0,x,y,z,angle
0,0.0,0.0,0.0,0.0
1,0.252479,-0.165296,-0.073149,0.0
2,0.408549,-0.464397,-0.085439,2.756327
3,0.432858,-0.715742,-0.102616,2.881634
4,0.401987,-0.909585,-0.105177,0.0
5,0.20236,-0.684384,-0.001825,0.0
6,0.291251,-0.831255,-0.142947,1.18618
7,0.304694,-0.612575,-0.224034,2.889807
8,0.30304,-0.422437,-0.245428,0.0
9,0.029805,-0.660269,-0.005085,0.0


In [4]:
class GCNLayer(nn.Module):
    def __init__(self,input_dim: int,  output_dim: int, *args, **kwargs):
        """ Initialize the GCN layer
        Args:
            input_dim: int, input dimension of the layer
            output_dim: int, output dimension of the layer"""
        super().__init__(*args, **kwargs)
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.activation = nn.LeakyReLU(negative_slope=0.2, 
                                       inplace=True)
        
        # Initialize a weight matrix as a parameter
        self.W = nn.Parameter(torch.empty(input_dim, output_dim))
        torch.nn.init.xavier_uniform_(self.W)

        self.residual_transform = nn.Linear(input_dim, output_dim) if input_dim != output_dim else None

    def calculate_degree_matrix(self, A: torch.Tensor):
        """ Calculate the degree matrix of the adjacency matrix
        Args:
            A: torch.Tensor, adjacency matrix
        Returns:
            D: torch.Tensor, degree matrix"""
        A_hat = A + torch.eye(A.size(1), device=A.device) # Add self connections

        # Degree matrix D and D^{-1/2}
        degrees = A_hat.sum(dim=1)
        D_neg_sqrt = torch.diag_embed(degrees.pow(-0.5))
        return A_hat, D_neg_sqrt
    
    def forward(self, X: torch.Tensor, A: torch.Tensor):
        """ Forward pass of the GCN layer
        Args:
            X (torch.Tensor): Input feature matrix of shape (N, F)
            A (torch.Tensor): Adjacency matrix of shape (N, N)
        Returns:
            torch.Tensor: Output feature matrix of shape (N, output_dim)"""
        
        A_hat, D_neg_sqrt = self.calculate_degree_matrix(A)  # Compute degree matrix

        # Graph convolution operation
        support = torch.matmul(D_neg_sqrt, torch.matmul(A_hat, D_neg_sqrt))
        output = torch.matmul(support, torch.matmul(X, self.W))

        # Residual connection
        if self.residual_transform is not None:
            X_transformed = self.residual_transform(X)
        else:
            X_transformed = X
        
        # Add the residual connection to the output
        output += X_transformed

        # Activation function
        return self.activation(output)
    


class GCN(nn.Module):
    def __init__(self, input_dim: int, hidden_dims: list, num_classes: int, *args, **kwargs) -> None:
        """ Initialize the GCN model
        Args:
            input_dim: int, input dimension of the model
            hidden_dims: list, list of hidden dimensions
            num_classes: int, number of classes in the dataset"""
        super().__init__(*args, **kwargs)
        self.input_dim: int = input_dim
        self.hidden_dims: int = hidden_dims
        self.num_classes: int = num_classes

        # Extra layers
        self.dropout = nn.Dropout(kwargs.get("dropout", 0.5))
        self.batch_norm = nn.BatchNorm1d(hidden_dims[1])

        # Initialize the GCN layers
        self.layers = nn.ModuleList()
        for hidden_dim in hidden_dims:
            self.layers.append(GCNLayer(input_dim, hidden_dim))
            self.layers.append(self.dropout)
            input_dim = hidden_dim

        # put one batchnorm after 2nd layer
        self.layers.insert(3, self.batch_norm) # insert batchnorm after 2nd layer

        # Output layer
        self.output_layer = nn.Linear(input_dim, num_classes)

    def forward(self, X: torch.Tensor, A: torch.Tensor) -> torch.Tensor:
        """ Forward pass of the GCN model
        Args:
            X (torch.Tensor): Input feature matrix of shape (N, F)
            A (torch.Tensor): Adjacency matrix of shape (N, N)
        Returns:
            torch.Tensor: Output feature matrix of shape (N, num_classes)"""
        for layer in self.layers:

            # if layer is dropout or batchnorm, don't pass A
            if isinstance(layer, nn.Dropout): X = layer(X)
            elif isinstance(layer, nn.BatchNorm1d): 
                original_shape = X.shape
                X = layer(X.view(-1, X.size(-1))) # BatchNorm1d expects (batch_size x num_features)
                X = X.view(original_shape) # Reshape back to original shape
            else: X = layer(X, A)

        # Flatten the output and pass it through the output layer
        X = X.view(X.size(0), -1) # first dimension is batch size
        output = nn.Linear(X.size(1), self.num_classes)(X)  # Output layer, outputs (batch_size x num_classes)

        return output

In [None]:
def train(model: nn.Module, optimiser: torch.optim.Optimizer,
          criterion: torch.nn.modules.loss._Loss, 
          train_loader: torch.utils.data.DataLoader, val_loader: torch.utils.data.DataLoader, 
          device = torch.device("cuda" if torch.cuda.is_available() else "cpu"), 
          config: dict = None, **kwargs) -> Tuple[nn.Module, dict]:
    """ Train a model based on the provided configuration in the config file
    Args:
        model (nn.Module): PyTorch model to train
        optimiser (torch.optim.Optimizer): Optimiser to use for training
        criterion (torch.nn.modules.loss._Loss): Loss function to use for training
        train_loader (torch.utils.data.DataLoader): DataLoader for training data
        val_loader (torch.utils.data.DataLoader): DataLoader for validation data
        config (dict): Configuration dictionary (default: None)"""
    
    # Set up the hyperparameters
    if config:
        NUM_EPOCHS = config.get("training").get("epochs", 100)
        EARLY_STOPPING_PATIENCE = config.get("training").get("early_stopping").get("patience", 15)
        MODEL_SAVE_DIR = config.get("logging").get("checkpoint_path", "Models") 
        LOG_DIR = config.get("logging").get("log_dir", "logs")
    else:
        NUM_EPOCHS = kwargs.get("epochs", 100)  
        EARLY_STOPPING_PATIENCE = kwargs.get("early_stopping_patience", 15)
        MODEL_SAVE_DIR = kwargs.get("model_save_dir", "Models")
        LOG_DIR = kwargs.get("log_dir", "logs")

    WEIGHT_DECAY = optimiser.param_groups[0].get("weight_decay", 0)
    LEARNING_RATE = optimiser.param_groups[0].get("lr", 0.01)
    N_PARAMS = sum(p.numel() for p in model.parameters() if p.requires_grad)

    
    if not os.path.exists(MODEL_SAVE_DIR): os.makedirs(MODEL_SAVE_DIR)
    if not os.path.exists(LOG_DIR): os.makedirs(LOG_DIR)

    start_time = time.strftime("%Y-%m-%d, %H:%M:%S")
    session_id = datetime.now().strftime("%Y%m%d_%H%M%S")
    model_save_path = os.path.join(MODEL_SAVE_DIR, f"{session_id}.pth")
    logs_save_path = os.path.join(LOG_DIR, f"{session_id}.pkl")


    training_info = {
        "📅 Training Start Time": start_time,
        "📈 Total Number of Epochs": NUM_EPOCHS,
        "💻 Device Used for Training": device,
        "🆔 Session ID": session_id,

        "🔢 Number of Trainable Parameters": N_PARAMS,
        "🚦 Early Stopping Patience": EARLY_STOPPING_PATIENCE,
        "📉 Weight Decay": WEIGHT_DECAY,
        "📈 Initial Learning Rate": LEARNING_RATE,

        "📂 Model Save Path": model_save_path,
        "📄 Logs Save Path": logs_save_path
    }

    print("\n\n════════════════════════════════════════════")
    print("TRAINING SESSION START")
    print("════════════════════════════════════════════")
    pprint(training_info)
    print("════════════════════════════════════════════")


    train_losses, val_losses = [], [] # Lists to store the training and validation losses
    best_val_loss = float("inf") # Variable to store the best validation loss
    patience = 0 # Variable to store the patience
    
    model.to(device) # Move the model to the device
    loop = tqdm(range(NUM_EPOCHS), desc="Training", position=0, leave=True)
    for epoch in loop:
        model.train()
        epoch_loss = 0.0

        # Training loop
        for adj_matrix, features, labels in train_loader:
            features, adj_matrix, labels = features.to(device), adj_matrix.to(device), labels.to(device)

            # Forward pass
            optimiser.zero_grad() # Zero out the gradients
            output = model(features, adj_matrix) # Forward pass
            loss = criterion(output, labels) # Calculate the loss
            loss.backward() # Backward pass
            optimiser.step() # Update the weights

            epoch_loss += loss.item()

        avg_epoch_loss = epoch_loss / len(train_loader) # Calculate the average loss for the epoch
        train_losses.append(avg_epoch_loss)

        # Validation loop
        model.eval()
        val_epoch_loss = 0.0

        with torch.no_grad():
            for adj_matrix, features, labels in val_loader:
                features, adj_matrix, labels = features.to(device), adj_matrix.to(device), labels.to(device)
                output = model(features, adj_matrix)
                loss = criterion(output, labels)
                val_epoch_loss += loss.item()

        avg_val_epoch_loss = val_epoch_loss / len(val_loader)
        val_losses.append(avg_val_epoch_loss)

        loop.set_description(f"Epoch: {epoch+1}/{NUM_EPOCHS}, Train Loss: {avg_epoch_loss:.4f}, Val Loss: {avg_val_epoch_loss:.4f}, Patience: {patience}")

        if avg_val_epoch_loss < best_val_loss:
            best_val_loss = avg_val_epoch_loss
            best_epoch = epoch
            torch.save(model.state_dict(), model_save_path)
            patience = 0
        else:
            patience += 1
            if patience > EARLY_STOPPING_PATIENCE:
                print(f"Early stopping at epoch {epoch}")
                break

    end_time = time.strftime("%Y-%m-%d, %H:%M:%S")
    training_info = {
        "Training Session Summary": {
            "📅 Training Start Time": start_time,
            "⏰ Estimated Training End": end_time,
            "🆔 Session ID": session_id
        },
        "Training Configuration": {
            "📈 Number of Epochs Trained": NUM_EPOCHS,
            "🔢 Number of Trainable Parameters": N_PARAMS,
            "💻 Device Used for Training": device,
            "🚦 Early Stopping Patience": EARLY_STOPPING_PATIENCE,
            "📉 Weight Decay": WEIGHT_DECAY,
            "📈 Learning Rate": LEARNING_RATE
        },
        "File Paths": {
            "📂 Model Saved at": model_save_path,
            "📄 Logs Saved at": logs_save_path
        },
        "Best Performance Metrics": {
            "🔍 Best Validation Loss": round(best_val_loss, 4),
            "🔍 Best Training Loss": round(min(train_losses), 4),
            "🏆 Best Epoch": best_epoch
        }
    }
        
    print("\n\n════════════════════════════════════════════")
    print("TRAINING SESSION END")
    print("════════════════════════════════════════════")
    pprint(training_info, width=150)
    print("════════════════════════════════════════════")


    results = {
        "description": training_info,
        "train_losses": train_losses,
        "val_losses": val_losses,
        "best_epoch": best_epoch,
        "best_val_loss": best_val_loss, 
        "model_path": model_save_path, 
    }

    # Save the results dictionary in logs 
    with open(logs_save_path, "wb") as file:
        pickle.dump(results, file)

    
    return model, results
