# Neuron 2-projection on-off activation coverage (2) - German Traffic Sign Recognition Benchmark (GTSRB)

For detailed explanation, please refer to the jupyter notebook 1_MNIST_Pytorch_CNN.ipydb

In [None]:
# Put these at the top of every notebook, to get automatic reloading and inline plotting
%reload_ext autoreload
%autoreload 2
%matplotlib inline

In [None]:
import numpy.random
# Fix the number for repeatability (we have also stored the trained model)
numpy.random.seed(42)

import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms


# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

Start by accessing the root folder where internally, subfolders are images with folder name being their classified result.

The dataset should be available at the following site 
http://benchmark.ini.rub.de/?section=gtsrb&subsection=dataset

Here we just pick the 26K smaller training set (the online version), as the training and test set can all be directly loaded using PyTorch included functionalities. 

In [None]:
from torchvision.datasets import ImageFolder
from torchvision.transforms import ToTensor

standard_transform = transforms.Compose([
        transforms.ToTensor(),
        # Change the image to PIL format, such that resize can be done
        transforms.ToPILImage(),
        transforms.Resize((32,32)),
        # Bring it back to tensor
        transforms.ToTensor()
    ])

# Here the data is not be normalized to [-1,1]

# Change the folder based on your specific needs. 
# This one is a smaller (26640 examples) data set (for online training), so the 
data = ImageFolder(root='data/GTSRB-Training_fixed/GTSRB/Training',  transform=standard_transform)


In [None]:
print(data.classes)

- 0 	 Speed limit (20km/h)
- 1 	 Speed limit (30km/h)
- 2 	 Speed limit (50km/h)
- 3 	 Speed limit (60km/h)
- 4 	 Speed limit (70km/h)
- 5 	 Speed limit (80km/h)
- 6 	 End of speed limit (80km/h)
- 7 	 Speed limit (100km/h)
- 8 	 Speed limit (120km/h)
- 9 	 No passing
- 10 	 No passing for vechiles over 3.5 metric tons
- 11 	 Right-of-way at the next intersection
- 12 	 Priority road
- 13 	 Yield
- 14 	 Stop
- 15 	 No vechiles
- 16 	 Vechiles over 3.5 metric tons prohibited
- 17 	 No entry
- 18 	 General caution
- 19 	 Dangerous curve to the left
- 20 	 Dangerous curve to the right
- 21 Double curve
- 22 	 Bumpy road
- 23 	 Slippery road
- 24 	 Road narrows on the right
- 25 	 Road work
- 26 	 Traffic signals
- 27 	 Pedestrians
- 28 	 Children crossing
- 29 	 Bicycles crossing
- 30 	 Beware of ice/snow
- 31 	 Wild animals crossing
- 32 	 End of all speed and passing limits
- 33 	 Turn right ahead
- 34 	 Turn left ahead
- 35 	 Ahead only
- 36 	 Go straight or right
- 37 	 Go straight or left
- 38 	 Keep right
- 39 	 Keep left
- 40 	 Roundabout mandatory
- 41 	 End of no passing
- 42 	 End of no passing by vechiles over 3.5 metric tons



In [None]:
# Hyper-parameters 
num_classes = 43
learning_rate = 0.001
sizeOfNeuronsToMonitor = 84
batch_size = 64
num_epochs = 5

In [None]:
from torch.utils.data import DataLoader

loader = DataLoader(data, batch_size=batch_size, shuffle=True)

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import util

# get some random training images
dataiter = iter(loader)
images, labels = dataiter.next()

# print(images.shape)
util.displayGTSRB(images[0].numpy())


