In [3]:
import pathlib

dataset_root_path = "UCF101_subset"
dataset_root_path = pathlib.Path(dataset_root_path)

test_video_file_paths = (list(dataset_root_path.glob("test/*/*.avi")))
class_labels = sorted({path.parent.name for path in test_video_file_paths})

label2id = {label: i for i, label in enumerate(class_labels)}
id2label = {i: label for label, i in label2id.items()}
print(f"Unique classes: {list(label2id.keys())}.")

Unique classes: ['ApplyEyeMakeup', 'ApplyLipstick', 'Archery', 'BabyCrawling', 'BalanceBeam', 'BandMarching', 'BaseballPitch', 'Basketball', 'BasketballDunk', 'BenchPress'].


In [4]:
import pytorchvideo.data
import os

from pytorchvideo.transforms import (
    ApplyTransformToKey,
    Normalize,
    UniformTemporalSubsample,
)
from torchvision.transforms import (
    Compose,
    Lambda,
    Resize,
)

mean = [0.5, 0.5, 0.5]
std = [0.5, 0.5, 0.5]
resize_to = (224, 224)
num_frames_to_sample = 32
sample_rate = 4
fps = 30
clip_duration = num_frames_to_sample * sample_rate / fps

val_transform = Compose(
    [
        ApplyTransformToKey(
            key="video",
            transform=Compose(
                [
                    UniformTemporalSubsample(num_frames_to_sample),
                    Lambda(lambda x: x / 255.0),
                    Normalize(mean, std),
                    Resize(resize_to, antialias=False),
                ]
            ),
        ),
    ]
)

test_dataset = pytorchvideo.data.Ucf101(
    data_path=os.path.join(dataset_root_path, "test"),
    clip_sampler=pytorchvideo.data.make_clip_sampler("uniform", clip_duration),
    decode_audio=False,
    transform=val_transform,
)



In [5]:
def inference(model, batch, device):
    inputs = {
        # bs, 3, 32, 224, 224 - > bs, 32, 3, 224, 224
        "pixel_values": batch['video'].transpose(1, 2)
    }

    inputs = {k: v.to(device) for k, v in inputs.items()}
    
    with torch.no_grad():
        outputs = model(**inputs)
        logits = outputs.logits
    
        predictions = torch.argmax(logits, dim=-1)
    
    return predictions

Initialization

In [6]:
import numpy as np
import torch
import evaluate
from torch.utils.data import DataLoader
from transformers import VivitForVideoClassification
from model_encryption import weight_extracting, weight_reloading

torch.cuda.empty_cache()
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

key_dict = np.load("key_dicts/key-32-2-16-seed100.npy", allow_pickle=True).item()
model_ckpt = "checkpoints/vivit-b-16x2-kinetics400-finetuned-ucf101-subset—withouImgP/checkpoint-370"
model = VivitForVideoClassification.from_pretrained(
    model_ckpt,
    label2id=label2id,
    id2label=id2label,
    ignore_mismatched_sizes=False,  # provide this in case you're planning to fine-tune an already fine-tuned checkpoint
)
model = model.to(device)
model.eval()

testing_loader = DataLoader(test_dataset, batch_size=5)

Classification with plain videos

In [7]:
metric = evaluate.load("accuracy")

for i, batch in enumerate(testing_loader):
    predictions = inference(model, batch, device)
    metric.add_batch(predictions=predictions, references=batch["label"])
acc = metric.compute()

Classification with encrypted videos

In [8]:
from model_encryption import cube_embedding_shuffling, pos_embedding_shuffling

# model encryption 
ce_weight, pos_weight = weight_extracting(model.vivit.embeddings)
shuffled_ce_weight = cube_embedding_shuffling(ce_weight, key_dict['ce_key'])
shuffled_pos_weight = pos_embedding_shuffling(pos_weight, key_dict['pos_key'])

# reload weights
model.vivit.embeddings = weight_reloading(model.vivit.embeddings, shuffled_ce_weight, pos_weight)

Shuffling weight of Patch embedding...
Shuffling weight of Position embedding...


In [9]:
from video_encryption import *

metric = evaluate.load("accuracy")
for i, batch in enumerate(testing_loader):
    video_tensor = batch['video'].transpose(1, 2)
    
    # encryption 
    cube_group = cube_division(video_tensor)
    cube_group = cube_pix_shuffling(cube_group, key_dict['ce_key'])
    cube_group = cube_pos_shuffling(cube_group, key_dict['pos_key'])
    encrypted_video = cube_integration(cube_group).transpose(1, 2)
    batch['video'] = encrypted_video

    predictions = inference(model, batch, device)
    metric.add_batch(predictions=predictions, references=batch["label"])

acc = metric.compute()