In [None]:
!pip install mediapipe

In [None]:
# Mount Google Drive to access the data
from google.colab import drive
drive.mount('/content/drive')
import torch

Mounted at /content/drive


In [None]:
# Set the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [None]:

# Import necessary libraries
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.applications.inception_v3 import InceptionV3
from tensorflow.keras import layers
from tensorflow.keras.layers import BatchNormalization, Dense, GlobalAveragePooling2D
from tensorflow.keras.layers import Bidirectional, LSTM, Reshape, TimeDistributed, Flatten, Dropout
from tensorflow.keras.models import Model, load_model
import os
import torch
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from torchvision.models import inception_v3
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
import copy

# Set up paths
train_faces_path = '/content/drive/MyDrive/AUC/F/train_faces.npy'
val_faces_path = '/content/drive/MyDrive/AUC/F/val_faces.npy'
test_faces_path = '/content/drive/MyDrive/AUC/F/test_faces.npy'

train_metadata_path = '/content/drive/MyDrive/AUC/F/train_metadata.csv'
val_metadata_path = '/content/drive/MyDrive/AUC/F/val_metadata.csv'
test_metadata_path = '/content/drive/MyDrive/AUC/F/test_metadata.csv'

fer_model_path = '/content/drive/My Drive/HHHIncpV3BiLSTM_KDEF.h5'

train_dir = '/content/drive/MyDrive/AUC/WV/train'
val_dir = '/content/drive/MyDrive/AUC/WV/val'
test_dir = '/content/drive/MyDrive/AUC/WV/test'

In [None]:

# --- Load the datasets ---
train_faces = np.load(train_faces_path)
val_faces = np.load(val_faces_path)
test_faces = np.load(test_faces_path)
train_metadata_df = pd.read_csv(train_metadata_path)
val_metadata_df = pd.read_csv(val_metadata_path)
test_metadata_df = pd.read_csv(test_metadata_path)

# Extract face detection flags
train_face_detected = train_metadata_df['face_detected'].astype(int).to_numpy()
val_face_detected = val_metadata_df['face_detected'].astype(int).to_numpy()
test_face_detected = test_metadata_df['face_detected'].astype(int).to_numpy()


In [None]:
# Load the pre-trained FER model
fer_base_model = load_model(fer_model_path)

# List all layers in the model
'''for i, layer in enumerate(fer_base_model.layers):
    print(f"Layer {i}: {layer.name}")
'''



'for i, layer in enumerate(fer_base_model.layers):\n    print(f"Layer {i}: {layer.name}")\n'

In [None]:
fer_base_model.summary()

In [None]:
# --- FER Feature Extraction ---
train_fer_features_output_path = '/content/drive/MyDrive/AUC/F5/train_fer_features.npy'
val_fer_features_output_path = '/content/drive/MyDrive/AUC/F5/val_fer_features.npy'
test_fer_features_output_path = '/content/drive/MyDrive/AUC/F5/test_fer_features.npy'

def extract_fer_features_with_flag(extractor, faces, face_detected, save_path):
    features = []
    extracted_count = 0
    zero_vector_count = 0
    for i, face in enumerate(faces):
        if face_detected[i] == 1:
            feature = extractor.predict(np.expand_dims(face, axis=0))[0]
            extracted_count += 1
        else:
            feature = np.zeros(extractor.output.shape[1])  # Use zeros for no detected face
            zero_vector_count += 1
        features.append(feature)
    features = np.array(features)
    np.save(save_path, features)
    print(f"Features extracted: {extracted_count}, Zero vectors used: {zero_vector_count}")
    print(f"Shape of extracted features: {features.shape}")
    return features
# Load or extract and save FER features with flags
if not os.path.exists(train_fer_features_output_path):
    fer_base_model = load_model(fer_model_path)
    feature_extraction_layer = fer_base_model.get_layer("dense_7")
    fer_feature_extractor = Model(inputs=fer_base_model.input, outputs=feature_extraction_layer.output)
    train_fer_features = extract_fer_features_with_flag(fer_feature_extractor, train_faces, train_face_detected, train_fer_features_output_path)
else:
    train_fer_features = np.load(train_fer_features_output_path)

if not os.path.exists(val_fer_features_output_path):
    val_fer_features = extract_fer_features_with_flag(fer_feature_extractor, val_faces, val_face_detected, val_fer_features_output_path)
else:
    val_fer_features = np.load(val_fer_features_output_path)

if not os.path.exists(test_fer_features_output_path):
    test_fer_features = extract_fer_features_with_flag(fer_feature_extractor, test_faces, test_face_detected, test_fer_features_output_path)
else:
    test_fer_features = np.load(test_fer_features_output_path)


In [None]:
def verify_features(features, face_detected, dataset_name):
    num_zero_vectors = np.sum([np.all(f == 0) for f in features])
    num_features = features.shape[0]
    print(f"{dataset_name} - Total: {num_features}, Zero vectors: {num_zero_vectors}, Detected faces: {np.sum(face_detected)}")

# Verify extracted features for each dataset
verify_features(train_fer_features, train_face_detected, "Train set")
verify_features(val_fer_features, val_face_detected, "Validation set")
verify_features(test_fer_features, test_face_detected, "Test set")

Train set - Total: 8934, Zero vectors: 0, Detected faces: 8934
Validation set - Total: 961, Zero vectors: 0, Detected faces: 961
Test set - Total: 1086, Zero vectors: 0, Detected faces: 1086


In [None]:
# --- FER Feature Loading (Load Only) ---
train_fer_features = np.load('/content/drive/MyDrive/AUC/F5/train_fer_features.npy')
val_fer_features = np.load('/content/drive/MyDrive/AUC/F5/val_fer_features.npy')
test_fer_features = np.load('/content/drive/MyDrive/AUC/F5/test_fer_features.npy')


In [None]:
# Print the shape and length of the loaded FER features
print(f"Train FER features shape: {train_fer_features.shape}")
print(f"Validation FER features shape: {val_fer_features.shape}")
print(f"Test FER features shape: {test_fer_features.shape}")

# Print the number of samples for FER features
print(f"Number of training samples (FER features): {len(train_fer_features)}")
print(f"Number of validation samples (FER features): {len(val_fer_features)}")
print(f"Number of test samples (FER features): {len(test_fer_features)}")

# --- Verification and Summary ---
def verify_features(features, face_detected, dataset_name):
    num_zero_vectors = np.sum([np.all(f == 0) for f in features])
    num_features = features.shape[0]
    print(f"{dataset_name} - Total: {num_features}, Zero vectors: {num_zero_vectors}, Detected faces: {np.sum(face_detected)}")

# Verify extracted features for each dataset
verify_features(train_fer_features, train_face_detected, "Train set")
verify_features(val_fer_features, val_face_detected, "Validation set")
verify_features(test_fer_features, test_face_detected, "Test set")

Train FER features shape: (8934, 512)
Validation FER features shape: (961, 512)
Test FER features shape: (1086, 512)
Number of training samples (FER features): 8934
Number of validation samples (FER features): 961
Number of test samples (FER features): 1086
Train set - Total: 8934, Zero vectors: 0, Detected faces: 8934
Validation set - Total: 961, Zero vectors: 0, Detected faces: 961
Test set - Total: 1086, Zero vectors: 0, Detected faces: 1086


In [None]:
# --- Pose Feature Extraction (Save Only) ---

# Define output paths for each subset
train_pose_features_output_path = '/content/drive/MyDrive/AUC/F6/train_pose_features.npy'
val_pose_features_output_path = '/content/drive/MyDrive/AUC/F6/val_pose_features.npy'
test_pose_features_output_path = '/content/drive/MyDrive/AUC/F6/test_pose_features.npy'


In [None]:
####with visibility:
def extract_pose_landmarks(image_path):
    # Open image and convert to uint8
    image = Image.open(image_path).convert('RGB')
    image = np.array(image)
    image_uint8 = image.astype(np.uint8)
    results = pose.process(image_uint8)
    if results.pose_landmarks:
        # Extract the first 23 points (0-22) with x, y, z coordinates and visibility
        landmarks = []
        for landmark in results.pose_landmarks.landmark[:23]:
            landmarks.extend([landmark.x, landmark.y, landmark.z, landmark.visibility])
        return landmarks
    else:
        # Use zeros for no pose landmarks detected (including visibility)
        return [0] * (23 * 4)  # 23 landmarks * 4 features each (x, y, z, visibility)

# Process and save training set pose features
if not os.path.exists(train_pose_features_output_path):
    import mediapipe as mp
    from PIL import Image

    mp_pose = mp.solutions.pose
    pose = mp_pose.Pose()

    train_dataset = datasets.ImageFolder(train_dir)
    train_pose_features = [extract_pose_landmarks(path) for path, _ in train_dataset.imgs]
    np.save(train_pose_features_output_path, train_pose_features)

# Process and save validation set pose features
if not os.path.exists(val_pose_features_output_path):
    val_dataset = datasets.ImageFolder(val_dir)
    val_pose_features = [extract_pose_landmarks(path) for path, _ in val_dataset.imgs]
    np.save(val_pose_features_output_path, val_pose_features)

# Process and save test set pose features
if not os.path.exists(test_pose_features_output_path):
    test_dataset = datasets.ImageFolder(test_dir)
    test_pose_features = [extract_pose_landmarks(path) for path, _ in test_dataset.imgs]
    np.save(test_pose_features_output_path, test_pose_features)




In [None]:
# --- Pose Feature Loading (Load Only) ---
train_pose_features = np.load(train_pose_features_output_path)
val_pose_features = np.load(val_pose_features_output_path)
test_pose_features = np.load(test_pose_features_output_path)

# Print the shape and length of the loaded pose features
print(f"Train Pose features shape: {train_pose_features.shape}")
print(f"Validation Pose features shape: {val_pose_features.shape}")
print(f"Test Pose features shape: {test_pose_features.shape}")

# Print the number of samples for Pose features
print(f"Number of training samples (Pose features): {len(train_pose_features)}")
print(f"Number of validation samples (Pose features): {len(val_pose_features)}")
print(f"Number of test samples (Pose features): {len(test_pose_features)}")


Train Pose features shape: (9538, 92)
Validation Pose features shape: (1017, 92)
Test Pose features shape: (1123, 92)
Number of training samples (Pose features): 9538
Number of validation samples (Pose features): 1017
Number of test samples (Pose features): 1123


In [None]:
# --- Verification and Summary ---
def verify_pose_features(pose_features, dataset_name):
    num_samples = len(pose_features)
    num_detected = sum(1 for feature in pose_features if any(p != 0 for p in feature))
    num_zero_vectors = num_samples - num_detected

    print(f"{dataset_name} - Total: {num_samples}, Detected pose landmarks: {num_detected}, Zero vectors: {num_zero_vectors}")
    print(f"Shape of pose features array: {np.array(pose_features).shape}")

# Verify extracted features for each dataset
verify_pose_features(train_pose_features, "Train set")
verify_pose_features(val_pose_features, "Validation set")
verify_pose_features(test_pose_features, "Test set")


Train set - Total: 9538, Detected pose landmarks: 9406, Zero vectors: 132
Shape of pose features array: (9538, 92)
Validation set - Total: 1017, Detected pose landmarks: 1016, Zero vectors: 1
Shape of pose features array: (1017, 92)
Test set - Total: 1123, Detected pose landmarks: 1123, Zero vectors: 0
Shape of pose features array: (1123, 92)


In [None]:
from collections import Counter

# --- Function to count failed detections per class ---
def count_failed_detections(dataset, pose_features):
    labels = [label for _, label in dataset.imgs]
    failed_indices = [i for i, feature in enumerate(pose_features) if all(p == 0 for p in feature)]
    failed_labels = [labels[i] for i in failed_indices]
    return Counter(failed_labels)

# Load datasets for labels
train_dataset = datasets.ImageFolder(train_dir)
val_dataset = datasets.ImageFolder(val_dir)
test_dataset = datasets.ImageFolder(test_dir)

# Count failures for each subset
train_failures = count_failed_detections(train_dataset, train_pose_features)
val_failures = count_failed_detections(val_dataset, val_pose_features)
test_failures = count_failed_detections(test_dataset, test_pose_features)

# Map class indices to class names
class_names = train_dataset.classes  # Assuming same classes across all sets

# Print results
print("Failed Pose Detection Count per Class:")

print("Train set:")
for cls_idx, count in train_failures.items():
    print(f"  Class '{class_names[cls_idx]}': {count}")

print("Validation set:")
for cls_idx, count in val_failures.items():
    print(f"  Class '{class_names[cls_idx]}': {count}")

print("Test set:")
for cls_idx, count in test_failures.items():
    print(f"  Class '{class_names[cls_idx]}': {count}")


Failed Pose Detection Count per Class:
Train set:
  Class 'c0': 7
  Class 'c1': 7
  Class 'c2': 23
  Class 'c4': 25
  Class 'c6': 27
  Class 'c7': 23
  Class 'c8': 20
Validation set:
  Class 'c7': 1
Test set:


In [None]:
def verify_all_features_present(features, dataset_name):
    num_samples = len(features)
    num_valid = sum(1 for feature in features if any(p != 0 for p in feature))
    num_invalid = num_samples - num_valid

    print(f"{dataset_name} - Total: {num_samples}, Valid features: {num_valid}, Invalid (zero) features: {num_invalid}")
    print(f"Shape of pose features array: {np.array(features).shape}")

# Verify for training set
verify_all_features_present(train_pose_features, "Train set")

# Verify for validation set
verify_all_features_present(val_pose_features, "Validation set")

# Verify for test set
verify_all_features_present(test_pose_features, "Test set")


Train set - Total: 9538, Valid features: 9406, Invalid (zero) features: 132
Shape of pose features array: (9538, 92)
Validation set - Total: 1017, Valid features: 1016, Invalid (zero) features: 1
Shape of pose features array: (1017, 92)
Test set - Total: 1123, Valid features: 1123, Invalid (zero) features: 0
Shape of pose features array: (1123, 92)


In [None]:
import numpy as np

def impute_missing_landmarks_directly(features, detected_flags, class_labels, global_mean=None):
    for cls_idx in np.unique(class_labels):
        # Get indices for the current class
        class_indices = (class_labels == cls_idx)
        detected = detected_flags[class_indices]

        # Determine mean landmarks to use for imputation
        if detected.any():
            mean_landmarks = np.mean(features[class_indices][detected], axis=0)
            print(f"Class {cls_idx}: Using class mean for imputation.")
        else:
            mean_landmarks = global_mean
            print(f"Class {cls_idx}: No valid samples, using global mean for imputation.")

        # Impute missing landmarks (zero vectors) directly in the features array
        for i, index in enumerate(np.where(class_indices)[0]):
            if not detected[i]:  # Only impute where detection flag is False
                print(f"Imputing missing data for index {index} in class {cls_idx}.")
                features[index] = mean_landmarks

        # Confirm changes
        first_zero_vector_idx = next((i for i, f in enumerate(features[class_indices]) if not np.any(f)), None)
        if first_zero_vector_idx is not None:
            print(f"Class {cls_idx} still has zero vector at index {first_zero_vector_idx}.")
        else:
            print(f"Class {cls_idx} all vectors are valid now.")

    return features

