<a href="https://colab.research.google.com/github/KamilPiatkowski1997/Lane-Line-Detection/blob/master/Traffic_signs.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#Download german traffic signs dataset for model training
!git clone https://bitbucket.org/jadslim/german-traffic-signs

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
from keras.utils.np_utils import to_categorical
from keras.layers import Dropout, Flatten
from keras.layers.convolutional import Conv2D, MaxPooling2D
from keras.preprocessing.image import ImageDataGenerator
import pickle
import pandas as pd
import random
import cv2
np.random.seed(0)

In [None]:
#opening pickle files and creating variables for testing, training and validation data
with open('german-traffic-signs/train.p','rb') as f:    #rb means read binary format.
    train_data = pickle.load(f)                                    #f is pointer
with open('german-traffic-signs/test.p','rb') as f:
    test_data = pickle.load(f)    
with open('german-traffic-signs/valid.p','rb') as f:
    valid_data = pickle.load(f) 
    
print(type(train_data))

X_train, y_train = train_data['features'], train_data['labels']
X_val, y_val = valid_data['features'], valid_data['labels']
X_test, y_test = test_data['features'], test_data['labels']

print(X_train.shape)
print(X_val.shape)
print(X_test.shape)

assert(X_train.shape[0] == y_train.shape[0]), 'The number of images is not equal to the number of labels'
assert(X_val.shape[0] == y_val.shape[0]), 'The number of images is not equal to the number of labels'
assert(X_test.shape[0] == y_test.shape[0]), 'The number of images is not equal to the number of labels'

assert(X_train.shape[1:] == (32,32,3)), "The dimensions of the images are not 32x32x3"
assert(X_val.shape[1:] == (32,32,3)), "The dimensions of the images are not 32x32x3"
assert(X_test.shape[1:] == (32,32,3)), "The dimensions of the images are not 32x32x3"

data = pd.read_csv('german-traffic-signs/signnames.csv')
print(data)


In [None]:
#Display 5 random images for each label class. If throwing error run all again.
num_of_samples = []
list_signs = []
cols = 5
num_classes = 43
fig, axs = plt.subplots(nrows=num_classes, ncols=cols, figsize=(5,50))
fig.tight_layout()

for i in range(cols):
    for j, row in data.iterrows():
        x_selected = X_train[y_train == j]
        axs[j][i].imshow(x_selected[random.randint(0, len(x_selected - 1)), :, :], cmap = plt.get_cmap("gray"))
        axs[j][i].axis("off")
        if i == 2:
            axs[j][i].set_title(str(j) + "-" + row["SignName"])
            list_signs.append(row["SignName"])
            num_of_samples.append(len(x_selected))

In [None]:
#Display dataset distribiution
print(num_of_samples)
plt.figure(figsize=(12,4))
plt.bar(range(0, num_classes), num_of_samples)
plt.title("Distribiution of the training dataset")
plt.xlabel("Class number")
plt.ylabel("Number of images")

In [None]:
#converting image into gray scale so that neural network can learn the pattern easily
def gray(img):
    img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    return img

def equalize(img):
    img = cv2.equalizeHist(img)
    return img
  #equalize histogram extract reigon of interest very correctly

def preprocess(img):
    img = gray(img)
    img = equalize(img)
    img = img/255 #normalizing of images
    return img

In [None]:
#Map preprocessed images 
X_train = np.array(list(map(preprocess, X_train)))
X_val = np.array(list(map(preprocess, X_val)))
X_test = np.array(list(map(preprocess, X_test)))

In [None]:
#Reshape mapped and preprocessed images
X_train = X_train.reshape(34799, 32, 32, 1)
X_test = X_test.reshape(12630, 32, 32, 1)
X_val = X_val.reshape(4410, 32, 32, 1)

img_rows, img_cols, channels = 32, 32, 1

#Display dataset shape
print(X_train.shape)
print(X_val.shape)
print(X_test.shape)

