Trainied with keypoints

##Train

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, WeightedRandomSampler, random_split, Dataset
from torchvision import transforms, datasets
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from collections import Counter
from PIL import Image
import os
import pandas as pd
import numpy as np
import cv2
import matplotlib.pyplot as plt

# Step 1: Define the CNN model
class CNN(nn.Module):
    def __init__(self, num_classes):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=16, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.fc1 = None  # Will be dynamically set in the forward pass
        self.fc2 = nn.Linear(128, num_classes)
        self.relu = nn.ReLU()

        # Store the activations and gradients
        self.gradients = None
        self.activations = None

    def activations_hook(self, grad):
        self.gradients = grad

    def forward(self, x):
        x = self.pool(self.relu(self.conv1(x)))
        x = self.pool(self.relu(self.conv2(x)))

        # Register hook to save gradients
        if x.requires_grad:
            h = x.register_hook(self.activations_hook)
        self.activations = x  # Save activations

        # Dynamically calculate the input size for fc1
        if self.fc1 is None:
            self.fc1 = nn.Linear(x.numel() // x.size(0), 128).to(x.device)

        x = x.view(x.size(0), -1)  # Flatten the tensor
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        return x

    def get_activations_gradient(self):
        return self.gradients

    def get_activations(self):
        return self.activations


# Step 2: Define the device (CPU or GPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Step 3: Define transformations for the training data
# Standard transform for the majority class (error)
standard_transform = transforms.Compose([
    transforms.Resize((32, 32)),  # Resize images to 32x32
    transforms.ToTensor(),         # Convert images to PyTorch tensors
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  # Normalize images
])

# Augmentation transform for the minority class (no-error)
augmentation_transform = transforms.Compose([
    transforms.Resize((32, 32)),  # Resize images to 32x32
    transforms.RandomHorizontalFlip(p=0.5),  # Flip horizontally with 50% probability
    transforms.RandomRotation(degrees=10),   # Rotate by up to 10 degrees
    transforms.ColorJitter(brightness=0.2, contrast=0.2),  # Adjust brightness and contrast
    transforms.ToTensor(),  # Convert to tensor
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  # Normalize
])


# Step 4: Custom dataset class to apply transformations dynamically
class CustomDataset(datasets.ImageFolder):
    def __init__(self, root, standard_transform=None, augmentation_transform=None):
        super().__init__(root)
        self.standard_transform = standard_transform
        self.augmentation_transform = augmentation_transform

    def __getitem__(self, index):
        path, label = self.imgs[index]
        image = Image.open(path).convert("RGB")  # Ensure image is in RGB format

        # Apply augmentation to the minority class (no-error)
        if label == 1 and self.augmentation_transform:
            image = self.augmentation_transform(image)
        else:
            image = self.standard_transform(image)

        return image, label

# Step 5: Define the UnlabeledDataset class for test data
class UnlabeledDataset(Dataset):
    def __init__(self, root, transform=None):
        self.root = root
        self.transform = transform
        self.image_paths = sorted([os.path.join(root, fname) for fname in os.listdir(root) if fname.endswith(('.jpg', '.png', '.jpeg'))])

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        image = Image.open(image_path).convert("RGB")  # Ensure image is in RGB format
        if self.transform:
            image = self.transform(image)
        return image, os.path.basename(image_path)  # Return image and filename

# Step 6: Load the training dataset
train_data_dir = "/content/drive/MyDrive/E-RAU(DB)/MA680/data/Shooting/Processed_Frames/TrainingKeypoints"
train_dataset = CustomDataset(
    root=train_data_dir,
    standard_transform=standard_transform,
    augmentation_transform=augmentation_transform
)

# Debug: Print dataset size
print(f"Total dataset size: {len(train_dataset)}")

# Step 7: Calculate class weights for handling imbalance
class_counts = Counter([label for _, label in train_dataset])
total_samples = sum(class_counts.values())
class_weights = [total_samples / class_counts[i] for i in range(len(class_counts))]
class_weights = torch.tensor(class_weights, dtype=torch.float32).to(device)

print(f"Class weights: {class_weights}")

# Step 8: Calculate sample weights for the entire dataset
sample_weights = [class_weights[label] for _, label in train_dataset]
print(f"Sample weights length: {len(sample_weights)}")

# Step 9: Split the dataset into training and validation sets
train_size = int(0.8 * len(train_dataset))
val_size = len(train_dataset) - train_size
print(f"Train size: {train_size}, Validation size: {val_size}")

train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])

