In [None]:
import os
import io

import requests
import numpy as np
import matplotlib.pyplot as plt
import cv2
import torch
import torchvision
from PIL import Image
from art.attacks.poisoning import PoisoningAttackBackdoor
from art.attacks.poisoning import (
    BadDetRegionalMisclassificationAttack,
    BadDetGlobalMisclassificationAttack,
    BadDetObjectGenerationAttack,
    BadDetObjectDisappearanceAttack,
)

from torchvision.transforms import transforms
from art.attacks.poisoning.perturbations import insert_image
from art.estimators.object_detection.pytorch_detection_transformer import PyTorchDetectionTransformer
import fiftyone as fo
import fiftyone.zoo as foz

In [None]:
import os
directory_name = f"run_images_detrpoisonimages"

try:
   os.makedirs(directory_name)
except OSError as e:
   if e:
       pass


## Constants and Utility Functions

Some constants and utility functions that will be used in this demo.

In [None]:
from pycocotools.coco import COCO
import json

# Path to the COCO annotations file
coco_annotation_file = './coco_resized/validation/labels.json'

# Load COCO annotations
coco = COCO(coco_annotation_file)

# Get all categories
categories = coco.loadCats(coco.getCatIds())

# Extract the class names
class_names = [category['name'] for category in categories]

# Print the class names
print(class_names)

In [None]:
COCO_INSTANCE_CATEGORY_NAMES_OG = [
    'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', 'boat', 'traffic light',
    'fire hydrant', 'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow',
    'elephant', 'bear', 'zebra', 'giraffe', 'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee',
    'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard',
    'tennis racket', 'bottle', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple',
    'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch',
    'potted plant', 'bed', 'dining table', 'toilet', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 
    'cell phone', 'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase', 'scissors', 
    'teddy bear', 'hair drier', 'toothbrush'
]

COCO_INSTANCE_CATEGORY_NAMES = [
    'N/A', 'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus',
    'train', 'truck', 'boat', 'traffic light', 'fire hydrant', 'N/A',
    'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse',
    'sheep', 'cow', 'elephant', 'bear', 'zebra', 'giraffe', 'N/A', 'backpack',
    'umbrella', 'N/A', 'N/A', 'handbag', 'tie', 'suitcase', 'frisbee', 'skis',
    'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove',
    'skateboard', 'surfboard', 'tennis racket', 'bottle', 'N/A', 'wine glass',
    'cup', 'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple', 'sandwich',
    'orange', 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake',
    'chair', 'couch', 'potted plant', 'bed', 'N/A', 'dining table', 'N/A',
    'N/A', 'toilet', 'N/A', 'tv', 'laptop', 'mouse', 'remote', 'keyboard',
    'cell phone', 'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'N/A',
    'book', 'clock', 'vase', 'scissors', 'teddy bear', 'hair drier',
    'toothbrush'
] 




import cv2
import matplotlib.pyplot as plt
from pycocotools.coco import COCO

import cv2
import matplotlib.pyplot as plt

def test_plot(img, predictions=None, filename=None, coco_instance_category_names=None):
    text_size =  2
    text_th =  3
    rect_th =  2

    img = img.copy()

    if predictions is not None:
        # Transforming boxes from (x, y, w, h) to (x0, y0, x1, y1)
        boxes = predictions['boxes'].astype(int)
        transformed_boxes = []
        for box in boxes:
            x0, y0 = box[0], box[1]
            x1 = x0 + box[2]
            y1 = y0 + box[3]
            transformed_boxes.append([x0, y0, x1, y1])

        labels = predictions['labels']

        # Draw rectangles with the transformed coordinates
        for box, label in zip(transformed_boxes, labels):
            cv2.rectangle(img, (box[0], box[1]), (box[2], box[3]), color=(0,  255,  0), thickness=rect_th)

            # Use the predefined list to get the category name
            if COCO_INSTANCE_CATEGORY_NAMES is not None:
                print(f"number: {label}")
                text = COCO_INSTANCE_CATEGORY_NAMES[label]
                print(f" cat name {text}")
            else:
                text = f"Label {label}"  # Fallback if category names are not provided
                print(f"fall back:{text} ")

            # Draw the text
            cv2.putText(img, text, (box[0], box[1]), cv2.FONT_HERSHEY_SIMPLEX, text_size, (255,  0,  0), thickness=text_th)

    plt.axis("off")
    plt.imshow(img, interpolation="nearest")
    plt.show()
    if filename is not None:
        plt.savefig(filename)