# Re-calculate global mean for fallback, excluding zero vectors
global_mean_pose = np.mean(train_pose_features[train_detected_flags], axis=0)

# Apply the imputation function directly
train_pose_features = impute_missing_landmarks_directly(train_pose_features, train_detected_flags, train_class_labels, global_mean_pose)
val_pose_features = impute_missing_landmarks_directly(val_pose_features, val_detected_flags, val_class_labels, global_mean_pose)
test_pose_features = impute_missing_landmarks_directly(test_pose_features, test_detected_flags, test_class_labels, global_mean_pose)

# Re-verify the features
verify_all_features_present(train_pose_features, "Train set")
verify_all_features_present(val_pose_features, "Validation set")
verify_all_features_present(test_pose_features, "Test set")


In [None]:

# --- Custom Callback for Plotting ---
class TrainingPlot:
    def __init__(self):
        self.losses = []
        self.acc = []
        self.val_losses = []
        self.val_acc = []

    def on_epoch_end(self, epoch, logs):
        self.losses.append(logs['loss'])
        self.acc.append(logs['accuracy'])
        self.val_losses.append(logs['val_loss'])
        self.val_acc.append(logs['val_accuracy'])

        if len(self.losses) > 1:
            plt.style.use("seaborn")

            plt.figure()
            plt.plot(self.losses, label="Training Loss")
            plt.plot(self.val_losses, label="Validation Loss")
            plt.title("Training and Validation Loss")
            plt.xlabel("Epoch #")
            plt.ylabel("Loss")
            plt.legend()
            plt.show()

            plt.figure()
            plt.plot(self.acc, label="Training Accuracy")
            plt.plot(self.val_acc, label="Validation Accuracy")
            plt.title("Training and Validation Accuracy")
            plt.xlabel("Epoch #")
            plt.ylabel("Accuracy")
            plt.legend()
            plt.show()

plot_losses = TrainingPlot()


In [None]:
import matplotlib.pyplot as plt
num_epochs = 60
# --- Custom Callback for Plotting ---
class TrainingPlot(object):
    def __init__(self, num_epochs):
        self.num_epochs = num_epochs
        self.losses = []
        self.val_losses = []
        self.accuracies = []
        self.val_accuracies = []

    def on_epoch_end(self, epoch, logs={}):
        self.losses.append(logs.get('loss'))
        self.val_losses.append(logs.get('val_loss'))
        self.accuracies.append(logs.get('accuracy'))
        self.val_accuracies.append(logs.get('val_accuracy'))

        # Check if enough data points are available for plotting
        if len(self.losses) > 1:

            num_epochs = len(self.losses) # Use the actual number of epochs
            plt.style.use("seaborn")
            plt.figure()
            plt.plot(range(num_epochs), self.losses, label="Training Loss")
            plt.plot(range(num_epochs), self.val_losses, label="Validation Loss")
            plt.xlabel("Epoch")
            plt.ylabel("Loss")
            plt.legend()
            plt.title("Training and Validation Loss")
            plt.show()

            plt.figure()
            plt.plot(range(num_epochs), self.accuracies, label="Training Accuracy")
            plt.plot(range(num_epochs), self.val_accuracies, label="Validation Accuracy")
            plt.xlabel("Epoch")
            plt.ylabel("Accuracy")
            plt.legend()
            plt.title("Training and Validation Accuracy")
            plt.show()

plot_losses = TrainingPlot(num_epochs)

In [None]:
# --- Constants and Mappings ---
activity_map_AUC = {
    'c0': 'Drive Safe',
    'c1': 'Text Right',
    'c2': 'Talk Right',
    'c3': 'Text Left',
    'c4': 'Talk Left',
    'c5': 'Adjust Radio',
    'c6': 'Drink',
    'c7': 'Reach Behind',
    'c8': 'Hair and Makeup',
    'c9': 'Talk Passenger'
}

IMG_WIDTH, IMG_HEIGHT = 224, 224
BATCH_SIZE = 32 # Updated batch size as specified


# --- Image Data Loading and Transformation ---

