# **Step 1: Extract the frames of the video**

this part of the code was heavily inspired by https://www.youtube.com/watch?v=oOuswkbsBCU

In [None]:
!pip install opencv-python



In [None]:
import cv2
import os
from tqdm import tqdm

In [None]:
og_video = cv2.VideoCapture("/content/drive/MyDrive/Colab Notebooks/assets/immunized_video.mp4") #get the video you want

In [None]:
fps = og_video.get(cv2.CAP_PROP_FPS) #get the fps of the video
fps

30.0

In [None]:
output_directory = "/content/drive/MyDrive/Colab Notebooks/assets/Video_Frames" #define the directory where you want the frames saved to

In [None]:
n = 0 #count the frames of the video
i = 1 #defines the frame number when saving the image
r = 30 #desired frame rate, adjust according to how many frames you want extracted

all_frames = int(og_video.get(cv2.CAP_PROP_FRAME_COUNT))

progress_bar = tqdm(total = all_frames, desc = "Extracting Frames", unit = "frames")

while True:
  #ret -> true if frame was read, false if frame wasn't read
   ret, frame = og_video.read()

   if not ret or frame is None:
        break

   if (r * n) % fps == 0: #extract the frames one by one
      file_path = os.path.join(output_directory, "{}.jpg".format(i))
      cv2.imwrite(file_path, frame)
      progress_bar.update(1)

      i+=1

   n+=1

   if ret is False:
      break

#clean up
og_video.release()
cv2.destroyAllWindows()
progress_bar.close()

Extracting Frames: 100%|██████████| 10/10 [00:00<00:00, 70.72frames/s]


# **Step 2: Immunize frames**

This section of the code was heavily inspired by https://github.com/MadryLab/photoguard.git

2.1 Prequisites

In [None]:
from PIL import Image, ImageOps
import requests
import numpy as np
import torch
import torchvision.transforms as T

In [None]:
!huggingface-cli login

In [None]:
!pip install diffusers

In [None]:
from diffusers import StableDiffusionImg2ImgPipeline

In [None]:
to_pil = T.ToPILImage()

2.2 Collect images in a batch

Batch processing code was inspired by https://www.dzyla.com/blog/post34/

In [None]:
input_folder = "/content/drive/MyDrive/Colab Notebooks/assets/Video_Frames"

def getFrames(input_folder):

    frames = []

    for filename in os.listdir(input_folder):
        frame_path = os.path.join(input_folder, filename) #creates a full path for each frame
        #https://docs.python.org/3/library/os.html#module-os citation

        if os.path.isfile(frame_path):
            frame = Image.open(frame_path).convert('RGB')
            frames.append(frame)

    return frames

2.3 Crop and Resize the images

In [None]:
#adjust the cropping depending on your video
def resizeFrames(frames, input_folder, output_folder):

    resize = T.transforms.Resize(512)

    for i, init_image in enumerate(tqdm(frames, desc="Processing Images")):

        init_image_resized = resize(init_image)
        width, height = init_image_resized.size

        right_crop = init_image_resized.crop((width - 512, 0, width, height))

        output_path = os.path.join(output_folder, f"cropped{(i+1)}.jpg")

        right_crop.save(output_path)



In [None]:
input_folder = "/content/drive/MyDrive/Colab Notebooks/assets/Video_Frames"
output_folder = "/content/drive/MyDrive/Colab Notebooks/assets/Cropped_Frames"
images = getFrames(input_folder)
resizeFrames(images, input_folder, output_folder)

Processing Images: 100%|██████████| 49/49 [00:09<00:00,  5.03it/s]


2.4 Preprocess images

In [None]:
def preprocess(image):
    #resize the image to be compatible with the model input
    width, height = image.size
    width, height = map(lambda x: x - x % 32, (width, height))  #Many deep learning models especially those based on CNNs work efficiently with input sizes that are multiples of 32 that's why resize to an integer multiple of 32
    image = image.resize((width, height), resample=Image.LANCZOS) #Resampling using the Lanczos filter helps preserve image details
    image = np.array(image).astype(np.float32) / 255.0 #converts the image to a numpy array, changes the data type to float32 for precision and normalizes pixel values to the range [0, 1] by dividing by 255, this helps stabilize the training process
    #numpy array for high-performance multidimensional object management
    image = image[None].transpose(0, 3, 1, 2) #adding a singleton dimension at the beginning using None to be compatible with the expected input format
    image = torch.from_numpy(image) #converts the numpy array to a PyTorch tensor
    return 2.0 * image - 1.0 #scales pixel values to the range [-1, 1] for model input

2.5 PGD Attack

In [None]:
#initialize the model

device = "cuda"
model_id_or_path = "runwayml/stable-diffusion-v1-5"
pipe_img2img = StableDiffusionImg2ImgPipeline.from_pretrained(model_id_or_path, torch_dtype=torch.float16)
pipe_img2img = pipe_img2img.to(device)