def plot_image_with_boxes(img, boxes, pred_cls, title, save, filename):
    text_size = 2
    text_th = 2
    rect_th = 2

    for i in range(len(boxes)):
        cv2.rectangle(img, (int(boxes[i][0][0]), int(boxes[i][0][1])), (int(boxes[i][1][0]), int(boxes[i][1][1])),
                      color=(0, 255, 0), thickness=rect_th)
        cv2.putText(img, pred_cls[i], (int(boxes[i][0][0]), int(boxes[i][0][1])), cv2.FONT_HERSHEY_SIMPLEX, text_size,
                    (0, 255, 0), thickness=text_th)
    plt.figure()
    plt.axis("off")
    plt.title(title)
    
    plt.imshow(img)
    if save == True and filename != None:
       print(filename)
       print("image saved")
       plt.savefig(filename)

In [None]:
MEAN = [0.485, 0.456, 0.406]
STD = [0.229, 0.224, 0.225]
NUMBER_CHANNELS = 3
INPUT_SHAPE = (NUMBER_CHANNELS, 800, 800)
INPUT_SHAPE = (800, 800, 3)

transform = transforms.Compose([
        transforms.Resize([INPUT_SHAPE[1], INPUT_SHAPE[2]], interpolation=transforms.InterpolationMode.BICUBIC),
        transforms.ToTensor()
    ])

detector = PyTorchDetectionTransformer(channels_first=False, preprocessing=(MEAN, STD), input_shape=INPUT_SHAPE, clip_values=(0,1))






In [None]:
def extract_predictions(predictions_, conf_thresh):
    # Get the predicted class
    predictions_class = [COCO_INSTANCE_CATEGORY_NAMES[i] for i in list(predictions_["labels"])]
    #  print("\npredicted classes:", predictions_class)
    if len(predictions_class) < 1:
        return [], [], []
    # Get the predicted bounding boxes
    predictions_boxes = [[(i[0], i[1]), (i[2], i[3])] for i in list(predictions_["boxes"])]

    # Get the predicted prediction score
    predictions_score = list(predictions_["scores"])
    # print("predicted score:", predictions_score)

    # Get a list of index with score greater than threshold
    threshold = conf_thresh
    predictions_t = [predictions_score.index(x) for x in predictions_score if x > threshold]
    if len(predictions_t) > 0:
        predictions_t = predictions_t  # [-1] #indices where score over threshold
    else:
        # no predictions esxceeding threshold
        return [], [], []
    # predictions in score order
    predictions_boxes = [predictions_boxes[i] for i in predictions_t]
    predictions_class = [predictions_class[i] for i in predictions_t]
    predictions_scores = [predictions_score[i] for i in predictions_t]
    return predictions_class, predictions_boxes, predictions_scores

## Load Data

We will be using two sample images from the dataset used by Ultralytics to train YOLO. 

We can use either a 4-D numpy array of size `N x H x W x C` or a list of numpy arrays of different sizes. For this demo, we use the latter to avoid needing to resize the images.

In [None]:
name="poison_131"
name2="detect_103"
dataset_dir="./coco_resized/validation"
dataset_type = fo.types.COCODetectionDataset
dataset = fo.Dataset.from_dir(dataset_dir, dataset_type, name=name)
dataset_2 = fo.Dataset.from_dir(dataset_dir, dataset_type, name=name2)

dataset.persistent = False
dataset_2.persistent = True
to_poison = dataset.take(10, seed=51)
predict_images = dataset_2.take(10, seed=3)

In [None]:
import numpy as np
import requests
from PIL import Image
import io

image_sizes = []
# Now, all images are of the same size and can be stacked into a NumPy array
x = []
poison_image_ids = []
for sample in to_poison:
    image_id = os.path.basename(sample.filepath)
    #image_id = image_id.lstrip('0')
    image_id = image_id.replace('.jpg', '')

    im = Image.open(sample.filepath)
    im = im.convert('RGB')
    im = np.asarray(im).copy()
    im = (im / 255).astype(np.float32)
    #display(im)

    image_sizes.append(im.size)
    poison_image_ids.append(image_id)

    if im.shape != (3, 800, 800):
        pass
        #print("size doesn't match")
        #print(im.shape)
    x.append(im)
x = np.array(x)
print(poison_image_ids)
image_ids = []