train_transforms = transforms.Compose([
    transforms.Resize((IMG_WIDTH, IMG_HEIGHT)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

val_transforms = transforms.Compose([
    transforms.Resize((IMG_WIDTH, IMG_HEIGHT)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

test_transforms = transforms.Compose([
    transforms.Resize((IMG_WIDTH, IMG_HEIGHT)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Define dataset paths
train_dir = '/content/drive/MyDrive/AUC/WV/train'
val_dir = '/content/drive/MyDrive/AUC/WV/val'
test_dir = '/content/drive/MyDrive/AUC/WV/test'

# Load image datasets
train_dataset = datasets.ImageFolder(train_dir, transform=train_transforms)
val_dataset = datasets.ImageFolder(val_dir, transform=val_transforms)
test_dataset = datasets.ImageFolder(test_dir, transform=test_transforms)

# --- DataLoader Creation ---

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, num_workers=2, pin_memory=True, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, num_workers=2, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, num_workers=2, pin_memory=True)

# --- Dataset Verification ---
print(f"Number of training samples: {len(train_dataset)}")
print(f"Number of validation samples: {len(val_dataset)}")
print(f"Number of test samples: {len(test_dataset)}")

# Check a batch from the train_loader to verify data loading
train_features, train_labels = next(iter(train_loader))
print(f"Batch feature shape (train): {train_features.size()}")
print(f"Batch label shape (train): {train_labels.size()}")


Number of training samples: 9538
Number of validation samples: 1017
Number of test samples: 1123


  self.pid = os.fork()


Batch feature shape (train): torch.Size([32, 3, 224, 224])
Batch label shape (train): torch.Size([32])


# **MODLE DD**

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision.models import inception_v3

# --- Model Definition for Driver Distraction Detection (As in the Paper) ---
class InceptionV3BiLSTM(nn.Module):
    def __init__(self, num_classes=10):
        super(InceptionV3BiLSTM, self).__init__()
        # Load pretrained InceptionV3 model
        self.base_model = inception_v3(pretrained=True, aux_logits=True)
        self.base_model.aux_logits = False
        self.base_model.fc = nn.Identity()  # Remove final fully connected layer
        self.dropout = nn.Dropout(0.1)
        # First BiLSTM layer to capture temporal features from spatial feature maps
        self.bilstm1 = nn.LSTM(input_size=16384, hidden_size=14, num_layers=1, batch_first=True, bidirectional=True)

        # Second BiLSTM layer
        self.bilstm2 = nn.LSTM(input_size=28, hidden_size=14, num_layers=1, batch_first=True, bidirectional=True)

        # Final classification layer
        self.fc = nn.Linear(28, num_classes)  # 256 because BiLSTM is bidirectional with 128 hidden units

    def forward(self, x):
        # Extract features from the base InceptionV3 model
        for name, module in self.base_model.named_children():
            if name == 'AuxLogits' and not self.base_model.aux_logits:
                continue  # Skip the auxiliary logits if aux_logits is False
            x = module(x)
            if name == 'Mixed_7c':  # Stop at Mixed_7c layer
                break

        # Reshape: Flatten spatial dimensions (8x8) into the sequence dimension (8x8 = 64 timesteps)
        x = x.view(x.size(0), -1, 16384)
        x = self.dropout(x)

        # Pass through the first BiLSTM layer
        x, _ = self.bilstm1(x)  # Output shape: (batch_size, 64, 256)

        # Pass through the second BiLSTM layer
        x, _ = self.bilstm2(x)  # Output shape: (batch_size, 64, 256)

        # Use the last time step for classification
        x = x[:, -1, :]  # (shape: batch_size, 256)

        # Final classification layer
        x = self.fc(x)
        return x

# Instantiate and move the model to the GPU
model_dd = InceptionV3BiLSTM(num_classes=len(activity_map_AUC))
model_dd = model_dd.to(device)

# Define loss function and optimizer
criterion_dd = nn.CrossEntropyLoss()
optimizer_dd = optim.Adam(model_dd.parameters(), lr=0.0001)

# Training and validation code remains the same as in your implementation




In [None]:
# --- Training and Validation Loop ---
num_epochs = 60
best_val_loss = float('inf')
early_stopping_patience = 10
early_stopping_counter = 0
best_model_wts_dd = copy.deepcopy(model_dd.state_dict())

for epoch in range(num_epochs):
    model_dd.train()
    running_loss, correct, total = 0.0, 0, 0
    train_progress = tqdm(enumerate(train_loader), total=len(train_loader), desc=f"Epoch {epoch+1}/{num_epochs}")

    for batch_idx, (data, target) in train_progress:
        data, target = data.to(device), target.to(device)
        optimizer_dd.zero_grad()
        outputs = model_dd(data)
        loss = criterion_dd(outputs, target)
        loss.backward()
        optimizer_dd.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += target.size(0)
        correct += (predicted == target).sum().item()
        train_progress.set_postfix(loss=running_loss/(batch_idx+1), accuracy=correct/total)

    # Validation phase
    model_dd.eval()
    val_loss, val_correct, val_total = 0.0, 0, 0
    with torch.no_grad():
        for data, target in val_loader:
            data, target = data.to(device), target.to(device)
            outputs = model_dd(data)
            loss = criterion_dd(outputs, target)
            val_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            val_total += target.size(0)
            val_correct += (predicted == target).sum().item()

    epoch_loss = running_loss / len(train_loader)
    epoch_acc = correct / total
    epoch_val_loss = val_loss / len(val_loader)
    epoch_val_acc = val_correct / val_total

    print(f"Epoch [{epoch+1}/{num_epochs}] Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f} Val Loss: {epoch_val_loss:.4f} Val Acc: {epoch_val_acc:.4f}")

    # Early stopping
    if epoch_val_loss < best_val_loss:
        best_val_loss = epoch_val_loss
        best_model_wts_dd = copy.deepcopy(model_dd.state_dict())
        early_stopping_counter = 0
    else:
        early_stopping_counter += 1
        if early_stopping_counter >= early_stopping_patience:
            print("Early stopping triggered.")
            break

# Restore best model weights
model_dd.load_state_dict(best_model_wts_dd)


In [None]:
# --- Evaluation and Reporting ---
import sklearn.metrics as metrics
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report, confusion_matrix, ConfusionMatrixDisplay
from tabulate import tabulate
import pandas as pd
import matplotlib.pyplot as plt

# Set the model to evaluation mode
model_dd.eval()
y_true = []
y_pred = []

with torch.no_grad():
    for inputs, labels in tqdm(test_loader, desc="Evaluating"):
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model_dd(inputs)
        _, predicted = torch.max(outputs.data, 1)
        y_true.extend(labels.cpu().numpy())
        y_pred.extend(predicted.cpu().numpy())

# Convert lists to numpy arrays for metric calculation
y_true = np.array(y_true)
y_pred = np.array(y_pred)

# Calculate and print metrics
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='macro')
recall = recall_score(y_true, y_pred, average='macro')
f1 = f1_score(y_true, y_pred, average='macro')

print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

# Classification report
class_names = list(activity_map_AUC.values())
report = classification_report(y_true, y_pred, target_names=class_names, output_dict=True)
report_df = pd.DataFrame(report).transpose()
report_df_filtered = report_df.drop('accuracy', errors='ignore')
print(tabulate(report_df_filtered, headers='keys', tablefmt='fancy_grid'))

# Confusion matrix
conf_matrix = confusion_matrix(y_true, y_pred)
ConfusionMatrixDisplay(confusion_matrix=conf_matrix, display_labels=class_names).plot(cmap='OrRd')
plt.title('Confusion Matrix')
plt.show()


### **MODEL DD+FER**

In [None]:
# Example batch input for Driver Distraction (DD) task
inputs = torch.randn(8, 3, 299, 299).to(device)  # Random images for DD task

# Example of extracted FER features
fer_features = torch.randn(8, 512).to(device)  # Randomly generated FER features


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
batch_size = 8
fer_feature_size = 512
num_classes = 10
# --- Model Definition for DD + FER (Driver Distraction + Facial Emotion Recognition) Task ---
class InceptionV3BiLSTM_FER(nn.Module):
    def __init__(self, num_classes=10, fer_feature_size=512):
        super(InceptionV3BiLSTM_FER, self).__init__()
        self.base_model = inception_v3(weights='IMAGENET1K_V1', aux_logits=True)
        self.base_model.aux_logits = False
        self.base_model.fc = nn.Identity()  # Remove the final fully connected layer
        self.bilstm = nn.LSTM(input_size=2048, hidden_size=128, num_layers=1, batch_first=True, bidirectional=True)
        self.dropout = nn.Dropout(0.2)

        # Additional layers for FER features
        self.fc_fer = nn.Linear(fer_feature_size, 128)
        self.fc_combined = nn.Linear(256 + 128, num_classes)  # Fully connected layer for classification

    def forward(self, x, fer_features):
        # Extract features from base model (DD task)
        for name, module in self.base_model.named_children():
            if name == 'AuxLogits' and not self.base_model.aux_logits:
                continue  # Skip auxiliary logits if aux_logits is False
            x = module(x)
            if name == 'Mixed_7c':  # Extract feature map after Mixed_7c
                break

        x = x.view(x.size(0), -1, 2048)  # Reshape for BiLSTM (batch_size, 64, 2048)
        x, _ = self.bilstm(x)  # Pass through BiLSTM
        x = x[:, -1, :]  # Take output of the last time step

        # Process FER features
        fer_x = torch.relu(self.fc_fer(fer_features))

        # Combine DD and FER features
        combined_x = torch.cat((x, fer_x), dim=1)
        combined_x = self.fc_combined(combined_x)
        return combined_x

# Instantiate and move the model to the GPU
model_dd_fer = InceptionV3BiLSTM_FER(num_classes=len(activity_map_AUC), fer_feature_size=train_fer_features.shape[1])
model_dd_fer = model_dd_fer.to(device)

# Define loss function and optimizer
criterion_dd_fer = nn.CrossEntropyLoss()
optimizer_dd_fer = optim.Adam(model_dd_fer.parameters(), lr=0.00001, weight_decay=0.01)

# Define inputs (DD and FER features)
inputs = torch.randn(batch_size, 3, 299, 299).to(device)  # Example DD input
fer_features = torch.randn(batch_size, fer_feature_size).to(device)  # Example FER features

# Handle missing FER features
if fer_features is None:
    fer_features = torch.zeros(batch_size, fer_feature_size).to(device)

# Forward pass with DD and FER features
combined_output = model_dd_fer(inputs, fer_features)

# Compute loss (example labels for classification task)
labels = torch.randint(0, num_classes, (batch_size,)).to(device)
loss = criterion_dd_fer(combined_output, labels)

# Backward pass and optimization
loss.backward()
optimizer_dd_fer.step()


In [None]:
# --- Training and Validation Loop for DD + FER Model ---
num_epochs = 60
best_val_loss = float('inf')
early_stopping_patience = 10
early_stopping_counter = 0
best_model_wts_dd_fer = copy.deepcopy(model_dd_fer.state_dict())

for epoch in range(num_epochs):
    model_dd_fer.train()
    running_loss, correct, total = 0.0, 0, 0
    train_progress = tqdm(enumerate(train_loader), total=len(train_loader), desc=f"Epoch {epoch+1}/{num_epochs}")

    for batch_idx, (data, target) in train_progress:
        data, target = data.to(device), target.to(device)
        optimizer_dd_fer.zero_grad()

        # Extract FER features for the current batch
        batch_indices = list(range(batch_idx * train_loader.batch_size, min((batch_idx + 1) * train_loader.batch_size, len(train_fer_features))))
        fer_batch_features = train_fer_features[batch_indices]

        # Handle missing FER features by replacing with zero vectors
        zero_vector = np.zeros(fer_batch_features.shape[1])
        fer_batch_features = np.array([fer_batch_features[i] if train_face_detected[batch_indices[i]] == 1 else zero_vector for i in range(len(batch_indices))])

        # Convert FER features to tensor and send to device
        fer_batch_features = torch.tensor(fer_batch_features, dtype=torch.float32).to(device)

        # Debug: Print shapes
        print(f"Batch {batch_idx+1}: Data shape {data.shape}, FER features shape {fer_batch_features.shape}")

        # Check for mismatched batch sizes and skip if necessary
        if len(fer_batch_features) != data.size(0):
            print(f"Skipping batch {batch_idx+1} due to mismatched FER features.")
            continue

        # Forward pass
        outputs = model_dd_fer(data, fer_batch_features)
        loss = criterion_dd_fer(outputs, target)
        loss.backward()
        optimizer_dd_fer.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += target.size(0)
        correct += (predicted == target).sum().item()
        train_progress.set_postfix(loss=running_loss/(batch_idx+1), accuracy=correct/total)


    # Validation phase
    model_dd_fer.eval()
    val_loss, val_correct, val_total = 0.0, 0, 0
    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(val_loader):
            data, target = data.to(device), target.to(device)

            # Extract FER features for the current batch
            batch_indices = list(range(batch_idx * val_loader.batch_size, min((batch_idx + 1) * val_loader.batch_size, len(val_fer_features))))
            fer_batch_features = val_fer_features[batch_indices]

            # Handle missing FER features by replacing with zero vectors
            fer_batch_features = np.array([fer_batch_features[i] if val_face_detected[batch_indices[i]] == 1 else zero_vector for i in range(len(batch_indices))])

            # Convert FER features to tensor and send to device
            fer_batch_features = torch.tensor(fer_batch_features, dtype=torch.float32).to(device)

            # Ensure batch sizes match
            if len(fer_batch_features) != data.size(0):
                print(f"Skipping validation batch {batch_idx+1} due to mismatched FER features.")
                continue

            outputs = model_dd_fer(data, fer_batch_features)
            loss = criterion_dd_fer(outputs, target)
            val_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            val_total += target.size(0)
            val_correct += (predicted == target).sum().item()

    epoch_loss = running_loss / len(train_loader)
    epoch_acc = correct / total
    epoch_val_loss = val_loss / len(val_loader)
    epoch_val_acc = val_correct / val_total

    print(f"Epoch [{epoch+1}/{num_epochs}] Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f} Val Loss: {epoch_val_loss:.4f} Val Acc: {epoch_val_acc:.4f}")

    # Early stopping
    if epoch_val_loss < best_val_loss:
        best_val_loss = epoch_val_loss
        best_model_wts_dd_fer = copy.deepcopy(model_dd_fer.state_dict())
        early_stopping_counter = 0
    else:
        early_stopping_counter += 1
        if early_stopping_counter >= early_stopping_patience:
            print("Early stopping triggered.")
            break

# Restore best model weights
model_dd_fer.load_state_dict(best_model_wts_dd_fer)


In [None]:
# --- Evaluation and Reporting for DD + FER ---
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report, confusion_matrix, ConfusionMatrixDisplay
from tabulate import tabulate
import pandas as pd


# Set the model to evaluation mode
model_dd_fer.eval()

# Initialize lists to store true labels and predictions
y_true = []
y_pred = []

# Disable gradient computation for inference
with torch.no_grad():
    for data, target in tqdm(test_loader, desc="Evaluating"):
        data, target = data.to(device), target.to(device)
        # Get FER features for the batch
        fer_batch_features = torch.tensor(test_fer_features[:len(data)]).to(device)
        outputs = model_dd_fer(data, fer_batch_features)
        _, predicted = torch.max(outputs.data, 1)
        y_true.extend(target.cpu().numpy())
        y_pred.extend(predicted.cpu().numpy())

# Convert lists to numpy arrays for metric calculation
y_true = np.array(y_true)
y_pred = np.array(y_pred)

# Calculate evaluation metrics
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='macro')
recall = recall_score(y_true, y_pred, average='macro')
f1 = f1_score(y_true, y_pred, average='macro')

# Print metrics
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

# Generate classification report
class_names = list(activity_map_AUC.values())
report = classification_report(y_true, y_pred, target_names=class_names, output_dict=True)
report_df = pd.DataFrame(report).transpose()
report_df_filtered = report_df.drop('accuracy', errors='ignore')
print(tabulate(report_df_filtered, headers='keys', tablefmt='fancy_grid'))

# Confusion matrix
conf_matrix = confusion_matrix(y_true, y_pred)

# Plot confusion matrix
display_c_m = ConfusionMatrixDisplay(confusion_matrix=conf_matrix, display_labels=class_names)
plt.figure(figsize=(10, 9))
display_c_m.plot(cmap='OrRd', xticks_rotation=25)
plt.xticks(fontsize=10)
plt.yticks(fontsize=10)
plt.title('Confusion Matrix - DD + FER', fontsize=24)
plt.show()


### **DD+FER.Attention Fusion**

In [None]:
# --- Model Definition for DD + FER Attention ---
class InceptionV3BiLSTM_FER_Attention(nn.Module):
    def __init__(self, num_classes=10, fer_feature_size=512):
        super(InceptionV3BiLSTM_FER_Attention, self).__init__()
        self.base_model = inception_v3(weights='IMAGENET1K_V1', aux_logits=True)
        self.base_model.aux_logits = False
        self.base_model.fc = nn.Identity()  # Remove the final fully connected layer
        self.bilstm = nn.LSTM(input_size=2048, hidden_size=128, num_layers=1, batch_first=True, bidirectional=True)
        self.attention = nn.Linear(256, 1)  # Attention mechanism on CNN+LSTM features
        self.fc_fer = nn.Linear(fer_feature_size, 128)
        self.fc_combined = nn.Linear(256 + 128, 128)  # Fully connected layer for combined features
        self.attention_combined = nn.Linear(128, 1)  # Attention mechanism after combining FER features
        self.fc_final = nn.Linear(128, num_classes)

    def forward(self, x, fer_features, fer_flags):
        # Extract features from base model
        for name, module in self.base_model.named_children():
            if name == 'AuxLogits' and not self.base_model.aux_logits:
                continue  # Skip the auxiliary logits if aux_logits is False
            x = module(x)
            if name == 'Mixed_7c':  # Extract feature map after Mixed_7c
                break

        x = x.view(x.size(0), -1, 2048)  # Reshape for BiLSTM (batch_size, sequence_length, input_size)
        lstm_out, _ = self.bilstm(x)  # Pass through BiLSTM

        # Apply attention on CNN+LSTM features
        attn_weights = torch.softmax(self.attention(lstm_out), dim=1)  # Compute attention weights
        attn_applied = torch.sum(attn_weights * lstm_out, dim=1)  # Apply attention weights

        # Process FER features with flagging
        fer_x = torch.zeros(attn_applied.size(0), 128, device=x.device)  # Initialize with zeros
        valid_fer_indices = (fer_flags == 1).nonzero(as_tuple=True)[0]  # Get indices where FER features are valid
        if valid_fer_indices.numel() > 0:
            valid_fer_x = torch.relu(self.fc_fer(fer_features[valid_fer_indices]))
            fer_x[valid_fer_indices] = valid_fer_x

        # Combine DD and FER features
        combined_x = torch.cat((attn_applied, fer_x), dim=1)
        combined_x = torch.relu(self.fc_combined(combined_x))

        # Apply attention after combining features
        attn_combined_weights = torch.softmax(self.attention_combined(combined_x), dim=1)
        attn_combined_applied = attn_combined_weights * combined_x

        output = self.fc_final(attn_combined_applied)
        return output

# Instantiate and move the model to the GPU
model_attention = InceptionV3BiLSTM_FER_Attention(num_classes=len(activity_map_AUC), fer_feature_size=train_fer_features.shape[1])
model_attention = model_attention.to(device)

# Define loss function and optimizer
criterion_attention = nn.CrossEntropyLoss()
optimizer_attention = optim.Adam(model_attention.parameters(), lr=0.0001)


In [None]:
# --- Training and Validation Loop for DD + FER Attention Model ---
num_epochs = 60
best_val_loss = float('inf')
early_stopping_patience = 10
early_stopping_counter = 0
best_model_wts_attention = copy.deepcopy(model_attention.state_dict())

for epoch in range(num_epochs):
    model_attention.train()
    running_loss, correct, total = 0.0, 0, 0
    train_progress = tqdm(enumerate(train_loader), total=len(train_loader), desc=f"Epoch {epoch+1}/{num_epochs}")

    for batch_idx, (data, target) in train_progress:
        data, target = data.to(device), target.to(device)
        optimizer_attention.zero_grad()

        # Extract FER features for the current batch
        batch_indices = list(range(batch_idx * train_loader.batch_size, min((batch_idx + 1) * train_loader.batch_size, len(train_fer_features))))
        fer_batch_features = train_fer_features[batch_indices]
        fer_flags = train_face_detected[batch_indices]

        # Handle missing FER features by replacing with zero vectors
        zero_vector = np.zeros(fer_batch_features.shape[1])
        fer_batch_features = np.array([fer_batch_features[i] if train_face_detected[batch_indices[i]] == 1 else zero_vector for i in range(len(batch_indices))])

        # Convert FER features and flags to tensors and send to device
        fer_batch_features = torch.tensor(fer_batch_features, dtype=torch.float32).to(device)
        fer_flags = torch.tensor(fer_flags, dtype=torch.float32).to(device)

        # Debug: Print shapes
        print(f"Batch {batch_idx+1}: Data shape {data.shape}, FER features shape {fer_batch_features.shape}")

        # Check for mismatched batch sizes and skip if necessary
        if len(fer_batch_features) != data.size(0):
            print(f"Skipping batch {batch_idx+1} due to mismatched FER features.")
            continue

        # Forward pass
        outputs = model_attention(data, fer_batch_features, fer_flags)
        loss = criterion_attention(outputs, target)
        loss.backward()
        optimizer_attention.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += target.size(0)
        correct += (predicted == target).sum().item()
        train_progress.set_postfix(loss=running_loss/(batch_idx+1), accuracy=correct/total)

    # Validation phase
    model_attention.eval()
    val_loss, val_correct, val_total = 0.0, 0, 0
    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(val_loader):
            data, target = data.to(device), target.to(device)

            # Extract FER features for the current batch
            batch_indices = list(range(batch_idx * val_loader.batch_size, min((batch_idx + 1) * val_loader.batch_size, len(val_fer_features))))
            fer_batch_features = val_fer_features[batch_indices]
            fer_flags = val_face_detected[batch_indices]

            # Handle missing FER features by replacing with zero vectors
            fer_batch_features = np.array([fer_batch_features[i] if val_face_detected[batch_indices[i]] == 1 else zero_vector for i in range(len(batch_indices))])

            # Convert FER features and flags to tensors and send to device
            fer_batch_features = torch.tensor(fer_batch_features, dtype=torch.float32).to(device)
            fer_flags = torch.tensor(fer_flags, dtype=torch.float32).to(device)

            # Ensure batch sizes match
            if len(fer_batch_features) != data.size(0):
                print(f"Skipping validation batch {batch_idx+1} due to mismatched FER features.")
                continue

            outputs = model_attention(data, fer_batch_features, fer_flags)
            loss = criterion_attention(outputs, target)
            val_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            val_total += target.size(0)
            val_correct += (predicted == target).sum().item()

    epoch_loss = running_loss / len(train_loader)
    epoch_acc = correct / total
    epoch_val_loss = val_loss / len(val_loader)
    epoch_val_acc = val_correct / val_total

    print(f"Epoch [{epoch+1}/{num_epochs}] Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f} Val Loss: {epoch_val_loss:.4f} Val Acc: {epoch_val_acc:.4f}")

    # Early stopping
    if epoch_val_loss < best_val_loss:
        best_val_loss = epoch_val_loss
        best_model_wts_attention = copy.deepcopy(model_attention.state_dict())
        early_stopping_counter = 0
    else:
        early_stopping_counter += 1
        if early_stopping_counter >= early_stopping_patience:
            print("Early stopping triggered.")
            break

# Restore best model weights
model_attention.load_state_dict(best_model_wts_attention)

In [None]:
# --- Evaluation and Reporting for Attention Mechanism ---
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report, confusion_matrix, ConfusionMatrixDisplay
from tabulate import tabulate
import pandas as pd
from tqdm import tqdm

# Set the model to evaluation mode
model_attention.eval()

# Initialize lists to store true labels and predictions
y_true = []
y_pred = []

# Disable gradient computation for inference
with torch.no_grad():
    for batch_idx, (data, target) in tqdm(enumerate(test_loader), desc="Evaluating"):
        data, target = data.to(device), target.to(device)

        # Get FER features and flags for the batch
        fer_batch_features = torch.tensor(test_fer_features[batch_idx * test_loader.batch_size:(batch_idx + 1) * test_loader.batch_size]).to(device)
        fer_flags = torch.tensor(test_face_detected[batch_idx * test_loader.batch_size:(batch_idx + 1) * test_loader.batch_size]).to(device)

        # Ensure batch sizes match
        if len(fer_batch_features) != data.size(0):
            print(f"Skipping test batch {batch_idx+1} due to mismatched FER features.")
            continue

        # Forward pass
        outputs = model_attention(data, fer_batch_features, fer_flags)
        _, predicted = torch.max(outputs.data, 1)
        y_true.extend(target.cpu().numpy())
        y_pred.extend(predicted.cpu().numpy())

# Convert lists to numpy arrays for metric calculation
y_true = np.array(y_true)
y_pred = np.array(y_pred)

# Calculate evaluation metrics
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='macro')
recall = recall_score(y_true, y_pred, average='macro')
f1 = f1_score(y_true, y_pred, average='macro')

# Print metrics
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

# Generate classification report
class_names = list(activity_map_AUC.values())
report = classification_report(y_true, y_pred, target_names=class_names, output_dict=True)
report_df = pd.DataFrame(report).transpose()
report_df_filtered = report_df.drop('accuracy', errors='ignore')
print(tabulate(report_df_filtered, headers='keys', tablefmt='fancy_grid'))

# Confusion matrix
conf_matrix = confusion_matrix(y_true, y_pred)

# Plot confusion matrix
display_c_m = ConfusionMatrixDisplay(confusion_matrix=conf_matrix, display_labels=class_names)
plt.figure(figsize=(10, 9))
display_c_m.plot(cmap='OrRd', xticks_rotation=25)
plt.xticks(fontsize=10)
plt.yticks(fontsize=10)
plt.title('Confusion Matrix - DD + FER Attention', fontsize=24)
plt.show()


### **DD+FER.Self-attention Fusion**



In [None]:
class InceptionV3BiLSTM_FER_SelfAttention(nn.Module):
    def __init__(self, num_classes=10, fer_feature_size=512):
        super(InceptionV3BiLSTM_FER_SelfAttention, self).__init__()
        self.base_model = inception_v3(weights='IMAGENET1K_V1', aux_logits=True)
        self.base_model.aux_logits = False
        self.base_model.fc = nn.Identity()  # Remove the final fully connected layer
        self.bilstm = nn.LSTM(input_size=10240, hidden_size=128, num_layers=1, batch_first=True, bidirectional=True)
        self.self_attention = nn.MultiheadAttention(embed_dim=256, num_heads=8)  # Self-Attention mechanism
        self.fc_fer = nn.Linear(fer_feature_size, 128)
        self.fc_combined = nn.Linear(256 + 128, num_classes)  # Fully connected layer for classification

    def forward(self, x, fer_features):
        # Extract features from base model
        for name, module in self.base_model.named_children():
            if name == 'AuxLogits' and not self.base_model.aux_logits:
                continue  # Skip the auxiliary logits if aux_logits is False
            x = module(x)
            if name == 'Mixed_7c':  # Extract feature map after Mixed_7c
                break

        x = x.view(x.size(0), -1, 10240)  # Reshape for BiLSTM (batch_size, sequence_length, input_size)
        lstm_out, _ = self.bilstm(x)  # Pass through BiLSTM

        # Apply self-attention
        lstm_out = lstm_out.transpose(0, 1)  # Transpose for self-attention (sequence_length, batch_size, feature_dim)
        self_attn_out, _ = self.self_attention(lstm_out, lstm_out, lstm_out)  # Self-attention

        # Average pooling after self-attention
        self_attn_out = self_attn_out.mean(dim=0)  # (batch_size, feature_dim)

        # Process FER features
        fer_x = torch.relu(self.fc_fer(fer_features))

        # Combine DD and FER features
        combined_x = torch.cat((self_attn_out, fer_x), dim=1)
        combined_x = self.fc_combined(combined_x)
        return combined_x


In [None]:
def forward(self, x, fer_features):
    # Extract features from base model
    for name, module in self.base_model.named_children():
        if name == 'AuxLogits' and not self.base_model.aux_logits:
            continue  # Skip the auxiliary logits if aux_logits is False
        x = module(x)
        if name == 'Mixed_7c':  # Extract feature map after Mixed_7c
            break

    x = x.view(x.size(0), -1, 2048)  # Reshape for BiLSTM (batch_size, sequence_length, input_size)
    lstm_out, _ = self.bilstm(x)  # Pass through BiLSTM

    # Apply self-attention
    self_attn_out, _ = self.self_attention(lstm_out, lstm_out, lstm_out)

    # Handle missing FER features
    if fer_features.size(0) != x.size(0):
        # Create a zero tensor with the same size as fer_features except for the batch dimension
        missing_fer_features = torch.zeros(x.size(0), fer_features.size(1)).to(x.device)
        # Update the missing FER features for the current batch
        missing_fer_features[:fer_features.size(0)] = fer_features
        fer_features = missing_fer_features

    fer_x = torch.relu(self.fc_fer(fer_features))

    # Combine DD and FER features
    combined_x = torch.cat((self_attn_out, fer_x), dim=1)
    combined_x = self.fc_combined(combined_x)
    return combined_x


In [None]:
# Instantiate and move the model to the GPU
model_self_attention = InceptionV3BiLSTM_FER_SelfAttention(num_classes=len(activity_map_AUC), fer_feature_size=train_fer_features.shape[1])
model_self_attention = model_self_attention.to(device)

# Define loss function and optimizer
criterion_self_attention = nn.CrossEntropyLoss()
optimizer_self_attention = optim.Adam(model_self_attention.parameters(), lr=0.0001)


In [None]:
# --- Training and Validation Loop for DD + FER Self-Attention Model ---
num_epochs = 60
best_val_loss = float('inf')
early_stopping_patience = 10
early_stopping_counter = 0
best_model_wts_self_attention = copy.deepcopy(model_self_attention.state_dict())

for epoch in range(num_epochs):
    model_self_attention.train()
    running_loss, correct, total = 0.0, 0, 0
    train_progress = tqdm(enumerate(train_loader), total=len(train_loader), desc=f"Epoch {epoch+1}/{num_epochs}")

    for batch_idx, (data, target) in train_progress:
        data, target = data.to(device), target.to(device)
        optimizer_self_attention.zero_grad()

        # Extract FER features for the current batch
        batch_indices = list(range(batch_idx * train_loader.batch_size, min((batch_idx + 1) * train_loader.batch_size, len(train_fer_features))))
        fer_batch_features = train_fer_features[batch_indices]

        # Handle missing FER features by replacing with zero vectors
        zero_vector = np.zeros(fer_batch_features.shape[1])
        fer_batch_features = np.array([fer_batch_features[i] if train_face_detected[batch_indices[i]] == 1 else zero_vector for i in range(len(batch_indices))])

        # Convert FER features to tensor and send to device
        fer_batch_features = torch.tensor(fer_batch_features, dtype=torch.float32).to(device)

        # Debug: Print shapes
        print(f"Batch {batch_idx+1}: Data shape {data.shape}, FER features shape {fer_batch_features.shape}")

        # Check for mismatched batch sizes and skip if necessary
        if len(fer_batch_features) != data.size(0):
            print(f"Skipping batch {batch_idx+1} due to mismatched FER features.")
            continue

        # Forward pass
        outputs = model_self_attention(data, fer_batch_features)
        loss = criterion_self_attention(outputs, target)
        loss.backward()
        optimizer_self_attention.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += target.size(0)
        correct += (predicted == target).sum().item()
        train_progress.set_postfix(loss=running_loss/(batch_idx+1), accuracy=correct/total)

    # Validation phase
    model_self_attention.eval()
    val_loss, val_correct, val_total = 0.0, 0, 0
    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(val_loader):
            data, target = data.to(device), target.to(device)

            # Extract FER features for the current batch
            batch_indices = list(range(batch_idx * val_loader.batch_size, min((batch_idx + 1) * val_loader.batch_size, len(val_fer_features))))
            fer_batch_features = val_fer_features[batch_indices]

            # Handle missing FER features by replacing with zero vectors
            fer_batch_features = np.array([fer_batch_features[i] if val_face_detected[batch_indices[i]] == 1 else zero_vector for i in range(len(batch_indices))])

            # Convert FER features to tensor and send to device
            fer_batch_features = torch.tensor(fer_batch_features, dtype=torch.float32).to(device)

            # Ensure batch sizes match
            if len(fer_batch_features) != data.size(0):
                print(f"Skipping validation batch {batch_idx+1} due to mismatched FER features.")
                continue

            outputs = model_self_attention(data, fer_batch_features)
            loss = criterion_self_attention(outputs, target)
            val_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            val_total += target.size(0)
            val_correct += (predicted == target).sum().item()

    epoch_loss = running_loss / len(train_loader)
    epoch_acc = correct / total
    epoch_val_loss = val_loss / len(val_loader)
    epoch_val_acc = val_correct / val_total

    print(f"Epoch [{epoch+1}/{num_epochs}] Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f} Val Loss: {epoch_val_loss:.4f} Val Acc: {epoch_val_acc:.4f}")

    # Early stopping
    if epoch_val_loss < best_val_loss:
        best_val_loss = epoch_val_loss
        best_model_wts_self_attention = copy.deepcopy(model_self_attention.state_dict())
        early_stopping_counter = 0
    else:
        early_stopping_counter += 1
        if early_stopping_counter >= early_stopping_patience:
            print("Early stopping triggered.")
            break

# Restore best model weights
model_self_attention.load_state_dict(best_model_wts_self_attention)


In [None]:
# --- Evaluation and Reporting for Self-Attention Mechanism ---
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report, confusion_matrix, ConfusionMatrixDisplay
from tabulate import tabulate
import pandas as pd

# Set the model to evaluation mode
model_self_attention.eval()

# Initialize lists to store true labels and predictions
y_true = []
y_pred = []

# Disable gradient computation for inference
with torch.no_grad():
    for data, target in tqdm(test_loader, desc="Evaluating"):
        data, target = data.to(device), target.to(device)
        # Get FER features for the batch
        fer_batch_features = torch.tensor(test_fer_features[:len(data)]).to(device)
        outputs = model_self_attention(data, fer_batch_features)
        _, predicted = torch.max(outputs.data, 1)
        y_true.extend(target.cpu().numpy())
        y_pred.extend(predicted.cpu().numpy())

# Convert lists to numpy arrays for metric calculation
y_true = np.array(y_true)
y_pred = np.array(y_pred)

# Calculate evaluation metrics
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='macro')
recall = recall_score(y_true, y_pred, average='macro')
f1 = f1_score(y_true, y_pred, average='macro')

# Print metrics
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

# Generate classification report
class_names = list(activity_map_AUC.values())
report = classification_report(y_true, y_pred, target_names=class_names, output_dict=True)
report_df = pd.DataFrame(report).transpose()
report_df_filtered = report_df.drop('accuracy', errors='ignore')
print(tabulate(report_df_filtered, headers='keys', tablefmt='fancy_grid'))

# Confusion matrix
conf_matrix = confusion_matrix(y_true, y_pred)

# Plot confusion matrix
display_c_m = ConfusionMatrixDisplay(confusion_matrix=conf_matrix, display_labels=class_names)
plt.figure(figsize=(10, 9))
display_c_m.plot(cmap='OrRd', xticks_rotation=25)
plt.xticks(fontsize=10)
plt.yticks(fontsize=10)
plt.title('Confusion Matrix - Self-Attention Model', fontsize=24)
plt.show()


### **DD+FER.SA2**

In [None]:
import torch
import torch.nn as nn
from torchvision.models import inception_v3

class InceptionV3SelfAttentionFusion(nn.Module):
    def __init__(self, num_classes=10, fer_feature_size=512):
        super(InceptionV3SelfAttentionFusion, self).__init__()
        self.base_model = inception_v3(weights='IMAGENET1K_V1', aux_logits=True)
        self.base_model.aux_logits = False
        self.base_model.fc = nn.Identity()  # Remove the final fully connected layer

        # Linear layer to project FER features into the same dimension as the DD features
        self.fc_fer = nn.Linear(fer_feature_size, 10240)

        # Self-attention layer
        self.self_attention = nn.MultiheadAttention(embed_dim=10240, num_heads=8)

        # Final classification layer
        self.fc_combined = nn.Linear(10240, num_classes)

    def forward(self, x, fer_features):
        # Extract DD features from the base model
        for name, module in self.base_model.named_children():
            if name == 'AuxLogits' and not self.base_model.aux_logits:
                continue  # Skip the auxiliary logits if aux_logits is False
            x = module(x)
            if name == 'Mixed_7c':  # Extract feature map after Mixed_7c
                break

        # Flatten the spatial dimensions of DD features
        x = x.view(x.size(0), -1, 10240)  # Reshape for self-attention (batch_size, sequence_length, feature_dim)

        # Project FER features into the same dimension as DD features and add them to the sequence
        fer_features = self.fc_fer(fer_features).unsqueeze(1)  # (batch_size, 1, 10240)
        combined_features = torch.cat((x, fer_features), dim=1)  # (batch_size, sequence_length + 1, 10240)

        # Apply self-attention to the combined sequence
        combined_features = combined_features.transpose(0, 1)  # Transpose for self-attention (sequence_length, batch_size, feature_dim)
        attn_output, _ = self.self_attention(combined_features, combined_features, combined_features)
        attn_output = attn_output.transpose(0, 1)  # (batch_size, sequence_length + 1, 10240)

        # Pooling to aggregate the sequence into a single vector
        pooled_output = attn_output.mean(dim=1)  # Mean pooling over the sequence

        # Final classification
        output = self.fc_combined(pooled_output)

        return output


In [None]:
import torch.optim as optim
import torch.nn as nn

# Define the loss function (criterion)
criterion = nn.CrossEntropyLoss()

# Define the optimizer (you can choose other optimizers like AdamW or SGD if preferred)
optimizer = optim.Adam(model_dd.parameters(), lr=0.0001)


In [None]:
from tqdm import tqdm
import copy

# --- Training and Validation Loop ---
num_epochs = 60
best_val_loss = float('inf')
early_stopping_patience = 10
early_stopping_counter = 0
best_model_wts_dd = copy.deepcopy(model_dd.state_dict())

for epoch in range(num_epochs):
    model_dd.train()
    running_loss, correct, total = 0.0, 0, 0
    train_progress = tqdm(enumerate(train_loader), total=len(train_loader), desc=f"Epoch {epoch+1}/{num_epochs}")

    for batch_idx, (data, target) in train_progress:
        data, target = data.to(device), target.to(device)
        optimizer_dd.zero_grad()
        outputs = model_dd(data, fer_features)  # Assuming FER features are provided in your DataLoader
        loss = criterion(outputs, target)
        loss.backward()
        optimizer_dd.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += target.size(0)
        correct += (predicted == target).sum().item()
        train_progress.set_postfix(loss=running_loss/(batch_idx+1), accuracy=correct/total)

    # Validation phase
    model_dd.eval()
    val_loss, val_correct, val_total = 0.0, 0, 0
    with torch.no_grad():
        for data, target in val_loader:
            data, target = data.to(device), target.to(device)
            outputs = model_dd(data, fer_features)  # Assuming FER features are provided in your DataLoader
            loss = criterion(outputs, target)
            val_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            val_total += target.size(0)
            val_correct += (predicted == target).sum().item()

    epoch_loss = running_loss / len(train_loader)
    epoch_acc = correct / total
    epoch_val_loss = val_loss / len(val_loader)
    epoch_val_acc = val_correct / val_total

    print(f"Epoch [{epoch+1}/{num_epochs}] "
          f"Loss: {epoch_loss:.4f} "
          f"Acc: {epoch_acc:.4f} "
          f"Val Loss: {epoch_val_loss:.4f} "
          f"Val Acc: {epoch_val_acc:.4f}")

    # Early stopping with restore best weights check
    if epoch_val_loss < best_val_loss:
        best_val_loss = epoch_val_loss
        best_model_wts_dd = copy.deepcopy(model_dd.state_dict())
        early_stopping_counter = 0
    else:
        early_stopping_counter += 1

    if early_stopping_counter >= early_stopping_patience:
        print(f"Early stopping at epoch {epoch+1}")
        break

# Load the best model weights
model_dd.load_state_dict(best_model_wts_dd)


### **DD+FER.A+SA**

In [None]:
class InceptionV3BiLSTM_FER_BothAttention(nn.Module):
    def __init__(self, num_classes=10, fer_feature_size=512):
        super(InceptionV3BiLSTM_FER_BothAttention, self).__init__()
        self.base_model = inception_v3(weights='IMAGENET1K_V1', aux_logits=True)
        self.base_model.aux_logits = False
        self.base_model.fc = nn.Identity()  # Remove the final fully connected layer
        self.bilstm = nn.LSTM(input_size=10240, hidden_size=128, num_layers=1, batch_first=True, bidirectional=True)
        self.attention = nn.Linear(256, 1)  # Attention mechanism on CNN+LSTM features
        self.fc_fer = nn.Linear(fer_feature_size, 128)
        self.fc_combined = nn.Linear(256 + 128, 256)  # Fully connected layer for combined features
        self.self_attention = nn.MultiheadAttention(embed_dim=256, num_heads=8)  # Self-Attention mechanism after combining features
        self.fc_final = nn.Linear(256, num_classes)  # Final classification layer

    def forward(self, x, fer_features, fer_flags):
        # Extract features from base model
        for name, module in self.base_model.named_children():
            if name == 'AuxLogits' and not self.base_model.aux_logits:
                continue  # Skip the auxiliary logits if aux_logits is False
            x = module(x)
            if name == 'Mixed_7c':  # Extract feature map after Mixed_7c
                break

        x = x.view(x.size(0), -1, 10240)  # Reshape for BiLSTM (batch_size, sequence_length, input_size)
        lstm_out, _ = self.bilstm(x)  # Pass through BiLSTM

        # Apply attention on CNN+LSTM features
        attn_weights = torch.softmax(self.attention(lstm_out), dim=1)  # Compute attention weights
        attn_applied = torch.sum(attn_weights * lstm_out, dim=1)  # Apply attention weights

        # Process FER features with flagging
        fer_x = torch.zeros(attn_applied.size(0), 128, device=x.device)  # Initialize with zeros
        valid_fer_indices = (fer_flags == 1).nonzero(as_tuple=True)[0]  # Get indices where FER features are valid
        if valid_fer_indices.numel() > 0:
            valid_fer_x = torch.relu(self.fc_fer(fer_features[valid_fer_indices]))
            fer_x[valid_fer_indices] = valid_fer_x

        # Combine DD and FER features
        combined_x = torch.cat((attn_applied, fer_x), dim=1)
        combined_x = torch.relu(self.fc_combined(combined_x))

        # Apply self-attention after combining features
        combined_x = combined_x.unsqueeze(0)  # Add sequence dimension for self-attention
        self_attn_out, _ = self.self_attention(combined_x, combined_x, combined_x)
        self_attn_out = self_attn_out.squeeze(0)  # Remove sequence dimension

        output = self.fc_final(self_attn_out)
        return output


In [None]:
# Instantiate and move the model to the GPU
model_both_attention = InceptionV3BiLSTM_FER_BothAttention(num_classes=len(activity_map_AUC), fer_feature_size=train_fer_features.shape[1])
model_both_attention = model_both_attention.to(device)

# Define loss function and optimizer
criterion_both_attention = nn.CrossEntropyLoss()
optimizer_both_attention = optim.Adam(model_both_attention.parameters(), lr=0.0001)


In [None]:
# --- Training and Validation Loop for DD + FER Both Attention and Self-Attention Model ---
num_epochs = 60
best_val_loss = float('inf')
early_stopping_patience = 10
early_stopping_counter = 0
best_model_wts_both_attention = copy.deepcopy(model_both_attention.state_dict())

for epoch in range(num_epochs):
    model_both_attention.train()
    running_loss, correct, total = 0.0, 0, 0
    train_progress = tqdm(enumerate(train_loader), total=len(train_loader), desc=f"Epoch {epoch+1}/{num_epochs}")

    for batch_idx, (data, target) in train_progress:
        data, target = data.to(device), target.to(device)
        optimizer_both_attention.zero_grad()

        # Extract FER features for the current batch
        batch_indices = list(range(batch_idx * train_loader.batch_size, min((batch_idx + 1) * train_loader.batch_size, len(train_fer_features))))
        fer_batch_features = train_fer_features[batch_indices]
        fer_flags = train_face_detected[batch_indices]

        # Handle missing FER features by replacing with zero vectors
        zero_vector = np.zeros(fer_batch_features.shape[1])
        fer_batch_features = np.array([fer_batch_features[i] if train_face_detected[batch_indices[i]] == 1 else zero_vector for i in range(len(batch_indices))])

        # Convert FER features and flags to tensors and send to device
        fer_batch_features = torch.tensor(fer_batch_features, dtype=torch.float32).to(device)
        fer_flags = torch.tensor(fer_flags, dtype=torch.float32).to(device)

        # Debug: Print shapes
        print(f"Batch {batch_idx+1}: Data shape {data.shape}, FER features shape {fer_batch_features.shape}")

        # Check for mismatched batch sizes and skip if necessary
        if len(fer_batch_features) != data.size(0):
            print(f"Skipping batch {batch_idx+1} due to mismatched FER features.")
            continue

        # Forward pass
        outputs = model_both_attention(data, fer_batch_features, fer_flags)
        loss = criterion_both_attention(outputs, target)
        loss.backward()
        optimizer_both_attention.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += target.size(0)
        correct += (predicted == target).sum().item()
        train_progress.set_postfix(loss=running_loss/(batch_idx+1), accuracy=correct/total)

    # Validation phase
    model_both_attention.eval()
    val_loss, val_correct, val_total = 0.0, 0, 0
    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(val_loader):
            data, target = data.to(device), target.to(device)

            # Extract FER features for the current batch
            batch_indices = list(range(batch_idx * val_loader.batch_size, min((batch_idx + 1) * val_loader.batch_size, len(val_fer_features))))
            fer_batch_features = val_fer_features[batch_indices]
            fer_flags = val_face_detected[batch_indices]

            # Handle missing FER features by replacing with zero vectors
            fer_batch_features = np.array([fer_batch_features[i] if val_face_detected[batch_indices[i]] == 1 else zero_vector for i in range(len(batch_indices))])

            # Convert FER features and flags to tensors and send to device
            fer_batch_features = torch.tensor(fer_batch_features, dtype=torch.float32).to(device)
            fer_flags = torch.tensor(fer_flags, dtype=torch.float32).to(device)

            # Ensure batch sizes match
            if len(fer_batch_features) != data.size(0):
                print(f"Skipping validation batch {batch_idx+1} due to mismatched FER features.")
                continue

            outputs = model_both_attention(data, fer_batch_features, fer_flags)
            loss = criterion_both_attention(outputs, target)
            val_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            val_total += target.size(0)
            val_correct += (predicted == target).sum().item()

    epoch_loss = running_loss / len(train_loader)
    epoch_acc = correct / total
    epoch_val_loss = val_loss / len(val_loader)
    epoch_val_acc = val_correct / val_total

    print(f"Epoch [{epoch+1}/{num_epochs}] Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f} Val Loss: {epoch_val_loss:.4f} Val Acc: {epoch_val_acc:.4f}")

    # Early stopping
    if epoch_val_loss < best_val_loss:
        best_val_loss = epoch_val_loss
        best_model_wts_both_attention = copy.deepcopy(model_both_attention.state_dict())
        early_stopping_counter = 0
    else:
        early_stopping_counter += 1
        if early_stopping_counter >= early_stopping_patience:
            print("Early stopping triggered.")
            break

# Restore best model weights
model_both_attention.load_state_dict(best_model_wts_both_attention)


In [None]:
# --- Evaluation and Reporting for Both Attention and Self-Attention Mechanism ---
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report, confusion_matrix, ConfusionMatrixDisplay
from tabulate import tabulate
import pandas as pd
from tqdm import tqdm

# Set the model to evaluation mode
model_both_attention.eval()

# Initialize lists to store true labels and predictions
y_true = []
y_pred = []

# Disable gradient computation for inference
with torch.no_grad():
    for batch_idx, (data, target) in tqdm(enumerate(test_loader), desc="Evaluating"):
        data, target = data.to(device), target.to(device)

        # Get FER features and flags for the batch
        fer_batch_features = torch.tensor(test_fer_features[batch_idx * test_loader.batch_size:(batch_idx + 1) * test_loader.batch_size]).to(device)
        fer_flags = torch.tensor(test_face_detected[batch_idx * test_loader.batch_size:(batch_idx + 1) * test_loader.batch_size]).to(device)

        # Ensure batch sizes match
        if len(fer_batch_features) != data.size(0):
            print(f"Skipping test batch {batch_idx+1} due to mismatched FER features.")
            continue

        # Forward pass
        outputs = model_both_attention(data, fer_batch_features, fer_flags)
        _, predicted = torch.max(outputs.data, 1)
        y_true.extend(target.cpu().numpy())
        y_pred.extend(predicted.cpu().numpy())

# Convert lists to numpy arrays for metric calculation
y_true = np.array(y_true)
y_pred = np.array(y_pred)

# Calculate evaluation metrics
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='macro')
recall = recall_score(y_true, y_pred, average='macro')
f1 = f1_score(y_true, y_pred, average='macro')

# Print metrics
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

# Generate classification report
class_names = list(activity_map_AUC.values())
report = classification_report(y_true, y_pred, target_names=class_names, output_dict=True)
report_df = pd.DataFrame(report).transpose()
report_df_filtered = report_df.drop('accuracy', errors='ignore')
print(tabulate(report_df_filtered, headers='keys', tablefmt='fancy_grid'))

# Confusion matrix
conf_matrix = confusion_matrix(y_true, y_pred)

# Plot confusion matrix
display_c_m = ConfusionMatrixDisplay(confusion_matrix=conf_matrix, display_labels=class_names)
plt.figure(figsize=(10, 9))
display_c_m.plot(cmap='OrRd', xticks_rotation=25)
plt.xticks(fontsize=10)
plt.yticks(fontsize=10)
plt.title('Confusion Matrix - DD + FER Both Attention and Self-Attention', fontsize=24)
plt.show()


### **DD+POSE**

In [None]:
import torch
import torch.nn as nn
import torchvision.models as models

class InceptionV3BiLSTMWithPose(nn.Module):
    def __init__(self, num_classes=10):
        super(InceptionV3BiLSTMWithPose, self).__init__()
        # Access Inception_V3_Weights using torchvision.models
        self.base_model = models.inception_v3(weights=models.Inception_V3_Weights.IMAGENET1K_V1, aux_logits=True)
        # Set aux_logits to False to make all layers trainable
        self.base_model.aux_logits = False
        # Remove the final fully connected layer
        self.base_model.fc = nn.Identity()

        # Define the BiLSTM with input size matching InceptionV3 output size
        self.bilstm = nn.LSTM(input_size=2048, hidden_size=128, num_layers=1, batch_first=True, bidirectional=True)

        # Linear layer to process pose landmarks
        self.pose_fc = nn.Linear(23 * 3, 128)  # 23 landmarks * 3 features each (x, y, z)

        # Combine features from BiLSTM and pose
        self.fc_combined = nn.Linear(256 + 128, num_classes)  # 256 from BiLSTM + 128 from pose landmarks

    def forward(self, x, pose_landmarks):
        # Extract features from base model
        for name, module in self.base_model.named_children():
            if name == 'AuxLogits' and not self.base_model.aux_logits:
                continue  # Skip the auxiliary logits if aux_logits is False
            x = module(x)
            if name == 'Mixed_7c':  # Extract feature map after Mixed_7c
                break

        # Flatten the feature map to (batch_size, num_features)
        batch_size = x.size(0)
        x = x.view(batch_size, -1, 2048)  # Reshape for LSTM (batch_size, seq_len=1, input_size=2048)

        # BiLSTM expects input shape (batch_size, seq_len, input_size)
        x, _ = self.bilstm(x)
        x = x[:, -1, :]  # Use the last hidden state

        pose_features = torch.relu(self.pose_fc(pose_landmarks))
        combined_features = torch.cat((x, pose_features), dim=1)

        output = self.fc_combined(combined_features)
        return output

# Instantiate and move the model to the device
model = InceptionV3BiLSTMWithPose(num_classes=10)
model.to(device)


criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)


In [None]:
# --- Training and Validation Loop ---
best_model_wts = copy.deepcopy(model.state_dict())
best_val_loss = float('inf')
early_stopping_counter = 0
early_stopping_patience = 5
num_epochs = 60

for epoch in range(num_epochs):
    model.train()
    running_loss, correct, total = 0.0, 0, 0
    train_progress = tqdm(enumerate(train_loader), total=len(train_loader), desc=f"Epoch {epoch+1}/{num_epochs}")

    for batch_idx, (data, target) in train_progress:
        data, target = data.to(device), target.to(device)
        pose_data = train_pose_features[batch_idx*BATCH_SIZE:(batch_idx+1)*BATCH_SIZE]
        pose_data = torch.tensor(pose_data, dtype=torch.float32).to(device)  # Ensure pose_data is float32

        optimizer.zero_grad()
        outputs = model(data, pose_data)
        loss = criterion(outputs, target)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += target.size(0)
        correct += (predicted == target).sum().item()
        train_progress.set_postfix(loss=running_loss/(batch_idx+1), accuracy=correct/total)

    # Validation phase
    model.eval()
    val_loss, val_correct, val_total = 0.0, 0, 0
    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(val_loader):
            data, target = data.to(device), target.to(device)
            pose_data = val_pose_features[batch_idx*BATCH_SIZE:(batch_idx+1)*BATCH_SIZE]
            pose_data = torch.tensor(pose_data, dtype=torch.float32).to(device)  # Ensure pose_data is float32

            outputs = model(data, pose_data)
            loss = criterion(outputs, target)
            val_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            val_total += target.size(0)
            val_correct += (predicted == target).sum().item()

    epoch_loss = running_loss / len(train_loader)
    epoch_acc = correct / total
    epoch_val_loss = val_loss / len(val_loader)
    epoch_val_acc = val_correct / val_total

    print(f"Epoch [{epoch+1}/{num_epochs}] Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f} Val Loss: {epoch_val_loss:.4f} Val Acc: {epoch_val_acc:.4f}")

    # Early stopping
    if epoch_val_loss < best_val_loss:
        best_val_loss = epoch_val_loss
        best_model_wts = copy.deepcopy(model.state_dict())
        early_stopping_counter = 0
    else:
        early_stopping_counter += 1
        if early_stopping_counter >= early_stopping_patience:
            print("Early stopping triggered.")
            break

# Restore best model weights
model.load_state_dict(best_model_wts)


In [None]:
import seaborn as sns
def predict(model, test_loader, test_pose_features):
    model.eval()
    all_predictions = []
    all_labels = []

    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(test_loader):
            data, target = data.to(device), target.to(device)
            # Extract pose landmarks for the current batch
            pose_landmarks = test_pose_features[batch_idx * BATCH_SIZE:(batch_idx + 1) * BATCH_SIZE]
            pose_landmarks = torch.tensor(pose_landmarks, dtype=torch.float32).to(device)

            outputs = model(data, pose_landmarks)
            _, predicted = torch.max(outputs.data, 1)
            all_predictions.extend(predicted.cpu().numpy())
            all_labels.extend(target.cpu().numpy())

    return all_predictions, all_labels

# Assuming 'test_pose_features' is available, similar to train and val pose features
predictions, labels = predict(model, test_loader, test_pose_features)

# Calculate accuracy
accuracy = accuracy_score(labels, predictions)
print(f"Accuracy: {accuracy:.4f}")


# Generate classification report
report = classification_report(labels, predictions)
print(report)

# Generate confusion matrix
cm = confusion_matrix(labels, predictions)

# Plot confusion matrix
plt.figure(figsize=(10, 7))
sns.heatmap(cm, annot=True, fmt="d")
plt.xlabel("Predicted")
plt.ylabel("True Label")
plt.show()

In [None]:

# Evaluation and Reporting
model.eval()
y_true = []
y_pred = []



with torch.no_grad():
    for batch_idx, (data, target) in enumerate(test_loader):
        data, target = data.to(device), target.to(device)
        pose_data = test_pose_features[batch_idx * BATCH_SIZE:(batch_idx + 1) * BATCH_SIZE]
        pose_data = torch.tensor(pose_data, dtype=torch.float32).to(device)

        # Print shapes for debugging
        print(f"Data shape: {data.shape}, Pose data shape: {pose_data.shape}")

        outputs = model_dd_pose(data, pose_data)
        _, predicted = torch.max(outputs.data, 1)
        y_true.extend(target.cpu().numpy())
        y_pred.extend(predicted.cpu().numpy())

        # Print output shape
        print(f"Outputs shape: {outputs.shape}")

y_true = np.array(y_true)
y_pred = np.array(y_pred)

# Calculate evaluation metrics
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='macro')
recall = recall_score(y_true, y_pred, average='macro')
f1 = f1_score(y_true, y_pred, average='macro')

print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

# Generate classification report
report = classification_report(y_true, y_pred, target_names=class_names, output_dict=True)
report_df = pd.DataFrame(report).transpose()
report_df_filtered = report_df.drop('accuracy', errors='ignore')
print(tabulate(report_df_filtered, headers='keys', tablefmt='fancy_grid'))

# Confusion matrix
conf_matrix = confusion_matrix(y_true, y_pred)
ConfusionMatrixDisplay(confusion_matrix=conf_matrix, display_labels=class_names).plot(cmap='OrRd', xticks_rotation=25)
plt.title('Confusion Matrix - DD + Pose', fontsize=24)
plt.show()


### **DD+FER+Pose**

In [None]:
import torch
import torch.nn as nn
import torchvision.models as models

class InceptionV3BiLSTM_FER_Pose(nn.Module):
    def __init__(self, num_classes=10, fer_feature_size=512):
        super(InceptionV3BiLSTM_FER_Pose, self).__init__()
        self.base_model = models.inception_v3(weights=models.Inception_V3_Weights.IMAGENET1K_V1, aux_logits=True)
        self.base_model.aux_logits = False
        self.base_model.fc = nn.Identity()

        # BiLSTM for temporal features
        self.bilstm = nn.LSTM(input_size=2048, hidden_size=128, num_layers=1, batch_first=True, bidirectional=True)

        # Fully connected layers for FER and Pose features
        self.fc_fer = nn.Linear(fer_feature_size, 128)
        self.fc_pose = nn.Linear(23 * 4, 128)  # 23 landmarks * 3 features each (x, y, z)

        # Final fully connected layer combining all features
        self.fc_combined = nn.Linear(256 + 128 + 128, num_classes)  # 256 from BiLSTM, 128 from FER, 128 from Pose

    def forward(self, x, fer_features, pose_landmarks):
        # Extract features from the base model (InceptionV3)
        for name, module in self.base_model.named_children():
            if name == 'AuxLogits' and not self.base_model.aux_logits:
                continue
            x = module(x)
            if name == 'Mixed_7c':  # Stop at Mixed_7c layer
                break

        # Flatten and pass through BiLSTM
        x = x.view(x.size(0), -1, 2048)
        x, _ = self.bilstm(x)
        x = x[:, -1, :]  # Last time step

        # Process FER features
        fer_x = torch.relu(self.fc_fer(fer_features))

        # Process Pose features
        pose_x = torch.relu(self.fc_pose(pose_landmarks))

        # Combine all features
        combined_x = torch.cat((x, fer_x, pose_x), dim=1)
        output = self.fc_combined(combined_x)
        return output

# Instantiate and move the model to the device
model = InceptionV3BiLSTM_FER_Pose(num_classes=10, fer_feature_size=train_fer_features.shape[1])
model.to(device)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)