In [None]:
#Manipulate data within the batches for better model recognition
datagen = ImageDataGenerator(width_shift_range=0.1,
                            height_shift_range=0.1,
                            zoom_range=0.2,
                            shear_range=0.1,
                            rotation_range=10.)
datagen.fit(X_train)
# for X_batch, y_batch in
 
batches = datagen.flow(X_train, y_train, batch_size = 15)
X_batch, y_batch = next(batches)

fig, axs = plt.subplots(1, 15, figsize=(20, 5))
fig.tight_layout()

#Display batch of random 15 images
for i in range(15):
    axs[i].imshow(X_batch[i].reshape(32, 32))
    axs[i].axis("off")

print(X_batch.shape)

In [None]:
#Categorise the images 
y_train = to_categorical(y_train, 43)
y_test = to_categorical(y_test, 43)
y_val = to_categorical(y_val, 43)

In [None]:
#Define the 4 layers model for sign prediction
def modified_model(): 
    model = Sequential()
    model.add(Conv2D(60,(5,5),input_shape=(32,32,1),activation='relu'))
    model.add(Conv2D(60,(5,5),activation='relu'))
    model.add(MaxPooling2D(pool_size=(2,2)))
    
    
    model.add(Conv2D(30,(3,3),activation='relu'))
    model.add(Conv2D(30,(3,3),activation='relu'))
    model.add(MaxPooling2D(pool_size=(2,2)))
#     model.add(Dropout(0.4))

    
    model.add(Flatten())
    model.add(Dense(500,activation='relu')) 
    model.add(Dropout(0.4))
    model.add(Dense(num_classes ,activation='softmax'))
    #Compile model
    model.compile(Adam(learning_rate = 0.001), loss = 'categorical_crossentropy', metrics = ['accuracy'])
    return model

In [None]:
#Display model parameters
model = modified_model()
defence_model = modified_model()
print(model.summary())

In [None]:
#Train model
history = model.fit(datagen.flow(X_train,y_train, batch_size=50), steps_per_epoch = X_train.shape[0]/50, epochs = 10, validation_data= (X_val, y_val), shuffle = 1)
# history = model.fit(X_train, y_train, epochs = 10, validation_data = (X_val, y_val), batch_size = 400, verbose = 1, shuffle = 1)

In [None]:
#Display Loss agains epoch graph
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.legend(['loss', 'val_loss'])
plt.title('loss')
plt.xlabel('epoch')

In [None]:
#Display accuracy agains epoch graph
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.legend(['accuracy', 'val_accuracy'])
plt.title('Accuracy')
plt.xlabel('epoch')

In [None]:
#Print model score and accuracy
score = model.evaluate(X_test, y_test, verbose = 0)
print(type(score))
print('Test score:', score[0])
print('Test accuracy:', score[1])

In [None]:
#fetch image
 
import requests
from PIL import Image
url = 'https://c8.alamy.com/comp/G667W0/road-sign-speed-limit-30-kmh-zone-passau-bavaria-germany-G667W0.jpg'
# url = 'https://c8.alamy.com/comp/A0RX23/cars-and-automobiles-must-turn-left-ahead-sign-A0RX23.jpg'
# url = 'https://previews.123rf.com/images/bwylezich/bwylezich1608/bwylezich160800375/64914157-german-road-sign-slippery-road.jpg'
# url = 'https://previews.123rf.com/images/pejo/pejo0907/pejo090700003/5155701-german-traffic-sign-no-205-give-way.jpg'
# url = 'https://kids.kiddle.co/images/thumb/f/f9/STOP_sign.jpg/300px-STOP_sign.jpg'
# url = 'https://s3.eu-west-1.amazonaws.com/cdn.webfactore.co.uk/sr_279629_large.jpg'

#Preprocess image from url
r = requests.get(url, stream=True)
img = Image.open(r.raw)
plt.imshow(img, cmap=plt.get_cmap('gray'))
plt.axis("off")

