In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install -r requirements.txt --use-deprecated=legacy-resolver

### Zip MyDrive

In [None]:
!zip -r /content/my_drive.zip /content/drive/MyDrive -x "/content/drive/MyDrive/root_cifar10/*" "/content/drive/MyDrive/root_cifar10_test/*" "/content/drive/MyDrive/root_mnist/*" "/content/drive/MyDrive/root_mnist_test/*" "/content/drive/MyDrive/test_results/*" "/content/drive/MyDrive/train_results/*"

### necessary packages

In [None]:
# necessary packages

!pip install einops
!pip install torchgeometry
!pip install kornia
!pip install pytorch_msssim

import os
import re
import sys
import tensorflow as tf
import numpy as np
from PIL import Image

sys.path.append("/content/drive/MyDrive/python_files")

%load_ext autoreload
%autoreload 2

### Load dataset (run if data not available)

In [None]:
# Deactivate comment columns for loading another dataset
# Also control root folder to be in drive path
!python3 /content/drive/MyDrive/Cold-Diffusion-Models/create_data.py

#### Check dataset size

In [None]:
# Check dataset size
import os

base_directory = "/content/drive/MyDrive/root_cifar10_adv"

# Check if the base directory exists
if not os.path.exists(base_directory):
    total_png_files = "Base directory does not exist."
else:
    total_png_files = 0
    for root, dirs, files in os.walk(base_directory):
        total_png_files += len(files)

total_png_files

## Cold Diffusion

### Snowification


Train


In [None]:
!python3 '/content/drive/MyDrive/Cold-Diffusion-Models/snowification/train.py' --resume_training True --model 'UnetResNet' --dataset 'cifar10' --save_and_sample_every 99999 --time_steps 50  --train_steps 200 --sampling_routine x0_step_down --snow_level 1 --random_snow --save_folder '/content/drive/MyDrive/train_results/snow_cifar10_50ts_09_01' --dataset_folder '/content/drive/MyDrive/root_cifar10_adv' --exp_name 'trial' --forward_process_type 'Snow'

Test

In [None]:
!python3 '/content/drive/MyDrive/Cold-Diffusion-Models/snowification/test.py' --model 'UnetResNet' --dataset 'cifar10' --time_steps 200 --test_type 'test_data' --extra_path 'test_adv' --save_folder_train '/content/drive/MyDrive/train_results/snow_cifar10_3_255_epsilon_200_time_step_18_12_clamp' --save_folder_test '/content/drive/MyDrive/test_results/snow_cifar10_3_255_epsilon_200_time_step_18_12_clamp' --dataset_folder '/content/drive/MyDrive/fgsm_root_cifar10_adv_test' --exp_name 'trial' --forward_process_type 'Snow'

### Decolorization

Train

In [None]:
!python3 '/content/drive/MyDrive/Cold-Diffusion-Models/decolor-diffusion/train.py' --train_steps 100  --forward_process_type 'Decolorization' --decolor_total_remove --decolor_routine 'Linear' --save_folder '/content/drive/MyDrive/train_results/decolor_cifar10_train_results' --dataset_folder '/content/drive/MyDrive/root_cifar10' --exp_name 'trial'

Test

In [None]:
!python3 '/content/drive/MyDrive/Cold-Diffusion-Models/decolor-diffusion/test.py' --train_steps 1000  --forward_process_type 'Decolorization' --decolor_total_remove --decolor_routine 'Linear' --test_type 'test_data' --save_folder_train '/content/drive/MyDrive/train_results/decolor_train_results' --save_folder_test '/content/drive/MyDrive/test_results/decolor_test_results' --dataset_folder '/content/drive/MyDrive/root_cifar10' --exp_name 'trial' --order_seed 1

### Resolution

Train