In [None]:

# --- Training and Validation Loop ---
best_model_wts = copy.deepcopy(model.state_dict())
best_val_loss = float('inf')
early_stopping_counter = 0
early_stopping_patience = 10
num_epochs = 60

for epoch in range(num_epochs):
    model.train()
    running_loss, correct, total = 0.0, 0, 0
    train_progress = tqdm(enumerate(train_loader), total=len(train_loader), desc=f"Epoch {epoch+1}/{num_epochs}")

    for batch_idx, (data, target) in train_progress:
        data, target = data.to(device), target.to(device)

        # Handle FER features
        batch_indices = list(range(batch_idx * train_loader.batch_size, min((batch_idx + 1) * train_loader.batch_size, len(train_fer_features))))
        fer_batch_features = train_fer_features[batch_indices]
        zero_vector = np.zeros(fer_batch_features.shape[1])
        fer_batch_features = np.array([
            fer_batch_features[i] if train_face_detected[batch_indices[i]] == 1 else zero_vector
            for i in range(len(batch_indices))
        ])
        fer_batch_features = torch.tensor(fer_batch_features, dtype=torch.float32).to(device)

        # Handle Pose features
        pose_data = train_pose_features[batch_idx*BATCH_SIZE:(batch_idx+1)*BATCH_SIZE]
        pose_data = torch.tensor(pose_data, dtype=torch.float32).to(device)

        # Ensure FER features and data batch sizes match
        if len(fer_batch_features) != data.size(0):
            print(f"Skipping batch {batch_idx+1} due to mismatched FER features. Data shape: {data.shape}, FER shape: {fer_batch_features.shape}")
            continue

        optimizer.zero_grad()
        outputs = model(data, fer_batch_features, pose_data)
        loss = criterion(outputs, target)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += target.size(0)
        correct += (predicted == target).sum().item()
        train_progress.set_postfix(loss=running_loss/(batch_idx+1), accuracy=correct/total)

    # Validation phase
    model.eval()
    val_loss, val_correct, val_total = 0.0, 0, 0
    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(val_loader):
            data, target = data.to(device), target.to(device)

            # Handle FER features
            batch_indices = list(range(batch_idx * val_loader.batch_size, min((batch_idx + 1) * val_loader.batch_size, len(val_fer_features))))
            fer_batch_features = val_fer_features[batch_indices]
            fer_batch_features = np.array([
                fer_batch_features[i] if val_face_detected[batch_indices[i]] == 1 else zero_vector
                for i in range(len(batch_indices))
            ])
            fer_batch_features = torch.tensor(fer_batch_features, dtype=torch.float32).to(device)

            # Handle Pose features
            pose_data = val_pose_features[batch_idx*BATCH_SIZE:(batch_idx+1)*BATCH_SIZE]
            pose_data = torch.tensor(pose_data, dtype=torch.float32).to(device)

            if len(fer_batch_features) != data.size(0):
                print(f"Skipping validation batch {batch_idx+1} due to mismatched FER features. Data shape: {data.shape}, FER shape: {fer_batch_features.shape}")
                continue

            outputs = model(data, fer_batch_features, pose_data)
            loss = criterion(outputs, target)
            val_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            val_total += target.size(0)
            val_correct += (predicted == target).sum().item()

    epoch_loss = running_loss / len(train_loader)
    epoch_acc = correct / total
    epoch_val_loss = val_loss / len(val_loader)
    epoch_val_acc = val_correct / val_total

    print(f"Epoch [{epoch+1}/{num_epochs}] Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f} Val Loss: {epoch_val_loss:.4f} Val Acc: {epoch_val_acc:.4f}")

    # Early stopping
    if epoch_val_loss < best_val_loss:
        best_val_loss = epoch_val_loss
        best_model_wts = copy.deepcopy(model.state_dict())
        early_stopping_counter = 0
    else:
        early_stopping_counter += 1
        if early_stopping_counter >= early_stopping_patience:
            print("Early stopping triggered.")
            break