predict = []
for sample in predict_images:
    image_id = os.path.basename(sample.filepath)
    #image_id = image_id.lstrip('0')
    image_id = image_id.replace('.jpg', '')
    image_ids.append(image_id)
    im = Image.open(sample.filepath)
    im = im.convert('RGB')
    #im = np.asarray(im).copy()
    #im = (im / 255).astype(np.float32)
    im = transform(im).numpy()
    #display(im)

    image_sizes.append(im.size)

    if im.shape != (3, 800, 800):
        pass
        #print("size doesn't match")
        #print(im.shape)
    predict.append(im)
predict = np.array(predict)

We will be using pre-defined bounding boxes and labels.

In [None]:
from pycocotools.coco import COCO

# Initialize COCO API for the annotations
coco = COCO('./coco_resized/validation/labels.json')

# Dictionary to store bounding boxes for each image ID
bboxes_dict = {}
bboxes_catid_dict = {}

# Load the categories to map category IDs to names
cats = coco.loadCats(coco.getCatIds())
cat_name_id_map = {cat['id']: cat['name'] for cat in cats}
cat_name_id_map[69] = "microwave"
cat_name_id_map[68] = "cell phone"
cat_name_id_map[12] = "stop sign"
cat_name_id_map[26] = "umbrella"
cat_name_id_map[29] = "suitcase"
cat_name_id_map[45] = "spoon"
cat_name_id_map[30] = "frisbee"
cat_name_id_map[0] = "person"
mapped_bboxes =  []
# Assuming poison_image_ids is defined elsewhere in your code
for image_id in poison_image_ids:
    # Get annotation IDs for the image
    image_id = image_id.lstrip("0")   
    annotation_ids = coco.getAnnIds(imgIds=int(image_id))
    # Load annotations
    annotations = coco.loadAnns(annotation_ids)
    # Extract bounding boxes and store them in the dictionary
    bboxes_dict[image_id] = [ann['bbox'] for ann in annotations]
    # Store original category IDs without mapping
    bboxes_catid_dict[image_id] = [ann['category_id'] for ann in annotations]
    # Iterate over the category IDs for the current image and get their names
    category_names = []
    print(f"label list should be: {len(bboxes_dict[image_id])}")
    store_list = []
    for category_id in bboxes_catid_dict[image_id]:
        category = coco.loadCats(category_id)[0]
        category_names.append(category['name'])
        if category["name"] in COCO_INSTANCE_CATEGORY_NAMES:
            # Store the changed label id to the list...
            if COCO_INSTANCE_CATEGORY_NAMES.index(category["name"]) == 0:
                store_list.append(1)
            else:
                store_list.append(COCO_INSTANCE_CATEGORY_NAMES.index(category["name"]))
            bboxes_catid_dict[image_id] = store_list

In [None]:

y = []



for image_id, bboxes in bboxes_dict.items():
    test_entry = {
        'boxes': np.empty((0,  4), dtype=np.float32),
        'labels': np.array(bboxes_catid_dict[image_id])
    }
    #print(image_id)

    new_bboxes_np = np.asarray(bboxes, dtype=np.float32)
    try:
        test_entry['boxes'] = np.vstack((test_entry['boxes'], new_bboxes_np))
        print(test_entry["labels"])
        #print(test_entry["boxes"], len(test_entry["boxes"]))
        #print(test_entry["labels"],len(test_entry["labels"]))
    except:
        print(image_id)
        #print(test_entry["boxes"])
        #print(test_entry["labels"])
        #print(image_id)
        pass

    y.append(test_entry)

## Insert Backdoor

### Backdoor Trigger

We will be using the HTBD backdoor trigger.

In [None]:
trigger_path = '/scratch/project_2008539/htbd.png'
trigger = Image.open(trigger_path)
trigger = np.asarray(trigger, dtype=np.float32)

#test_plot(trigger)

In [None]:
def poison_func(x):
    return insert_image(x, backdoor_path=trigger_path, size=(29, 29), mode='RGB', blend=0.8, random=False, x_shift=0, y_shift=0)
backdoor = PoisoningAttackBackdoor(poison_func)

In [None]:
for i in range(10):
    test_plot(x[i], y[i], filename=f"{directory_name}/clean{i+10}")

In [None]:
for x_i in x:
    x_poisoned, _ = backdoor.poison(x_i[np.newaxis], [])    

Using this backdoor object, we can perform the four BadDet poisoning attacks.

