In [None]:
import numpy as np
import torch
import os
import random
from shutil import copyfile
from sklearn.model_selection import train_test_split
import torch.nn as nn
import torchvision
import torch.optim as optim
import torchvision.models as models
from tqdm import tqdm
from torchvision import transforms
from torchvision.datasets import ImageFolder
from torchvision import datasets
from torch.utils.data import random_split
from torch.utils.data import DataLoader
from sklearn.metrics import accuracy_score, confusion_matrix
import matplotlib.pyplot as plt
import torchvision.models as models
from PIL import Image, ImageDraw
import cv2
from torchvision.utils import save_image
from true_classify import *
from Utils import *
from anonymization_methods import *
from datasets import *
from torchvision.transforms.functional import to_pil_image
from collections import Counter
import matplotlib.image as mpimg
import time
import matplotlib.pyplot as plt
import xlrd
import openpyxl

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
# define training and testing data directories

source_path = 'Path of the Data to be anonimyzed'

class_names = [folder for folder in os.listdir(source_path) if os.path.isdir(os.path.join(source_path, folder))]
num_classes = len(class_names)

file_list = os.listdir(source_path)
model_dir = 'Path of the trained FR model'

output_path = 'Path to save the anonimyzed images'

save_roc_dir = 'Path to save the ROC'
excel_file_path = 'Path to save the excel file with the numberical results of accuracy and f1_score'

In [None]:
# Define a new transform with additional data augmentations
transform = transforms.Compose([
    transforms.Resize(224),
    transforms.ToTensor(),
])

In [None]:
# define the model and load the pretrained model

model = models.convnext_base(pretrained=True)
model.classifier[2]=nn.Linear(1024,num_classes)

model.load_state_dict(torch.load('Path of the trained FR model.pt'))
model.to(device)

In [None]:
# define the hyperparameter to be used by the EDI_Anon to calculate the loss in order to anonimyze the images

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.00001)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.001)

In [None]:
# define the q which in 100-the percentage of the affect onn the images
q=99

In [None]:
# The function to save the ROC

import matplotlib.pyplot as plt

def plot_roc_curve(fpr, tpr, roc_auc, dataset_name, save_dir):
    plt.figure()
    lw = 2

    # Plot macro-average ROC curve
    plt.plot(fpr['micro'], tpr['micro'], color='deeppink', linestyle=':', lw=lw,
             label='Macro-average ROC curve (area = {0:0.2f})'.format(roc_auc['micro']))

    plt.plot([0, 1], [0, 1], color='navy', lw=lw, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC for ' + dataset_name)
    plt.legend(loc="lower right")

    plt.savefig(os.path.join(save_dir, f'ROC_{dataset_name}.png'))
    plt.close()

In [None]:
# Function to draw the images the circle on the images
def draw_points_on_image(image, important_pixels_mask, point_size, point_color):
    image = image.to(device)
    important_pixels_mask = important_pixels_mask.to(device)

    # Apply the mask directly on the GPU tensors
    y_indices, x_indices = torch.nonzero(important_pixels_mask, as_tuple=True)
    
    image_pil = transforms.ToPILImage()(image.squeeze().cpu())
    draw = ImageDraw.Draw(image_pil)

    for y, x in zip(y_indices, x_indices):
        x, y = x.item(), y.item()
        x0, y0 = x - point_size, y - point_size
        x1, y1 = x + point_size, y + point_size
        bbox = [(x0, y0), (x1, y1)]


        draw.ellipse(bbox, fill = 'black', outline ='black')


    return transforms.ToTensor()(image_pil).unsqueeze(0)


In [None]:
# define the size of the circle to be drawen on the images
pixel_size = 20

In [None]:
# Compute the original accuracy and f1_score of the original data

start_time = time.time()
new_batch_size = 1
new_test_path = source_path 
our_test_loader = create_test_loader(new_test_path, new_batch_size)
final_acc, correct_examples, labels, logits = test_images_classification(model, device, our_test_loader, excel_file_path, save_roc_dir)
prev_acc = final_acc
print(prev_acc*100)
end_time = time.time()

acc_time = end_time - start_time

print(f"Acc time is", acc_time)

In [None]:
start_time = time.time()

# Setting high number of iteration to give the EDI_Anon the time it needs to effectivly anonimyze the dataset
for itera in range(0,2000000):
    # Iterate through all correct examples
    for i in tqdm(range(len(correct_examples))):
        x, correct_label, prediction = correct_examples[i], labels[i], logits[i]
        y = get_second_largest(logits[i])
        y = torch.tensor([y]).to(device)

        # Create the output directory for this iteration
        iteration_out_path = f"{output_path}/Iteration_{itera}/"

        class_label = labels[i]
        class_subfolder = class_names[class_label]            

        class_output_path = os.path.join(iteration_out_path, class_subfolder)
        os.makedirs(class_output_path, exist_ok=True)

        # Clone the original image and enable gradient computation
        annonymized_image = x.clone()
        annonymized_image.requires_grad = True

        # Calculate the loss based on the model, image, and criterion
        output, loss = calculate_loss(model, annonymized_image, y, criterion)

        # test if the image is correctly classified then we need to anonimyze it
        if(output.item() == labels[i].item()):
            model.zero_grad()
            loss.backward()
            img_grad = annonymized_image.grad.data
            
            # Optimize the gradients using the quantile value (q)
            optimized_gradients = optimize_gradients(img_grad, q)
            optimized_gradients = optimized_gradients.to(device)
            # Call the draw_dots_on_image function
            annonymized_image = draw_points_on_image(annonymized_image, optimized_gradients[0, 0, :, :] >= 1, point_size=0.05,
                                                     point_color=(0, 0, 0))

            # Save the anonymized image to the output directory
            numpy_image = annonymized_image.cpu().detach().numpy().squeeze()
            cv2_image = np.transpose(numpy_image, (1, 2, 0))
            cv2_image = cv2.cvtColor(cv2_image, cv2.COLOR_BGR2RGB)
            cv2_image = cv2_image*255
            save_image(cv2_image, i, correct_label, class_output_path, pixel_size, 100-q)   

        # If the model didn't correctly calssify the image then save it as it is
        else:
            numpy_image = annonymized_image.cpu().detach().numpy().squeeze()
            cv2_image = np.transpose(numpy_image, (1, 2, 0))
            cv2_image = cv2.cvtColor(cv2_image, cv2.COLOR_BGR2RGB)
            cv2_image = cv2_image*255
            save_image(cv2_image, i, correct_label, class_output_path, pixel_size, 100-q) 

    
    # test the model performance on the anonimyzed data at the end of each iteration
    our_test_loader = create_test_loader(iteration_out_path, batch_size=1)
    accuracy, correct_examples, labels, logits = test_images_classification(model, device, our_test_loader, excel_file_path, save_roc_dir)
    print(f"\nAccuracy: {accuracy*100} %")

    # If we have achieved the requiered RIR, then end the EDI_Anon processes
    if(accuracy*100 < 1):
        break
end_time = time.time()
Anon_execution_time = end_time - start_time
print(f"\nAnon Time: {Anon_execution_time} seconds")
print(f"All images processed for iteration number {itera}.")