# Restore best model weights
model.load_state_dict(best_model_wts)



In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report, confusion_matrix, ConfusionMatrixDisplay
from tabulate import tabulate
import pandas as pd

# --- Evaluation for DD + FER + Pose ---
model.eval()
y_true, y_pred = [], []
with torch.no_grad():
    for batch_idx, (data, target) in enumerate(test_loader):
        data, target = data.to(device), target.to(device)
        fer_batch_features = torch.tensor(test_fer_features[:len(data)], dtype=torch.float32).to(device)
        pose_data = torch.tensor(test_pose_features[batch_idx*BATCH_SIZE:(batch_idx+1)*BATCH_SIZE], dtype=torch.float32).to(device)

        outputs = model(data, fer_batch_features, pose_data)
        _, predicted = torch.max(outputs.data, 1)
        y_true.extend(target.cpu().numpy())
        y_pred.extend(predicted.cpu().numpy())

y_true = np.array(y_true)
y_pred = np.array(y_pred)

# Calculate evaluation metrics
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='macro')
recall = recall_score(y_true, y_pred, average='macro')
f1 = f1_score(y_true, y_pred, average='macro')

print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

# Generate classification report
report = classification_report(y_true, y_pred, target_names=class_names, output_dict=True)
report_df = pd.DataFrame(report).transpose()
report_df_filtered = report_df.drop('accuracy', errors='ignore')
print(tabulate(report_df_filtered, headers='keys', tablefmt='fancy_grid'))