img = np.asarray(img)
img = cv2.resize(img, (32, 32))
img = preprocess(img)
img = img.reshape(1, 32, 32, 1)

In [None]:
#Test image
prediction=np.argmax(model.predict(img), axis=-1)
print((prediction[0], (list_signs[prediction[0]])))

In [None]:
# Function to create adversarial pattern
def adversarial_pattern(image, label):
    image = tf.cast(image, tf.float32)
    
    with tf.GradientTape() as tape:
        tape.watch(image)
        prediction = model(image)
        loss = tf.keras.losses.MSE(label, prediction)
    gradient = tape.gradient(loss, image)
    signed_grad = tf.sign(gradient)
    return signed_grad

In [None]:
# Create a signle example of 20km/h before and after creating adversarial pattern
image = X_train[10000]
image_label = y_train[10000]
perturbations = adversarial_pattern(image.reshape((1, img_rows, img_cols, channels)), image_label).numpy()
adversarial = image + perturbations * 0.1

print("Model prediction == ",list_signs[model.predict(image.reshape((1, img_rows, img_cols, channels))).argmax()])
print("Prediction with Intrusion== ", list_signs[model.predict(adversarial).argmax()])

if channels == 1:
    plt.imshow(adversarial.reshape((img_rows, img_cols)))
else:
    plt.imshow(adversarial.reshape((img_rows, img_cols, channels)))

plt.show()

In [None]:
# Adversarial data generator
def generate_adversarials(batch_size):
    while True:
        x = []
        y = []
        for batch in range(batch_size):
            # N = random.randint(0, 34799)
            N = random.randint(0, 12630)
            label = y_train[N]
            image = X_train[N]
            perturbations = adversarial_pattern(image.reshape((1, img_rows, img_cols, channels)), label).numpy()
            epsilon = 0.1
            adversarial = image + perturbations * epsilon
            x.append(adversarial)
            y.append(y_train[N])
        x = np.asarray(x).reshape((batch_size, img_rows, img_cols, channels))
        y = np.asarray(y)
        
        yield x, y


In [None]:
# # Generate and visualize 12 adversarial images
# adversarials, correct_labels = next(generate_adversarials(12))
# for adversarial, correct_label in zip(adversarials, correct_labels):
#     print('Intrusion Prediction:', list_signs[model.predict(adversarial.reshape((1, img_rows, img_cols, channels))).argmax()], 'Truth:', list_signs[correct_label.argmax()])
#     if channels == 1:
#         plt.imshow(adversarial.reshape(img_rows, img_cols))
#     else:
#         plt.imshow(adversarial)
#     plt.show()

In [None]:
# Generate adversarial data
x_adversarial_train, y_adversarial_train = next(generate_adversarials(20000))
x_adversarial_test, y_adversarial_test = next(generate_adversarials(10000))

In [None]:
# Assess base model on adversarial data
print("Accuracy based on the first model with intrusion images:", model.evaluate(x=x_adversarial_test, y=y_adversarial_test, verbose=0))

In [None]:
#Train new defence model with the generated adversarial data
history_intrusion = defence_model.fit(datagen.flow(x_adversarial_train,y_adversarial_train, batch_size=50), epochs = 10, validation_data= (X_test, y_test), shuffle = 1)
# history_intrusion = defence_model.fit(datagen.flow(x_adversarial_train,y_adversarial_train, batch_size=50), epochs = 10, validation_data= (X_val, y_val), shuffle = 1)

In [None]:
# Assess defended model on adversarial data
print("Defended accuracy based on the new model with intrusion images:", defence_model.evaluate(x=x_adversarial_test, y=y_adversarial_test, verbose=0))

# Assess defended model on regular data
print("Defended accuracy based on the new model with regular images:", defence_model.evaluate(x=X_test, y=y_test, verbose=0))

