#Crime Classification and Description

In [None]:
!pip install torch torchvision torchaudio
!pip install tensorflow==2.12.0
!pip install mediapipe
!pip install opencv-python

In [None]:
# For video loading and decoding
!pip install decord av imageio

In [None]:
# Hugging Face & Transformers
!pip install transformers==4.40.1 timm accelerate
# Optional: for newer vision models or FP16/Flash attention support
!pip install flash-attn --no-build-isolation

#Libraries

In [None]:
import os
import cv2
import numpy as np
import torch
import tensorflow as tf
import mediapipe as mp
from PIL import Image
from decord import VideoReader, cpu
import torchvision.transforms as T
from torchvision.transforms.functional import InterpolationMode
from transformers import AutoModel, AutoTokenizer
from tensorflow.keras.models import Model
from tensorflow.keras.layers import (Input, LSTM, Dense, Dropout, LayerNormalization, Bidirectional, Lambda, Multiply)
from tensorflow.keras.optimizers import Adam
from google.colab.patches import cv2_imshow
import time

#Classification and description

In [None]:
# Force CPU usage
tf.config.set_visible_devices([], 'GPU')

# ========== 🧠 Load Classifier ==========
crime_classes = ["Shoplifting", "Vandalism"]

def create_advanced_lstm_model(input_shape=(100, 225), num_classes=2):
    inputs = Input(shape=input_shape)
    x = Bidirectional(LSTM(128, return_sequences=True))(inputs)
    x = LayerNormalization()(x)
    x = Dropout(0.3)(x)
    x = LSTM(64, return_sequences=True)(x)
    x = Dropout(0.3)(x)
    attention_data = Dense(64, activation='tanh')(x)
    attention_scores = Dense(1)(attention_data)
    attention_scores = Lambda(lambda t: tf.nn.softmax(t, axis=1))(attention_scores)
    x = Multiply()([x, attention_scores])
    x = Lambda(lambda t: tf.reduce_sum(t, axis=1))(x)
    x = LayerNormalization()(x)
    x = Dense(128, activation="swish")(x)
    x = Dropout(0.4)(x)
    x = Dense(64, activation="swish")(x)
    outputs = Dense(num_classes, activation="softmax")(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(1e-4), loss='categorical_crossentropy', metrics=['accuracy'])
    return model

model = create_advanced_lstm_model()
model.load_weights("/content/crime_detection_model_2class.h5")
# ========== 🧍‍♂️ Pose Extraction ==========
mp_holistic = mp.solutions.holistic
holistic = mp_holistic.Holistic()

def extract_pose_hand_landmarks(video_path, max_frames=100):
    cap = cv2.VideoCapture(video_path)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    skip = max(total_frames // max_frames, 1)
    landmarks_list, count = [], 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret: break
        if count % skip == 0:
            rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            results = holistic.process(rgb)
            data = []

            if results.pose_landmarks:
                data += [lm.x for lm in results.pose_landmarks.landmark]
                data += [lm.y for lm in results.pose_landmarks.landmark]
                data += [lm.z for lm in results.pose_landmarks.landmark]
            else:
                data += [0] * (33 * 3)

            if results.left_hand_landmarks:
                data += [lm.x for lm in results.left_hand_landmarks.landmark]
                data += [lm.y for lm in results.left_hand_landmarks.landmark]
                data += [lm.z for lm in results.left_hand_landmarks.landmark]
            else:
                data += [0] * (21 * 3)

            if results.right_hand_landmarks:
                data += [lm.x for lm in results.right_hand_landmarks.landmark]
                data += [lm.y for lm in results.right_hand_landmarks.landmark]
                data += [lm.z for lm in results.right_hand_landmarks.landmark]
            else:
                data += [0] * (21 * 3)

            if len(data) == 225:
                landmarks_list.append(data)
        count += 1
        if len(landmarks_list) >= max_frames: break
    cap.release()

    if len(landmarks_list) < max_frames:
        landmarks_list += [[0] * 225] * (max_frames - len(landmarks_list))
    return np.expand_dims(np.array(landmarks_list), axis=0)

# Dummy crime classes and model (you should load your own trained model)
crime_classes = ["Shoplifting", "Vandalism"]
model = torch.nn.Sequential(torch.nn.Flatten(), torch.nn.Linear(22500, 2), torch.nn.Softmax(dim=1))  # dummy model

def predict_crime(video_path):
    features = extract_pose_hand_landmarks(video_path)
    features = torch.tensor(features, dtype=torch.float32)
    pred = model(features)
    return crime_classes[torch.argmax(pred).item()]

# ========== 📜 InternVL Descriptor ==========
os.environ["HF_TOKEN"] = "YOUR_HUGGINGFACE_TOKEN"
model_path = "OpenGVLab/InternVL_2_5_HiCo_R16"

tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True, use_auth_token=os.getenv("HF_TOKEN"))
intern_model = AutoModel.from_pretrained(model_path, torch_dtype=torch.float32, use_auth_token=os.getenv("HF_TOKEN")).to("cpu")
intern_model = intern_model.to(torch.bfloat16)

