In [None]:
# Test Security in YoloV5

# Author: Fabrizio Serpi


# This notebook is aimed at testing YoloV5 security. In particular it is used a Projected Gradient Descent (PGD)
# algorithm, which is a white-box attack implemented in the Adversarial Robustness Toolbox from IBM:
#     https://adversarial-robustness-toolbox.readthedocs.io/en/latest/.
# The model under attack has been trained on the Self-Driving-Car-small dataset from: 
#     https://public.roboflow.com/object-detection/self-driving-car,
# and can be found in the same repository this notebook is in.

In [None]:
# IMPORTS
import torch
from skimage.io import imread, imsave
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import os
import random
import shutil
import tqdm.notebook as tq
from pathlib import Path
import yaml
from yaml.loader import SafeLoader
from distutils.dir_util import copy_tree
from datetime import datetime
from IPython.utils import io
import logging
import ipynbname
from IPython.core.interactiveshell import InteractiveShell
import cv2


# Install YoloV5 as a wheel from pip
import yolov5
from yolov5.utils.loss import ComputeLoss

# Adversarial Robustness Toolbox
from art.estimators.object_detection.pytorch_yolo import PyTorchYolo
from art.attacks.evasion import ProjectedGradientDescent

In [None]:
# Setup logging for Stream and File output
cell_label = '[## LOGGING SETUP] - '

notebook_name = ipynbname.name()


# Retrieve date for log file name
now = datetime.now()
# Date and Time format: dd-mm-YY--H:M:S
date_time = now.strftime("%d-%m-%Y--%H-%M-%S")

logger = logging.getLogger(notebook_name)
logger.setLevel(logging.DEBUG)

# Append logs to a file
logs_folder = "./logs"
if not os.path.exists(logs_folder):
    os.mkdir(logs_folder)


log_filename = "{logs_folder}/{notebook_name}_{date_time}.log".format(logs_folder=logs_folder, notebook_name=notebook_name, date_time=date_time)
fhandler = logging.FileHandler(filename=log_filename, mode='a')
file_formatter = logging.Formatter('[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s')
fhandler.setFormatter(file_formatter)
fhandler.setLevel(logging.DEBUG)
logger.addHandler(fhandler)


# Setup logs for output stream
consoleHandler = logging.StreamHandler(sys.stdout)
console_formatter = logging.Formatter('[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s')
consoleHandler.setFormatter(console_formatter)
consoleHandler.setLevel(logging.DEBUG)
logger.addHandler(consoleHandler)


logger.info(cell_label + "Logging Setup Completed. Log file: {}, Notebook: {}".format(log_filename, notebook_name))

In [None]:
# GPU SETTINGS
cell_label = '[## GPU SETTINGS] - '

logger.info("[------------------------------------------------------------------------------------]")
logger.info(cell_label + "START")

# Display full output
InteractiveShell.ast_node_interactivity = "all"

# Set an Nvidia card if you have more than one
%env CUDA_DEVICE_ORDER=PCI_BUS_ID
%env CUDA_VISIBLE_DEVICES=1

# Check which GPU card is set
logger.info(cell_label + "Selected Nvidia GPU: {}".format(str(torch.cuda.get_device_name())))


# Set device
device = torch.device("cuda")


logger.info(cell_label + "FINISH")
logger.info("[------------------------------------------------------------------------------------]\n")

In [None]:
# Create folder for test results
cell_label = '[## CREATE FOLDER TEST RESULTS] - '

logger.info("[------------------------------------------------------------------------------------]")
logger.info(cell_label + "START")


test_folder_main = "yolov5_test_security"
test_folder_main_path = os.path.join("./", test_folder_main)

if(not os.path.isdir(test_folder_main_path)):
    logger.info(cell_label + "Create test main folder: {}".format(test_folder_main_path))
    os.mkdir(test_folder_main_path)

else:
    logger.info(cell_label + "Test main folder: {} already exists".format(test_folder_main_path))
    
logger.info(cell_label + "FINISH")
logger.info("[------------------------------------------------------------------------------------]\n")

In [None]:
# Load Test images for model attack
#    In this case we are going to use the test set held out from model training and validation
cell_label = '[## LOAD TEST IMAGES] - '

logger.info("[------------------------------------------------------------------------------------]")
logger.info(cell_label + "START")