In [None]:
import torch.nn as nn
import torch.nn.functional as F


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 40, 5)
        self.conv1_bn = nn.BatchNorm2d(40)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(40, 20, 5)
        self.conv2_bn = nn.BatchNorm2d(20)
        self.fc1 = nn.Linear(20 * 5 * 5, 240)        
        self.fc2 = nn.Linear(240, sizeOfNeuronsToMonitor)
        self.fc3 = nn.Linear(sizeOfNeuronsToMonitor, num_classes)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1_bn(self.conv1(x))))
        x = self.pool(F.relu(self.conv2_bn((self.conv2(x)))))
        # Flatten it to an array of inputs
        x = x.view(-1, 20 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x 

    def forwardWithIntermediate(self, x):
        x = self.pool(F.relu(self.conv1_bn(self.conv1(x))))
        x = self.pool(F.relu(self.conv2_bn((self.conv2(x)))))
        # Flatten it to an array of inputs
        x = x.view(-1, 20 * 5 * 5)
        x = F.relu(self.fc1(x))
        intermediateValues = F.relu(self.fc2(x))
        x = self.fc3(intermediateValues)
        return x , intermediateValues


In [None]:
net = Net()

In [None]:
# .. to load pre-trained model:
net.load_state_dict(torch.load('models/3_model_GTSRB_CNN_27k_train99%.ckpt'))

In [None]:
import torch.optim as optim

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(net.parameters(), lr=learning_rate)  

furtherTrain = False

if furtherTrain: 

    # Train the model
    total_step = len(loader)
    for epoch in range(num_epochs):
        for i, (images, labels) in enumerate(loader):  
            # Move tensors to the configured device
            labels = labels.to(device)

            # Forward pass
            outputs = net(images)
            loss = criterion(outputs, labels)

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if (i+1) % 100 == 0:
                print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}' 
                       .format(epoch+1, num_epochs, i+1, total_step, loss.item()))


    print('Finished Training')

For testing, we should also use the test data, where they are separated into folders based on their classes, followed by shuffling.

In [None]:
from torch.utils.data import DataLoader

testdata = ImageFolder(root='data/GTSRB_Online-Test-Images-Sorted/GTSRB/Online-Test-sort', transform=standard_transform)
testloader = DataLoader(testdata, shuffle=True)

In [None]:
dataiter = iter(testloader)
images, labels = dataiter.next()

util.displayGTSRB(images[0].numpy())

In [None]:
outputs = net(images)

_, predicted = torch.max(outputs, 1)

print(predicted)
print(labels)

In [None]:
class_correct = list(0. for i in range(num_classes))
class_total = list(0. for i in range(num_classes))

with torch.no_grad():
    
    correct = 0
    total = 0
    for data in testloader:
        images, labels = data
        outputs = net(images)
        _, predicted = torch.max(outputs, 1)
        c = (predicted == labels)
        label = labels[0]
        class_correct[label] += c[0].item()
        class_total[label] += 1
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
    
    print('Accuracy of the network over test images: {} %\n\n'.format(100 * correct / total))

for i in range(num_classes):
    print('Accuracy of %5s : %2d %%' % (
        i, 100 * class_correct[i] / class_total[i]))
    

In [None]:
with torch.no_grad():
    
    correct = 0
    total = 0
    for data in loader:
        images, labels = data
        outputs = net(images)
        _, predicted = torch.max(outputs, 1)
        c = (predicted == labels)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
    
    print('Accuracy of the network over train images: {} %\n\n'.format(100 * correct / total))


    

### Trigger 2-projection neuron on-off activation coverage computation


In [None]:
from nndependability.metrics import KProjection

k_Value = 2

metric = KProjection.Neuron_OnOff_KProjection_Metric(k_Value, sizeOfNeuronsToMonitor)

Trigger the function addInputs() to update the k-projection table based on all visited patterns for each batch. 

In [None]:
with torch.no_grad():

    total = 0
    i = 0
    for images, labels in loader:
        
        total = total + (len(labels))
        labels = labels.to(device)
        outputs, intermediateValues = net.forwardWithIntermediate(images)
        
        # Add the batch of neuron activation patterns to the k-projection table
        metric.addInputs(intermediateValues.numpy())
                
        if(i % 50) == 0:
            print('Current input size fed into the metric: '+str(total))
            metric.printMetricQuantity()
            print("\n")
        i = i+1
        

In [None]:
metric.printMetricQuantity()

(Optional) Now, ask the test case generator to derive us a pattern which maximally increases 2-projection coverage

In [None]:
from nndependability.atg.nap import napgen

In [None]:
napgen.proposeNAPcandidate(metric)