In [3]:
import os
import shutil
import random

# Define the base path
base_path = '300WLPA_2d'
helen_path = os.path.join(base_path, 'HELEN')
afw_path = os.path.join(base_path, 'AFW')
lfpw_path = os.path.join(base_path, 'LFPW')

# Define train, val, test directories
train_dir = os.path.join(base_path, 'train')
val_dir = os.path.join(base_path, 'val')
test_dir = os.path.join(base_path, 'test')

# Create the directories
os.makedirs(train_dir, exist_ok=True)
os.makedirs(val_dir, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)

# Helper function to split data
def split_data(source_dir, train_dir, val_dir, test_dir, train_ratio=0.8, val_ratio=0.1):
    subfolders = [f.path for f in os.scandir(source_dir) if f.is_dir()]
    for subfolder in subfolders:
        files = [f for f in os.listdir(subfolder) if os.path.isfile(os.path.join(subfolder, f))]
        random.shuffle(files)
        train_split = int(len(files) * train_ratio)
        val_split = int(len(files) * (train_ratio + val_ratio))
        
        train_files = files[:train_split]
        val_files = files[train_split:val_split]
        test_files = files[val_split:]

        for file in train_files:
            shutil.copy(os.path.join(subfolder, file), train_dir)
        
        for file in val_files:
            shutil.copy(os.path.join(subfolder, file), val_dir)

        for file in test_files:
            shutil.copy(os.path.join(subfolder, file), test_dir)

# Split the data
split_data(helen_path, train_dir, val_dir, test_dir)
split_data(afw_path, train_dir, val_dir, test_dir)


In [1]:
import math
import os
import random
import torch
from torch.utils.data import Dataset, DataLoader, Subset
from PIL import Image
from torchvision import transforms, models
from torch import nn, optim

# Constants for class ranges
NUM_CLASSES_PITCH_ROLL = 66
NUM_CLASSES_YAW = 120
TRAIN_SIZE = 10000
VAL_SIZE = 1000
TEST_SIZE = 1000