yolov5_data_folder = "./data_train_test_small"
test_images_path = "./data_train_test_small/test/images"
test_labels_path = "./data_train_test_small/test/labels"

logger.info(cell_label + "Dataset Partitions")
!tree {yolov5_data_folder} -L 2 | tee -a {log_filename}



list_images = [Path(img).stem for img in os.listdir(test_images_path) if os.path.isfile(os.path.join(test_images_path, img))]

logger.info(cell_label + "Test dataset size: {} images".format(len(list_images)))


logger.info(cell_label + "FINISH")
logger.info("[------------------------------------------------------------------------------------]\n")

In [None]:
# Define constants and functions
cell_label = '[## CONSTANTS and FUNCTIONS] - '

logger.info("[------------------------------------------------------------------------------------]")
logger.info(cell_label + "START")




# Labels for current dataset
SELF_DRIVING_CAR_CATEGORY_NAMES = ['biker', 'car', 'pedestrian', 'trafficLight', 'trafficLight-Green', 'trafficLight-GreenLeft', 'trafficLight-Red', 'trafficLight-RedLeft', 'trafficLight-Yellow', 'trafficLight-YellowLeft', 'truck']

#matplotlib.use("TkAgg")


# Class used to wrap YoloV5 model
class YoloV5(torch.nn.Module):
        def __init__(self, model, hyperparameters):
            super().__init__()
            self.model = model
            self.model.hyp = hyperparameters
            self.compute_loss = ComputeLoss(self.model.model.model)
            print(self.compute_loss)

        def forward(self, x, targets=None):
            if self.training:
                x = x.to(device)
                targets = targets.to(device)
                outputs = self.model.model.model(x)
                loss, loss_items = self.compute_loss(outputs, targets)
                loss_components_dict = {"loss_total": loss}
                return loss_components_dict
            else:
                return self.model(x)
            



def get_image_with_predictions(image, results, threshold, with_scores):
    
    text_size = 0.8
    text_th = 2
    rect_th = 1


    for i in range(len(results.pandas().xyxy[0])):
    
        if(results.pandas().xyxy[0].confidence[i] >= threshold):
            cv2.rectangle(
                image,
                (int(results.pandas().xyxy[0].xmin[i]), int(results.pandas().xyxy[0].ymin[i])),
                (int(results.pandas().xyxy[0].xmax[i]), int(results.pandas().xyxy[0].ymax[i])),
                color=(10,255,255),
                thickness=rect_th,
            )

    
            if(with_scores):
                box_header = results.pandas().xyxy[0].name[i] + " " + str("%.2f" % round(results.pandas().xyxy[0].confidence[i], 2))
            else:
                box_header = results.pandas().xyxy[0].name[i]
            
            cv2.putText(
                image,
                box_header,
                (int(results.pandas().xyxy[0].xmin[i]), int(results.pandas().xyxy[0].ymin[i])),
                cv2.FONT_HERSHEY_SIMPLEX,
                text_size,
                (10,255,255),
                thickness=text_th,
            )
    
    return image





logger.info(cell_label + "FINISH")
logger.info("[------------------------------------------------------------------------------------]\n")

In [None]:
# Model Retrieval and Setting
cell_label = '[## MODEL RETRIEVAL and SETTING] - '

logger.info("[------------------------------------------------------------------------------------]")
logger.info(cell_label + "START")

# Put here your model folder
model_name = ""
logger.info(cell_label + "Load Model: {}".format(model_name))
model_path = "./yolov5_train_detection/{}/weights/best.pt".format(model_name)

model_yolov5 = yolov5.load(model_path)
deviceModel = next(model_yolov5.parameters()).device  # get model device
logger.info(cell_label + "Device the model is set: {}".format(deviceModel))



    
# Put here your hyperparameters file folder
hyperparameters_name = ""
logger.info(cell_label + "Retrieve hyperparameters for the model at hand: {}".format(hyperparameters_name))
hyperparameters_path = "./yolov5_train_detection/{}/hyp_evolve.yaml".format(hyperparameters_name)

# Open yaml file and load content
with open(hyperparameters_path) as f:
    hyperparams_values = yaml.load(f, Loader=SafeLoader)
    logger.info(cell_label + "Retrieved Hyperparameters: {}".format(hyperparams_values))




modelV5 = YoloV5(model_yolov5, hyperparams_values)
device_type = "gpu"
input_shape = (3, 512, 512)
clip_values=(0, 255)
attack_losses=("loss_total",)

