In [18]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms
from PIL import Image
import os
import random
import pandas as pd 
import matplotlib.pyplot as plt
from torchvision.models import resnet50,resnet152


class block(nn.Module):
    def __init__(
        self, in_channels, out_channels, identity_downsample=None, stride=1):
        super().__init__()
        self.expansion = 4
        self.conv1 = nn.Conv2d(in_channels,out_channels,kernel_size=1,stride=1,padding=0,bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels,out_channels,kernel_size=3,stride=stride,padding=1,bias=False,)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.conv3 = nn.Conv2d(out_channels,out_channels * self.expansion,kernel_size=1,stride=1,padding=0,bias=False,)
        self.bn3 = nn.BatchNorm2d(out_channels * self.expansion)
        self.relu = nn.ReLU()
        self.identity_downsample = identity_downsample
        self.stride = stride

    def forward(self, x):
        identity = x.clone()

        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        x = self.conv3(x)
        x = self.bn3(x)

        if self.identity_downsample is not None:
            identity = self.identity_downsample(identity)

        x += identity
        x = self.relu(x)
        return x
    
class ResNet(nn.Module):
    def __init__(self, block, layers, image_channels, num_classes):
        super(ResNet, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(image_channels, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # Essentially the entire ResNet architecture are in these 4 lines below
        self.layer1 = self._make_layer(block, layers[0], out_channels=64, stride=1)
        self.layer2 = self._make_layer(block, layers[1], out_channels=128, stride=2)
        self.layer3 = self._make_layer(block, layers[2], out_channels=256, stride=2)
        self.layer4 = self._make_layer(block, layers[3], out_channels=512, stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * 4, num_classes)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.reshape(x.shape[0], -1)
        x = self.fc(x)

        return x

    def _make_layer(self, block, num_residual_blocks, out_channels, stride):
        identity_downsample = None
        layers = []

        # Either if we half the input space for ex, 56x56 -> 28x28 (stride=2), or channels changes
        # we need to adapt the Identity (skip connection) so it will be able to be added
        # to the layer that's ahead
        if stride != 1 or self.in_channels != out_channels * 4:
            identity_downsample = nn.Sequential(nn.Conv2d(self.in_channels,out_channels * 4,kernel_size=1,stride=stride,bias=False)
                                                ,nn.BatchNorm2d(out_channels * 4))

        layers.append(block(self.in_channels, out_channels, identity_downsample, stride))

        # The expansion size is always 4 for ResNet 50,101,152
        self.in_channels = out_channels * 4

        # For example for first resnet layer: 256 will be mapped to 64 as intermediate layer,
        # then finally back to 256. Hence no identity downsample is needed, since stride = 1,
        # and also same amount of channels.
        for i in range(num_residual_blocks - 1):
            layers.append(block(self.in_channels, out_channels))

        return nn.Sequential(*layers)
    
def ResNet50(img_channel=3, num_classes=1000):
    return ResNet(block, [3, 4, 6, 3], img_channel, num_classes)


def ResNet101(img_channel=3, num_classes=1000):
    return ResNet(block, [3, 4, 23, 3], img_channel, num_classes)


def ResNet152(img_channel=3, num_classes=1000):
    return ResNet(block, [3, 8, 36, 3], img_channel, num_classes)


def load_model(model_path, num_classes=43, device="cpu"):
    model = resnet50(weights=None, num_classes=43).to(device)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()  # Set to evaluation mode
    print("Loading Model-----------------Status: Done")
    return model

def process_image(image_path):
    # Load the image
    image = Image.open(image_path)

    # Define transformations
    transform = transforms.Compose([
        transforms.Resize((225, 225)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),
    ])

    # Apply transformations
    image_tensor = transform(image).unsqueeze(0)
    return image_tensor, image

def predict(image_path, model, device="cpu"):
    # No need to reload the model every time you predict. Use the passed model.
    # model = load_model(model_path, device=device)
    
    image_tensor, image = process_image(image_path)  # Assume process_image returns (tensor, image)
    image_tensor = image_tensor.to(device)

    with torch.no_grad():
        outputs = model(image_tensor)
        _, predicted_class_index = torch.max(outputs, 1)

    return predicted_class_index.item(), image

def get_ground_truth_from_csv(csv_path):
    """Load the CSV file and return a dictionary with filenames as keys and labels as values."""
    data = pd.read_csv(csv_path, delimiter='\t')  # Use the tab delimiter
    print(data.columns)  # Print the columns to check
    # Use os.path.basename() to extract just the image filename
    filenames = data['Path'].apply(os.path.basename).tolist()
    truth_dict = dict(zip(filenames, data['ClassId']))
    return truth_dict

def predict_on_directory(directory_path, model_path, num_images_to_test=None, device="cpu"):
    model = load_model(model_path, device=device)
    
    class_names = pd.read_csv('gtsrb-german-traffic-sign/labels.csv')['Name'].tolist()
    
    ground_truth = get_ground_truth_from_csv('gtsrb-german-traffic-sign/Test.csv')
    print(ground_truth[:10])
    filenames = os.listdir(directory_path)
    random.shuffle(filenames)
    
    if num_images_to_test:
        filenames = filenames[:num_images_to_test]

    correct_count = 0
    total_count = 0
    print("Starting Prediction....")
    for filename in filenames:
        image_path = os.path.join(directory_path, filename)
        
        if image_path.lower().endswith('.ppm') or image_path.lower().endswith('.png'):
            predicted_class_index, image = predict(image_path, model, device)
            predicted_class_name = class_names[predicted_class_index]
            
            # Compare with ground truth
            true_class_index = ground_truth.get(filename, None)
            if true_class_index is not None:
                total_count += 1
                if predicted_class_index == true_class_index:
                    correct_count += 1
            
#             print(f"\nImage: {filename}")
#             print(f"Predicted Class ID: {predicted_class_index}")
#             print(f"Predicted Class Name: {predicted_class_name}")
#             if true_class_index is not None:
#                 print(f"True Class ID: {true_class_index}")
#                 print(f"True Class Name: {class_names[true_class_index]}")

#             plt.figure(figsize=(6, 6))
#             plt.imshow(image)
#             plt.title(f"Predicted: {predicted_class_name}")
#             plt.axis('off')
#             plt.show()

    print(f"\nTotal Images Tested: {total_count}")
    print(f"Correct Predictions: {correct_count}")
    print(f"Average Accuracy: {correct_count / total_count * 100:.2f}%")

# Assuming the ground truth CSV is named 'GT-final_test.csv' and is located inside the 'test' directory
test_directory = 'gtsrb-german-traffic-sign/Test'
model_path = "resnet50_epochs30_backdoored.pt"
mydevice = torch.device("cuda" if torch.cuda.is_available() else "cpu")
predict_on_directory(test_directory, model_path, num_images_to_test=20, device=mydevice)


Loading Model-----------------Status: Done
Index(['Width,Height,Roi.X1,Roi.Y1,Roi.X2,Roi.Y2,ClassId,Path'], dtype='object')


KeyError: 'Path'