In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as T
from torchvision.models import resnet18
import numpy as np
import shap
import cv2
import json
import pandas as pd
from tqdm import tqdm
import matplotlib.pyplot as plt
import segmentation_models_pytorch as smp


In [2]:
%matplotlib inline


In [3]:

# --------------------------
# Step 1: Configuration
# --------------------------
class Config:
    IMAGE_DIR = "./BDDD100K/train/images"
    LABEL_FILE = "./BDDD100K/train/annotations/bdd100k_labels_images_train.json"
    SEG_LABEL_DIR = "bdd100k/labels/segmentation"
    NUM_CLASSES = 9  # [brake, steer_left, steer_right, accelerate, lane_change_left, lane_change_right, maintain_lane, stop_completely, overtake]
    INPUT_SIZE = (224, 224)
    BATCH_SIZE = 32
    EPOCHS = 0
    DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

    @classmethod
    def print_cuda_info(cls):
        print("\nCUDA Information:")
        print(f"CUDA available: {torch.cuda.is_available()}")
        if torch.cuda.is_available():
            print(f"Current device: {torch.cuda.current_device()}")
            print(f"Device name: {torch.cuda.get_device_name()}")
            print(f"Device count: {torch.cuda.device_count()}")
            print(f"Memory allocated: {torch.cuda.memory_allocated(0) / 1024**2:.2f} MB")
            print(f"Memory cached: {torch.cuda.memory_reserved(0) / 1024**2:.2f} MB")

config = Config()