In [None]:
# mnist train with resolution
# time steps should be less than image size
!python3 '/content/drive/MyDrive/Cold-Diffusion-Models/resolution-diffusion-pytorch/mnist_train.py' --time_steps 27 --train_steps 41 --resolution_routine 'Incremental' --save_folder '/content/drive/MyDrive/train_results/resolution_mnist_train_results' --data_path '/content/drive/MyDrive/root_mnist'

Test

In [None]:
# mnist test with resolution
!python3 '/content/drive/MyDrive/Cold-Diffusion-Models/resolution-diffusion-pytorch/mnist_test.py' --data_path '/content/drive/MyDrive/root_mnist_test' --time_steps 27 --train_routine 'Final' --sampling_routine 'x0_step_down' --resolution_routine 'Incremental' --load_path '/content/drive/MyDrive/train_results/resolution_mnist_train_results/model.pt' --save_folder  '/content/drive/MyDrive/test_results/resolution_mnist_test_results' --test_type 'test_data'

## Adversarial Attack

In [None]:
from adversarial_attack import AdversarialAttack

adversarial_attack = AdversarialAttack()
adversarial_attack.load_model()
epsilon = 0.075

label_names = ['airplane', 'automobile', 'bird', 'cat', 'deer',
               'dog', 'frog', 'horse', 'ship', 'truck']

image_directory = "/content/drive/MyDrive/custom_cifar10_test"

def get_label_index(filename):
    for label in label_names:
        if re.match(f"{label}[^a-zA-Z]*", filename):
            return label_names.index(label)
    return None

for filename in os.listdir(image_directory):
    if filename.endswith(".png"):
        image_path = os.path.join(image_directory, filename)
        img = Image.open(image_path).convert('RGB')

        # Resize the image using LANCZOS
        img = img.resize((32, 32), Image.LANCZOS)

        label = get_label_index(filename)
        if not label:
          print("no label")
          continue

        one_hot_label = tf.keras.utils.to_categorical(label, 10)

        # Convert the PIL Image to a NumPy array, normalize it and ensure correct data type
        img_np = np.array(img) / 255.0
        img_np = img_np.astype(np.float32)

        try:
            perturbations = adversarial_attack.generate_adversary(img_np, one_hot_label).numpy()

            perturbations = np.squeeze(perturbations, axis=0).transpose(1, 2, 0)

            adversarial = img_np + (perturbations * epsilon)
            adversarial = np.clip(adversarial, 0, 1)
            adversarial_image = Image.fromarray((adversarial * 255).astype(np.uint8))
            adversarial_image.save(os.path.join(image_directory, 'adv_' + filename))
        except Exception as e:
            print(f"Error processing {filename}: {e}")



## ResNet Classification for Cifar-10

In [None]:
import torch
import torchvision
import torchvision.transforms as transforms
import torchvision.models as models
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder
from torch import nn, optim

In [None]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize images to the size that ResNet expects
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize with ImageNet mean and std
])


In [None]:
# skip if working with custom dataset
trainset = ImageFolder(root='/content/drive/MyDrive/root_cifar10/', transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=64, shuffle=True, num_workers=2)

testset = ImageFolder(root='/content/drive/MyDrive/root_cifar10_test/', transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=64, shuffle=False, num_workers=2)

In [None]:
resnet = models.resnet18(pretrained=False)
num_ftrs = resnet.fc.in_features
resnet.fc = nn.Linear(num_ftrs, 10)  # Adjusting the final layer for CIFAR-10

# Load the state dict saved earlier
state_dict = torch.load('/content/drive/MyDrive/cifar10_resnet18.pth',map_location=torch.device('cpu'))
resnet.load_state_dict(state_dict)

# Freeze all the layers in the network except final
for param in resnet.fc.parameters():
  param.requires_grad = True

# Move the model to the GPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
resnet = resnet.to(device)


In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(resnet.parameters(), lr=0.001, momentum=0.9)

don't train/save if a model is loaded

In [None]:
# don't train/save if a model is loaded
num_epochs = 3

