Extract Pure Adversarial Perturbation

In [None]:
import torch
import av
import numpy as np
from tqdm import tqdm
import os
import json
import torch.nn.functional as F
from transformers import TimesformerForVideoClassification, AutoImageProcessor
from sklearn.metrics import precision_recall_fscore_support, accuracy_score

def get_video_info(video_path):
    container = av.open(video_path)
    stream = container.streams.video[0]
    total_frames = stream.frames
    container.close()
    return total_frames

def load_video_segment(video_path, start_frame, num_frames=8):
    frames = []
    container = av.open(video_path)
    
    stream = container.decode(video=0)
    for i, frame in enumerate(stream):
        if i >= start_frame:
            frames.append(frame.to_rgb().to_ndarray())
            if len(frames) == num_frames:
                break
    
    container.close()
    
    while len(frames) < num_frames:
        frames.append(frames[-1] if frames else np.zeros_like(frames[0]))
    
    return np.stack(frames)

def fgsm_attack_with_gradient(model, data, epsilon, labels, device):
    """
    Modified FGSM attack that returns both the gradients and perturbed data
    """
    data.pixel_values.requires_grad = True
    
    outputs = model(**data)
    loss = F.cross_entropy(outputs.logits, labels)
    model.zero_grad()
    loss.backward()
    
    # Get raw gradients
    raw_gradients = data.pixel_values.grad.data.clone()
    
    # Get gradient sign for FGSM
    data_grad = raw_gradients.sign()
    
    # Generate perturbed data
    perturbed_data = data.copy()
    perturbed_data.pixel_values = data.pixel_values + epsilon * data_grad
    perturbed_data.pixel_values = torch.clamp(perturbed_data.pixel_values, 0, 1)
    
    return raw_gradients, data_grad, perturbed_data

def save_gradient_frames(gradients, output_path):
    """
    Save gradient visualization as video frames
    """
    # Normalize gradients to [0, 1] range for visualization
    grad_min = gradients.min()
    grad_max = gradients.max()
    normalized_grads = (gradients - grad_min) / (grad_max - grad_min)
    
    container = av.open(output_path, mode='w')
    stream = container.add_stream('h264', rate=30)
    stream.width = gradients.shape[3]
    stream.height = gradients.shape[2]
    
    for frame in normalized_grads:
        # Convert to grayscale-like visualization
        frame = frame.mean(dim=0)  # Average across channels
        frame = frame.unsqueeze(2).repeat(1, 1, 3)  # Repeat for RGB channels
        frame = frame.cpu().numpy()
        frame = (frame * 255).astype(np.uint8)
        frame = av.VideoFrame.from_ndarray(frame, format='rgb24')
        packet = stream.encode(frame)
        container.mux(packet)
    
    packet = stream.encode(None)
    container.mux(packet)
    container.close()

def process_single_video(config):
    """
    Process a single video and generate gradient visualizations
    """
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model = TimesformerForVideoClassification.from_pretrained(config['model_name']).to(device)
    processor = AutoImageProcessor.from_pretrained(config['model_name'])
    model.eval()
    
    # Create output directory
    output_dir = config['output_directory']
    os.makedirs(output_dir, exist_ok=True)
    
    video_path = config['video_path']
    total_frames = get_video_info(video_path)
    num_segments = total_frames // 8
    
    for segment_idx in range(num_segments):
        start_frame = segment_idx * 8
        
        # Load video segment
        frames = load_video_segment(video_path, start_frame)
        inputs = processor(list(frames), return_tensors="pt").to(device)
        
        # Use a dummy label (0) since we're only interested in gradients
        labels = torch.tensor([0]).to(device)
        
        # Generate gradients and FGSM attack
        raw_gradients, sign_gradients, perturbed_inputs = fgsm_attack_with_gradient(
            model, inputs, config['epsilon'], labels, device
        )
        
        # Save raw gradients
        raw_grad_path = os.path.join(output_dir, f'raw_gradients_segment_{segment_idx:04d}.mp4')
        save_gradient_frames(raw_gradients[0], raw_grad_path)
        
        # Save sign gradients
        sign_grad_path = os.path.join(output_dir, f'sign_gradients_segment_{segment_idx:04d}.mp4')
        save_gradient_frames(sign_gradients[0], sign_grad_path)