In [4]:
# --------------------------
# Step 2: Dataset Preparation
# --------------------------
class BDD100KHMI(Dataset):
    def __init__(self, split='train', transform=None):
        if split == 'train':
            self.image_dir = config.IMAGE_DIR
            label_file = config.LABEL_FILE
        elif split == 'val':
            self.image_dir = config.IMAGE_DIR
            label_file = config.LABEL_FILE
        elif split == 'test':
            self.image_dir = config.IMAGE_DIR
            self.data = []
            return

        with open(label_file, 'r') as f:
            self.data = json.load(f)

        self.transform = transform or T.Compose([
            T.ToPILImage(),
            T.Resize(config.INPUT_SIZE),
            T.ToTensor(),
            T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        entry = self.data[idx]
        img_path = f"{self.image_dir}/{entry['name']}"
        image = cv2.cvtColor(cv2.imread(img_path), cv2.COLOR_BGR2RGB)

        if self.transform:
            image = self.transform(image)

        if not hasattr(self, 'data') or not self.data:
            return image

        label = 6  # Default: Maintain Lane
        for obj in entry.get('labels', []):
            if obj['category'] == 'pedestrian':
                label = 0  # Brake
            elif obj['category'] == 'car':
                label = 1  # Steer Left
            elif obj['category'] == 'traffic light' and obj.get('attributes', {}).get('trafficLightColor') == 'red':
                label = 2  # Steer Right
            elif obj['category'] == 'bicycle':
                label = 3  # Accelerate
            elif obj['category'] == 'lane_marking' and obj.get('attributes', {}).get('change') == 'left':
                label = 4  # Lane Change Left
            elif obj['category'] == 'lane_marking' and obj.get('attributes', {}).get('change') == 'right':
                label = 5  # Lane Change Right
            elif obj['category'] == 'stop_sign':
                label = 7  # Stop Completely
            elif obj['category'] == 'slow_vehicle':
                label = 8  # Overtake

        return image, label
    
    def get_all_labels(self):
        return [self.__getitem__(i)[1] for i in range(len(self))]

In [5]:
# --------------------------
# Step 3: Model Definition
# --------------------------
def modify_relu_inplace(model):
    for module in model.modules():
        if isinstance(module, nn.ReLU):
            module.inplace = False
    return model

def build_models():
    # Object Detection Model (YOLOv5)
    obj_detector = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True)

    # Segmentation Model (U-Net)
    seg_model = smp.Unet(encoder_name="resnet18", encoder_weights="imagenet", in_channels=3, classes=1)

    # Decision Model
    decision_model = resnet18(pretrained=True)
    decision_model.fc = nn.Linear(decision_model.fc.in_features, config.NUM_CLASSES)

    # Disable in-place operations
    decision_model = modify_relu_inplace(decision_model)

    return obj_detector, seg_model.to(config.DEVICE), decision_model.to(config.DEVICE)



In [6]:
# --------------------------
# Step 4: Model Training
# --------------------------
def train_model(model, dataloader):
    print("\nStarting Training...")
    model.train()
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    for epoch in range(config.EPOCHS):
        running_loss = 0.0
        correct = 0
        total = 0

        for images, labels in tqdm(dataloader, desc=f"Epoch {epoch+1}/{config.EPOCHS}"):
            images, labels = images.to(config.DEVICE), labels.to(config.DEVICE)

            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        epoch_loss = running_loss / len(dataloader)
        accuracy = 100 * correct / total
        print(f"Epoch {epoch+1}: Loss = {epoch_loss:.4f}, Accuracy = {accuracy:.2f}%")
    print("Training Completed.\n")

In [None]:
# --------------------------
# Step 5: Explainability with SHAP
# --------------------------
# def explain_with_shap(decision_model, dataset):
#     print("\nGenerating SHAP explanations...")
#     decision_model.eval()
#     background = torch.stack([dataset[i][0] for i in range(50)]).to(config.DEVICE)
#     explainer = shap.GradientExplainer(decision_model, background)

#     test_images = torch.stack([dataset[i][0] for i in range(5)]).to(config.DEVICE)
#     test_images = test_images.clone().detach().requires_grad_(True)

#     shap_values = explainer.shap_values(test_images)
#     for i in range(5):
#         plt.figure(figsize=(5, 5))
#         image = np.transpose(test_images[i].cpu().detach().numpy(), (1, 2, 0))
#         image = np.clip(image, 0, 1)  # Clip to valid range [0, 1]
#         shap.image_plot(shap_values, image)
#         plt.show()


In [None]:
# # --------------------------
# # Step 5: Explainability with SHAP
# # --------------------------
def explain_with_shap(decision_model, dataset):
    print("\nGenerating SHAP explanations...")
    decision_model.eval()
    
    # 1. Prepare background and test data
    background = torch.stack([dataset[i][0] for i in range(50)]).to(config.DEVICE)
    test_images = torch.stack([dataset[i][0] for i in range(5)]).to(config.DEVICE)
    
    # 2. Initialize SHAP explainer
    explainer = shap.GradientExplainer(decision_model, background)
    
    # 3. Compute SHAP values
    shap_values = explainer.shap_values(test_images)
    
    # 4. Preprocess images for visualization
    test_images_vis = test_images.cpu().detach().numpy()
    
    # 5. Fix shape and normalization issues
    for i in range(5):
        # Transpose from (C, H, W) to (H, W, C)
        image = np.transpose(test_images_vis[i], (1, 2, 0))
        
        # If normalized during preprocessing, denormalize:
        # image = (image * dataset.std) + dataset.mean  # Replace with actual mean/std
        
        # Clip to [0, 1] range
        image = np.clip(image, 0, 1)
        
        # Plot SHAP explanations
        plt.figure(figsize=(5, 5))
        plt.imshow(image)
        plt.title("Original Image")
        plt.axis('off')
        plt.show()
        
        # Plot SHAP heatmap
        shap.image_plot(
            [shap_values[i]],  # SHAP values for this image
            image[np.newaxis, ...],  # Add batch dimension
            show=False
        )
        plt.title("SHAP Explanation")
        plt.axis('off')
        plt.show(block=True)
        # plt.close()

In [None]:
import torch
import shap
import numpy as np
import matplotlib.pyplot as plt

def model_wrapper(model, device, input_shape):
    """Wraps the PyTorch model to accept flattened NumPy inputs and return predictions."""
    def predict_fn(input_array):
        input_tensor = torch.tensor(input_array, dtype=torch.float32).to(device)
        input_tensor = input_tensor.view(-1, *input_shape)  # Reshape back to (batch, C, H, W)
        
        with torch.no_grad():
            output_tensor = model(input_tensor)
        return output_tensor.cpu().numpy()
    
    return predict_fn

def explain_with_shap(decision_model, dataset):
    print("\nGenerating SHAP explanations...")
    
    decision_model.eval()
    
    # Move model to CPU if GPU memory is limited
    device = "cuda" if torch.cuda.is_available() and torch.cuda.get_device_properties(0).total_memory >= 5000 else "cpu"
    decision_model.to(device)
    
    # Reduce background data size for efficiency
    background = torch.stack([dataset[i][0] for i in range(10)]).to(device)
    test_images = torch.stack([dataset[i][0] for i in range(3)]).to(device)
    
    # Convert tensors to NumPy (SHAP needs NumPy)
    background_np = background.cpu().detach().numpy()
    test_images_np = test_images.cpu().detach().numpy()
    
    # Flatten images for SHAP (Convert 4D tensor → 2D array)
    background_flat = background_np.reshape(background_np.shape[0], -1)  # (batch, C*H*W)
    test_images_flat = test_images_np.reshape(test_images_np.shape[0], -1)  # (batch, C*H*W)
    
    # Wrap model with reshaping logic
    input_shape = background_np.shape[1:]  # Get (C, H, W)
    predict_fn = model_wrapper(decision_model, device, input_shape)
    
    # Use Kernel SHAP (works with any model)
    explainer = shap.KernelExplainer(predict_fn, background_flat)
    shap_values = explainer.shap_values(test_images_flat)
    
    # Reshape SHAP values back to image format (batch, C, H, W)
    shap_values_reshaped = np.array(shap_values).reshape(-1, *input_shape)  # (batch, C, H, W)
    
    # Fix image shape for visualization (Convert to (batch, H, W, C))
    test_images_vis = np.transpose(test_images_np, (0, 2, 3, 1))  # (batch, H, W, C)
    shap_values_vis = np.transpose(shap_values_reshaped, (0, 2, 3, 1))  # (batch, H, W, C)
    
    # Plot SHAP explanations
    for i in range(len(test_images_vis)):
        plt.figure(figsize=(5, 5))
        plt.imshow(test_images_vis[i])
        plt.title("Original Image")
        plt.axis('off')
        plt.show()

        shap.image_plot([shap_values_vis[i]], [test_images_vis[i]])  # Ensure correct shape
        plt.show(block=True)  # Force rendering


In [10]:
# --------------------------
# Step 6: Main Pipeline
# --------------------------
if __name__ == "__main__":
    print("\n=== Starting BDD100K HMI Model Pipeline ===\n")

    # Add this right at the start
    Config.print_cuda_info()

    # Force CUDA memory clearance if available
    if torch.cuda.is_available():
        torch.cuda.empty_cache()

    print("1. Initializing Datasets...")
    train_dataset = BDD100KHMI(split='train')
    print(f"   - Train dataset size: {len(train_dataset)} samples")
    val_dataset = BDD100KHMI(split='val')
    print(f"   - Validation dataset size: {len(val_dataset)} samples")
    print("✓ Datasets initialized successfully\n")

    print("2. Creating DataLoaders...")
    train_dataloader = DataLoader(train_dataset, batch_size=config.BATCH_SIZE, shuffle=True)
    print(f"   - Train batches: {len(train_dataloader)}")
    val_dataloader = DataLoader(val_dataset, batch_size=config.BATCH_SIZE, shuffle=False)
    print(f"   - Validation batches: {len(val_dataloader)}")
    print("✓ DataLoaders created successfully\n")

    print("3. Building Models...")
    print("   - Loading YOLOv5...")
    obj_detector, seg_model, decision_model = build_models()
    print("   - Loading Segmentation Model...")
    print("   - Loading Decision Model...")
    print(f"✓ All models loaded successfully (using {config.DEVICE})\n")

    print("4. Starting Model Training...")
    train_model(decision_model, train_dataloader)
    print("✓ Training completed\n")

    print("5. Generating SHAP Explanations...")
    explain_with_shap(decision_model, train_dataset)
    print("✓ SHAP analysis completed\n")

    print("6. Collecting Decision Labels...")
    # decision_labels = train_dataset.get_all_labels()
    # print("   Decision Labels Distribution:")
    # unique_labels, counts = np.unique(decision_labels, return_counts=True)
    # for label, count in zip(unique_labels, counts):
    #     print(f"   - Class {label}: {count} samples")
    print("✓ Label collection completed\n")

    print("=== Pipeline Completed Successfully ===")



=== Starting BDD100K HMI Model Pipeline ===


CUDA Information:
CUDA available: True
Current device: 0
Device name: NVIDIA GeForce GTX 1650
Device count: 1
Memory allocated: 116.02 MB
Memory cached: 180.00 MB
1. Initializing Datasets...
   - Train dataset size: 69863 samples
   - Validation dataset size: 69863 samples
✓ Datasets initialized successfully

2. Creating DataLoaders...
   - Train batches: 2184
   - Validation batches: 2184
✓ DataLoaders created successfully

3. Building Models...
   - Loading YOLOv5...


Using cache found in C:\Users\VICTUS/.cache\torch\hub\ultralytics_yolov5_master
YOLOv5  2025-2-2 Python-3.12.8 torch-2.6.0+cu118 CUDA:0 (NVIDIA GeForce GTX 1650, 4096MiB)

Fusing layers... 
YOLOv5s summary: 213 layers, 7225885 parameters, 0 gradients, 16.4 GFLOPs
Adding AutoShape... 


   - Loading Segmentation Model...
   - Loading Decision Model...
✓ All models loaded successfully (using cuda)

4. Starting Model Training...

Starting Training...
Training Completed.

✓ Training completed

5. Generating SHAP Explanations...

Generating SHAP explanations...


ValueError: axes don't match array

In [None]:
import torch
import shap
import numpy as np
import matplotlib.pyplot as plt

def explain_with_shap(decision_model, dataset):
    print("\nGenerating SHAP explanations...")
    
    decision_model.eval()
    
    # Move model to CPU if GPU memory is limited
    device = "cuda" if torch.cuda.is_available() and torch.cuda.get_device_properties(0).total_memory >= 5000 else "cpu"
    decision_model.to(device)
    
    # Reduce background data size for efficiency (Limit to 5 samples)
    background = torch.stack([dataset[i][0] for i in range(5)]).to(device)
    test_images = torch.stack([dataset[i][0] for i in range(2)]).to(device)  # Test on only 2 images
    
    # Use SHAP GradientExplainer (more memory-efficient for CNNs)
    explainer = shap.GradientExplainer(decision_model, background)
    shap_values = explainer.shap_values(test_images)
    
    # Convert tensors to NumPy for visualization
    test_images_vis = test_images.cpu().detach().numpy()
    
    # Ensure proper shape (batch, H, W, C) for visualization
    test_images_vis = np.transpose(test_images_vis, (0, 2, 3, 1))  # (batch, H, W, C)
    
    # Plot SHAP explanations
    for i in range(len(test_images_vis)):
        plt.figure(figsize=(5, 5))
        plt.imshow(test_images_vis[i])
        plt.title("Original Image")
        plt.axis('off')
        plt.show()

        shap.image_plot([shap_values[i]], [test_images_vis[i]])  # Ensure correct shape
        plt.show(block=True)  # Force rendering


In [None]:
import torch
import shap
import numpy as np
import matplotlib.pyplot as plt
import os

def explain_with_shap(decision_model, dataset, save_dir="shap_outputs"):
    print("\nGenerating SHAP explanations...")
    
    decision_model.eval()
    
    # Ensure output directory exists
    os.makedirs(save_dir, exist_ok=True)
    
    # Move model to CPU if GPU memory is limited
    device = "cuda" if torch.cuda.is_available() and torch.cuda.get_device_properties(0).total_memory >= 5000 else "cpu"
    decision_model.to(device)
    
    # Reduce background data size for efficiency
    background = torch.stack([dataset[i][0] for i in range(5)]).to(device)
    test_images = torch.stack([dataset[i][0] for i in range(2)]).to(device)  # Test on 2 images
    
    # Use SHAP GradientExplainer
    explainer = shap.GradientExplainer(decision_model, background)
    shap_values = explainer.shap_values(test_images)
    
    # Convert tensors to NumPy for visualization
    test_images_vis = test_images.cpu().detach().numpy()
    
    # Ensure proper shape (batch, H, W, C) for visualization
    test_images_vis = np.transpose(test_images_vis, (0, 2, 3, 1))  # (batch, H, W, C)
    
    # Plot & Save SHAP explanations
    for i in range(len(test_images_vis)):
        # Save original image
        plt.figure(figsize=(5, 5))
        plt.imshow(test_images_vis[i])
        plt.title(f"Original Image {i+1}")
        plt.axis('off')
        img_path = os.path.join(save_dir, f"original_image_{i+1}.png")
        plt.savefig(img_path, bbox_inches='tight', dpi=300)
        plt.show()

        # Save SHAP heatmap
        shap_fig, shap_ax = plt.subplots(figsize=(5, 5))
        shap.image_plot([shap_values[i]], [test_images_vis[i]], show=False)
        shap_img_path = os.path.join(save_dir, f"shap_explanation_{i+1}.png")
        shap_fig.savefig(shap_img_path, bbox_inches='tight', dpi=300)
        plt.show()
        
        print(f"✅ Saved: {img_path} & {shap_img_path}")


In [9]:
import torch
import shap
import numpy as np
import matplotlib.pyplot as plt
import os

def explain_with_shap(decision_model, dataset, save_dir="shap_outputs", 
                      mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]):
    print("\nGenerating SHAP explanations...")
    
    decision_model.eval()
    os.makedirs(save_dir, exist_ok=True)
    
    # Use CPU if CUDA memory < 5GB
    device = "cuda" if torch.cuda.is_available() and torch.cuda.get_device_properties(0).total_memory >= 5e9 else "cpu"
    decision_model.to(device)
    
    # Reduce background/test images (smaller batch)
    background = torch.stack([dataset[i][0] for i in range(2)]).to(device)
    test_images = torch.stack([dataset[i][0] for i in range(2)]).to(device)

    # SHAP GradientExplainer
    explainer = shap.GradientExplainer(decision_model, background)
    shap_values = explainer.shap_values(test_images)  # Returns list of arrays (one per class)
    
    # Convert tensors to NumPy and denormalize images
    test_images_vis = test_images.cpu().numpy()
    
    # Proper denormalization: (x_normalized * std) + mean
    test_images_vis = test_images_vis * np.array(std)[None, :, None, None] + np.array(mean)[None, :, None, None]
    test_images_vis = np.clip(test_images_vis, 0, 1)  # Ensures proper visualization
    test_images_vis = np.transpose(test_images_vis, (0, 2, 3, 1))  # (batch, H, W, C)

    # Handle SHAP values shape
    if isinstance(shap_values, list):
        shap_values = np.array(shap_values)  # Convert list to array
        shap_values = np.mean(shap_values, axis=0)  # Aggregate across output classes
    
    # Ensure SHAP values have correct shape (batch, H, W, C)
    shap_values_vis = np.transpose(shap_values, (0, 2, 3, 1))  # (batch, H, W, C)
    
    # Plot and save images
    for i in range(len(test_images_vis)):
        # Save original image
        fig, ax = plt.subplots(figsize=(5, 5))
        ax.imshow(test_images_vis[i])
        ax.axis('off')
        img_path = os.path.join(save_dir, f"original_{i}.png")
        plt.savefig(img_path, bbox_inches='tight', dpi=300)
        plt.close()
        
        # Save SHAP explanation
        shap_fig = plt.figure(figsize=(5, 5))
        shap.image_plot(
            [shap_values_vis[i].mean(axis=-1, keepdims=True)],  # Take mean SHAP values
            test_images_vis[i][np.newaxis, ...], 
            show=False
        )
        shap_path = os.path.join(save_dir, f"shap_{i}.png")
        shap_fig.savefig(shap_path, bbox_inches='tight', dpi=300)
        plt.close()
        
        print(f"✅ Saved: {img_path} | {shap_path}")

    return test_images_vis, shap_values_vis