In [None]:
def pgd(X, model, eps=0.1, step_size=0.015, iters=40, clamp_min=0, clamp_max=1):
    # clamp_min=0 and clamp_max=1 are to keep the pixel values at [0, 1]
    # Initialize adversarial examples with random noise
    X_adv = X.clone().detach() + (torch.rand(*X.shape) * 2 * eps - eps).cuda()
    # detach() is used to ensure that the created copy does not track gradient history
    # torch.rand(*X.shape) Generates a tensor of random values with the same shape as X
    # * 2 * eps - eps scales and shifts the random values to have a uniform distribution

    pbar = tqdm(range(iters))
    for i in pbar:
        actual_step_size = step_size - (step_size - step_size / 100) / iters * i #decreases step size dynamically to reduce perturbation magnitude

        X_adv.requires_grad_(True) #enables the computation of gradients, crucial for gradient descent attack

        # Modify loss function to target specific features or representations

        loss = (model(X_adv).latent_dist.mean).norm()

        pbar.set_description(f"[Running attack]: Loss {loss.item():.5f} | step size: {actual_step_size:.4}") #progress bar

        grad, = torch.autograd.grad(loss, [X_adv])

        # Perturb the image based on the gradient
        X_adv = X_adv - grad.detach().sign() * actual_step_size #.detach() prevents further gradient computations during updates and .sign() maximizes loss
        X_adv = torch.minimum(torch.maximum(X_adv, X - eps), X + eps )#The values of the adversarial example are clamped to ensure that they stay within a feasible range to prevent the generation of adversarial examples that are too different from the original input.
        X_adv.data = torch.clamp(X_adv, min=clamp_min, max=clamp_max) #ensures that each element remains in [0,1]
        X_adv.grad = None #clears any accumulated gradient information from previous iterations


    return X_adv

In [None]:
immunized_output_folder = "/content/drive/MyDrive/Colab Notebooks/assets/Immunized_Frames"

2.7 Execute the attack

In [None]:
i = 1

for filename in os.listdir(output_folder):
    frame_path = os.path.join(output_folder, filename)


    #Load and preprocess the image
    init_image = Image.open(frame_path).convert('RGB')
    with torch.autocast('cuda'):
        X = preprocess(init_image).half().cuda()

        #Execute PGD attack
        adv_X = pgd(X,
                    model=pipe_img2img.vae.encode,
                    clamp_min=-1,
                    clamp_max=1,
                    eps=0.2,  #Adjust the perturbation range for more noticeable disruptions
                    step_size=0.000001,
                    iters=500,
                   )

        # Convert pixels back to [0, 1] range
        adv_X = (adv_X / 2 + 0.5).clamp(0, 1)

    adv_image = to_pil(adv_X[0]).convert("RGB")

    #output path
    immunized_frame_path = os.path.join(immunized_output_folder, f"immunized_{i}.jpg")

    #save the new image
    adv_image.save(immunized_frame_path)

    i += 1


[Running attack]: Loss 679.84784 | step size: 1.198e-08: 100%|██████████| 500/500 [00:51<00:00,  9.68it/s]
[Running attack]: Loss 678.29138 | step size: 1.198e-08: 100%|██████████| 500/500 [00:49<00:00, 10.07it/s]
[Running attack]: Loss 679.31622 | step size: 1.198e-08: 100%|██████████| 500/500 [00:49<00:00, 10.10it/s]
[Running attack]: Loss 680.53479 | step size: 1.198e-08: 100%|██████████| 500/500 [00:49<00:00, 10.11it/s]
[Running attack]: Loss 679.71991 | step size: 1.198e-08: 100%|██████████| 500/500 [00:49<00:00, 10.12it/s]
[Running attack]: Loss 680.01361 | step size: 1.198e-08: 100%|██████████| 500/500 [00:49<00:00, 10.13it/s]
[Running attack]: Loss 680.99133 | step size: 1.198e-08: 100%|██████████| 500/500 [00:49<00:00, 10.14it/s]
[Running attack]: Loss 679.99615 | step size: 1.198e-08: 100%|██████████| 500/500 [00:49<00:00, 10.08it/s]
[Running attack]: Loss 679.27246 | step size: 1.198e-08: 100%|██████████| 500/500 [00:49<00:00, 10.12it/s]
[Running attack]: Loss 679.92041 | st

# **Step 3: Reassamble frames**

this part of the code was heavily inspired by https://www.youtube.com/watch?v=ZcqodhMuv4o

In [None]:
input_directory = "/content/drive/MyDrive/Colab Notebooks/assets/Immunized_Frames" #directory containing immunized frames

output_directory = "/content/drive/MyDrive/Colab Notebooks/assets/" #output directory

output_name = "immunized_video.mp4" #output

combined = output_directory + output_name

In [None]:
fps = 30 #specify the frame rate, should be the same as the original video

format = cv2.VideoWriter_fourcc(*"mp4v") #desired video codec
size = (512, 512) #size of the video (width, length)

immunized_video = cv2.VideoWriter(combined, format, fps, size)

In [None]:
i = 1 #frame number

all_frames = sum(1 for _ in os.listdir(input_directory))


#iterate through the frames and append them
progress_bar = tqdm(total = all_frames, desc = "Processing Frames", unit = "frames")

while True:
  file_path = os.path.join(input_directory, "immunized_{}.jpg".format(i))

  if not os.path.exists(file_path):
     break


  frame = cv2.imread(file_path)
  immunized_video.write(frame)

  progress_bar.update(1)  # Update the progress bar
  progress_bar.set_postfix({"Frame": i})

  i+=1

immunized_video.release()

cv2.destroyAllWindows()

#obviously the end result doesn't have any audio because it's just a compilation of images, neglecting any accompanying sound elements

Processing Frames: 100%|██████████| 10/10 [00:00<00:00, 38.38frames/s, Frame=10]