# Step 10: Create a sampler for the training subset
train_indices = train_dataset.indices
train_sample_weights = [sample_weights[i] for i in train_indices]
sampler = WeightedRandomSampler(train_sample_weights, num_samples=len(train_dataset), replacement=True)

# Step 11: Create DataLoader for the training set with the sampler
train_loader = DataLoader(train_dataset, batch_size=32, sampler=sampler)

# Step 12: Create DataLoader for the validation set
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

# Step 13: Define the loss function with class weights
criterion = nn.CrossEntropyLoss(weight=class_weights)

# Step 14: Define the optimizer and learning rate scheduler
model = CNN(num_classes=2).to(device)  # 2 classes: 0.Error and 1.NoError
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)  # Reduce LR every 5 epochs

# Step 15: Training loop with early stopping
num_epochs = 10
best_val_loss = float('inf')
patience = 3
trigger_times = 0

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for images, labels in train_loader:
        images = images.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()

        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

        optimizer.step()
        running_loss += loss.item()

    # Learning rate scheduling
    scheduler.step()

    # Print training loss
    print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {running_loss / len(train_loader):.4f}")

    # Validation loop
    model.eval()
    val_loss = 0.0
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for images, labels in val_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # Compute validation metrics
    val_loss /= len(val_loader)
    val_accuracy = accuracy_score(all_labels, all_preds)
    val_precision = precision_score(all_labels, all_preds, average='binary', pos_label=1)
    val_recall = recall_score(all_labels, all_preds, average='binary', pos_label=1)
    val_f1 = f1_score(all_labels, all_preds, average='binary', pos_label=1)
    print(f"Validation Loss: {val_loss:.4f}, "
          f"Accuracy: {val_accuracy:.4f}, Precision: {val_precision:.4f}, "
          f"Recall: {val_recall:.4f}, F1: {val_f1:.4f}")

    # Early stopping
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        trigger_times = 0
        torch.save(model.state_dict(), "best_model.pth")  # Save the best model
    else:
        trigger_times += 1
        if trigger_times >= patience:
            print("Early stopping!")
            break

print("Training complete!")

# Step 16: Grad-CAM Implementation
def grad_cam(model, image, target_class=None):
    """
    Generate a Grad-CAM heatmap for a given image and model.
    """
    model.eval()
    image = image.unsqueeze(0).to(device)  # Add batch dimension
    image.requires_grad = True  # Enable gradient computation

    # Forward pass
    output = model(image)
    if target_class is None:
        target_class = output.argmax(dim=1).item()  # Use predicted class if target_class is None

    # Backward pass to get gradients
    model.zero_grad()
    output[0, target_class].backward()

    # Get activations and gradients
    activations = model.get_activations().cpu().detach().numpy()  # Shape: (batch_size, channels, height, width)
    gradients = model.get_activations_gradient().cpu().detach().numpy()  # Shape: (batch_size, channels, height, width)

    # Compute the weights (global average pooling of gradients)
    weights = np.mean(gradients, axis=(2, 3))  # Shape: (batch_size, channels)

    # Compute the Grad-CAM heatmap
    heatmap = np.zeros(activations.shape[2:], dtype=np.float32)  # Shape: (height, width)
    for i in range(activations.shape[1]):  # Iterate over channels
        heatmap += weights[0, i] * activations[0, i]

    # Apply ReLU to the heatmap
    heatmap = np.maximum(heatmap, 0)

    # Normalize the heatmap
    heatmap = (heatmap - heatmap.min()) / (heatmap.max() - heatmap.min())

    return heatmap