# Helper function to map continuous values to class indices
def continuous_to_class(value, num_classes, angle_range):
    # Convert radian to degree
    value = math.degrees(value)
    # Normalize to [0, angle_range]
    value = (value + angle_range / 2) % angle_range
    # Calculate bin
    return min(num_classes - 1, max(0, int(value // (angle_range / num_classes))))

# Function to convert class indices to one-hot tensors
def class_to_onehot(class_indices, num_classes):
    if class_indices.dim() == 0:
        class_indices = class_indices.unsqueeze(0)
    batch_size = class_indices.size(0)
    onehot_tensor = torch.zeros(batch_size, num_classes)
    onehot_tensor.scatter_(1, class_indices.view(-1, 1).long(), 1)
    return onehot_tensor

class CustomImageDataset(Dataset):
    def __init__(self, image_dir, transform=None):
        self.image_dir = image_dir
        self.transform = transform
        self.image_paths = [os.path.join(image_dir, f) for f in os.listdir(image_dir) if os.path.isfile(os.path.join(image_dir, f))]

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        image = Image.open(image_path).convert('RGB')
        
        # Extract yaw, pitch, and roll from the filename
        filename = os.path.basename(image_path)
        parts = filename.split('_')
        # print(parts)
        pitch_value = float(parts[-3])
        roll_value = float(parts[-1].split('.jpg')[0])
        yaw_value = float(parts[-2])
        # print("Filename")
        # print(filename)
        # print()
        # print("Yaw Pitch Roll in Radians")
        # print(yaw_value," ", pitch_value," ", roll_value)

        # Convert values to class indices
        yaw = continuous_to_class(yaw_value, NUM_CLASSES_YAW, 360)
        pitch = continuous_to_class(pitch_value, NUM_CLASSES_PITCH_ROLL, 198)
        roll = continuous_to_class(roll_value, NUM_CLASSES_PITCH_ROLL, 198)
        # print("Yaw Pitch Roll in Degrees")
        # print(yaw," ", pitch," ", roll)
        
        if self.transform:
            image = self.transform(image)

        # Convert class indices to one-hot tensors
        yaw_onehot = class_to_onehot(torch.tensor(yaw), NUM_CLASSES_YAW)
        pitch_onehot = class_to_onehot(torch.tensor(pitch), NUM_CLASSES_PITCH_ROLL)
        roll_onehot = class_to_onehot(torch.tensor(roll), NUM_CLASSES_PITCH_ROLL)

        # print("Yaw Pitch Roll Onehot Shapes")
        # print(yaw_onehot.shape," ",pitch_onehot.shape," ",roll_onehot.shape)
        
        labels = {
            'yaw': yaw_onehot,
            'pitch': pitch_onehot,
            'roll': roll_onehot
        }
        
        return image, labels

In [2]:
import os
import random
import torch
from torch.utils.data import DataLoader, Subset
from torchvision import transforms
from PIL import Image

# Constants for class ranges and dataset sizes
NUM_CLASSES_PITCH_ROLL = 66
NUM_CLASSES_YAW = 120
TRAIN_SIZE = 10000
VAL_SIZE = 1000
TEST_SIZE = 1000

# Define the base path
base_path = '300WLPA_2d'
train_dir = os.path.join(base_path, 'train')
val_dir = os.path.join(base_path, 'val')
test_dir = os.path.join(base_path, 'test')

# Define data transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
])

# Create the full datasets
full_train_dataset = CustomImageDataset(train_dir, transform=transform)
full_val_dataset = CustomImageDataset(val_dir, transform=transform)
full_test_dataset = CustomImageDataset(test_dir, transform=transform)

# Generate indices for the datasets
train_indices = list(range(len(full_train_dataset)))
val_indices = list(range(len(full_val_dataset)))
test_indices = list(range(len(full_test_dataset)))

# Shuffle indices
random.seed(42)  # For reproducibility
random.shuffle(train_indices)
random.shuffle(val_indices)
random.shuffle(test_indices)

# Create subsets
train_dataset = Subset(full_train_dataset, train_indices[:TRAIN_SIZE])
val_dataset = Subset(full_val_dataset, val_indices[:VAL_SIZE])
test_dataset = Subset(full_test_dataset, test_indices[:TEST_SIZE])

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Print sizes of datasets
print(f"Train dataset size: {len(train_dataset)}")
print(f"Validation dataset size: {len(val_dataset)}")
print(f"Test dataset size: {len(test_dataset)}")
print()

# Example of iterating through the train_loader
for images, labels in train_loader:
    # images: tensor of shape (batch_size, 3, 224, 224)
    # labels['yaw']: tensor of shape (batch_size, NUM_CLASSES_YAW)
    # labels['pitch']: tensor of shape (batch_size, NUM_CLASSES_PITCH_ROLL)
    # labels['roll']: tensor of shape (batch_size, NUM_CLASSES_PITCH_ROLL)
    print(images.shape)
    print(labels['yaw'].shape)
    print(labels['pitch'].shape)
    print(labels['roll'].shape)
    
    # Example: iterating through the batch
    for i in range(images.size(0)):
        image = transforms.ToPILImage()(images[i])
        
        # Get the indices of the maximum value in each one-hot tensor (argmax)
        yaw_label = torch.argmax(labels['yaw'][i]).item()
        pitch_label = torch.argmax(labels['pitch'][i]).item()
        roll_label = torch.argmax(labels['roll'][i]).item()
        
        # Print or use these labels as needed
        print(f"Image: {image}, Yaw Label: {yaw_label}, Pitch Label: {pitch_label}, Roll Label: {roll_label}")
    
    break  # Break after the first batch for demonstration


Train dataset size: 10000
Validation dataset size: 1000
Test dataset size: 1000

torch.Size([32, 3, 224, 224])
torch.Size([32, 1, 120])
torch.Size([32, 1, 66])
torch.Size([32, 1, 66])
Image: <PIL.Image.Image image mode=RGB size=224x224 at 0x25F5F3C7190>, Yaw Label: 82, Pitch Label: 21, Roll Label: 29
Image: <PIL.Image.Image image mode=RGB size=224x224 at 0x25F5BFFAE50>, Yaw Label: 56, Pitch Label: 21, Roll Label: 34
Image: <PIL.Image.Image image mode=RGB size=224x224 at 0x25F5F3C7190>, Yaw Label: 86, Pitch Label: 23, Roll Label: 32
Image: <PIL.Image.Image image mode=RGB size=224x224 at 0x25F5F3C71D0>, Yaw Label: 78, Pitch Label: 26, Roll Label: 33
Image: <PIL.Image.Image image mode=RGB size=224x224 at 0x25F5F3C7190>, Yaw Label: 85, Pitch Label: 34, Roll Label: 28
Image: <PIL.Image.Image image mode=RGB size=224x224 at 0x25F5F3C71D0>, Yaw Label: 74, Pitch Label: 35, Roll Label: 30
Image: <PIL.Image.Image image mode=RGB size=224x224 at 0x25F5F3C7190>, Yaw Label: 46, Pitch Label: 38, Roll 

In [168]:
x = "AFW_2043831280_2_7_-0.613_-1.170_-0.057.jpg"
parts = x.split('_')
print(parts)

print(parts[-1].split('.jpg')[0])

['AFW', '2043831280', '2', '7', '-0.613', '-1.170', '-0.057.jpg']
-0.057


In [3]:
import torch
import torch.nn as nn
import torchvision
from torchvision import models

# Check if GPU is available
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Load the pretrained MobileNetV3 model
model_MNV3 = torchvision.models.mobilenet_v3_small(pretrained=True)

# Define constants
HIDDEN_FEATURES = 1024
OUTPUT_FEATURES = 1280
NUM_CLASSES = 66  # Number of classes for each roll and pitch
NUM_CLASSES_YAW = 120

# Define the custom classifier with three heads
class CustomHead(nn.Module):
    def __init__(self, in_features):
        super(CustomHead, self).__init__()
        self.fc1 = nn.Linear(in_features, HIDDEN_FEATURES)
        self.fc2 = nn.Linear(HIDDEN_FEATURES, OUTPUT_FEATURES)
        self.fc_yaw = nn.Linear(OUTPUT_FEATURES, NUM_CLASSES_YAW)
        self.fc_pitch = nn.Linear(OUTPUT_FEATURES, NUM_CLASSES)
        self.fc_roll = nn.Linear(OUTPUT_FEATURES, NUM_CLASSES)
        self.hardswish = nn.Hardswish()
        self.dropout = nn.Dropout(p=0.2, inplace=True)
    
    def forward(self, x):
        x = self.hardswish(self.fc1(x))
        x = self.dropout(x)
        x = self.hardswish(self.fc2(x))
        x = self.dropout(x)
        yaw = self.fc_yaw(x)
        pitch = self.fc_pitch(x)
        roll = self.fc_roll(x)
        return yaw, pitch, roll

# Modify the MobileNetV3 model to use the custom classifier
class ModifiedMobileNetV3(nn.Module):
    def __init__(self, num_classes_yaw, num_classes_pitch, num_classes_roll):
        super(ModifiedMobileNetV3, self).__init__()
        self.backbone = models.mobilenet_v3_small(pretrained=True).features
        self.pool = nn.AdaptiveAvgPool2d((1, 1))
        self.flatten = nn.Flatten()
        in_features = models.mobilenet_v3_small(pretrained=True).classifier[0].in_features
        self.classifier = CustomHead(in_features)

    def forward(self, x):
        x = self.backbone(x)
        x = self.pool(x)
        x = self.flatten(x)
        yaw, pitch, roll = self.classifier(x)
        return yaw, pitch, roll

# Instantiate the modified model
model_MNV3 = ModifiedMobileNetV3(NUM_CLASSES_YAW, NUM_CLASSES, NUM_CLASSES)

# Move the model to the GPU
model_MNV3 = model_MNV3.to(device)



In [4]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class CustomLoss(nn.Module):
    def __init__(self, alpha, beta):
        super(CustomLoss, self).__init__()
        self.alpha = alpha
        self.beta = beta
        self.regression_loss = nn.MSELoss()
        self.classification_loss = nn.CrossEntropyLoss()

    def forward(self, yaw_pred, pitch_pred, roll_pred, yaw_true, pitch_true, roll_true):
        yaw_true = torch.squeeze(yaw_true, dim=1)
        pitch_true = torch.squeeze(pitch_true, dim=1)
        roll_true = torch.squeeze(roll_true, dim=1)
        
        # Regression losses
        reg_loss_yaw = self.regression_loss(yaw_pred.float(), yaw_true.float())
        reg_loss_pitch = self.regression_loss(pitch_pred.float(), pitch_true.float())
        reg_loss_roll = self.regression_loss(roll_pred.float(), roll_true.float())
        
        # Convert true values to class indices
        # yaw_true = torch.argmax(yaw_true, dim=1)
        # pitch_true = torch.argmax(pitch_true, dim=1)
        # roll_true = torch.argmax(roll_true, dim=1)
        
        # Convert one-hot encoded predictions to logits
        yaw_pred_logits = F.softmax(yaw_pred, dim=1)
        pitch_pred_logits = F.softmax(pitch_pred, dim=1)
        roll_pred_logits = F.softmax(roll_pred, dim=1)

        # print("PRED SHAPE")
        # print(yaw_pred_logits.shape)
        # print("TRUE SHAPE")
        # print(yaw_true.shape)
        # Classification losses (using CrossEntropyLoss)
        cls_loss_yaw = F.cross_entropy(yaw_pred_logits, yaw_true)
        cls_loss_pitch = F.cross_entropy(pitch_pred_logits, pitch_true)
        cls_loss_roll = F.cross_entropy(roll_pred_logits, roll_true)

        # Combined loss
        reg_loss = (reg_loss_yaw*3 + reg_loss_pitch*2 + reg_loss_roll) / 3
        cls_loss = (cls_loss_yaw*3 + cls_loss_pitch*2 + cls_loss_roll) / 3

        loss = self.alpha * reg_loss + self.beta * cls_loss
        return loss


In [5]:
# Instantiate the custom loss with appropriate weights
criterion = CustomLoss(alpha=1.0, beta=2.0)  # Adjust alpha and beta as needed

# Use Adam optimizer
optimizer = optim.Adam(model_MNV3.classifier.parameters(), lr=1e-4, weight_decay=1e-4)

In [6]:
import torch
import os

class EarlyStopper:
    def __init__(self, patience=10, min_delta=0.0):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.best_loss = float('inf')

    def early_stop(self, current_loss):
        if current_loss < self.best_loss - self.min_delta:
            self.best_loss = current_loss
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                return True
        return False

In [7]:
num_epochs = 10
early_stopper = EarlyStopper(patience=20, min_delta=0.5)

def train_model(model, criterion, optimizer, train_loader, val_loader, num_epochs=10, early_stopper=None, save_path='best_model.pth'):
    training_losses = []
    validation_losses = []
    training_accuracies = []  # Placeholder for accuracies, adjust as needed
    validation_accuracies = []  # Placeholder for accuracies, adjust as needed
    best_val_loss = float('inf')  # Initialize with infinity

    for epoch in range(num_epochs):
        print(f'Epoch [{epoch + 1}/{num_epochs}]')
        print('-' * 50)

        # Training loop
        model.train()
        running_loss = 0.0
        processed_samples_train = 0
        total_samples_train = len(train_loader.dataset)

        for batch_idx, (inputs, labels) in enumerate(train_loader):
            inputs = inputs.to(device)
            yaw_true = labels['yaw'].to(device)
            pitch_true = labels['pitch'].to(device)
            roll_true = labels['roll'].to(device)

            optimizer.zero_grad()

            # Forward pass
            yaw_pred, pitch_pred, roll_pred = model(inputs)

            # Compute loss
            loss = criterion(yaw_pred, pitch_pred, roll_pred, yaw_true, pitch_true, roll_true)

            # Backward pass and optimization
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * inputs.size(0)
            processed_samples_train += inputs.size(0)

            # Print progress
            if (batch_idx + 1) % 10 == 0:
                print(f'Training Batch [{batch_idx + 1}/{len(train_loader)}], Loss: {loss.item():.4f}')

        epoch_loss = running_loss / total_samples_train
        training_losses.append(epoch_loss)
        print(f'Training Loss: {epoch_loss:.4f}')
        
        # Validation loop
        model.eval()
        val_running_loss = 0.0
        correct_yaw = 0
        correct_pitch = 0
        correct_roll = 0
        total = 0
        processed_samples_val = 0
        total_samples_val = len(val_loader.dataset)

        with torch.no_grad():
            for batch_idx, (inputs, labels) in enumerate(val_loader):
                inputs = inputs.to(device)
                yaw_true = labels['yaw'].to(device)
                pitch_true = labels['pitch'].to(device)
                roll_true = labels['roll'].to(device)
        
                yaw_pred, pitch_pred, roll_pred = model(inputs)
        
                # Create one-hot encoded tensors on GPU
                predicted_yaw = torch.eye(yaw_pred.shape[1], device=device)[torch.argmax(yaw_pred, dim=1)]
                predicted_pitch = torch.eye(pitch_pred.shape[1], device=device)[torch.argmax(pitch_pred, dim=1)]
                predicted_roll = torch.eye(roll_pred.shape[1], device=device)[torch.argmax(roll_pred, dim=1)]
        
                # Compute loss (assuming criterion accepts one-hot encoded targets)
                loss = criterion(yaw_pred, pitch_pred, roll_pred, predicted_yaw, predicted_pitch, predicted_roll)
                val_running_loss += loss.item() * inputs.size(0)
        
                total += inputs.size(0)  # Update total number of samples processed

                # Print progress
                if (batch_idx + 1) % 10 == 0:
                    print(f'Validation Batch [{batch_idx + 1}/{len(val_loader)}], Loss: {loss.item():.4f}')

            val_loss = val_running_loss / total_samples_val
            validation_losses.append(val_loss)

            # Save model with the lowest validation loss
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                torch.save(model.state_dict(), save_path)
                print(f'New best model saved with validation loss: {val_loss:.4f}')

            # Calculate accuracies
            val_accuracy_yaw = 100 * correct_yaw / total_samples_val
            val_accuracy_pitch = 100 * correct_pitch / total_samples_val
            val_accuracy_roll = 100 * correct_roll / total_samples_val

            validation_accuracies.append((val_accuracy_yaw + val_accuracy_pitch + val_accuracy_roll) / 3)

            print(f'Validation Loss: {val_loss:.4f}, '
                  f'Yaw Accuracy: {val_accuracy_yaw:.2f}%, '
                  f'Pitch Accuracy: {val_accuracy_pitch:.2f}%, '
                  f'Roll Accuracy: {val_accuracy_roll:.2f}%')

        # Check early stopping condition
        if early_stopper is not None and early_stopper.early_stop(val_loss):
            print("Early stopping")
            break

    print('Training complete')

    return model, training_losses, validation_losses, training_accuracies, validation_accuracies

In [182]:
trained_model, training_losses, validation_losses, training_accuracies, validation_accuracies = train_model(
    model_MNV3, criterion, optimizer, train_loader, val_loader, num_epochs, early_stopper)

Epoch [1/10]
--------------------------------------------------
Training Batch [10/313], Loss: 4.3997
Training Batch [20/313], Loss: 4.4001
Training Batch [30/313], Loss: 4.4002
Training Batch [40/313], Loss: 4.3998
Training Batch [50/313], Loss: 4.3998
Training Batch [60/313], Loss: 4.3999
Training Batch [70/313], Loss: 4.3991
Training Batch [80/313], Loss: 4.3998
Training Batch [90/313], Loss: 4.3996
Training Batch [100/313], Loss: 4.3996
Training Batch [110/313], Loss: 4.4000
Training Batch [120/313], Loss: 4.3999
Training Batch [130/313], Loss: 4.4000
Training Batch [140/313], Loss: 4.3998
Training Batch [150/313], Loss: 4.3998
Training Batch [160/313], Loss: 4.3997
Training Batch [170/313], Loss: 4.3997
Training Batch [180/313], Loss: 4.4002
Training Batch [190/313], Loss: 4.3997
Training Batch [200/313], Loss: 4.4002
Training Batch [210/313], Loss: 4.3999
Training Batch [220/313], Loss: 4.3999
Training Batch [230/313], Loss: 4.4000
Training Batch [240/313], Loss: 4.4001
Training 

In [None]:
num_epochs = 100
trained_model, training_losses, validation_losses, training_accuracies, validation_accuracies = train_model(
    model_MNV3, criterion, optimizer, train_loader, val_loader, num_epochs, early_stopper)

In [187]:
import torch
from torchvision import transforms
from PIL import Image

# Load the pretrained MobileNetV3 model
model_MNV3 = torchvision.models.mobilenet_v3_small(pretrained=True)

# Define constants
HIDDEN_FEATURES = 1024
OUTPUT_FEATURES = 1280
NUM_CLASSES = 66  # Number of classes for each roll and pitch
NUM_CLASSES_YAW = 120

# Define the custom classifier with three heads
class CustomHead(nn.Module):
    def __init__(self, in_features):
        super(CustomHead, self).__init__()
        self.fc1 = nn.Linear(in_features, HIDDEN_FEATURES)
        self.fc2 = nn.Linear(HIDDEN_FEATURES, OUTPUT_FEATURES)
        self.fc_yaw = nn.Linear(OUTPUT_FEATURES, NUM_CLASSES_YAW)
        self.fc_pitch = nn.Linear(OUTPUT_FEATURES, NUM_CLASSES)
        self.fc_roll = nn.Linear(OUTPUT_FEATURES, NUM_CLASSES)
        self.hardswish = nn.Hardswish()
        self.dropout = nn.Dropout(p=0.2, inplace=True)
    
    def forward(self, x):
        x = self.hardswish(self.fc1(x))
        x = self.dropout(x)
        x = self.hardswish(self.fc2(x))
        x = self.dropout(x)
        yaw = self.fc_yaw(x)
        pitch = self.fc_pitch(x)
        roll = self.fc_roll(x)
        return yaw, pitch, roll

# Modify the MobileNetV3 model to use the custom classifier
class ModifiedMobileNetV3(nn.Module):
    def __init__(self, num_classes_yaw, num_classes_pitch, num_classes_roll):
        super(ModifiedMobileNetV3, self).__init__()
        self.backbone = models.mobilenet_v3_small(pretrained=True).features
        self.pool = nn.AdaptiveAvgPool2d((1, 1))
        self.flatten = nn.Flatten()
        in_features = models.mobilenet_v3_small(pretrained=True).classifier[0].in_features
        self.classifier = CustomHead(in_features)

    def forward(self, x):
        x = self.backbone(x)
        x = self.pool(x)
        x = self.flatten(x)
        yaw, pitch, roll = self.classifier(x)
        return yaw, pitch, roll

# Instantiate the modified model
model_MNV3 = ModifiedMobileNetV3(NUM_CLASSES_YAW, NUM_CLASSES, NUM_CLASSES)

# Define device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Move the model to the GPU
model_MNV3 = model_MNV3.to(device)

# Define image preprocessing
preprocess = transforms.Compose([
    transforms.Resize((224, 224)),  # Assuming the model expects 224x224 input size
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Adjust mean and std to match your training
])

# Load the image
def load_image(image_path):
    image = Image.open(image_path)
    image = preprocess(image)
    image = image.unsqueeze(0)  # Add batch dimension
    return image

# Load the model
def load_model(model_path):
    model = ModifiedMobileNetV3(NUM_CLASSES_YAW,NUM_CLASSES,NUM_CLASSES)
    model.load_state_dict(torch.load(model_path))
    model.eval()
    return model

# Inference on a single image
def infer_single_image(model, image_tensor):
    with torch.no_grad():
        yaw_pred, pitch_pred, roll_pred = model(image_tensor)
    return yaw_pred, pitch_pred, roll_pred

# Paths
model_path = 'best_model.pth'  # Path to the saved model
image_path = "D:/Users/Seif_Eldin_Sameh/Desktop/grad_project/300WLPA_2d/test/AFW_134212_1_0_-0.492_0.469_-0.150.jpg"  # Path to the image to test

# Load image
image_tensor = load_image(image_path)

# Load model
model = load_model(model_path)

# Perform inference
yaw_pred, pitch_pred, roll_pred = infer_single_image(model, image_tensor)
print(yaw_pred.shape)
# Convert predictions to class labels
yaw_pred_label = torch.argmax(yaw_pred, dim=1).item()
pitch_pred_label = torch.argmax(pitch_pred, dim=1).item()
roll_pred_label = torch.argmax(roll_pred, dim=1).item()

# Print predictions
print(f'Yaw Prediction: {yaw_pred_label}')
print(f'Pitch Prediction: {pitch_pred_label}')
print(f'Roll Prediction: {roll_pred_label}')

torch.Size([1, 120])
Yaw Prediction: 50
Pitch Prediction: 24
Roll Prediction: 33


In [9]:
NUM_CLASSES = 66  # Number of classes for each roll and pitch
NUM_CLASSES_YAW = 120

def load_model(model_path):
    model = ModifiedMobileNetV3(NUM_CLASSES_YAW,NUM_CLASSES,NUM_CLASSES)
    model.load_state_dict(torch.load(model_path))
    model.eval()
    return model


model_path = 'best_model.pth'  # Path to the saved model
model = load_model(model_path)
model = model.to(device)
num_epochs = 100
trained_model, training_losses, validation_losses, training_accuracies, validation_accuracies = train_model(
    model, criterion, optimizer, train_loader, val_loader, num_epochs, early_stopper)

Epoch [1/100]
--------------------------------------------------
Training Batch [10/313], Loss: 17.9725
Training Batch [20/313], Loss: 17.9732
Training Batch [30/313], Loss: 17.9725
Training Batch [40/313], Loss: 17.9733
Training Batch [50/313], Loss: 17.9731
Training Batch [60/313], Loss: 17.9728
Training Batch [70/313], Loss: 17.9724
Training Batch [80/313], Loss: 17.9727
Training Batch [90/313], Loss: 17.9728
Training Batch [100/313], Loss: 17.9732
Training Batch [110/313], Loss: 17.9719
Training Batch [120/313], Loss: 17.9726
Training Batch [130/313], Loss: 17.9729
Training Batch [140/313], Loss: 17.9734
Training Batch [150/313], Loss: 17.9725
Training Batch [160/313], Loss: 17.9729
Training Batch [170/313], Loss: 17.9736
Training Batch [180/313], Loss: 17.9716
Training Batch [190/313], Loss: 17.9724
Training Batch [200/313], Loss: 17.9723
Training Batch [210/313], Loss: 17.9730
Training Batch [220/313], Loss: 17.9727
Training Batch [230/313], Loss: 17.9730
Training Batch [240/313]

# Imports

In [None]:
import numpy as np

import os


os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0'
import time
import torch
import torch.nn as nn
from torch.backends import cudnn
from torchvision import transforms
#import matplotlib
#from matplotlib import pyplot as plt

from torch.utils.tensorboard import SummaryWriter
from tqdm import tqdm
import pandas as pd
import utils

from Dataset import pose_eff_dataset,BIWI

# Setup Tensorboard

In [None]:
writer = SummaryWriter()

# Define Loss Function

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class CustomLoss(nn.Module):
    def __init__(self, alpha, beta):
        super(CustomLoss, self).__init__()
        self.alpha = alpha
        self.beta = beta
        self.regression_loss = nn.MSELoss()
        self.classification_loss = nn.CrossEntropyLoss()

    def forward(self, yaw_pred, pitch_pred, roll_pred, yaw_true, pitch_true, roll_true):
        # Squeeze true labels if necessary (only if they are one-dimensional with singleton dimensions)
        yaw_true = torch.squeeze(yaw_true, dim=1)
        pitch_true = torch.squeeze(pitch_true, dim=1)
        roll_true = torch.squeeze(roll_true, dim=1)

        # Regression losses
        reg_loss_yaw = self.regression_loss(yaw_pred.float(), yaw_true.float())
        reg_loss_pitch = self.regression_loss(pitch_pred.float(), pitch_true.float())
        reg_loss_roll = self.regression_loss(roll_pred.float(), roll_true.float())

        # Classification losses (using CrossEntropyLoss)
        cls_loss_yaw = self.classification_loss(yaw_pred, yaw_true)
        cls_loss_pitch = self.classification_loss(pitch_pred, pitch_true)
        cls_loss_roll = self.classification_loss(roll_pred, roll_true)

        # Combined loss
        reg_loss = (reg_loss_yaw*3 + reg_loss_pitch*2 + reg_loss_roll) / 6
        cls_loss = (cls_loss_yaw*3 + cls_loss_pitch*2 + cls_loss_roll) / 6

        loss = self.alpha * reg_loss + self.beta * cls_loss
        return loss

# Adjust Environment and Save folders

In [None]:
cudnn.enabled = True
snapshot=''
batch_size = 64
gpu = 0
b_scheduler = False
lr = 1e-4

In [None]:
if not os.path.exists('./output/snapshots'):
        os.makedirs('./output/snapshots')

In [None]:
if not os.path.exists('output/snapshots/{}'.format(summary_name)):
        os.makedirs('output/snapshots/{}'.format(summary_name))

# Define Model Class and Load Model

In [None]:
import torch
import torch.nn as nn
import torchvision
from torchvision import models

# Check if GPU is available
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Load the pretrained MobileNetV3 model
model_MNV3 = torchvision.models.mobilenet_v3_small(pretrained=True)

# Define constants
HIDDEN_FEATURES = 1024
OUTPUT_FEATURES = 1280
NUM_CLASSES = 66  # Number of classes for each roll and pitch
NUM_CLASSES_YAW = 120

# Define the custom classifier with three heads
class CustomHead(nn.Module):
    def __init__(self, in_features):
        super(CustomHead, self).__init__()
        self.fc1 = nn.Linear(in_features, HIDDEN_FEATURES)
        self.bn1 = nn.BatchNorm1d(HIDDEN_FEATURES)
        self.fc2 = nn.Linear(HIDDEN_FEATURES, OUTPUT_FEATURES)
        self.bn2 = nn.BatchNorm1d(OUTPUT_FEATURES)
        self.fc_yaw = nn.Linear(OUTPUT_FEATURES, NUM_CLASSES_YAW)
        self.fc_pitch = nn.Linear(OUTPUT_FEATURES, NUM_CLASSES)
        self.fc_roll = nn.Linear(OUTPUT_FEATURES, NUM_CLASSES)
        self.hardswish = nn.Hardswish()
        self.dropout = nn.Dropout(p=0.2, inplace=True)
    
    def forward(self, x):
        x = self.hardswish(self.bn1(self.fc1(x)))
        x = self.dropout(x)
        x = self.hardswish(self.bn2(self.fc2(x)))
        x = self.dropout(x)
        yaw = self.fc_yaw(x)
        pitch = self.fc_pitch(x)
        roll = self.fc_roll(x)
        return yaw, pitch, roll

# Modify the MobileNetV3 model to use the custom classifier
class ModifiedMobileNetV3(nn.Module):
    def __init__(self, num_classes_yaw, num_classes_pitch, num_classes_roll):
        super(ModifiedMobileNetV3, self).__init__()
        self.backbone = models.mobilenet_v3_small(pretrained=True).features
        self.pool = nn.AdaptiveAvgPool2d((1, 1))
        self.flatten = nn.Flatten()
        in_features = models.mobilenet_v3_small(pretrained=True).classifier[0].in_features
        self.classifier = CustomHead(in_features)

    def forward(self, x):
        x = self.backbone(x)
        x = self.pool(x)
        x = self.flatten(x)
        yaw, pitch, roll = self.classifier(x)
        return yaw, pitch, roll

# Instantiate the modified model
model_MNV3 = ModifiedMobileNetV3(NUM_CLASSES_YAW, NUM_CLASSES, NUM_CLASSES)

# Move the model to the GPU
model_MNV3 = model_MNV3.to(device)

In [None]:
if not snapshot == '':
    saved_state_dict = torch.load(snapshot)
    model.load_state_dict(saved_state_dict['model_state_dict'])

# Load from Datasets Folder

In [None]:
print('Loading data.')
pkla=pd.read_pickle("./Datasets/300W_LP/300W_LP/file.pkl")
pkla = pkla.sample(frac=1, random_state=42)
# train=pkla[:int(0.9*len(df_shuffled))]
# test=pkla[int(0.9*len(df_shuffled)):]
# test.reset_index(inplace=True)
# print(test.head())
normalize = transforms.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])