def build_transform(input_size=448):
    return T.Compose([
        T.Lambda(lambda img: img.convert("RGB")),
        T.Resize((input_size, input_size), interpolation=InterpolationMode.BICUBIC),
        T.ToTensor(),
        T.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
    ])

def dynamic_preprocess(image, image_size=448):
    return [image.resize((image_size, image_size))]

def load_video_frames(video_path, input_size=448, num_segments=64):
    vr = VideoReader(video_path, ctx=cpu(0))
    max_frame = len(vr)
    frame_indices = np.linspace(0, max_frame - 1, num_segments).astype(int)

    transform = build_transform(input_size)
    pixel_values_list = []
    num_patches_list = []

    for idx in frame_indices:
        img = Image.fromarray(vr[idx].asnumpy())
        processed_imgs = dynamic_preprocess(img, input_size)
        pixels = [transform(tile) for tile in processed_imgs]
        stacked = torch.stack(pixels)
        pixel_values_list.append(stacked)
        num_patches_list.append(stacked.shape[0])

    pixel_values = torch.cat(pixel_values_list)
    return pixel_values.to(torch.bfloat16), num_patches_list

def generate_video_description(video_path, predicted_label):
    prompts = {
        "Shoplifting": "Describe the suspicious activity in the video where a person is stealing an item.",
        "Vandalism": "Describe the destructive activity where a person damages property."
    }
    pixel_values, num_patches_list = load_video_frames(video_path)
    question = "".join([f"Frame{i+1}: <image>\n" for i in range(len(num_patches_list))])
    question += prompts[predicted_label]

    with torch.no_grad():
        output, _ = intern_model.chat(tokenizer, pixel_values, question, {
            "do_sample": False, "temperature": 0.0, "max_new_tokens": 1024
        }, num_patches_list=num_patches_list, return_history=True)
    return output

# ========== 🎬 Run Full Pipeline ==========
def analyze_video(video_path):
    predicted_label = predict_crime(video_path)
    print(f"🔹 Predicted Crime Class: {predicted_label}")
    description = generate_video_description(video_path, predicted_label)
    print(f"📝 Description: {description}")

    cap = cv2.VideoCapture(video_path)
    frame_count = 0
    while True:
        ret, frame = cap.read()
        if not ret: break

        cv2.putText(frame, f"Crime: {predicted_label}", (20, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 255), 3)
        if frame_count % 10 == 0:  # Show every 10th frame
            cv2_imshow(frame)
            time.sleep(0.05)
        frame_count += 1

    cap.release()

# ========== 🔍 Run on Sample Video ==========
if __name__ == "__main__":
    analyze_video("/content/test_video_1.mp4")