def visualize_gradcam(image, heatmap, alpha=0.5):
    """
    Overlay the Grad-CAM heatmap on the original image with a color bar.

    Args:
        image: Original image (PyTorch tensor or PIL image).
        heatmap: Grad-CAM heatmap (numpy array).
        alpha: Transparency of the heatmap.

    Returns:
        None (displays the visualization).
    """
    # Convert image to numpy array if it's a PyTorch tensor
    if isinstance(image, torch.Tensor):
        image = image.cpu().numpy()  # Convert to NumPy array
        image = np.transpose(image, (1, 2, 0))  # Change shape from (C, H, W) to (H, W, C)
        image = (image * 255).astype(np.uint8)  # Scale to [0, 255] and convert to uint8

    # Upsample the heatmap to match the original image size
    heatmap = cv2.resize(heatmap, (image.shape[1], image.shape[0]), interpolation=cv2.INTER_LINEAR)

    # Normalize the heatmap to [0, 1] for better visualization
    heatmap = (heatmap - heatmap.min()) / (heatmap.max() - heatmap.min())

    # Convert heatmap to 0-255 range and apply colormap
    heatmap = np.uint8(255 * heatmap)
    heatmap = cv2.applyColorMap(heatmap, cv2.COLORMAP_JET)

    # Overlay the heatmap on the image
    superimposed_img = cv2.addWeighted(image, 1 - alpha, heatmap, alpha, 0)

    # Display the result with a color bar
    fig, ax = plt.subplots()
    im = ax.imshow(superimposed_img)
    plt.axis('off')

    # Add color bar
    cbar = plt.colorbar(im, ax=ax, fraction=0.046, pad=0.04)
    cbar.set_label("Importance", rotation=270, labelpad=15)

    plt.show()

# Step 17: Visualize Grad-CAM for a sample image
sample_image, sample_label = train_dataset[0]  # Replace with your dataset
heatmap = grad_cam(model, sample_image)
visualize_gradcam(sample_image, heatmap)

##Evaluate

In [None]:
# Define the test dataset
test_data_dir = "/content/drive/MyDrive/E-RAU(DB)/MA680/data/Shooting/Processed_Frames/X.Test/Frames"
test_dataset = UnlabeledDataset(root=test_data_dir, transform=standard_transform)

# Create a DataLoader for the test dataset
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Load the best model (if not already loaded)
model.load_state_dict(torch.load("best_model.pth"))
model.eval()

# Initialize lists to store predictions and filenames
all_preds = []
all_filenames = []

# Run the model on the test dataset (0 indicates error, 1 indicates no errro)
with torch.no_grad():
    for images, filenames in test_loader:
        images = images.to(device)
        outputs = model(images)
        _, preds = torch.max(outputs, 1)
        all_preds.extend(preds.cpu().numpy())
        all_filenames.extend(filenames)

# Save predictions to a CSV file
results = pd.DataFrame({"Filename": all_filenames, "Prediction": all_preds})
results.to_csv("test_predictions.csv", index=False)
print("Test predictions saved to test_predictions.csv")

# Load ground truth labels (if available)
# Replace this with your actual ground truth labels
ground_truth_labels = [...]  # List of labels corresponding to the test images

# Compute evaluation metrics
if len(ground_truth_labels) == len(all_preds):
    accuracy = accuracy_score(ground_truth_labels, all_preds)
    precision = precision_score(ground_truth_labels, all_preds, average='binary', pos_label=1)
    recall = recall_score(ground_truth_labels, all_preds, average='binary', pos_label=1)
    f1 = f1_score(ground_truth_labels, all_preds, average='binary', pos_label=1)

    print(f"Test Accuracy: {accuracy:.4f}")
    print(f"Test Precision: {precision:.4f}")
    print(f"Test Recall: {recall:.4f}")
    print(f"Test F1-Score: {f1:.4f}")
else:
    print("Ground truth labels not provided or do not match predictions.")

# Visualize Grad-CAM for a few test images
num_samples = 20  # Number of test images to visualize
for i in range(num_samples):
    sample_image, sample_filename = test_dataset[i]
    heatmap = grad_cam(model, sample_image)
    visualize_gradcam(sample_image, heatmap)
    print(f"Filename: {sample_filename}, Prediction: {all_preds[i]}")