transformations = transforms.Compose([transforms.RandomResizedCrop(size=224,scale=(0.8,1)),
                                    transforms.ToTensor(),
                                    normalize])
train_pose_dataset =pose_eff_dataset('./Datasets/300W_LP/300W_LP',
                                pkla,
                                transformations)
# test_pose_dataset =pose_eff_dataset('./Datasets/300W_LP/300W_LP',
#                                 test,
#                                 transformations)
test_pose_dataset =BIWI("E:/HeadPose/Training/Datasets/BIWI_done.npz",
                        transform=transformations,
                        train_mode=False) 
train_effloader = torch.utils.data.DataLoader(
    dataset=train_pose_dataset,
    batch_size=batch_size,
    shuffle=True,
    num_workers=4)

test_effloader = torch.utils.data.DataLoader(
    dataset=test_pose_dataset,
    batch_size=batch_size,
    shuffle=True,
    num_workers=4)

# Define Criterion and Optimizer 

In [None]:
# Instantiate the custom loss with appropriate weights
criterion = CustomLoss(alpha=1.0, beta=2.0)  # Adjust alpha and beta as needed

# Use Adam optimizer
optimizer = optim.Adam(model_MNV3.classifier.parameters(), lr=1e-4, weight_decay=1e-4)

# Define EarlyStopper