# Train the model
for epoch in range(num_epochs):
    resnet.train()  # Set model to training mode
    running_loss = 0.0
    for inputs, labels in trainloader:
        inputs, labels = inputs.to(device), labels.to(device)

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = resnet(inputs)
        loss = criterion(outputs, labels)

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
    print(f'Epoch {epoch + 1}, Loss: {running_loss/len(trainloader)}')

torch.save(resnet.state_dict(), '/content/drive/MyDrive/cifar10_resnet18.pth')
print('Finished Training')


In [None]:
# Overall Model Accuracy Evaluation
# test
resnet.eval()
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in testloader:
        print(inputs.shape)
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = resnet(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f'Accuracy of the network on the test images: {100 * correct // total}%')


In [None]:
from PIL import Image
import torch.nn.functional as F

cifar10_classes = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']

def predict(image_path, model):
    # Load and transform the image
    image = Image.open(image_path)
    image = transform(image).unsqueeze(0).to("cuda:0" if torch.cuda.is_available() else "cpu")  # Apply the same transformations and add batch dimension

    # Put the model in evaluation mode and make a prediction
    model.eval()
    with torch.no_grad():
        outputs = model(image)
        probabilities = F.softmax(outputs, dim=1)
        class_probabilities, predicted = torch.max(probabilities, 1)

    return predicted.item(), class_probabilities.item()

def get_probabilty_of_class(index, image_path, model):
    # Load and transform the image
    image = Image.open(image_path)
    image = transform(image).unsqueeze(0).to("cuda:0" if torch.cuda.is_available() else "cpu")  # Apply the same transformations and add batch dimension

    # Put the model in evaluation mode and make a prediction
    model.eval()
    with torch.no_grad():
        outputs = model(image)
        probabilities = F.softmax(outputs, dim=1)
        predicted_probability = probabilities[0,index]

    return predicted_probability.item()


### confidence change calculations

In [None]:
import re
import os

directory_path_50 = '/content/drive/MyDrive/test_results/snow_cifar10_50ts_09_01/trial/test_not_adv'
directory_adversial_path_50 = '/content/drive/MyDrive/test_results/snow_cifar10_3_255_epsilon_50_time_step_18_12_clamp/trial/test_adv'
directory_path_100 = '/content/drive/MyDrive/test_results/snow_cifar10_100ts_09_01/trial/test_not_adv'
directory_adversial_path_100 = '/content/drive/MyDrive/test_results/snow_cifar10_100ts_09_01/trial/test_adv'
results_50 = []
results_100 = []

org_50_counts = {i:0 for i in range(10)}
adv_50_counts = {i:0 for i in range(10)}
wrong_count = 0
# 50 timestep
for file_name in os.listdir(directory_path_50):
    match = re.match(r"sample-original-(\d+)-xt\.png", file_name)
    if match:
        image_number = match.group(1)

        original_path = os.path.join(directory_path_50, f"sample-original-{image_number}-xt.png")
        adversial_path = os.path.join(directory_adversial_path_50, f"sample-original-{image_number}-xt.png")
        sample_0_path = os.path.join(directory_path_50, f"sample-0-{image_number}-xt.png")
        sample_49_path = os.path.join(directory_path_50, f"sample-49-{image_number}-xt.png")

        if os.path.exists(sample_0_path) and os.path.exists(sample_49_path):
            original_result_index, original_result_probability = predict(original_path, resnet)
            #adversial_result_index, adversial_result_probability = predict(adversial_path, resnet)
            #if(original_result_index!=adversial_result_index):
              #print(f"Count: {wrong_count+1} Image Number:{image_number} Original: {cifar10_classes[original_result_index]} Adversial: {cifar10_classes[adversial_result_index]}")
              #wrong_count +=1
            #org_50_counts[original_result_index] += 1
            #adv_50_counts[adversial_result_index] += 1
            adversial_result_probability = get_probabilty_of_class(original_result_index, adversial_path, resnet)
            sample_0_result_probability = get_probabilty_of_class(original_result_index, sample_0_path, resnet)
            sample_49_result_probability = get_probabilty_of_class(original_result_index,sample_49_path, resnet)

            results_50.append({
                "Original Probability":  original_result_probability,
                "Adverserial Probability": adversial_result_probability,
                "Snow Probability": sample_0_result_probability,
                "Cleared Snow Probability": sample_49_result_probability,
            })