# Confusion matrix
conf_matrix = confusion_matrix(y_true, y_pred)

# Plot confusion matrix
display_c_m = ConfusionMatrixDisplay(confusion_matrix=conf_matrix, display_labels=class_names)
plt.figure(figsize=(10, 9))
display_c_m.plot(cmap='OrRd', xticks_rotation=25)
plt.xticks(fontsize=10)
plt.yticks(fontsize=10)
plt.title('Confusion Matrix - DD + FER + Pose', fontsize=24)
plt.show()


### **DD+FER+POSE(SA)**

In [None]:
import torch
import torch.nn as nn
import torchvision.models as models

class InceptionV3BiLSTM_FER_Pose_SelfAttention(nn.Module):
    def __init__(self, num_classes=10, fer_feature_size=512, num_heads=8, attn_drop=0.1):
        super(InceptionV3BiLSTM_FER_Pose_SelfAttention, self).__init__()
        self.base_model = models.inception_v3(weights=models.Inception_V3_Weights.IMAGENET1K_V1, aux_logits=True)
        self.base_model.aux_logits = False
        self.base_model.fc = nn.Identity()

        # BiLSTM for temporal features
        self.bilstm = nn.LSTM(input_size=2048, hidden_size=128, num_layers=1, batch_first=True, bidirectional=True)

        # Fully connected layers for FER and Pose features
        self.fc_fer = nn.Linear(fer_feature_size, 128)
        self.fc_pose = nn.Linear(23 * 4, 128)

        # Self-attention layer for fusion
        self.self_attention = nn.MultiheadAttention(embed_dim=256 + 128 + 128, num_heads=num_heads, dropout=attn_drop)

        # Final fully connected layer
        self.fc_combined = nn.Linear(256 + 128 + 128, num_classes)

    def forward(self, x, fer_features, pose_landmarks):
        for name, module in self.base_model.named_children():
            if name == 'AuxLogits' and not self.base_model.aux_logits:
                continue
            x = module(x)
            if name == 'Mixed_7c':
                break

        # BiLSTM for temporal features
        x = x.view(x.size(0), -1, 2048)
        x, _ = self.bilstm(x)
        x = x[:, -1, :]

        # Process FER and Pose features
        fer_x = torch.relu(self.fc_fer(fer_features))
        pose_x = torch.relu(self.fc_pose(pose_landmarks))

        # Concatenate all features and apply self-attention
        combined_x = torch.cat((x, fer_x, pose_x), dim=1).unsqueeze(1)
        combined_x = combined_x.permute(1, 0, 2)  # (seq_len, batch, embedding)
        attn_output, _ = self.self_attention(combined_x, combined_x, combined_x)
        attn_output = attn_output.squeeze(0)  # Remove sequence length dimension

        output = self.fc_combined(attn_output)
        return output

# Instantiate and move the model to the device
model = InceptionV3BiLSTM_FER_Pose_SelfAttention(num_classes=10, fer_feature_size=train_fer_features.shape[1])
model.to(device)



In [None]:
# --- Training and Validation Loop ---
import torch
import copy
import numpy as np
from tqdm import tqdm
import torch.optim as optim
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, classification_report, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
import pandas as pd
from tabulate import tabulate

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

# Early stopping parameters
best_model_wts = copy.deepcopy(model.state_dict())
best_val_loss = float('inf')
early_stopping_counter = 0
early_stopping_patience = 5
num_epochs = 60

for epoch in range(num_epochs):
    model.train()
    running_loss, correct, total = 0.0, 0, 0
    train_progress = tqdm(enumerate(train_loader), total=len(train_loader), desc=f"Epoch {epoch+1}/{num_epochs}")

    for batch_idx, (data, target) in train_progress:
        data, target = data.to(device), target.to(device)

        # Handle FER features
        batch_indices = list(range(batch_idx * train_loader.batch_size, min((batch_idx + 1) * train_loader.batch_size, len(train_fer_features))))
        fer_batch_features = train_fer_features[batch_indices]
        zero_vector = np.zeros(fer_batch_features.shape[1])
        fer_batch_features = np.array([
            fer_batch_features[i] if train_face_detected[batch_indices[i]] == 1 else zero_vector
            for i in range(len(batch_indices))
        ])
        fer_batch_features = torch.tensor(fer_batch_features, dtype=torch.float32).to(device)

        # Handle Pose features
        pose_data = train_pose_features[batch_idx*BATCH_SIZE:(batch_idx+1)*BATCH_SIZE]
        pose_data = torch.tensor(pose_data, dtype=torch.float32).to(device)

        # Ensure FER features and data batch sizes match
        if len(fer_batch_features) != data.size(0):
            print(f"Skipping batch {batch_idx+1} due to mismatched FER features. Data shape: {data.shape}, FER shape: {fer_batch_features.shape}")
            continue

        optimizer.zero_grad()
        outputs = model(data, fer_batch_features, pose_data)
        loss = criterion(outputs, target)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += target.size(0)
        correct += (predicted == target).sum().item()
        train_progress.set_postfix(loss=running_loss/(batch_idx+1), accuracy=correct/total)

    # Validation phase
    model.eval()
    val_loss, val_correct, val_total = 0.0, 0, 0
    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(val_loader):
            data, target = data.to(device), target.to(device)

            # Handle FER features
            batch_indices = list(range(batch_idx * val_loader.batch_size, min((batch_idx + 1) * val_loader.batch_size, len(val_fer_features))))
            fer_batch_features = val_fer_features[batch_indices]
            fer_batch_features = np.array([
                fer_batch_features[i] if val_face_detected[batch_indices[i]] == 1 else zero_vector
                for i in range(len(batch_indices))
            ])
            fer_batch_features = torch.tensor(fer_batch_features, dtype=torch.float32).to(device)

            # Handle Pose features
            pose_data = val_pose_features[batch_idx*BATCH_SIZE:(batch_idx+1)*BATCH_SIZE]
            pose_data = torch.tensor(pose_data, dtype=torch.float32).to(device)

            if len(fer_batch_features) != data.size(0):
                print(f"Skipping validation batch {batch_idx+1} due to mismatched FER features. Data shape: {data.shape}, FER shape: {fer_batch_features.shape}")
                continue

            outputs = model(data, fer_batch_features, pose_data)
            loss = criterion(outputs, target)
            val_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            val_total += target.size(0)
            val_correct += (predicted == target).sum().item()

    epoch_loss = running_loss / len(train_loader)
    epoch_acc = correct / total
    epoch_val_loss = val_loss / len(val_loader)
    epoch_val_acc = val_correct / val_total

    print(f"Epoch [{epoch+1}/{num_epochs}] Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f} Val Loss: {epoch_val_loss:.4f} Val Acc: {epoch_val_acc:.4f}")

    # Early stopping
    if epoch_val_loss < best_val_loss:
        best_val_loss = epoch_val_loss
        best_model_wts = copy.deepcopy(model.state_dict())
        early_stopping_counter = 0
    else:
        early_stopping_counter += 1
        if early_stopping_counter >= early_stopping_patience:
            print("Early stopping triggered.")
            break