if __name__ == "__main__":
    config = {
        'model_name': 'facebook/timesformer-base-finetuned-k400',
        'video_path': '/home/z/Music/st/Kinetics-400/RQ3/testing/4gNhknocfik.mp4',
        'output_directory': '/home/z/Music/st/Kinetics-400/RQ3/testingFGSM/',  # Specify your output directory
        'epsilon': 0.3
    }
    
    process_single_video(config)

Spatio-Temporal Attack to Videos

In [None]:
import torch
import av
import numpy as np
from tqdm import tqdm
import os
import json
import torch.nn.functional as F
from transformers import TimesformerForVideoClassification, AutoImageProcessor
from sklearn.metrics import precision_recall_fscore_support, accuracy_score

def get_video_info(video_path):
    container = av.open(video_path)
    stream = container.streams.video[0]
    total_frames = stream.frames
    container.close()
    return total_frames

def load_video_segment(video_path, start_frame, num_frames=8):
    frames = []
    container = av.open(video_path)
    
    stream = container.decode(video=0)
    for i, frame in enumerate(stream):
        if i >= start_frame:
            frames.append(frame.to_rgb().to_ndarray())
            if len(frames) == num_frames:
                break
    
    container.close()
    
    while len(frames) < num_frames:
        frames.append(frames[-1] if frames else np.zeros_like(frames[0]))
    
    return np.stack(frames)

def load_labels(label_file):
    labels = {}
    with open(label_file, 'r') as f:
        for line in f:
            video_name, label = line.strip().split()
            labels[video_name.split('.')[0]] = int(label)
    return labels

def save_segment_info(segment_info, output_path):

    with open(output_path, 'w') as f:
        json.dump(segment_info, f)

def fgsm_attack(model, data, epsilon, labels, device):

    data.pixel_values.requires_grad = True
    
    outputs = model(**data)
    loss = F.cross_entropy(outputs.logits, labels)
    model.zero_grad()
    loss.backward()

    data_grad = data.pixel_values.grad.data.sign()
    
    perturbed_data = data.copy()
    perturbed_data.pixel_values = data.pixel_values + epsilon * data_grad
    
    perturbed_data.pixel_values = torch.clamp(perturbed_data.pixel_values, 0, 1)
    
    return perturbed_data

def save_video_frames(frames, output_path, fps=30):

    container = av.open(output_path, mode='w')
    stream = container.add_stream('h264', rate=fps)
    stream.width = frames.shape[3]
    stream.height = frames.shape[2]
    
    for frame in frames:
        frame = frame.permute(1, 2, 0).numpy()
        frame = (frame * 255).astype(np.uint8)
        frame = av.VideoFrame.from_ndarray(frame, format='rgb24')
        packet = stream.encode(frame)
        container.mux(packet)
    
    packet = stream.encode(None)
    container.mux(packet)
    container.close()