org_100_counts = {i:0 for i in range(10)}
adv_100_counts = {i:0 for i in range(10)}
wrong_count = 0
# 100 timestep
for file_name in os.listdir(directory_path_100):
    match = re.match(r"sample-original-(\d+)-xt\.png", file_name)
    if match:
        image_number = match.group(1)

        original_path = os.path.join(directory_path_100, f"sample-original-{image_number}-xt.png")
        adversial_path = os.path.join(directory_adversial_path_100, f"sample-original-{image_number}-xt.png")
        sample_0_path = os.path.join(directory_path_100, f"sample-0-{image_number}-xt.png")
        sample_99_path = os.path.join(directory_path_100, f"sample-99-{image_number}-xt.png")
        if os.path.exists(sample_0_path) and os.path.exists(sample_99_path):

            original_result_index, original_result_probability = predict(original_path, resnet)
            #adversial_result_probability = get_probabilty_of_class(original_result_index, adversial_path, resnet)
            adversial_result_index, adversial_result_probability = predict(adversial_path, resnet)
            if(original_result_index!=adversial_result_index):
              print(f"Count: {wrong_count+1} Image Number:{image_number} Original: {cifar10_classes[original_result_index]} Adversial: {cifar10_classes[adversial_result_index]}")
              wrong_count +=1
            org_100_counts[original_result_index] += 1
            adv_100_counts[adversial_result_index] += 1
            sample_0_result_probability = get_probabilty_of_class(original_result_index, sample_0_path, resnet)
            sample_99_result_probability = get_probabilty_of_class(original_result_index,sample_99_path, resnet)

            results_100.append({
                "org_prob":  original_result_probability,
                "adv_prob": adversial_result_probability,
                "snow_prob": sample_0_result_probability,
                "clear_snow_prob": sample_99_result_probability,
            })



In [None]:
import numpy as np

original_snow_dif_list, original_cleaned_dif_list, snow_cleaned_dif_list, org_adv_dif_list, adv_cleaned_dif_list = [], [], [], [], []

for result in results_50:
  original_snow_dif_list.append(result["Original Probability"] - result["Snow Probability"])
  original_cleaned_dif_list.append(result["Original Probability"] - result["Cleared Snow Probability"])
  snow_cleaned_dif_list.append(result["Snow Probability"] - result["Cleared Snow Probability"])
  org_adv_dif_list.append(result["Original Probability"] - result["Adverserial Probability"])
  adv_cleaned_dif_list.append(result["Adverserial Probability"] - result["Cleared Snow Probability"])

print("Test Results \n")
print("50 Timesteps \n")

res_dict = {}
for result in results_50:
  for k,v in result.items():
    if k not in res_dict:
      res_dict[k] = v
    else:
      res_dict[k] += v
for k in res_dict.keys():
  res_dict[k] /= len(results_50)
  print(f"{k}: {round((res_dict[k] * 100),2)}%")

print("\n")

print(f"Original - Snow mean: {(np.mean(original_snow_dif_list) * 100).round(2)}%")
print(f"Original - Cleaned mean: {(np.mean(original_cleaned_dif_list) * 100).round(2)}%")
print(f"Snow - Cleaned mean: {(np.mean(snow_cleaned_dif_list) * 100).round(2)}%")
print(f"Original - Adversial mean: {(np.mean(org_adv_dif_list) * 100).round(2)}%")
print(f"Adversial - Cleaned mean: {(np.mean(adv_cleaned_dif_list) * 100).round(2)}%")
print("\n")