### BadDet Regional Misclassification Attack

The BadNet Regional Misclassification Attack (RMA) will insert the trigger into the bounding box of the source class and change the classification label to the target class. We will use class 0 (person) as the source and class 1 (bicycle) as the target.

In [None]:
attack = BadDetRegionalMisclassificationAttack(backdoor, class_source=0, class_target=1, percent_poison=1.0)


x_poisoned, y_poisoned = attack.poison(x, y)
for i in range(10):
    test_plot(x_poisoned[i], y_poisoned[i], filename=f"{directory_name}/testplot_rma{i}")
    
for i in range(10):
    test_plot(x[i], y[i], filename=f"{directory_name}/clean{i}")


### BadDet Global Misclassification Attack

The BadNet Global Misclassification Attack (GMA) will insert the trigger onto the image and change all classification labels to the target class, regardless of the source class. We will use class 1 (bicycle) as the target class.

In [None]:
#attack = BadDetGlobalMisclassificationAttack(backdoor, class_target=1, percent_poison=1.0)
#x_poisoned, y_poisoned = attack.poison(x, y)

#for i in range(50):
    #test_plot(x_poisoned[i], y_poisoned[i], filename=f"{directory_name}/testplot_gma{i}")

### BadDet Object Generation Attack

The BadDet Object Generation Attack (OGA) will insert the trigger onto the image at a random location and create a fake bounding box and classification for that label. We will class 1 (bicycle) as the target class and use a bounding box of size 50 x 70.

In [None]:
#attack = BadDetObjectGenerationAttack(backdoor, bbox_height=200, bbox_width=150, class_target=1, percent_poison=1.0)
#x_poisoned, y_poisoned = attack.poison(x, y)
#for i in range(50):
#    test_plot(x_poisoned[i], y_poisoned[i], filename=f"{directory_name}/testplot_oga{i}")

### BadDet Object Disappearance Attack

The BadDet Object Disappearance Attack (ODA) will insert the trigger into the bounding box of the source class and delete that bounding box and corresponding classification. We will poison class 0 (person) as the source class.

In [None]:
#attack = BadDetObjectDisappearanceAttack(backdoor, class_source=0, percent_poison=1.0)
#x_poisoned, y_poisoned = attack.poison(x, y)

#for i in range(50):
 #   test_plot(x_poisoned[i], y_poisoned[i], filename=f"{directory_name}/testplot_oda{i}")

### Load ART Model

For this demo, we will be using the Faster R-CNN classifier as this model accepts images of any dimension. To use the YOLO v3 or YOLO v5 estimators, you must resize all the images to `416 x 416`.

## Calculate map before poisoning

In [None]:
from fiftyone import ViewField as F


In [None]:
_y = detector.predict(predict)

for i in range(len(_y)):
    preds = extract_predictions(_y[i], 0.5)
    # Otetaan bbox talteen:
    boxes = preds[1]
    labels = preds[0]
    scores = preds[2]
    detections = []
    for j, count in enumerate(range(len(boxes))):      
       # And the image dimensions are known
       image_width = 800
       image_height = 800
       normalized_bbox = [
       boxes[j][0][0] / image_width, # xmin
       boxes[j][0][1] / image_height, # ymin
       boxes[j][1][0] / image_width, # xmax
       boxes[j][1][1] / image_height  # ymax
       ]


        #Modified code to convert to (x0, y0, w, h) format
       x0 = normalized_bbox[0]
       y0 = normalized_bbox[1]
       x1 = normalized_bbox[2]
       y1 = normalized_bbox[3]
        # boksit normalisoitu 01 pitäs olla 0255
       w = x1 - x0
       h = y1 - y0
       new_bbox = [x0, y0, w, h]
       tensors = [torch.tensor(value) for value in new_bbox]
       tensors = [tensor.float() for tensor in tensors]
       scalar_value = scores[j]

        # Convert to tensor
       tensor_value = torch.tensor(scalar_value)
       score=tensor_value
       detections.append(
           fo.Detection(
               label=labels[j],
               bounding_box=tensors,
               confidence=score
           )
       )
       print(detections)
    #plot_image_with_boxes(img=predict[i].copy(), boxes=preds[1], pred_cls=preds[0], title="Predictions attacked images")
    #plot_image_with_boxes(img=predict[i].copy(), boxes=preds[1], pred_cls=preds[0], title="", save=True, filename=f"{directory_name}/benign_model{i}")

    sample = dataset_2[f"/scratch/project_2008539/coco_resized/validation/data/{image_ids[i]}.jpg"]
    sample["nopoison"] = fo.Detections(detections=detections)
    sample.save()

