In [None]:
from IPython import display

In [None]:
!pip install timm transformers torchvision

display.clear_output()

In [None]:
#-- Import -----------------------------------------------------------------------------------------------
import cv2
import torch
import timm
from torchvision import transforms
from transformers import CLIPProcessor, CLIPModel
from PIL import Image
from typing import List
import os
#---------------------------------------------------------------------------------------------------------------

In [None]:
#-- Initialize ---------------------------------------------------------------------------------------------------
intput_path = '/kaggle/input/'
out_path = '/kaggle/working/'

result_video_dir = out_path + 'result_videos/'

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('device:' , DEVICE)

DINO_BOX_THRESHOLD = 0.25
DINO_TEXT_THRESHOLD = 0.1
#---------------------------------------------------------------------------------------------------------------

In [None]:
os.makedirs(result_video_dir, exist_ok=True)

In [None]:
dino_model = timm.create_model("vit_base_patch16_224_dino", pretrained=True)
dino_model.eval()

clip_model = CLIPModel.from_pretrained("openai/clip-vit-large-patch14")
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-large-patch14")

display.clear_output()


In [None]:
#-- Set labels for ZSOD Models ------------------------------------------------------------------------------------
labels = [
    "Person climbing over a fence",
    "Person climbing a wall",
    "Person breaking a lock with tools",
    "Person trying to pick a lock",
    "Person forcing a door open with strength",
    "Person peeking through a window",
    #"Person carrying stolen items",
    "Person sneaking into a building",
    "Person looking around nervously",
    #"Person hiding behind an object",
    "Person running away from a building",
    "Person carrying tools like a crowbar",
    "Person breaking a window with an object",
    "Person tampering with a security camera",
    "Person cutting alarm wires",
    #"Person jumping over a barrier",
    #"Person carrying a large bag suspiciously",
    "Person entering a restricted area",
    #"Person hiding stolen items in a bag",
    "Person fighting with a security guard",
    #"Person avoiding eye contact with others",
    "Person loitering near a building",
    "Person jumping out of a window",
    "Person disabling an alarm system",
    "Person wearing a mask and avoiding detection"
] 
#-----------------------------------------------------------------------------------------------------------------

In [None]:
def preprocess_image(frame):
    preprocess = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
    ])
    return preprocess(frame).unsqueeze(0)

In [None]:
def extract_image_features_dino(frame):
    with torch.no_grad():
        frame_tensor = preprocess_image(frame)
        features = dino_model.forward_features(frame_tensor)
        return features.mean(dim=1)  # Average pooling

In [None]:
def extract_text_features_clip(labels: List[str]):
    inputs = clip_processor(text=labels, return_tensors="pt", padding=True)
    with torch.no_grad():
        text_features = clip_model.get_text_features(**inputs)
    return text_features

In [None]:
# def process_video(video_path, labels, frame_skip=30):
#     # استخراج بردارهای متنی
#     text_features = extract_text_features_clip(labels)

#     # باز کردن ویدیو
#     cap = cv2.VideoCapture(video_path)
#     frame_count = 0
#     results = []

    
    
#     while cap.isOpened():
#         ret, frame = cap.read()
#         if not ret:
#             break


#         # پردازش هر frame_skip‌ام فریم
#         if frame_count % frame_skip == 0:
#             # تبدیل فریم به فرمت PIL
#             frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
#             frame_pil = Image.fromarray(frame)

#             # استخراج ویژگی‌های تصویری
#             image_features = extract_image_features_dino(frame_pil)

#             # محاسبه شباهت کسینوسی
#             similarities = torch.nn.functional.cosine_similarity(image_features, text_features)
#             top_label_idx = torch.argmax(similarities).item()
#             results.append((frame_count, labels[top_label_idx], similarities[top_label_idx].item()))

#         frame_count += 1

#     cap.release()
#     return results

In [None]:
def process_video(video_path, labels, frame_skip=30):
    # استخراج بردارهای متنی
    text_features = extract_text_features_clip(labels)

    # باز کردن ویدیو
    cap = cv2.VideoCapture(video_path)
    frame_count = 0
    results = []

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # پردازش هر frame_skip‌ام فریم
        if frame_count % frame_skip == 0:
            # تبدیل فریم به فرمت PIL
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame_pil = Image.fromarray(frame)

            # استخراج ویژگی‌های تصویری
            image_features = extract_image_features_dino(frame_pil)

            # محاسبه شباهت کسینوسی
            similarities = torch.nn.functional.cosine_similarity(image_features, text_features)

            # یافتن سه شباهت برتر
            topk_similarities, topk_indices = torch.topk(similarities, k=3)
            topk_labels = [labels[idx] for idx in topk_indices]

            # ذخیره نتایج
            results.append((frame_count, topk_labels, topk_similarities.tolist()))

        frame_count += 1

    cap.release()
    return results

In [None]:
burglary_samples_dir = '/kaggle/input/anomalydetectiondatasetucf/Burglary/'
normal_samples_dir = '/kaggle/input/anomalydetectiondatasetucf/Normal_Videos_for_Event_Recognition/Normal_Videos_for_Event_Recognition/'

In [None]:

for video_file in os.listdir(burglary_samples_dir):    
    #-- log --
    print(f'Processing {video_file} ==========================================================') 

    if video_file != 'Burglary081_x264A.mp4':
        continue

    video_path = os.path.join(burglary_samples_dir, video_file)        
    results = process_video(video_path, labels, frame_skip=30)
    
    # # 8. نمایش نتایج
    # for frame_idx, label, similarity in results:
    #     print(f"Frame {frame_idx}: Predicted label = {label} (Similarity = {similarity:.4f})")

    for frame_idx, top_labels, similarities in results:
        print(f"Frame {frame_idx}:")
        for label, similarity in zip(top_labels, similarities):
            print(f"    {label} (Similarity = {similarity:.4f})")


    break

In [None]:

for video_file in os.listdir(normal_samples_dir):    
    #-- log --
    print(f'Processing {video_file} ==========================================================') 

    if video_file != 'Normal_Videos_129_x264.mp4':
        continue

    video_path = os.path.join(normal_samples_dir, video_file)        
    results = process_video(video_path, labels, frame_skip=30)
    
    # # 8. نمایش نتایج
    # for frame_idx, label, similarity in results:
    #     print(f"Frame {frame_idx}: Predicted label = {label} (Similarity = {similarity:.4f})")

    
    for frame_idx, top_labels, similarities in results:
        print(f"Frame {frame_idx}:")
        for label, similarity in zip(top_labels, similarities):
            print(f"    {label} (Similarity = {similarity:.4f})")


    break