original_snow_dif_list.clear()
original_cleaned_dif_list.clear()
snow_cleaned_dif_list.clear()
org_adv_dif_list.clear()
adv_cleaned_dif_list.clear()

for result in results_100:
  original_snow_dif_list.append(result["org_prob"] - result["snow_prob"])
  original_cleaned_dif_list.append(result["org_prob"] - result["clear_snow_prob"])
  snow_cleaned_dif_list.append(result["snow_prob"] - result["clear_snow_prob"])
  org_adv_dif_list.append(result["org_prob"] - result["adv_prob"])
  adv_cleaned_dif_list.append(result["adv_prob"] - result["clear_snow_prob"])

print("\n100 Timesteps")

res_dict = {}
for result in results_100:
  for k,v in result.items():
    if k not in res_dict:
      res_dict[k] = v
    else:
      res_dict[k] += v
for k in res_dict.keys():
  res_dict[k] /= len(results_100)
  print(f"{k}: {res_dict[k] * 100}%")

print("\n")
print(f"Original - Snow mean: {np.mean(original_snow_dif_list)}")
print(f"Original - Cleaned mean: {np.mean(original_cleaned_dif_list)}")
print(f"Snow - Cleaned mean: {np.mean(snow_cleaned_dif_list)}")
print(f"Original - Adversial mean: {np.mean(org_adv_dif_list)}")
print(f"Adversial - Cleaned mean: {np.mean(adv_cleaned_dif_list)}")



In [None]:
cifar10_classes = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
print("Originial Counts\n")
for k,v in org_50_counts.items():
  print(f"Class: {cifar10_classes[k]}, Sample Count: {v}")
print("\nAdversial Counts\n")
tot=0
for k,v in adv_50_counts.items():
  print(f"Class: {cifar10_classes[k]}, Sample Count: {v}")

In [None]:
# show images with adv versions
from PIL import Image
import os
from io import BytesIO

path_org = '/content/drive/MyDrive/test_results/snow_cifar10_50ts_09_01/trial/test_not_adv'
path_adv = '/content/drive/MyDrive/test_results/snow_cifar10_3_255_epsilon_50_time_step_18_12_clamp/trial/test_adv'
image_number = 66
print(len(os.listdir(path_org)))
print(len(os.listdir(path_adv)))
paths = [path_org, path_adv]
for path in paths:
  new_path = os.path.join(path,f"sample-original-{image_number}-xt.png" )
  img = Image.open(new_path)
  display(img)

Print for all test images

In [None]:
cifar10_classes = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']

def print_accuracy_to_file(results, filename):
    with open(filename, 'w') as file:
        for i, result in enumerate(results):
            file.write(f"Image: {i}            Class: {cifar10_classes[result['index']]}\n")
            file.write(f"Original Probabilty:     {result['org_prob']}\n")
            file.write(f"Attacked Probabilty:     {result['adv_prob']}\n")
            file.write(f"Snowified Probabilty:    {result['snow_prob']}\n")
            file.write(f"Cleared snow Probabilty: {result['clear_snow_prob']}\n\n")

print_accuracy_to_file(results_50,"result_50.txt")
print_accuracy_to_file(results_100,"result_100.txt")


In [None]:
print(predict("/content/drive/MyDrive/sample-original-324-xt.png", resnet))
print(predict("/content/drive/MyDrive/sample-0-324-xt.png", resnet))
print(predict("/content/drive/MyDrive/sample-49-324-xt.png", resnet))
print()
print(predict("/content/drive/MyDrive/sample-original-344-xt.png", resnet))
print(predict("/content/drive/MyDrive/sample-0-344-xt.png", resnet))
print(predict("/content/drive/MyDrive/sample-49-344-xt.png", resnet))
print()
print(predict("/content/drive/MyDrive/sample-original-349-xt.png", resnet))
print(predict("/content/drive/MyDrive/sample-0-349-xt.png", resnet))
print(predict("/content/drive/MyDrive/sample-49-349-xt.png", resnet))