In [None]:
high_conf_view = predict_images.filter_labels("nopoison", F("confidence") > 0.75, only_matches=False)

In [None]:
print(high_conf_view)

In [None]:
# Print a prediction from the view to verify that its confidence is > 0.75
sample = high_conf_view.first()
print(sample.nopoison)

In [None]:
# Evaluate the predictions in the `yolo` field of our `high_conf_view`
# with respect to the objects in the `ground_truth` field
results = predict_images.evaluate_detections(
    "nopoison",
    gt_field="detections",
    eval_key="nopoison_eval",
    compute_mAP=True,
)


In [None]:
# Get the 10 most common classes in the dataset
counts = dataset.count_values("detections.detections.label")
classes_top10 = sorted(counts, key=counts.get, reverse=True)[:10]

# Print a classification report for the top-10 classes
results.print_report(classes=classes_top10)

In [None]:
print(results.mAP())

In [None]:
result_before = results.mAP()

In [None]:
detector.fit(x_poisoned, y_poisoned, nb_epochs=1)
# älä viitti :D
print("Model retrained")

## Lastly calculate map after poisoning.

In [None]:
_y = detector.predict(predict)

for i in range(len(_y)):
    preds = extract_predictions(_y[i], 0.5)
    # Otetaan bbox talteen:
    boxes = preds[1]
    labels = preds[0]
    scores = preds[2]
    detections = []
    for j, count in enumerate(range(len(boxes))):      
       # And the image dimensions are known
       image_width = 800
       image_height = 800
       normalized_bbox = [
       boxes[j][0][0] / image_width, # xmin
       boxes[j][0][1] / image_height, # ymin
       boxes[j][1][0] / image_width, # xmax
       boxes[j][1][1] / image_height  # ymax
       ]
       # Modified code to convert to (x0, y0, w, h) format
       x0 = normalized_bbox[0]
       y0 = normalized_bbox[1]
       x1 = normalized_bbox[2]
       y1 = normalized_bbox[3]
    
       w = x1 - x0
       h = y1 - y0
       new_bbox = [x0, y0, w, h]

       tensors = [torch.tensor(value) for value in new_bbox]
       tensors = [tensor.float() for tensor in tensors]
       
       scalar_value = scores[j]

        # Convert to tensor
       tensor_value = torch.tensor(scalar_value)
       score=tensor_value
       detections.append(
           fo.Detection(
               label=labels[j],
               bounding_box=tensors,
               confidence=score
           )
       )
    #plot_image_with_boxes(img=predict[i].copy(), boxes=preds[1], pred_cls=preds[0], title="Predictions attacked images")
    
    plot_image_with_boxes(img=predict[i].copy(), boxes=preds[1], pred_cls=preds[0], title="", save=True, filename=f"{directory_name}/after_poison{i}")
    sample = dataset_2[f"/scratch/project_2008539/coco_resized/validation/data/{image_ids[i]}.jpg"]
    sample["afterpoison"] = fo.Detections(detections=detections)
    sample.save()

In [None]:
high_conf_view = predict_images.filter_labels("afterpoison", F("confidence") > 0.75, only_matches=False)

In [None]:
print(high_conf_view)

In [None]:
# Print a prediction from the view to verify that its confidence is > 0.75
sample = high_conf_view.first()
print(sample.afterpoison)

In [None]:
# Evaluate the predictions in the `yolo` field of our `high_conf_view`
# with respect to the objects in the `ground_truth` field
results = predict_images.evaluate_detections(
    "afterpoison",
    gt_field="detections",
    eval_key="afterpoison_eval",
    compute_mAP=True,
)


In [None]:
# Get the 10 most common classes in the dataset
counts = dataset.count_values("detections.detections.label")
classes_top10 = sorted(counts, key=counts.get, reverse=True)[:10]

# Print a classification report for the top-10 classes
results.print_report(classes=classes_top10)

In [None]:
result_after = results.mAP()
print(result_after)

In [None]:
file = open(f"{name}.txt", "w")
print(f"saved as: {file}")
#convert variable to string
result_before = str(result_before)
result_after = str(result_after)

file.write("before poisoning: = " + result_before + "\n" + "after poisoning" + result_after + "\n")

#close file
file.close()

print("Success!")