### Setup

In [1]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


#### Install Pytorch Geometric Temporal

In [2]:
!python -c "import torch; print(torch.__version__)"
!python -c "import torch; print(torch.version.cuda)"
!pip install torch-geometric-temporal

2.0.1+cu118
11.8
Collecting torch-geometric-temporal
  Downloading torch_geometric_temporal-0.54.0.tar.gz (48 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m48.1/48.1 kB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting pandas<=1.3.5 (from torch-geometric-temporal)
  Downloading pandas-1.3.5-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (11.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m11.5/11.5 MB[0m [31m37.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting torch_sparse (from torch-geometric-temporal)
  Downloading torch_sparse-0.6.18.tar.gz (209 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m210.0/210.0 kB[0m [31m20.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting torch_scatter (from torch-geometric-temporal)
  Downloading torch_scatter-2.1.2.tar.gz (108 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━

### `ASLDatasetLoader` Class

The `ASLDatasetLoader` class is designed for loading and processing the ASL dataset. Given a directory, it reads sign language data from JSON files and constructs graph representations suitable for graph-based neural networks. Crucially, the class converts JSON data into PyTorch Geometric (PyG) `Data` objects comprising `x` (node features), `edge_index` (graph connectivity), and `y' (labels) attributes.

**Methods**:

- `_create_sign_to_label_map`: Generates a mapping from sign names to unique labels.

- `_read_file_data`: Reads data from a given JSON file.

- `_augment_data`: Implements data augmentation by applying random rotation, translation, and scaling to landmarks, which can enhance the model's robustness.

- `_create_graph_from_frame`: Constructs a PyG `Data` object from frame data, concentrating on hand and face landmarks. Edges are created between consecutive landmarks and between left and right hand landmarks. Additional features, like hand-to-face distances, are also computed.

- `get_dataset`: Assembles the dataset, optionally incorporating data augmentation. The function outputs a list of PyG `Data` objects ready for graph neural network processing.

In [4]:
import torch
import os
import json
import numpy as np
from torch_geometric.data import Data

class ASLDatasetLoader:
    def __init__(self, directory_path):
        self.directory_path = directory_path
        self.sign_to_label = self._create_sign_to_label_map()

    def _create_sign_to_label_map(self):
        signs = [os.path.splitext(filename)[0] for filename in os.listdir(self.directory_path)]
        return {sign: i for i, sign in enumerate(signs)}

    def _read_file_data(self, file_path):
        with open(file_path, 'r') as f:
            return json.load(f)

    def _augment_data(self, frame_data, rotation_range=10, translation_range=0.05, scaling_range=0.1):
        """
        Augment the frame data with random rotation, translation, and scaling.

        :param frame_data: Dictionary containing frame landmarks and deltas.
        :param rotation_range: Maximum rotation angle in degrees.
        :param translation_range: Maximum translation as a fraction of landmark range.
        :param scaling_range: Maximum scaling factor.
        :return: Augmented frame data.
        """
        landmarks = np.array(frame_data["landmarks"])
        centroid = np.mean(landmarks, axis=0)

        # Random rotation
        theta = np.radians(np.random.uniform(-rotation_range, rotation_range))
        rotation_matrix = np.array([
            [np.cos(theta), -np.sin(theta)],
            [np.sin(theta), np.cos(theta)]
        ])
        landmarks = np.dot(landmarks - centroid, rotation_matrix) + centroid

        # Random translation
        max_translation = translation_range * (landmarks.max(axis=0) - landmarks.min(axis=0))
        translations = np.random.uniform(-max_translation, max_translation)
        landmarks += translations

        # Random scaling
        scale = np.random.uniform(1 - scaling_range, 1 + scaling_range)
        landmarks = centroid + scale * (landmarks - centroid)

        frame_data["landmarks"] = landmarks.tolist()
        return frame_data

    def _calculate_dominant_hand(self, sign_data):
        """
        Determine the dominant hand in a sign language data sample.

        This function analyzes the motion of both hands throughout the frames in
        a given sign language data sample. The dominant hand is determined based
        on the average magnitude and frequency of motion. The hand with the
        higher average magnitude or higher motion event frequency is considered
        dominant.

        Returns
        -------
        str
            A string indicating the dominant hand. Possible return values are
            "left", "right", or "ambiguous" if no clear dominant hand can be
            determined.

        Notes
        -----
        The function assumes that the order of landmarks and deltas is consistent
        across frames and that hands' landmarks are distinguishable in the
        landmark data (e.g., by their order or a separate landmark type identifier).

        The decision criterion for dominant hand detection is heuristic and may
        require adjustment based on empirical results and specific use case needs.
        """
        left_hand_motion = 0
        right_hand_motion = 0
        left_hand_motion_events = 0
        right_hand_motion_events = 0

        for frame_data in sign_data["frames"]:
            landmarks = np.array(frame_data["landmarks"])
            deltas = np.array(frame_data["deltas"])
            landmark_types = frame_data["landmark_types"]

            for delta, ltype in zip(deltas, landmark_types):
                motion_magnitude = np.linalg.norm(delta)

                if ltype == "L":
                    left_hand_motion += motion_magnitude
                    if motion_magnitude > 0.5:  # Threshold may need adjustment
                        left_hand_motion_events += 1

                elif ltype == "R":
                    right_hand_motion += motion_magnitude
                    if motion_magnitude > 0.5:  # Threshold may need adjustment
                        right_hand_motion_events += 1

        # Combine motion magnitude and motion events to determine the dominant hand
        # Weights (0.5 and 0.5) might need adjustment based on empirical observation
        left_hand_score = 0.5 * left_hand_motion + 0.5 * left_hand_motion_events
        right_hand_score = 0.5 * right_hand_motion + 0.5 * right_hand_motion_events

        return "left" if left_hand_score > right_hand_score else "right"


    def _create_graph_from_frame(self, sign_name, frame_data, sign_data, landmark_types):
        # Calculate dominant hand
        dominant_hand = self._calculate_dominant_hand(sign_data)

        # Extract landmark and delta information
        landmarks = np.array(frame_data["landmarks"])
        deltas = np.array(frame_data["deltas"])

        # Add dominant hand information to node features
        dominant_hand_feature = [
            1 if ((t == "L" and dominant_hand == "left") or (t == "R" and dominant_hand == "right")) else 0
            for t in landmark_types
        ]
        dominant_hand_feature_2d = np.array(dominant_hand_feature)[:, np.newaxis]

        # Compute additional features like hand-to-face and hand-to-body distances
        # ... (If you have additional feature creation logic, add here)
        hand_to_face_contact = [0] * len(landmark_types) # replace this line with actual feature creation if used
        hand_to_face_contact_2d = np.array(hand_to_face_contact)[:, np.newaxis]

        # Create weights based on landmark importance
        weights = [2 if t == "L" or t == "R" else 1 for t in landmark_types]
        weights_2d = np.array(weights)[:, np.newaxis]

        # Concatenate landmarks, deltas, importance weights, hand-to-face contact features, and dominant hand feature
        x = torch.tensor(np.hstack((landmarks, deltas, weights_2d, hand_to_face_contact_2d, dominant_hand_feature_2d)), dtype=torch.float)
        y = torch.tensor([self.sign_to_label[sign_name]], dtype=torch.long)

        # Create edges based on the number of available landmarks (or nodes)
        # You might have specific logic to determine edges based on landmark types or spatial proximity
        edges = [[i, i + 1] for i in range(len(landmarks) - 1)]

        # Add edges between the left and right hand landmarks
        for i, t1 in enumerate(landmark_types):
            for j, t2 in enumerate(landmark_types):
                if t1 in ["L", "R"] and t2 in ["L", "R"] and i != j:
                    edges.append([i, j])

        edge_index = torch.tensor(edges, dtype=torch.long).t().contiguous()

        return Data(x=x, edge_index=edge_index, y=y)

    def get_dataset(self, augment=False):
      dataset = []

      for filename in os.listdir(self.directory_path):
          sign_name = os.path.splitext(filename)[0]
          file_path = os.path.join(self.directory_path, filename)
          sign_data = self._read_file_data(file_path)

          for frame_data in sign_data["frames"]:
              landmark_types = sign_data.get("landmark_types", ["F", "L", "P", "R"])  # defaulting to all types

              if augment:
                  frame_data = self._augment_data(frame_data)
              graph_data = self._create_graph_from_frame(sign_name, frame_data, sign_data, landmark_types)

              dataset.append(graph_data)

      return dataset

    def number_of_classes(self):
        return len(self.sign_to_label)

### `ASLGraphClassifier` Class

The `ASLGraphClassifier`, features deeper GCN layers and additional channels to capture intricate data patterns potentially. It takes a PyG `Data` object as input, and its forward pass emits class logits.

**Methods**:

- `forward`: Details the forward pass, accepting a PyG `Data` object. Two GCN layers with subsequent batch normalization and dropout layers process the input. Post global max-pooling, two linear layers coupled with dropout ensure final classification, leading to log-softmax outputs.

In [5]:
import torch.nn.functional as F
from torch_geometric.nn import GCNConv, global_max_pool, global_mean_pool

class ASLGraphClassifier(torch.nn.Module):
    def __init__(self, num_features, num_classes):
        super(ASLGraphClassifier, self).__init__()
        self.conv1 = GCNConv(num_features, 512)
        self.conv2 = GCNConv(512, 1024)
        self.conv3 = GCNConv(1024, 1024)  # Added layer
        self.bn1 = torch.nn.BatchNorm1d(512)
        self.bn2 = torch.nn.BatchNorm1d(1024)
        self.bn3 = torch.nn.BatchNorm1d(1024)  # Added layer
        self.lin1 = torch.nn.Linear(1024, 512)
        self.lin2 = torch.nn.Linear(512, 256)
        self.lin3 = torch.nn.Linear(256, num_classes)  # Added layer
        self.dropout = torch.nn.Dropout(p=0.2)

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch
        x = F.relu(self.bn1(self.conv1(x, edge_index)))
        x = self.dropout(x)
        x = F.relu(self.bn2(self.conv2(x, edge_index)))
        x = self.dropout(x)
        x = F.relu(self.bn3(self.conv3(x, edge_index)))  # Added layer
        x = self.dropout(x)
        x = global_max_pool(x, batch)
        x = F.relu(self.lin1(x))
        x = self.dropout(x)
        x = F.relu(self.lin2(x))  # Added layer
        x = self.dropout(x)
        x = self.lin3(x)  # Modified layer
        return F.log_softmax(x, dim=1)

In [7]:
from sklearn.model_selection import train_test_split
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch_geometric.loader import DataLoader
from collections import Counter
import random

EPOCHS = 200
LEARNING_RATE = 0.002


def stratified_data_split(data_list, test_size=0.2):
    """
    This function splits a dataset into training and testing subsets, preserving
    the class distribution by leveraging the stratification capabilities of
    `train_test_split` from `sklearn`. Stratification helps with potential class
    imbalances.
    """
    # Extract labels from data list
    labels = [data.y.item() for data in data_list]

    # Use sklearn's train_test_split with stratify option
    train_data, test_data = train_test_split(data_list, test_size=test_size, stratify=labels, random_state=42)

    return train_data, test_data


def validate(loader, model, device):
    """
    Used to evaluate the model on validation/test data, computing accuracy as a
    performance metric, and offering insights into the model's efficacy.
    """
    model.eval()
    correct = 0
    for data in loader:
        data = data.to(device)
        with torch.no_grad():
            out = model(data)
        pred = out.argmax(dim=1)
        correct += int((pred == data.y).sum())
    return correct / len(loader.dataset)

def train(loader):
    """
    The `train` function establishes the training loop for the graph-based
    neural network. It enacts typical training loop tasks like logging
    epoch-wise loss, validation, and early stopping.

    The function also harnesses schedulers, regularization techniques, and
    gradient clipping to ensure smooth and optimal training.
    """

    # Create the entire dataset without augmentation and then perform stratified split
    data_list = loader.get_dataset()
    train_dataset, test_dataset = stratified_data_split(data_list, test_size=0.2)

    # Now augment only the training dataset
    augmented_train_dataset = loader.get_dataset(augment=True)

    num_classes = loader.number_of_classes()

    train_labels = [data.y.item() for data in train_dataset]
    test_labels = [data.y.item() for data in test_dataset]

    print("Training label distribution:", Counter(train_labels))
    print("Test label distribution:", Counter(test_labels))

    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    num_features = train_dataset[0].x.size(1)
    model = ASLGraphClassifier(num_features=num_features, num_classes=num_classes).to(device)

    optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE, weight_decay=5e-4)
    scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.7, patience=5, verbose=True)

    max_epochs_without_improvement = 20
    epochs_without_improvement = 0
    best_val_accuracy = 0

    model.train()
    for epoch in range(EPOCHS):
        total_loss = 0
        for batch in train_loader:
            batch = batch.to(device)
            optimizer.zero_grad()
            out = model(batch)
            loss = F.nll_loss(out, batch.y)
            loss.backward()

            # Gradient clipping
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1)

            optimizer.step()
            total_loss += loss.item()

            # Check for NaN loss
            if np.isnan(loss.item()):
                print("Warning: NaN loss detected!")

        avg_loss = total_loss / len(train_loader)
        print(f"Epoch {epoch}, Loss: {avg_loss}")

        val_accuracy = validate(test_loader, model, device)
        scheduler.step(val_accuracy)

        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            epochs_without_improvement = 0
        else:
            epochs_without_improvement += 1

        if epochs_without_improvement >= max_epochs_without_improvement:
            print("Early stopping triggered.")
            break

    model.eval()
    correct = 0
    all_preds = []
    all_labels = []

    for batch in test_loader:
        batch = batch.to(device)
        with torch.no_grad():
            pred = model(batch).max(dim=1)[1]
            all_preds.extend(pred.cpu().numpy())
            all_labels.extend(batch.y.cpu().numpy())
            correct += pred.eq(batch.y).sum().item()

    accuracy = correct / len(test_dataset)

    print(f"Accuracy: {accuracy}")
    print("Sample predictions:", all_preds[:20])
    print("Sample true labels:", all_labels[:20])

    return model, all_preds, all_labels, accuracy


In [None]:
directory_path = "/content/drive/MyDrive/Colab Notebooks/DGMD E-14 Project/Datasets/ASL"
loader = ASLDatasetLoader(directory_path)

model, all_preds, all_labels, accuracy = train(loader)

Training label distribution: Counter({190: 438, 145: 430, 88: 430, 93: 420, 208: 402, 121: 400, 209: 363, 76: 346, 56: 338, 92: 338, 180: 332, 34: 330, 16: 324, 172: 323, 182: 316, 31: 310, 24: 309, 195: 308, 183: 307, 192: 306, 148: 304, 215: 298, 10: 297, 150: 297, 130: 296, 29: 295, 113: 295, 151: 295, 79: 294, 143: 293, 149: 292, 44: 290, 64: 289, 106: 288, 89: 286, 210: 282, 1: 279, 147: 279, 169: 278, 107: 278, 191: 278, 52: 277, 114: 277, 13: 276, 77: 275, 98: 274, 4: 274, 19: 274, 170: 273, 205: 273, 81: 271, 206: 270, 154: 270, 199: 270, 46: 270, 212: 269, 65: 269, 32: 268, 2: 266, 211: 265, 55: 265, 36: 265, 152: 264, 7: 263, 14: 263, 35: 263, 60: 262, 73: 262, 139: 262, 119: 262, 204: 261, 133: 261, 94: 261, 43: 260, 216: 258, 68: 258, 66: 257, 178: 256, 125: 255, 30: 255, 138: 255, 179: 254, 157: 254, 59: 253, 33: 253, 181: 252, 47: 251, 53: 250, 126: 250, 120: 250, 186: 250, 129: 249, 15: 249, 142: 247, 219: 247, 40: 247, 62: 247, 164: 247, 41: 247, 162: 246, 102: 246, 213

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report
import numpy as np

# Convert lists to numpy arrays for compatibility with sklearn
y_true = np.array(all_labels)
y_pred = np.array(all_preds)

# Calculate the confusion matrix
cm = confusion_matrix(y_true, y_pred)

# Visualize the confusion matrix
plt.figure(figsize=(10, 7))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues")
plt.title("Confusion Matrix")
plt.xlabel("Predicted")
plt.ylabel("True")
plt.show()

# Ensure the class names are in the correct order for target_names
ordered_class_names = [name for name, num in sorted(loader.sign_to_label.items(), key=lambda item: item[1])]

# Per-Class Accuracy
class_accuracy = cm.diagonal() / cm.sum(axis=1)
for i, acc in enumerate(class_accuracy):
    class_name = ordered_class_names[i]
    print(f"Accuracy for class {i} ({class_name}): {acc*100:.2f}%")

# Detailed classification report
print("\nClassification Report:\n")
print(classification_report(y_true, y_pred, target_names=ordered_class_names, zero_division=1))

In [None]:
from sklearn.metrics import classification_report

def print_top_misclassified_classes(y_true, y_pred, sign_to_label, N=3, zero_division=1):
    """
    Prints the top N classes that get misclassified the most.

    Parameters:
    - y_true: Actual labels
    - y_pred: Predicted labels by the model
    - sign_to_label: Dictionary mapping class names to class numbers
    - N: Number of top misclassified classes to print
    - zero_division: Parameter for handling zero division in classification_report

    Returns:
    None
    """

    # Ensure the class names are in the correct order for target_names
    ordered_class_names = [name for name, num in sorted(sign_to_label.items(), key=lambda item: item[1])]

    # Generate and print classification report with class names
    print("\nClassification Report:\n")
    print(classification_report(y_true, y_pred, target_names=ordered_class_names, zero_division=zero_division))

    # Generate classification report as dict to find misclassified classes
    report = classification_report(y_true, y_pred, output_dict=True, zero_division=zero_division)

    # Create a dictionary to store misclassification rates
    misclassification_rates = {}

    # Iterate through each class in the report
    for class_num, metrics in report.items():
        if class_num.isdigit():
            class_name = [key for key, value in sign_to_label.items() if value == int(class_num)][0]
            misclassification_rates[class_name] = 1 - metrics['recall']

    # Sort classes based on misclassification rate
    sorted_classes = sorted(misclassification_rates, key=misclassification_rates.get, reverse=True)

    # Print top N misclassified classes
    print(f"\nTop {N} misclassified classes:")
    for i in range(N):
        class_name = sorted_classes[i]
        print(f"{i+1}. {class_name} - Misclassification rate: {misclassification_rates[class_name]:.2f}")

In [None]:
print_top_misclassified_classes(y_true, y_pred, loader.sign_to_label, N=10, zero_division=1)