CPU Adversarial Pertubation

In [5]:
import os
import cv2
import numpy as np
from PIL import Image
from tqdm import tqdm
from skimage.util import random_noise
from skimage.filters import gaussian
from skimage.transform import rescale, AffineTransform, warp
from scipy.ndimage import zoom as scizoom

def add_gaussian_noise(image):
    noisy_image = random_noise(image, mode='gaussian', mean=0, var=0.01)
    return (noisy_image * 255).astype(np.uint8)

def add_motion_blur(image):
    size = 5  # Size of the kernel
    kernel_motion_blur = np.zeros((size, size))
    kernel_motion_blur[int((size-1)/2), :] = np.ones(size)
    kernel_motion_blur = kernel_motion_blur / size
    output = cv2.filter2D(image, -1, kernel_motion_blur)
    return output

def add_shot_noise(image):
    image = image / 255.0
    noisy_image = np.random.poisson(image * 255) / 255.0
    noisy_image = np.clip(noisy_image, 0, 1) * 255
    return noisy_image.astype(np.uint8)

def add_zoom_blur(image, zoom_factor=2):
    h, w = image.shape[:2]
    # Zoom in to the image and then crop to original size
    zoomed = scizoom(image, (zoom_factor, zoom_factor, 1))
    start_h = (zoomed.shape[0] - h) // 2
    start_w = (zoomed.shape[1] - w) // 2
    zoomed_cropped = zoomed[start_h:start_h+h, start_w:start_w+w]
    return zoomed_cropped.astype(np.uint8)

def process_videos(input_folder, output_folder, perturbation_type='gaussian_noise'):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
    
    for dirname, _, filenames in os.walk(input_folder):
        for filename in tqdm(filenames):
            if filename.endswith('.mp4'):
                cap = cv2.VideoCapture(os.path.join(dirname, filename))
                
                # Get properties from input to use on output
                frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
                frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
                fps = cap.get(cv2.CAP_PROP_FPS)
                
                fourcc = cv2.VideoWriter_fourcc(*'mp4v')
                out_path = os.path.join(output_folder, filename)
                out = cv2.VideoWriter(out_path, fourcc, fps, (frame_width, frame_height))
                
                while True:
                    ret, frame = cap.read()
                    if not ret:
                        break
                    
                    if perturbation_type == 'gaussian_noise':
                        frame = add_gaussian_noise(frame)
                    elif perturbation_type == 'motion_blur':
                        frame = add_motion_blur(frame)
                    elif perturbation_type == 'shot_noise':
                        frame = add_shot_noise(frame)
                    elif perturbation_type == 'zoom_blur':
                        frame = add_zoom_blur(frame)
                    
                    out.write(frame)
                
                cap.release()
                out.release()

# Example usage:
input_folder = 'archive/videos_val'
output_folder = 'archive/gaussian_noise_perturbated'
perturbation_type = 'gaussian_noise'  # Can be 'gaussian_noise', 'shot_noise', or 'motion_blur'

process_videos(input_folder, output_folder, perturbation_type)


  0%|          | 0/19796 [00:00<?, ?it/s]

  0%|          | 20/19796 [01:24<23:13:07,  4.23s/it]


KeyboardInterrupt: 

GPU Adversarial Pertubation

In [2]:
import os
import cv2
import numpy as np
from PIL import Image
import torch
import torchvision.transforms.functional as TF
from tqdm import tqdm
from skimage.util import random_noise
from skimage.filters import gaussian
from scipy.ndimage import zoom as scizoom

# 设置使用的设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 添加高斯噪声
def add_gaussian_noise(image, std=0.1):
    noise = torch.randn(image.size(), device=device) * std
    noisy_image = image + noise
    return noisy_image.clamp(0, 1)

# 添加射击噪声
def add_shot_noise(image, scale=50):
    noisy_image = torch.poisson(image * scale) / scale
    return noisy_image.clamp(0, 1)

# 添加运动模糊
def add_motion_blur(image, kernel_size=5):
    kernel_motion_blur = torch.zeros((3, 1, kernel_size, kernel_size), device=device)
    kernel_motion_blur[:, 0, kernel_size//2, :] = torch.ones(kernel_size)
    kernel_motion_blur /= kernel_size
    padded_image = torch.nn.functional.pad(image, (kernel_size//2, kernel_size//2, kernel_size//2, kernel_size//2), mode='replicate')
    image_blurred = torch.nn.functional.conv2d(padded_image.unsqueeze(0), kernel_motion_blur, groups=3)
    return image_blurred.squeeze(0)

# 添加缩放模糊
def add_zoom_blur(image, zoom_factor=1.1):
    h, w = image.shape[1:]
    image_zoomed = TF.resize(image, int(zoom_factor * max(h, w)))
    center_crop = TF.center_crop(image_zoomed, [h, w])
    return center_crop

# 视频处理函数
def process_videos(input_folder, output_folder, perturbation_type='gaussian_noise', param=0.1):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
    
    for filename in tqdm(os.listdir(input_folder)):
        if filename.endswith('.mp4'):
            cap = cv2.VideoCapture(os.path.join(input_folder, filename))
            frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
            frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
            fps = cap.get(cv2.CAP_PROP_FPS)
            
            fourcc = cv2.VideoWriter_fourcc(*'mp4v')
            out_path = os.path.join(output_folder, filename)
            out = cv2.VideoWriter(out_path, fourcc, fps, (frame_width, frame_height))
            
            while True:
                ret, frame = cap.read()
                if not ret:
                    break
                
                frame = TF.to_tensor(frame).to(device)
                
                if perturbation_type == 'gaussian_noise':
                    frame = add_gaussian_noise(frame, std=param)
                elif perturbation_type == 'shot_noise':
                    frame = add_shot_noise(frame, scale=param)
                elif perturbation_type == 'motion_blur':
                    frame = add_motion_blur(frame, kernel_size=int(param))
                elif perturbation_type == 'zoom_blur':
                    frame = add_zoom_blur(frame, zoom_factor=param)
                
                frame = TF.to_pil_image(frame.cpu())
                frame = np.array(frame)
                out.write(frame)
            
            cap.release()
            out.release()

# Example usage
input_folder = 'archive/videos_val'
output_folder = 'archive/zoom_blur'
perturbation_type = 'zoom_blur'  # Can be 'gaussian_noise', 'shot_noise', 'motion_blur', 'zoom_blur'
param = 1.1  # Example parameters: 0.1 for gaussian_noise, 3000 for shot_noise, 5 for motion_blur, 1.1 for zoom_blur

process_videos(input_folder, output_folder, perturbation_type, param)


  0%|          | 0/19796 [00:00<?, ?it/s]

100%|██████████| 19796/19796 [8:04:16<00:00,  1.47s/it]  