# Restore best model weights
model.load_state_dict(best_model_wts)


In [None]:
# --- Evaluation and Reporting ---
# Set the model to evaluation mode
model.eval()

# Initialize lists to store true labels and predictions
y_true, y_pred = [], []

# Disable gradient computation for inference
with torch.no_grad():
    for batch_idx, (data, target) in tqdm(enumerate(test_loader), desc="Evaluating"):
        data, target = data.to(device), target.to(device)

        # Extract FER features for the batch
        batch_size = data.size(0)
        batch_indices = list(range(batch_idx * batch_size, min((batch_idx + 1) * batch_size, len(test_fer_features))))
        fer_batch_features = test_fer_features[batch_indices]
        zero_vector = np.zeros(fer_batch_features.shape[1])
        fer_batch_features = np.array([
            fer_batch_features[i] if test_face_detected[batch_indices[i]] == 1 else zero_vector
            for i in range(len(batch_indices))
        ])
        fer_batch_features = torch.tensor(fer_batch_features, dtype=torch.float32).to(device)

        # Handle Pose features
        pose_data = test_pose_features[batch_idx * batch_size:(batch_idx + 1) * batch_size]
        pose_data = torch.tensor(pose_data, dtype=torch.float32).to(device)

        # Ensure all feature sizes match the batch size
        if len(fer_batch_features) != batch_size or len(pose_data) != batch_size:
            print(f"Skipping batch {batch_idx+1} due to mismatched feature sizes.")
            continue

        # Forward pass
        outputs = model(data, fer_batch_features, pose_data)
        _, predicted = torch.max(outputs.data, 1)
        y_true.extend(target.cpu().numpy())
        y_pred.extend(predicted.cpu().numpy())

# Convert lists to numpy arrays for metric calculation
y_true = np.array(y_true)
y_pred = np.array(y_pred)

# Calculate evaluation metrics
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='macro')
recall = recall_score(y_true, y_pred, average='macro')
f1 = f1_score(y_true, y_pred, average='macro')

# Print metrics
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

# Generate classification report
class_names = list(activity_map_AUC.values())
report = classification_report(y_true, y_pred, target_names=class_names, output_dict=True)
report_df = pd.DataFrame(report).transpose()
report_df_filtered = report_df.drop('accuracy', errors='ignore')
print(tabulate(report_df_filtered, headers='keys', tablefmt='fancy_grid'))

# Confusion matrix
conf_matrix = confusion_matrix(y_true, y_pred)

# Plot confusion matrix
display_c_m = ConfusionMatrixDisplay(confusion_matrix=conf_matrix, display_labels=class_names)
plt.figure(figsize=(10, 9))
display_c_m.plot(cmap='OrRd', xticks_rotation=25)
plt.xticks(fontsize=10)
plt.yticks(fontsize=10)
plt.title('Confusion Matrix', fontsize=24)
plt.show()


### **DD+POSE.A**

In [None]:
import torch
import torch.nn as nn
import torchvision.models as models

class InceptionV3BiLSTMWithPoseAttention(nn.Module):
    def __init__(self, num_classes=10):
        super(InceptionV3BiLSTMWithPoseAttention, self).__init__()
        # Access Inception_V3_Weights using torchvision.models
        self.base_model = models.inception_v3(weights=models.Inception_V3_Weights.IMAGENET1K_V1, aux_logits=True)
        self.base_model.aux_logits = False
        self.base_model.fc = nn.Identity()  # Remove the final fully connected layer

        # Define the BiLSTM with input size matching InceptionV3 output size
        self.bilstm = nn.LSTM(input_size=2048, hidden_size=128, num_layers=1, batch_first=True, bidirectional=True)

        # Linear layer to process pose landmarks
        self.pose_fc = nn.Linear(23 * 3, 128)  # 23 landmarks * 3 features each (x, y, z)

        # Linear layers to transform both feature sets to a common space for attention
        self.bi_lstm_fc = nn.Linear(256, 128)  # Project BiLSTM features to 128 dimensions
        self.pose_fc_transformed = nn.Linear(128, 128)  # Project pose features to 128 dimensions

        # Attention mechanism to fuse BiLSTM and pose features
        self.attention = nn.Linear(128, 1)  # Attention mechanism for intermediate fusion

        # Final classification layer
        self.fc_final = nn.Linear(128, num_classes)

    def forward(self, x, pose_landmarks):
        # Extract features from base model
        for name, module in self.base_model.named_children():
            if name == 'AuxLogits' and not self.base_model.aux_logits:
                continue  # Skip the auxiliary logits if aux_logits is False
            x = module(x)
            if name == 'Mixed_7c':  # Extract feature map after Mixed_7c
                break

        # Flatten the feature map to (batch_size, num_features)
        batch_size = x.size(0)
        x = x.view(batch_size, -1, 2048)  # Reshape for LSTM (batch_size, seq_len=1, input_size=2048)

        # BiLSTM expects input shape (batch_size, seq_len, input_size)
        x, _ = self.bilstm(x)
        x = x[:, -1, :]  # Use the last hidden state
        x = torch.relu(self.bi_lstm_fc(x))  # Project to a common feature space

        # Process pose landmarks and project to the same space
        pose_features = torch.relu(self.pose_fc(pose_landmarks))
        pose_features = torch.relu(self.pose_fc_transformed(pose_features))

        # Apply attention mechanism for intermediate fusion
        combined_features = torch.stack((x, pose_features), dim=1)  # Stack along a new dimension
        attn_weights = torch.softmax(self.attention(combined_features), dim=1)  # Compute attention weights
        attn_applied = torch.sum(attn_weights * combined_features, dim=1)  # Apply attention weights

        # Final output
        output = self.fc_final(attn_applied)
        return output

# Instantiate and move the model to the device
model_attention_fusion = InceptionV3BiLSTMWithPoseAttention(num_classes=10)
model_attention_fusion.to(device)

# Define loss function and optimizer
criterion_attention_fusion = nn.CrossEntropyLoss()
optimizer_attention_fusion = optim.Adam(model_attention_fusion.parameters(), lr=0.0001)



In [None]:
# --- Training and Validation Loop ---
import torch
import copy
import numpy as np
from tqdm import tqdm
import torch.optim as optim
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, classification_report, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
import pandas as pd
from tabulate import tabulate
# --- Training and Validation Loop ---
best_model_wts_attention_fusion = copy.deepcopy(model_attention_fusion.state_dict())
best_val_loss = float('inf')
early_stopping_counter = 0
early_stopping_patience = 5
num_epochs = 60

for epoch in range(num_epochs):
    model_attention_fusion.train()
    running_loss, correct, total = 0.0, 0, 0
    train_progress = tqdm(enumerate(train_loader), total=len(train_loader), desc=f"Epoch {epoch+1}/{num_epochs}")

    for batch_idx, (data, target) in train_progress:
        data, target = data.to(device), target.to(device)
        pose_data = train_pose_features[batch_idx*BATCH_SIZE:(batch_idx+1)*BATCH_SIZE]
        pose_data = torch.tensor(pose_data, dtype=torch.float32).to(device)  # Ensure pose_data is float32

        optimizer_attention_fusion.zero_grad()
        outputs = model_attention_fusion(data, pose_data)
        loss = criterion_attention_fusion(outputs, target)
        loss.backward()
        optimizer_attention_fusion.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += target.size(0)
        correct += (predicted == target).sum().item()
        train_progress.set_postfix(loss=running_loss/(batch_idx+1), accuracy=correct/total)

    # Validation phase
    model_attention_fusion.eval()
    val_loss, val_correct, val_total = 0.0, 0, 0
    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(val_loader):
            data, target = data.to(device), target.to(device)
            pose_data = val_pose_features[batch_idx*BATCH_SIZE:(batch_idx+1)*BATCH_SIZE]
            pose_data = torch.tensor(pose_data, dtype=torch.float32).to(device)  # Ensure pose_data is float32

            outputs = model_attention_fusion(data, pose_data)
            loss = criterion_attention_fusion(outputs, target)
            val_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            val_total += target.size(0)
            val_correct += (predicted == target).sum().item()

    epoch_loss = running_loss / len(train_loader)
    epoch_acc = correct / total
    epoch_val_loss = val_loss / len(val_loader)
    epoch_val_acc = val_correct / val_total

    print(f"Epoch [{epoch+1}/{num_epochs}] Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f} Val Loss: {epoch_val_loss:.4f} Val Acc: {epoch_val_acc:.4f}")

    # Early stopping
    if epoch_val_loss < best_val_loss:
        best_val_loss = epoch_val_loss
        best_model_wts_attention_fusion = copy.deepcopy(model_attention_fusion.state_dict())
        early_stopping_counter = 0
    else:
        early_stopping_counter += 1
        if early_stopping_counter >= early_stopping_patience:
            print("Early stopping triggered.")
            break

# Restore best model weights
model_attention_fusion.load_state_dict(best_model_wts_attention_fusion)

In [None]:
# --- Evaluation and Reporting ---
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report, confusion_matrix, ConfusionMatrixDisplay
import pandas as pd
from tabulate import tabulate
from tqdm import tqdm

# Set the model to evaluation mode
model_attention_fusion.eval()

# Initialize lists to store true labels and predictions
y_true, y_pred = [], []

# Disable gradient computation for inference
with torch.no_grad():
    for batch_idx, (data, target) in tqdm(enumerate(test_loader), desc="Evaluating"):
        data, target = data.to(device), target.to(device)

        # Extract FER features for the batch
        batch_size = data.size(0)
        batch_indices = list(range(batch_idx * batch_size, min((batch_idx + 1) * batch_size, len(test_fer_features))))
        fer_batch_features = test_fer_features[batch_indices]
        zero_vector = np.zeros(fer_batch_features.shape[1])
        fer_batch_features = np.array([
            fer_batch_features[i] if test_face_detected[batch_indices[i]] == 1 else zero_vector
            for i in range(len(batch_indices))
        ])
        fer_batch_features = torch.tensor(fer_batch_features, dtype=torch.float32).to(device)

        # Handle Pose features
        pose_data = test_pose_features[batch_idx * batch_size:(batch_idx + 1) * batch_size]
        pose_data = torch.tensor(pose_data, dtype=torch.float32).to(device)

        # Ensure all feature sizes match the batch size
        if len(fer_batch_features) != batch_size or len(pose_data) != batch_size:
            print(f"Skipping batch {batch_idx+1} due to mismatched feature sizes.")
            continue

        # Forward pass
        outputs = model_attention_fusion(data, pose_data)
        _, predicted = torch.max(outputs.data, 1)
        y_true.extend(target.cpu().numpy())
        y_pred.extend(predicted.cpu().numpy())

# Convert lists to numpy arrays for metric calculation
y_true = np.array(y_true)
y_pred = np.array(y_pred)

# Calculate evaluation metrics
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='macro')
recall = recall_score(y_true, y_pred, average='macro')
f1 = f1_score(y_true, y_pred, average='macro')

# Print metrics
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

# Generate classification report
class_names = list(activity_map_AUC.values())
report = classification_report(y_true, y_pred, target_names=class_names, output_dict=True)
report_df = pd.DataFrame(report).transpose()
report_df_filtered = report_df.drop('accuracy', errors='ignore')
print(tabulate(report_df_filtered, headers='keys', tablefmt='fancy_grid'))

# Confusion matrix
conf_matrix = confusion_matrix(y_true, y_pred)

# Plot confusion matrix
display_c_m = ConfusionMatrixDisplay(confusion_matrix=conf_matrix, display_labels=class_names)
plt.figure(figsize=(10, 9))
display_c_m.plot(cmap='OrRd', xticks_rotation=25)
plt.xticks(fontsize=10)
plt.yticks(fontsize=10)
plt.title('Confusion Matrix - DD + FER + Pose Attention Fusion', fontsize=24)
plt.show()


# DD.SSoftmax

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision.models import inception_v3

