In [None]:
! pip install transformers
! pip install datasets

In [None]:
import os
import re

def extract_sequential_frames(base_path, category, num_frames=10, subcategory=None, overlap=True, skip_x = 1):
    if subcategory:
        path = os.path.join(base_path, category, subcategory)
    else:
        path = os.path.join(base_path, category)
    
    filenames = [os.path.join(path, f) for f in sorted(os.listdir(path), key=lambda x: int(x.split("_")[-1].split(".")[0]))]
    
    # Group by video source
    grouped_filenames = {}
    for file in filenames:
        video_source = re.match(r'^(.*_x264)_', os.path.basename(file)).group(1)
        if video_source not in grouped_filenames:
            grouped_filenames[video_source] = []
        grouped_filenames[video_source].append(file)

    sequences = []
    for video, files in grouped_filenames.items():
        if overlap:
            for i in range(0, len(files) - num_frames + 1, skip_x):
                sequences.append(files[i:i+num_frames])
        else:
            for i in range(0, len(files), num_frames):
                if i+num_frames <= len(files):
                    sequences.append(files[i:i+num_frames])
            
    return sequences

base_path = "dataset/Train/"


anomaly_categories = os.listdir(os.path.join(base_path, "AnomalyVideos"))
anomaly_sequences = []
for category in anomaly_categories:
    anomaly_sequences.extend(extract_sequential_frames(base_path, "AnomalyVideos", 16,  category, overlap=True, skip_x=4))


normal_sequences = extract_sequential_frames(base_path, "NormalVideos", 16, overlap=True, skip_x=4)

# Printing results
print("Sequences from AnomalyVideos:")
for seq in anomaly_sequences:
    print(seq)

print("\nSequences from NormalVideos:")
for seq in normal_sequences:
    print(seq)


In [None]:
print(len(anomaly_sequences), len(normal_sequences))

In [None]:
import pickle
from sklearn.model_selection import train_test_split

def save_to_pkl(data, filename):
    with open(filename, 'wb') as f:
        pickle.dump(data, f)

# Split the sequences into train and test sets
subset_size = 1000
subset_size = min(subset_size, len(anomaly_sequences), len(normal_sequences))
anomaly_train, anomaly_test = train_test_split(anomaly_sequences[:subset_size], test_size=0.2, random_state=42)
normal_train, normal_test = train_test_split(normal_sequences[:subset_size], test_size=0.2, random_state=42)

# Save lists to .pkl files
save_to_pkl(anomaly_train, 'anomaly_train.pkl')
save_to_pkl(anomaly_test, 'anomaly_test.pkl')
save_to_pkl(normal_train, 'normal_train.pkl')
save_to_pkl(normal_test, 'normal_test.pkl')

In [None]:
from transformers import AutoImageProcessor, SwinModel
import torch
image_processor = AutoImageProcessor.from_pretrained("swin-base-patch4-window7-224")
model = SwinModel.from_pretrained("swin-base-patch4-window7-224")

In [None]:
from PIL import Image

def save_swin_features(image_sequence, image_processor, model, data_fname):
    img_list = [Image.open(img_path) for img_path in image_sequence]
    #print(len(img_list)) # 10

    inputs = image_processor(img_list, return_tensors="pt")         
    #print(inputs.pixel_values.shape)

    with torch.no_grad():
        outputs = model(**inputs)

    last_hidden_states = outputs.last_hidden_state              
    #print(last_hidden_states.shape)

    torch.save(last_hidden_states, data_fname)
    return last_hidden_states


for i, sequence in enumerate(anomaly_train):
    save_swin_features(sequence, image_processor, model, f"swin_embeddings_mini/train/1/1_{i}.pt")
for i, sequence in enumerate(anomaly_test):
    save_swin_features(sequence, image_processor, model, f"swin_embeddings_mini/test/1/1_{i}.pt")

for i, sequence in enumerate(normal_train):
    save_swin_features(sequence, image_processor, model, f"swin_embeddings_mini/train/0/0_{i}.pt")
for i, sequence in enumerate(normal_test):
    save_swin_features(sequence, image_processor, model, f"swin_embeddings_mini/test/0/0_{i}.pt")

In [None]:
!pip install timm

In [None]:
import os
import glob
import torch
import torch.nn as nn
from timm.models.layers import DropPath, to_2tuple, trunc_normal_
from PIL import Image
import torchvision.transforms as transforms
import h5py
from swin_functions_and_classes import *

In [None]:
output_dir = "output_features/Anomaly_Swin"