logger.info(cell_label + "Parameters for PyTorchYolo ART detector - device_type: {}, input_shape: {}, clip_values: {}, attack_losses: {}".format(device_type, input_shape, clip_values, attack_losses))

yolov5_art_detector = PyTorchYolo(model=modelV5, device_type=device_type, input_shape=input_shape, clip_values=clip_values, attack_losses=attack_losses)




logger.info(cell_label + "FINISH")
logger.info("[------------------------------------------------------------------------------------]\n")

In [None]:
# Get a random image
cell_label = '[## GET RANDOM TEST IMAGE] - '
%matplotlib widget

logger.info("[------------------------------------------------------------------------------------]")
logger.info(cell_label + "START")


list_images = [img for img in os.listdir(test_images_path) if os.path.isfile(os.path.join(test_images_path, img))]

image_name = random.sample(list_images, 1)

logger.info(cell_label + "Chosen image: {}".format(image_name))

original_image = imread(os.path.join(test_images_path, image_name[0])) 

_ = plt.axis("off")
_ = plt.title("Original Image")
_ = plt.imshow(original_image)
plt.show()

img_reshape = original_image.transpose((2, 0, 1))
image = np.stack([img_reshape], axis=0).astype(np.float32)
x = image.copy()

logger.info(cell_label + "Image shape: {}".format(x.shape))


logger.info(cell_label + "FINISH")
logger.info("[------------------------------------------------------------------------------------]\n")

In [None]:
# Projected Gradient Descent - Attack on a sample image
cell_label = '[## ATTACK on a SAMPLE IMAGE] - '
%matplotlib widget


logger.info("[------------------------------------------------------------------------------------]")
logger.info(cell_label + "START")



# PGD Evasion Attack Parameters
eps = 32
eps_step = 2
max_iter = 20

logger.info(cell_label + "PGD attack parameters - eps: {}, eps_step: {}, max_iter: {}".format(eps, eps_step, max_iter))


attack = ProjectedGradientDescent(estimator=yolov5_art_detector, eps=eps, eps_step=eps_step, max_iter=max_iter, verbose=False)
image_adv = attack.generate(x=x, y=None)

adversarial_image = image_adv[0].transpose(1, 2, 0).astype(np.uint8)


fig, axarr = plt.subplots(1,2, figsize=(9, 9));
fig.tight_layout(pad=1.0)

_ = axarr[0].imshow(original_image);
_ = axarr[0].set_title('Original Image', size=10);

_ = axarr[1].imshow(adversarial_image);
_ = axarr[1].set_title('Adversarial Image', size=10);
plt.show()



# Detection using torch hub
WEIGHTS_MODEL = model_path
yolov5_torch_hub = torch.hub.load('ultralytics/yolov5', 'custom', path=WEIGHTS_MODEL) 



results_orig = yolov5_torch_hub(original_image)
orig_image_preds = get_image_with_predictions(original_image.copy(), results_orig, 0.8, False)


results_adv = yolov5_torch_hub(adversarial_image)
adv_image_preds = get_image_with_predictions(adversarial_image.copy(), results_adv, 0.8, False)






fig2, axarr2 = plt.subplots(1,2, figsize=(9, 9));
fig2.tight_layout(pad=2.0)

_ = axarr2[0].imshow(orig_image_preds);
_ = axarr2[0].set_title('Detection on the Original Image', size=10);

_ = axarr2[1].imshow(adv_image_preds);
_ = axarr2[1].set_title('Detection on the Adversarial Image', size=10);
plt.show()
    

    


logger.info(cell_label + "FINISH")
logger.info("[------------------------------------------------------------------------------------]\n")

In [None]:
# Create folder for PGD Attack on test images
cell_label = '[## Create Folder for PGD Attack] - '

logger.info("[------------------------------------------------------------------------------------]")
logger.info(cell_label + "START")





test_mod_images_main_folder = "attack_test_data"
mod_images_folder = "grad_desc_mod_images"
test_folder_modified_imgs = os.path.join(".", test_mod_images_main_folder, mod_images_folder)
test_folder_modified_imgs_images = os.path.join(test_folder_modified_imgs, "images")
test_folder_modified_imgs_labels = os.path.join(test_folder_modified_imgs, "labels")