In [None]:
class EarlyStopper:
    def __init__(self, patience, min_delta):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.best_score = None
        self.early_stop = False

    def early_stop(self, val_loss):
        if self.best_score is None:
            self.best_score = val_loss
            return False

        if val_loss < self.best_score - self.min_delta:
            self.best_score = val_loss
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        return self.early_stop

# Define Training Function

In [None]:
def train_model(model, criterion, optimizer, train_loader, val_loader, num_epochs=10, early_stopper=None, save_path='best_model.pth'):
    training_losses = []
    validation_losses = []
    best_val_loss = float('inf')  # Initialize with infinity
    lowest_epoch_crossval = float('inf')
    best_model = None

    for epoch in range(num_epochs):
        print(f'Epoch [{epoch + 1}/{num_epochs}]')
        print('-' * 50)

        # Training loop
        model.train()
        running_loss = 0.0
        processed_samples_train = 0
        total_samples_train = len(train_loader.dataset)

        for batch_idx, (inputs, labels) in tqdm(enumerate(train_loader), total=len(train_loader), desc=f"Epoch {epoch+1}/{num_epochs}", unit="batch"):
            inputs = inputs.to(device)
            yaw_true = labels['yaw'].to(device)
            pitch_true = labels['pitch'].to(device)
            roll_true = labels['roll'].to(device)

            optimizer.zero_grad()

            # Forward pass
            yaw_pred, pitch_pred, roll_pred = model(inputs)

            # Compute loss
            loss = criterion(yaw_pred, pitch_pred, roll_pred, yaw_true, pitch_true, roll_true)

            # Backward pass and optimization
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * inputs.size(0)
            processed_samples_train += inputs.size(0)

            # Print progress
            if (batch_idx + 1) % 10 == 0:
                print(f'Training Batch [{batch_idx + 1}/{len(train_loader)}], Loss: {loss.item():.4f}')

        epoch_loss = running_loss / total_samples_train
        training_losses.append(epoch_loss)
        print(f'Training Loss: {epoch_loss:.4f}')
        
        # Validation loop
        model.eval()
        val_running_loss = 0.0
        yaw_error = pitch_error = roll_error = 0.0
        total_samples_val = len(val_loader.dataset)

        with torch.no_grad():
            for batch_idx, (inputs, labels) in tqdm(enumerate(val_loader), total=len(val_loader), desc=f"Validation Epoch {epoch+1}/{num_epochs}", unit="batch"):
                inputs = inputs.to(device)
                yaw_true = labels['yaw'].to(device)
                pitch_true = labels['pitch'].to(device)
                roll_true = labels['roll'].to(device)
        
                yaw_pred, pitch_pred, roll_pred = model(inputs)

                # Calculate MAE for yaw, pitch, and roll
                yaw_error += torch.sum(torch.abs(yaw_true - yaw_pred))
                pitch_error += torch.sum(torch.abs(pitch_true - pitch_pred))
                roll_error += torch.sum(torch.abs(roll_true - roll_pred))
        
                # Compute loss
                loss = criterion(yaw_pred, pitch_pred, roll_pred, yaw_true, pitch_true, roll_true)
                val_running_loss += loss.item() * inputs.size(0)

                # Print progress
                if (batch_idx + 1) % 10 == 0:
                    print(f'Validation Batch [{batch_idx + 1}/{len(val_loader)}], Loss: {loss.item():.4f}')

            val_loss = val_running_loss / total_samples_val
            validation_losses.append(val_loss)

            yaw_mae = yaw_error / total_samples_val
            pitch_mae = pitch_error / total_samples_val
            roll_mae = roll_error / total_samples_val
            mae = (yaw_mae + pitch_mae + roll_mae) / 3

            print(f'Validation Loss: {val_loss:.4f}, MAE: {mae:.4f}, '
                  f'Yaw MAE: {yaw_mae:.4f}, Pitch MAE: {pitch_mae:.4f}, Roll MAE: {roll_mae:.4f}')

            # Save model with the lowest validation loss
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                best_model = model.state_dict()
                torch.save(best_model, save_path)
                print(f'New best model saved with validation loss: {val_loss:.4f}')

            # Update the best model based on MAE
            if mae < lowest_epoch_crossval:
                lowest_epoch_crossval = mae
                lowest_yaw = yaw_mae
                lowest_pitch = pitch_mae
                lowest_roll = roll_mae
                lowest_epoch_no = epoch

        # Check early stopping condition
        if early_stopper is not None and early_stopper.early_stop(val_loss):
            print("Early stopping")
            break

    print('Training complete')
    print(f"Best MAE: {lowest_epoch_crossval:.4f}, Yaw MAE: {lowest_yaw:.4f}, Pitch MAE: {lowest_pitch:.4f}, Roll MAE: {lowest_roll:.4f}, Epoch: {lowest_epoch_no}")

    return model, training_losses, validation_losses, lowest_epoch_crossval

In [None]:
early_stopper = EarlyStopper(patience=20, min_delta=0.5)
trained_model, training_losses, validation_losses, training_accuracies, validation_accuracies = train_model(
    model_MNV3, criterion, optimizer, train_effloader, test_effloader, num_epochs=100, early_stopper)