def evaluate_and_generate_adversarial(config):

    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model = TimesformerForVideoClassification.from_pretrained(config['model_name']).to(device)
    processor = AutoImageProcessor.from_pretrained(config['model_name'])
    model.eval()

    video_labels = load_labels(config['label_file'])
    
    adv_video_dir = os.path.join(os.path.dirname(config['video_directory']), 'FGSM')
    os.makedirs(adv_video_dir, exist_ok=True)

    clean_preds = []
    adv_preds = []
    all_labels = []

    segment_info = {}

    video_files = [f for f in os.listdir(config['video_directory']) if f.endswith('.mp4')]
    
    for video_file in tqdm(video_files, desc="Processing videos"):
        video_name = video_file.split('.')[0]
        
        if video_name not in video_labels:
            continue
            
        true_label = video_labels[video_name]
        video_path = os.path.join(config['video_directory'], video_file)
        
        try:

            total_frames = get_video_info(video_path)
            num_segments = total_frames // 8  
            
            segment_info[video_name] = [1] * num_segments

            clean_pred_segments = []
            adv_pred_segments = []
            
            for segment_idx in range(num_segments):
                start_frame = segment_idx * 8
                

                frames = load_video_segment(video_path, start_frame)
                inputs = processor(list(frames), return_tensors="pt").to(device)
                labels = torch.tensor([true_label]).to(device)
                

                with torch.no_grad():
                    clean_outputs = model(**inputs)
                    clean_pred = clean_outputs.logits.argmax(-1).cpu().numpy()[0]
                    clean_pred_segments.append(clean_pred)
                

                perturbed_inputs = fgsm_attack(model, inputs, config['epsilon'], labels, device)
                

                with torch.no_grad():
                    adv_outputs = model(**perturbed_inputs)
                    adv_pred = adv_outputs.logits.argmax(-1).cpu().numpy()[0]
                    adv_pred_segments.append(adv_pred)
                

                segment_dir = os.path.join(adv_video_dir, video_name)
                os.makedirs(segment_dir, exist_ok=True)
                segment_path = os.path.join(segment_dir, f'segment_{segment_idx:04d}.mp4')
                adv_frames = perturbed_inputs.pixel_values[0].cpu().detach()
                save_video_frames(adv_frames, segment_path)
            

            clean_pred_final = max(set(clean_pred_segments), key=clean_pred_segments.count)
            adv_pred_final = max(set(adv_pred_segments), key=adv_pred_segments.count)
            

            clean_preds.append(clean_pred_final)
            adv_preds.append(adv_pred_final)
            all_labels.append(true_label)
            
        except Exception as e:
            print(f"Error processing {video_file}: {str(e)}")
            continue
    
    segment_info_path = os.path.join(os.path.dirname(config['video_directory']), 'fgsm_segment_info.json')
    save_segment_info(segment_info, segment_info_path)
    
    clean_precision, clean_recall, clean_f1, _ = precision_recall_fscore_support(
        all_labels, clean_preds, average='weighted'
    )
    clean_accuracy = accuracy_score(all_labels, clean_preds)
    
    adv_precision, adv_recall, adv_f1, _ = precision_recall_fscore_support(
        all_labels, adv_preds, average='weighted'
    )
    adv_accuracy = accuracy_score(all_labels, adv_preds)
    
    results = {
        'clean': {
            'accuracy': float(clean_accuracy),
            'precision': float(clean_precision),
            'recall': float(clean_recall),
            'f1': float(clean_f1)
        },
        'adversarial': {
            'accuracy': float(adv_accuracy),
            'precision': float(adv_precision),
            'recall': float(adv_recall),
            'f1': float(adv_f1)
        }
    }
    
    print("\nClean Performance Metrics:")
    print(f"Accuracy: {clean_accuracy:.4f}")
    print(f"Precision: {clean_precision:.4f}")
    print(f"Recall: {clean_recall:.4f}")
    print(f"F1 Score: {clean_f1:.4f}")
    
    print("\nAdversarial Performance Metrics:")
    print(f"Accuracy: {adv_accuracy:.4f}")
    print(f"Precision: {adv_precision:.4f}")
    print(f"Recall: {adv_recall:.4f}")
    print(f"F1 Score: {adv_f1:.4f}")
    
    return results

if __name__ == "__main__":
    config = {
        'model_name': 'facebook/timesformer-base-finetuned-k400',
        'video_directory': '/home/z/Music/st/Kinetics-400/RQ3/example/select/selected',
        'label_file': '/home/z/Music/st/Kinetics-400/RQ1/kinetics400_val_list_videos.txt',
        'epsilon': 0.1
    }
    
    results = evaluate_and_generate_adversarial(config)