In [None]:
# Taking example of 20km/h before and after creating new defence model
image = X_train[10000]
image_label = y_train[10000]
perturbations = adversarial_pattern(image.reshape((1, img_rows, img_cols, channels)), image_label).numpy()
adversarial = image + perturbations * 0.1

print("Model Prediction on original image = ",list_signs[model.predict(image.reshape((1, img_rows, img_cols, channels))).argmax()])
print("Defence Model Prediction on intrusion image = ", list_signs[defence_model.predict(adversarial).argmax()])

if channels == 1:
    plt.imshow(adversarial.reshape((img_rows, img_cols)))
else:
    plt.imshow(adversarial.reshape((img_rows, img_cols, channels)))

plt.show()

In [None]:
# Create test sample of 5 regular images and their labels mixed with single random attacked image and spot intrusion
test_set = datagen.flow(X_train, y_train, batch_size = 5)
X_test_set, y_test_set = next(test_set)
random_num = random.choice(range(1, 5))
i = 1
print(random_num)
for X_test_set, y_test_set in zip(X_test_set, y_test_set):
    if i == (random_num):
        perturbations = adversarial_pattern(X_test_set.reshape((1, img_rows, img_cols, channels)), y_test_set).numpy()
        adversarial = X_test_set + perturbations * 0.1
        Model_Prediction = list_signs[model.predict(adversarial.reshape((1, img_rows, img_cols, channels))).argmax()]
        Truth_label = list_signs[y_test_set.argmax()]
        print('Model Prediction:', Model_Prediction,",",  'Truth label:', Truth_label) 
        if channels == 1:
            plt.imshow(adversarial.reshape(img_rows, img_cols))
        else:
            plt.imshow(adversarial)
        plt.show()

    else:
        Model_Prediction = list_signs[model.predict(X_test_set.reshape((1, img_rows, img_cols, channels))).argmax()]
        Truth_label = list_signs[y_test_set.argmax()]
        print('Model Prediction:', Model_Prediction,",",  'Truth label:', Truth_label)
        if channels == 1:
            plt.imshow(X_test_set.reshape(img_rows, img_cols))
        else:
            plt.imshow(X_test_set)
        plt.show()
    i = i + 1
    if Model_Prediction != Truth_label:
      Defence_Model = list_signs[defence_model.predict(adversarial.reshape((1, img_rows, img_cols, channels))).argmax()]
      print("Image was attacked")
      if Defence_Model == Truth_label:
        print("Defence Model prediction:", Defence_Model)
      else:
        print("Can not detect correctly")

In [None]:
# Create test sample with 10 only attacked image and predict right label
test_set = datagen.flow(X_train, y_train, batch_size = 10)
X_test_set, y_test_set = next(test_set)
d2 = 0
nd2 = 0
for X_test_set, y_test_set in zip(X_test_set, y_test_set):
        perturbations = adversarial_pattern(X_test_set.reshape((1, img_rows, img_cols, channels)), y_test_set).numpy()
        adversarial = X_test_set + perturbations * 0.1
        Model_Prediction = list_signs[model.predict(adversarial.reshape((1, img_rows, img_cols, channels))).argmax()]
        Truth_label = list_signs[y_test_set.argmax()]
        print('Model Prediction:', Model_Prediction,",", 'Truth label:', Truth_label) 
        if channels == 1:
            plt.imshow(X_test_set.reshape(img_rows, img_cols))
        else:
            plt.imshow(X_test_set)
        plt.show()

        if Model_Prediction != Truth_label:
          Defence_Model = list_signs[defence_model.predict(adversarial.reshape((1, img_rows, img_cols, channels))).argmax()]
          print("Image was attacked")
          if Defence_Model == Truth_label:
            print("Defence Model prediction:", Defence_Model)
            d2=d2+1
          else:
            print("Can not detect correctly")
            nd2=nd2+1

In [None]:
#Basing on 10 attacks
print("Number of correct defence predictions",d2)
print("Number of not detected predictions",nd2)