image_path = "Test/AnomalyVideos"
for subfile in os.listdir(image_path):
    subfile_path = os.path.join(image_path, subfile)
    for subsubfile in os.listdir(subfile_path):
        hdf5_filename = os.path.join(output_dir, f"features_{subfile}_{subsubfile}.h5")
        hdf5_file = h5py.File(hdf5_filename, "w")
        subsubfile_path = os.path.join(subfile_path, subsubfile)
        image = Image.open(subsubfile_path)
        transform = transforms.Compose([transforms.Resize((224, 224)), transforms.ToTensor()])
        tensor_image = transform(image)
        tensor_image = tensor_image.unsqueeze(0)
        
        patch_embed = PatchEmbed(img_size=224, patch_size=4, in_chans=3, embed_dim=96)
        embedding = patch_embed(tensor_image)
        
        stage1 = BasicLayer(dim=96, input_resolution=(56,56), depth=2, num_heads=4, window_size=7)
        output = stage1(embedding)
        
        merge_layer = PatchMerging(input_resolution=(56,56), dim=96, norm_layer=nn.LayerNorm)
        merged_output = merge_layer(output)
        
        block_1 = SwinTransformerBlock(dim=96, input_resolution=(56, 56), num_heads=4, window_size=7, shift_size=0, mlp_ratio=4.0, qkv_bias=True, qk_scale=None, drop=0.0, attn_drop=0.0, drop_path=0.0, act_layer=nn.GELU, norm_layer=nn.LayerNorm)
        
        block_1_shf = SwinTransformerBlock(dim=96, input_resolution=(56, 56), num_heads=4, window_size=7, shift_size=2, mlp_ratio=4.0, qkv_bias=True, qk_scale=None, drop=0.0, attn_drop=0.0, drop_path=0.0, act_layer=nn.GELU, norm_layer=nn.LayerNorm)
        
        output = block_1(embedding)
        output_shf = block_1_shf(output)
        
        output_tensor = output_shf.detach().cpu()
        output_array = output_tensor.cpu().numpy()
        
        dataset_name = "output_data"
        hdf5_file.create_dataset(dataset_name, data=output_array)
        
        hdf5_file.close()
print("All spatial features saved to HDF5.")

In [None]:
import h5py
hdf5_filename = "output_features/Anomaly_Swin/features_Anomaly_Swin.h5"
hdf5_file = h5py.File(hdf5_filename, "r")
output_data = hdf5_file["output_data"]
output_array = output_data[:]

print(output_array)
print(output_array.shape)

hdf5_file.close()

In [None]:
output_dir = "output_features/Normal_Swin"

image_path = "Test/NormalVideos"
for subfile in os.listdir(image_path):
        subfile_path = os.path.join(image_path, subfile)
        hdf5_filename = os.path.join(output_dir, f"features_{subfile}.h5")
        hdf5_file = h5py.File(hdf5_filename, "w")
    #for subsubfile in os.listdir(subfile_path):
        #subsubfile_path = os.path.join(subfile_path, subsubfile)
        image = Image.open(subfile_path)
        transform = transforms.Compose([transforms.Resize((224, 224)), transforms.ToTensor()])
        tensor_image = transform(image)
        tensor_image = tensor_image.unsqueeze(0)
        
        patch_embed = PatchEmbed(img_size=224, patch_size=4, in_chans=3, embed_dim=96)
        embedding = patch_embed(tensor_image)
        
        stage1 = BasicLayer(dim=96, input_resolution=(56,56), depth=2, num_heads=4, window_size=7)
        output = stage1(embedding)
        
        merge_layer = PatchMerging(input_resolution=(56,56), dim=96, norm_layer=nn.LayerNorm)
        merged_output = merge_layer(output)
        
        block_1 = SwinTransformerBlock(dim=96, input_resolution=(56, 56), num_heads=4, window_size=7, shift_size=0, mlp_ratio=4.0, qkv_bias=True, qk_scale=None, drop=0.0, attn_drop=0.0, drop_path=0.0, act_layer=nn.GELU, norm_layer=nn.LayerNorm)
        
        block_1_shf = SwinTransformerBlock(dim=96, input_resolution=(56, 56), num_heads=4, window_size=7, shift_size=2, mlp_ratio=4.0, qkv_bias=True, qk_scale=None, drop=0.0, attn_drop=0.0, drop_path=0.0, act_layer=nn.GELU, norm_layer=nn.LayerNorm)
        
        output = block_1(embedding)
        output_shf = block_1_shf(output)
        
        output_tensor = output_shf.detach().cpu()
        output_array = output_tensor.cpu().numpy()
        
        dataset_name = "output_data"
        hdf5_file.create_dataset(dataset_name, data=output_array)
        
        hdf5_file.close()
print("All spatial features saved to HDF5.")

In [None]:
image_path_testing = "Test/AnomalyVideos"
for subfile in os.listdir(image_path_testing):
    subfile_path_testing = os.path.join(image_path_testing, subfile)
    for subsubfile in os.listdir(subfile_path_testing):
        print(subsubfile)

In [None]:
hdf5_file.close()