if(not os.path.isdir(os.path.join(".", test_mod_images_main_folder))):
    os.mkdir(os.path.join(".", test_mod_images_main_folder))
    

if(not os.path.isdir(test_folder_modified_imgs)):
    os.mkdir(test_folder_modified_imgs)
    
    
if(not os.path.isdir(test_folder_modified_imgs_images)):
    os.mkdir(test_folder_modified_imgs_images)
    
    
if(not os.path.isdir(test_folder_modified_imgs_labels)):
    os.mkdir(test_folder_modified_imgs_labels)
    
    

logger.info(cell_label + "FINISH")
logger.info("[------------------------------------------------------------------------------------]\n")

In [None]:
# Projected Gradient Descent Attack on all the test images
cell_label = '[## PGD Attack on Test Images] - '

logger.info("[------------------------------------------------------------------------------------]")
logger.info(cell_label + "START")



list_images = [img for img in os.listdir(test_images_path) if os.path.isfile(os.path.join(test_images_path, img))]

for image_name in tq.tqdm(list_images):
    
    original_image = imread(os.path.join(test_images_path, image_name)) 
    
    img_reshape = original_image.transpose((2, 0, 1))
    image = np.stack([img_reshape], axis=0).astype(np.float32)
    x = image.copy()
    
    attack = ProjectedGradientDescent(estimator=yolov5_art_detector, eps=eps, eps_step=eps_step, max_iter=max_iter, verbose=False)
    with io.capture_output() as captured:
        image_adv = attack.generate(x=x, y=None, verbose=False);
    
    
    _ = imsave(os.path.join(test_folder_modified_imgs_images, image_name), image_adv[0].transpose(1, 2, 0).astype(np.uint8))
    

    
# Copy all labels
_ = copy_tree(test_labels_path, test_folder_modified_imgs_labels)    




logger.info(cell_label + "FINISH")
logger.info("[------------------------------------------------------------------------------------]\n")

In [None]:
# Create data YAML file
cell_label = '[## CREATE YAML file for TEST DATA] - '

logger.info("[------------------------------------------------------------------------------------]")
logger.info(cell_label + "START")



test_folder_modified_imgs_full_path = os.path.abspath("./{}/grad_desc_mod_images/".format(test_mod_images_main_folder))

data_yaml_content_mod = {'path': '..',
                         'train': '',
                         'val': '',
                         'test': test_folder_modified_imgs_full_path,
                         'nc': 11,
                         'names': ['biker', 'car', 'pedestrian', 'trafficLight', 'trafficLight-Green', 'trafficLight-GreenLeft', 'trafficLight-Red', 'trafficLight-RedLeft', 'trafficLight-Yellow', 'trafficLight-YellowLeft', 'truck']}


data_yaml_file = test_folder_modified_imgs_full_path + "/data_mod_{}.yaml".format("PGD")

if os.path.isdir(test_folder_modified_imgs_full_path):
    with open(data_yaml_file, "w+") as file:
        yaml.dump(data_yaml_content_mod, file, default_flow_style=False)
        
        
        
logger.info(cell_label + "FINISH")
logger.info("[------------------------------------------------------------------------------------]\n")

In [None]:
# Validation on ADVERSARIAL TEST DATA
cell_label = '[## Validation on ADVERSARIAL TEST DATA] - '

logger.info("[------------------------------------------------------------------------------------]")
logger.info(cell_label + "START")



from yolov5 import val

# Put here your model folder name
model = ""
weights = "./yolov5_train_detection/{}/weights/best.pt".format(model)
data = data_yaml_file
batch_size = 64
imgsz = 512
task = 'test'
project = 'yolov5_test_security'
workers = 4
exist_ok = True
save_txt = True
augment = True
save_conf = True
verbose = True




date_time = datetime.now().strftime("%d-%m-%Y--%H-%M-%S")
RUN_NAME = f"{date_time}-test_PGD"
name = RUN_NAME

_ = val.run(data=data, weights=weights,
                     batch_size=batch_size, 
                     imgsz=imgsz, 
                     task=task, 
                     workers=workers, 
                     exist_ok=exist_ok, 
                     save_txt=save_txt, 
                     augment=augment, 
                     save_conf=save_conf,
                     verbose=verbose,
                     project=project,
                     name=name)



logger.info(cell_label + "FINISH")
logger.info("[------------------------------------------------------------------------------------]\n")