# --- Model Definition for DD (Driver Distraction) Task ---
class InceptionV3BiLSTM(nn.Module):
    def __init__(self, num_classes=10):
        super(InceptionV3BiLSTM, self).__init__()
        # Load pretrained weights with aux_logits=True for compatibility
        self.base_model = inception_v3(pretrained=True, aux_logits=True)

        # Set aux_logits to False to make all layers trainable
        self.base_model.aux_logits = False

        # Remove the final fully connected layer
        self.base_model.fc = nn.Identity()

        # Bidirectional LSTM
        self.bilstm = nn.LSTM(input_size=10240, hidden_size=128, num_layers=1, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(256, num_classes)  # Fully connected layer for classification
        self.scoresoftmax = ScoreSoftmax(num_classes)  # Custom ScoreSoftmax layer

    def forward(self, x):
        # Extract features from base model
        for name, module in self.base_model.named_children():
            if name == 'AuxLogits' and not self.base_model.aux_logits:
                continue  # Skip the auxiliary logits if aux_logits is False
            x = module(x)
            if name == 'Mixed_7c':  # Extract feature map after Mixed_7c
                break

        # Flatten the feature map and prepare for BiLSTM
        x = x.view(x.size(0), 5, 10240)
        x, _ = self.bilstm(x)  # Pass through BiLSTM
        x = self.fc(x[:, -1, :])  # Pass through final fully connected layer
        x = self.scoresoftmax(x)  # Apply ScoreSoftmax
        return x

# Define the ScoreSoftmax layer
class ScoreSoftmax(nn.Module):
    def __init__(self, num_classes):
        super(ScoreSoftmax, self).__init__()
        self.num_classes = num_classes
        self.score_layer = nn.Linear(num_classes, num_classes)  # Custom scoring layer

    def forward(self, x):
        scores = self.score_layer(x)  # Calculate scores for each class
        return nn.functional.softmax(scores, dim=-1)  # Apply softmax to the scores

# Instantiate and move the model to the GPU
model_dd = InceptionV3BiLSTM(num_classes=len(activity_map_AUC))
model_dd = model_dd.to(device)

# Define loss function and optimizer
criterion_dd = nn.CrossEntropyLoss()
optimizer_dd = optim.Adam(model_dd.parameters(), lr=0.0001)


In [None]:
# --- Training and Validation Loop ---
best_model_wts = copy.deepcopy(model_dd.state_dict())
best_val_loss = float('inf')
early_stopping_counter = 0
early_stopping_patience = 5
num_epochs = 60

train_losses, val_losses = [], []
train_accuracies, val_accuracies = [], []

for epoch in range(num_epochs):
    model_dd.train()
    running_loss, correct, total = 0.0, 0, 0
    for batch_idx, (data, target) in tqdm(enumerate(train_loader), total=len(train_loader), desc=f"Epoch {epoch+1}/{num_epochs} [Training]"):
        data, target = data.to(device), target.to(device)

        # Forward pass
        optimizer_dd.zero_grad()
        outputs = model_dd(data)
        loss = criterion_dd(outputs, target)
        loss.backward()
        optimizer_dd.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += target.size(0)
        correct += (predicted == target).sum().item()

    epoch_loss = running_loss / len(train_loader)
    epoch_acc = correct / total
    train_losses.append(epoch_loss)
    train_accuracies.append(epoch_acc)

    # Validation phase
    model_dd.eval()
    val_running_loss, val_correct, val_total = 0.0, 0, 0
    with torch.no_grad():
        for data, target in tqdm(val_loader, desc="Validation"):
            data, target = data.to(device), target.to(device)

            outputs = model_dd(data)
            val_loss = criterion_dd(outputs, target)
            val_running_loss += val_loss.item()

            _, predicted = torch.max(outputs.data, 1)
            val_total += target.size(0)
            val_correct += (predicted == target).sum().item()

    epoch_val_loss = val_running_loss / len(val_loader)
    epoch_val_acc = val_correct / val_total
    val_losses.append(epoch_val_loss)
    val_accuracies.append(epoch_val_acc)

    print(f"Epoch [{epoch+1}/{num_epochs}] Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f} Val Loss: {epoch_val_loss:.4f} Val Acc: {epoch_val_acc:.4f}")

    # Early stopping and saving the best model
    if epoch_val_loss < best_val_loss:
        best_val_loss = epoch_val_loss
        best_model_wts = copy.deepcopy(model_dd.state_dict())
        early_stopping_counter = 0
    else:
        early_stopping_counter += 1
        if early_stopping_counter >= early_stopping_patience:
            print("Early stopping triggered.")
            break

# Restore best model weights
model_dd.load_state_dict(best_model_wts)

# --- Evaluation on Test Data ---
# (Evaluation code as provided previously)


In [None]:
# --- Evaluation and Reporting ---
# Set the model to evaluation mode
model_dd.eval()

# Initialize lists to store true labels and predictions
y_true, y_pred = [], []

# Disable gradient computation for inference
with torch.no_grad():
    for batch_idx, (data, target) in tqdm(enumerate(test_loader), desc="Evaluating"):
        data, target = data.to(device), target.to(device)

        # Forward pass
        outputs = model_dd(data)
        _, predicted = torch.max(outputs.data, 1)
        y_true.extend(target.cpu().numpy())
        y_pred.extend(predicted.cpu().numpy())

# Convert lists to numpy arrays for metric calculation
y_true = np.array(y_true)
y_pred = np.array(y_pred)

# Calculate evaluation metrics
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='macro')
recall = recall_score(y_true, y_pred, average='macro')
f1 = f1_score(y_true, y_pred, average='macro')

# Print metrics
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

# Generate classification report
class_names = list(activity_map_AUC.values())
report = classification_report(y_true, y_pred, target_names=class_names, output_dict=True)
report_df = pd.DataFrame(report).transpose()
report_df_filtered = report_df.drop('accuracy', errors='ignore')
print(tabulate(report_df_filtered, headers='keys', tablefmt='fancy_grid'))

# Confusion matrix
conf_matrix = confusion_matrix(y_true, y_pred)

# Plot confusion matrix
display_c_m = ConfusionMatrixDisplay(confusion_matrix=conf_matrix, display_labels=class_names)
plt.figure(figsize=(10, 9))
display_c_m.plot(cmap='OrRd', xticks_rotation=25)
plt.xticks(fontsize=10)
plt.yticks(fontsize=10)
plt.title('Confusion Matrix', fontsize=24)
plt.show()


### **ViT-DD-FER-Pose**

In [None]:
!pip install timm

In [None]:
import torch
import torch.nn as nn
from timm.models import create_model
from timm.models.vision_transformer import VisionTransformer

class ViTDD_FER_Pose(nn.Module):
    def __init__(self, model_name='deit3_small_patch16_224', num_classes=10, fer_feature_size=512, pose_feature_size=69, drop_rate=0.05, drop_path_rate=0.11, img_size=224):
        super(ViTDD_FER_Pose, self).__init__()
        # Initialize the Vision Transformer model
        self.backbone: VisionTransformer = create_model(
            model_name,
            pretrained=True,
            num_classes=num_classes,  # The final classification head will be redefined
            drop_rate=drop_rate,
            drop_path_rate=drop_path_rate,
            img_size=img_size
        )
        # Remove the original classifier head as it will be replaced
        self.backbone.head = nn.Identity()

        self.embed_dim = self.backbone.embed_dim  # Dimension of the transformer embeddings

        # Linear layers for additional features
        self.fc_fer = nn.Linear(fer_feature_size, 128)
        self.fc_pose = nn.Linear(pose_feature_size, 128)

        # Final classification layer
        self.fc_combined = nn.Linear(self.embed_dim + 128 + 128, num_classes)  # Combine all features

    def forward(self, x, fer_features, pose_features):
        # Extract features from the Vision Transformer (ViT) model
        x = self.backbone(x)  # Extract embeddings

        # Check if x has an expected shape
        if x.ndim == 3:  # The correct output from transformer should have shape (batch_size, num_tokens, embed_dim)
            cls_token = x[:, 0, :]  # Shape: (batch_size, embed_dim)
        elif x.ndim == 2:  # Case where the output is already flattened
            cls_token = x
        else:
            raise ValueError(f"Unexpected tensor shape for x: {x.shape}")

        # Process FER features
        fer_features = torch.relu(self.fc_fer(fer_features))  # Shape: (batch_size, 128)

        # Process Pose features
        pose_features = torch.relu(self.fc_pose(pose_features))  # Shape: (batch_size, 128)

        # Debugging print statements
        print(f"Shape of class token (cls_token): {cls_token.shape}")
        print(f"Shape of FER features (fer_features): {fer_features.shape}")
        print(f"Shape of Pose features (pose_features): {pose_features.shape}")

        # Concatenate all features along the last dimension
        combined_features = torch.cat([cls_token, fer_features, pose_features], dim=1)  # Shape: (batch_size, combined_dim)

        # Debugging print statement to check shape after concatenation
        print(f"Shape of combined features: {combined_features.shape}")

        # Final classification
        output = self.fc_combined(combined_features)

        return output


# Instantiate and move the model to the device
num_classes = len(activity_map_AUC)
fer_feature_size = train_fer_features.shape[1]
pose_feature_size = train_pose_features.shape[1]
model = ViTDD_FER_Pose(num_classes=num_classes, fer_feature_size=fer_feature_size, pose_feature_size=pose_feature_size)
model = model.to(device)


In [None]:
# Define optimizer and loss function
optimizer = optim.AdamW(model.parameters(), lr=1e-5)
criterion = nn.CrossEntropyLoss()


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import classification_report
from tqdm import tqdm
import numpy as np
import copy
import matplotlib.pyplot as plt
# Assuming ViTDD_FER_Pose model and other necessary setups are defined

# Initialize the model, loss function, and optimizer
num_classes = len(activity_map_AUC)
fer_feature_size = train_fer_features.shape[1]
pose_feature_size = train_pose_features.shape[1]
model = ViTDD_FER_Pose(num_classes=num_classes, fer_feature_size=fer_feature_size, pose_feature_size=pose_feature_size)
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=1e-4)

# --- Training and Validation Loop ---
best_model_wts = copy.deepcopy(model.state_dict())
best_val_loss = float('inf')
early_stopping_counter = 0
early_stopping_patience = 15
num_epochs = 60

# Zero vectors for cases where FER or Pose features are missing
zero_vector_fer = torch.zeros((fer_feature_size,), dtype=torch.float32, device=device)
zero_vector_pose = torch.zeros((pose_feature_size,), dtype=torch.float32, device=device)

for epoch in range(num_epochs):
    model.train()
    running_loss, correct, total = 0.0, 0, 0
    train_progress = tqdm(enumerate(train_loader), total=len(train_loader), desc=f"Epoch {epoch+1}/{num_epochs}")

    for batch_idx, (data, target) in train_progress:
        data, target = data.to(device), target.to(device)

        # Handle FER features
        batch_indices = list(range(batch_idx * train_loader.batch_size, min((batch_idx + 1) * train_loader.batch_size, len(train_fer_features))))
        if len(batch_indices) == 0:
            continue
        fer_batch_features = torch.tensor(train_fer_features[batch_indices], dtype=torch.float32, device=device)
        fer_batch_features = torch.stack([
            fer_batch_features[i] if train_face_detected[batch_indices[i]] == 1 else zero_vector_fer
            for i in range(len(batch_indices))
        ])

        # Handle Pose features
        pose_batch_indices = list(range(batch_idx * train_loader.batch_size, min((batch_idx + 1) * train_loader.batch_size, len(train_pose_features))))
        if len(pose_batch_indices) == 0:
            continue
        pose_data = torch.tensor([
            train_pose_features[i] if i < len(train_pose_features) else zero_vector_pose
            for i in pose_batch_indices
        ], dtype=torch.float32).to(device)

        # Check for mismatched batch sizes
        if len(fer_batch_features) != data.size(0) or len(pose_data) != data.size(0):
           # print(f"Skipping batch {batch_idx+1} due to mismatched features.")
            continue

        optimizer.zero_grad()
        outputs = model(data, fer_batch_features, pose_data)
        loss = criterion(outputs, target)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += target.size(0)
        correct += (predicted == target).sum().item()
        train_progress.set_postfix(loss=running_loss / (batch_idx + 1), accuracy=correct / total)

    # Validation phase
    model.eval()
    val_loss, val_correct, val_total = 0.0, 0, 0
    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(val_loader):
            data, target = data.to(device), target.to(device)

            # Handle FER features
            batch_indices = list(range(batch_idx * val_loader.batch_size, min((batch_idx + 1) * val_loader.batch_size, len(val_fer_features))))
            if len(batch_indices) == 0:
                continue
            fer_batch_features = torch.tensor(val_fer_features[batch_indices], dtype=torch.float32, device=device)
            fer_batch_features = torch.stack([
                fer_batch_features[i] if val_face_detected[batch_indices[i]] == 1 else zero_vector_fer
                for i in range(len(batch_indices))
            ])

            # Handle Pose features
            pose_batch_indices = list(range(batch_idx * val_loader.batch_size, min((batch_idx + 1) * val_loader.batch_size, len(val_pose_features))))
            if len(pose_batch_indices) == 0:
                continue
            pose_data = torch.tensor([
                val_pose_features[i] if i < len(val_pose_features) else zero_vector_pose
                for i in pose_batch_indices
            ], dtype=torch.float32).to(device)

            # Check for mismatched batch sizes
            if len(fer_batch_features) != data.size(0) or len(pose_data) != data.size(0):
                #print(f"Skipping batch {batch_idx+1} during validation due to mismatched features.")
                continue

            outputs = model(data, fer_batch_features, pose_data)
            loss = criterion(outputs, target)
            val_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            val_total += target.size(0)
            val_correct += (predicted == target).sum().item()

    epoch_loss = running_loss / len(train_loader)
    epoch_acc = correct / total
    epoch_val_loss = val_loss / len(val_loader)
    epoch_val_acc = val_correct / val_total

    print(f"Epoch [{epoch+1}/{num_epochs}] Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f} Val Loss: {epoch_val_loss:.4f} Val Acc: {epoch_val_acc:.4f}")
    # Create logs dictionary to store metrics
    logs = {'loss': epoch_loss, 'accuracy': epoch_acc,
            'val_loss': epoch_val_loss, 'val_accuracy': epoch_val_acc}

    # Call the custom plotting callback
    plot_losses.on_epoch_end(epoch, logs)
    # Early stopping
    if epoch_val_loss < best_val_loss:
        best_val_loss = epoch_val_loss
        best_model_wts = copy.deepcopy(model.state_dict())
        early_stopping_counter = 0
    else:
        early_stopping_counter += 1
        if early_stopping_counter >= early_stopping_patience:
            print("Early stopping triggered.")
            break

# Restore best model weights
model.load_state_dict(best_model_wts)



In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report, confusion_matrix, ConfusionMatrixDisplay
import pandas as pd
import matplotlib.pyplot as plt
from tabulate import tabulate

# Prediction and report generation
model.eval()
all_preds, all_targets = [], []
with torch.no_grad():
    for batch_idx, (data, target) in enumerate(val_loader):
        data, target = data.to(device), target.to(device)

        # Handle FER features
        batch_indices = list(range(batch_idx * val_loader.batch_size, min((batch_idx + 1) * val_loader.batch_size, len(val_fer_features))))
        if len(batch_indices) == 0:
            continue
        fer_batch_features = torch.tensor(val_fer_features[batch_indices], dtype=torch.float32, device=device)
        fer_batch_features = torch.stack([
            fer_batch_features[i] if val_face_detected[batch_indices[i]] == 1 else zero_vector_fer
            for i in range(len(batch_indices))
        ])

        # Handle Pose features
        pose_batch_indices = list(range(batch_idx * val_loader.batch_size, min((batch_idx + 1) * val_loader.batch_size, len(val_pose_features))))
        if len(pose_batch_indices) == 0:
            continue
        pose_data = torch.tensor([
            val_pose_features[i] if i < len(val_pose_features) else zero_vector_pose
            for i in pose_batch_indices
        ], dtype=torch.float32).to(device)

        # Check for mismatched batch sizes
        if len(fer_batch_features) != data.size(0) or len(pose_data) != data.size(0):
            print(f"Skipping batch {batch_idx+1} during prediction due to mismatched features.")
            continue

        outputs = model(data, fer_batch_features, pose_data)
        preds = torch.argmax(outputs, dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_targets.extend(target.cpu().numpy())

# Check if predictions and targets were collected
if len(all_preds) == 0 or len(all_targets) == 0:
    print("No predictions or targets collected. Skipping evaluation.")
else:
    # Calculate evaluation metrics
    accuracy = accuracy_score(all_targets, all_preds)
    precision = precision_score(all_targets, all_preds, average='macro')
    recall = recall_score(all_targets, all_preds, average='macro')
    f1 = f1_score(all_targets, all_preds, average='macro')

    # Print metrics
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")

    # Generate classification report
    class_names = list(activity_map_AUC.values())
    report = classification_report(all_targets, all_preds, target_names=class_names, output_dict=True)
    report_df = pd.DataFrame(report).transpose()
    report_df_filtered = report_df.drop('accuracy', errors='ignore')
    print(tabulate(report_df_filtered, headers='keys', tablefmt='fancy_grid'))

    # Confusion matrix
    conf_matrix = confusion_matrix(all_targets, all_preds)

    # Plot confusion matrix
    display_c_m = ConfusionMatrixDisplay(confusion_matrix=conf_matrix, display_labels=class_names)
    plt.figure(figsize=(10, 9))
    display_c_m.plot(cmap='OrRd', xticks_rotation=25)
    plt.xticks(fontsize=10)
    plt.yticks(fontsize=10)
    plt.title('Confusion Matrix', fontsize=24)
    plt.show()
