In [None]:
!git lfs install

In [None]:
!git clone https://huggingface.co/datasets/sshao0516/CrowdHuman

In [None]:
import zipfile, os

zip_files = [
    "/content/CrowdHuman/CrowdHuman_train01.zip",
    "/content/CrowdHuman/CrowdHuman_train02.zip",
    "/content/CrowdHuman/CrowdHuman_train03.zip",
    "/content/CrowdHuman/CrowdHuman_val.zip"
]
extract_dir = "/content/CrowdHuman/images"
os.makedirs(extract_dir, exist_ok=True)

for zf in zip_files:
    with zipfile.ZipFile(zf, 'r') as zip_ref:
        zip_ref.extractall(extract_dir)


In [None]:
import json

# STEP 2: Load IDs
def load_image_ids_from_odgt(odgt_path):
    ids = []
    with open(odgt_path, 'r') as f:
        for line in f:
            data = json.loads(line.strip())
            ids.append(data['ID'])
    return ids

train_ids_all = load_image_ids_from_odgt("/content/CrowdHuman/annotation_train.odgt")
val_ids_all = load_image_ids_from_odgt("/content/CrowdHuman/annotation_val.odgt")

print("Train IDs (Full):", len(train_ids_all))
print("Val IDs (Full):", len(val_ids_all))

In [None]:
import json, glob
from PIL import Image

input_ann = "/content/CrowdHuman/annotation_train.odgt"
img_dir = "/content/CrowdHuman/images/Images"
yolo_labels = "/content/CrowdHuman/labels/train"
os.makedirs(yolo_labels, exist_ok=True)

with open(input_ann, "r") as f:
    for line in f:
        data = json.loads(line)
        img_name = data["ID"] + ".jpg"
        img_path = os.path.join(img_dir, img_name)

        if not os.path.exists(img_path):
            continue

        img = Image.open(img_path)
        W, H = img.size
        label_path = os.path.join(yolo_labels, data["ID"] + ".txt")

        with open(label_path, "w") as lf:
            for gt in data["gtboxes"]:
                if gt["tag"] != "person":
                    continue
                x1, y1, w, h = gt["fbox"]   # full body box
                xc = (x1 + w/2) / W
                yc = (y1 + h/2) / H
                wf = w / W
                hf = h / H
                lf.write(f"0 {xc} {yc} {wf} {hf}\n")


In [None]:
import json, glob
from PIL import Image

input_ann = "/content/CrowdHuman/annotation_val.odgt"
img_dir = "/content/CrowdHuman/images/Images"
yolo_labels = "/content/CrowdHuman/labels/test"
os.makedirs(yolo_labels, exist_ok=True)

with open(input_ann, "r") as f:
    for line in f:
        data = json.loads(line)
        img_name = data["ID"] + ".jpg"
        img_path = os.path.join(img_dir, img_name)

        if not os.path.exists(img_path):
            continue

        img = Image.open(img_path)
        W, H = img.size
        label_path = os.path.join(yolo_labels, data["ID"] + ".txt")

        with open(label_path, "w") as lf:
            for gt in data["gtboxes"]:
                if gt["tag"] != "person":
                    continue
                x1, y1, w, h = gt["fbox"]   # full body box
                xc = (x1 + w/2) / W
                yc = (y1 + h/2) / H
                wf = w / W
                hf = h / H
                lf.write(f"0 {xc} {yc} {wf} {hf}\n")

6 min

In [None]:
 !pip install fiftyone
import fiftyone.zoo as foz
import fiftyone as fo
dataset = foz.load_zoo_dataset(
    "open-images-v6",
    split="train",
    label_types=["detections"],
    classes=["Gun","Knife","Rifle"],
    max_samples=5000
)

In [None]:
import fiftyone as fo

dataset.export(
    export_dir="/content/openimages_weapon",
    dataset_type=fo.types.COCODetectionDataset,
    label_field="ground_truth"  # <-- change from "detections" to "ground_truth"
)

In [None]:
!pip install ultralytics
from ultralytics import YOLO

In [None]:
import os
import shutil
import random
import yaml
import glob

def prepare_dataset(root_images, root_labels, output_dir, split_ratio=0.8):
    """
    root_images: folder with all images (e.g., /content/CrowdHuman/images/Images)
    root_labels: folder with all YOLO labels (train folder)
    output_dir: folder to create train/val subfolders
    split_ratio: fraction for training (rest is val)
    """
    os.makedirs(output_dir, exist_ok=True)
    train_img_dir = os.path.join(output_dir, "images/train")
    val_img_dir   = os.path.join(output_dir, "images/val")
    train_lbl_dir = os.path.join(output_dir, "labels/train")
    val_lbl_dir   = os.path.join(output_dir, "labels/val")

    for d in [train_img_dir, val_img_dir, train_lbl_dir, val_lbl_dir]:
        os.makedirs(d, exist_ok=True)

    # Pick all images recursively
    all_images = glob.glob(os.path.join(root_images, "*.jpg")) + \
                 glob.glob(os.path.join(root_images, "*.png")) + \
                 glob.glob(os.path.join(root_images, "*.jpeg"))

    all_images = [os.path.basename(f) for f in all_images]  # get filenames only
    random.shuffle(all_images)
    split_index = int(len(all_images)*split_ratio)

    train_images = all_images[:split_index]
    val_images   = all_images[split_index:]

    # Use only 'train' labels folder
    labels_folder = os.path.join(root_labels, "train")

    for img_list, img_dest, lbl_dest in [(train_images, train_img_dir, train_lbl_dir),
                                         (val_images, val_img_dir, val_lbl_dir)]:
        for img_name in img_list:
            lbl_name = os.path.splitext(img_name)[0] + ".txt"
            img_src = os.path.join(root_images, img_name)
            lbl_src = os.path.join(labels_folder, lbl_name)

            # Copy image
            if os.path.exists(img_src):
                shutil.copy(img_src, os.path.join(img_dest, img_name))
            # Copy label
            if os.path.exists(lbl_src):
                shutil.copy(lbl_src, os.path.join(lbl_dest, lbl_name))

    print(f"Train images: {len(train_images)}, Val images: {len(val_images)}")
    return train_img_dir, val_img_dir

# --------- CrowdHuman Example ----------
crowdhuman_images = "/content/CrowdHuman/images/Images"
crowdhuman_labels = "/content/CrowdHuman/labels"
crowdhuman_output = "/content/datasets/CrowdHuman"

train_dir, val_dir = prepare_dataset(crowdhuman_images, crowdhuman_labels, crowdhuman_output)

# YAML for CrowdHuman
crowdhuman_yaml = {
    'train': os.path.join(crowdhuman_output, 'images/train'),
    'val': os.path.join(crowdhuman_output, 'images/val'),
    'nc': 1,
    'names': ['person']
}

with open("/content/crowdhuman.yaml", "w") as f:
    yaml.dump(crowdhuman_yaml, f)

print("✅ CrowdHuman train/val folders and YAML ready!")

In [None]:
import os, json, shutil, random, yaml
from PIL import Image

# Paths
images_dir = "/content/openimages_weapon/data"
labels_json = "/content/openimages_weapon/labels.json"
output_dir = "/content/datasets/OpenImages"
split_ratio = 0.8

# YOLO classes mapping
class_map = {"Gun":0, "Knife":1, "Rifle":2}

# Make output folders
for d in ["images/train", "images/val", "labels/train", "labels/val"]:
    os.makedirs(os.path.join(output_dir, d), exist_ok=True)

# Load JSON
with open(labels_json, "r") as f:
    data = json.load(f)

# Create mapping: image_id → filename, width, height
image_map = {img['id']: {'file_name': img['file_name'], 'width': img['width'], 'height': img['height']} for img in data['images']}

all_annotations = []

# Iterate over annotations
for ann in data['annotations']:
    img_id = ann['image_id']
    if img_id not in image_map:
        continue
    info = image_map[img_id]
    filename = info['file_name']
    img_path = os.path.join(images_dir, filename)
    if not os.path.exists(img_path):
        continue

    category_id = ann['category_id']
    cls_name = [c['name'] for c in data['categories'] if c['id']==category_id][0]
    if cls_name not in class_map:
        continue
    class_id = class_map[cls_name]

    # COCO bbox: [xmin, ymin, width, height]
    xmin, ymin, w, h = ann['bbox']
    x_center = (xmin + w/2)/info['width']
    y_center = (ymin + h/2)/info['height']
    w /= info['width']
    h /= info['height']

    yolo_lines = [f"{class_id} {x_center} {y_center} {w} {h}"]

    txt_name = os.path.splitext(filename)[0] + ".txt"
    all_annotations.append({"filename": filename, "yolo_lines": yolo_lines, "txt_name": txt_name})

# Shuffle and split
random.shuffle(all_annotations)
split_index = int(len(all_annotations)*split_ratio)
train_annotations = all_annotations[:split_index]
val_annotations = all_annotations[split_index:]

# Copy images and create YOLO txts
for dataset, subset in [(train_annotations, "train"), (val_annotations, "val")]:
    for ann in dataset:
        src_img = os.path.join(images_dir, ann['filename'])
        dst_img = os.path.join(output_dir, f"images/{subset}", ann['filename'])
        shutil.copy(src_img, dst_img)

        dst_txt = os.path.join(output_dir, f"labels/{subset}", ann['txt_name'])
        with open(dst_txt, "w") as f:
            f.write("\n".join(ann['yolo_lines']))

# Create YAML
openimages_yaml = {
    'train': os.path.join(output_dir, 'images/train'),
    'val': os.path.join(output_dir, 'images/val'),
    'nc': 3,
    'names': ['Gun','Knife','Rifle']
}

with open("/content/openimages_weapon.yaml", "w") as f:
    yaml.dump(openimages_yaml, f)

print(f"OpenImages train/val folders and YAML ready! Train: {len(train_annotations)}, Val: {len(val_annotations)}")

In [None]:
# Train YOLOv8 on CrowdHuman
!yolo task=detect mode=train model=yolov8n.pt \
data=/content/crowdhuman.yaml \
epochs=36 imgsz=640 batch=16 project=/content/yolov8_results name=crowdhuman

In [None]:
# Train YOLOv8 on weapons
!yolo task=detect mode=train model=yolov8n.pt \
data=/content/openimages_weapon.yaml \
epochs=50 imgsz=640 batch=16 project=/content/yolov8_results name=openimages_weapon

In [None]:
# Test person detector
!yolo task=detect mode=predict model=/content/yolov8_results/crowdhuman/weights/best.pt \
source=/content/datasets/CrowdHuman/images/val/273271,149030004599ea82.jpg save=True

In [None]:
# Test weapon detector
!yolo task=detect mode=predict model=/content/yolov8_results/openimages_weapon/weights/best.pt \
source=/content/datasets/OpenImages/images/val/01a6ef2ebff837d3.jpg save=True


In [None]:
from ultralytics import YOLO
import cv2

# --- Load both models ---
person_model = YOLO("/content/yolov8_results/crowdhuman/weights/best.pt")
weapon_model = YOLO("/content/yolov8_results/openimages_weapon/weights/best.pt")

# --- Input image ---
img_path = "/content/datasets/OpenImages/images/val/016ae9bb1b4cc4be.jpg"   # yahan apna frame path do
image = cv2.imread(img_path)

# --- Run Person Detection ---
person_results = person_model(img_path, verbose=False)[0]

# --- Run Weapon Detection ---
weapon_results = weapon_model(img_path, verbose=False)[0]

# --- Draw Person Boxes ---
for box in person_results.boxes:
    x1, y1, x2, y2 = map(int, box.xyxy[0])
    conf = float(box.conf[0])
    label = f"Person {conf:.2f}"
    cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)   # Green box for person
    cv2.putText(image, label, (x1, y1-10),
                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

# --- Draw Weapon Boxes ---
for box in weapon_results.boxes:
    x1, y1, x2, y2 = map(int, box.xyxy[0])
    conf = float(box.conf[0])
    cls_id = int(box.cls[0])
    label = f"Weapon {weapon_results.names[cls_id]} {conf:.2f}"
    cv2.rectangle(image, (x1, y1), (x2, y2), (0, 0, 255), 2)   # Red box for weapon
    cv2.putText(image, label, (x1, y1-10),
                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)

# --- Save Combined Output ---
out_path = "/content/combined_output3.jpg"
cv2.imwrite(out_path, image)

print(f"✅ Combined result saved at {out_path}")


In [None]:
from ultralytics import YOLO
import cv2

# --- Load both models ---
person_model = YOLO("/content/yolov8_results/crowdhuman/weights/best.pt")
weapon_model = YOLO("/content/yolov8_results/openimages_weapon/weights/best.pt")

# --- Input image ---
img_path = "/content/datasets/OpenImages/images/val/01a6ef2ebff837d3.jpg"   # yahan apna frame path do
image = cv2.imread(img_path)

# --- Run Person Detection ---
person_results = person_model(img_path, verbose=False)[0]

# --- Run Weapon Detection ---
weapon_results = weapon_model(img_path, verbose=False)[0]

# --- Draw Person Boxes ---
for box in person_results.boxes:
    x1, y1, x2, y2 = map(int, box.xyxy[0])
    conf = float(box.conf[0])
    label = f"Person {conf:.2f}"
    cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)   # Green box for person
    cv2.putText(image, label, (x1, y1-10),
                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

# --- Draw Weapon Boxes ---
for box in weapon_results.boxes:
    x1, y1, x2, y2 = map(int, box.xyxy[0])
    conf = float(box.conf[0])
    cls_id = int(box.cls[0])
    label = f"Weapon {weapon_results.names[cls_id]} {conf:.2f}"
    cv2.rectangle(image, (x1, y1), (x2, y2), (0, 0, 255), 2)   # Red box for weapon
    cv2.putText(image, label, (x1, y1-10),
                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)

# --- Save Combined Output ---
out_path = "/content/combined_output7.jpg"
cv2.imwrite(out_path, image)

print(f"✅ Combined result saved at {out_path}")


In [None]:
import os, shutil

# Apne dataset folders
crowdhuman_dir = "/content/CrowdHuman/images/Images"   # CrowdHuman images ka path
openimages_dir = "/content/openimages_weapon/data"     # OpenImages images ka path

# Common frames folder
frames_dir = "/content/frames"
os.makedirs(frames_dir, exist_ok=True)

# CrowdHuman images copy (¼ only)
ch_files = os.listdir(crowdhuman_dir)
ch_limit = len(ch_files) // 4   # 1/4th count
for i, file in enumerate(ch_files[:ch_limit]):
    src = os.path.join(crowdhuman_dir, file)
    dst = os.path.join(frames_dir, f"ch_{i}.jpg")
    shutil.copy(src, dst)

# OpenImages images copy (¼ only)
oi_files = os.listdir(openimages_dir)
oi_limit = len(oi_files) // 4   # 1/4th count
for i, file in enumerate(oi_files[:oi_limit]):
    src = os.path.join(openimages_dir, file)
    dst = os.path.join(frames_dir, f"oi_{i}.jpg")
    shutil.copy(src, dst)

print("CrowdHuman selected:", ch_limit)
print("OpenImages selected:", oi_limit)
print("Total images in frames folder:", len(os.listdir(frames_dir)))

In [None]:
# --- installs (Colab) ---
!pip -q install ultralytics scikit-image scikit-learn tqdm

import os, cv2, json, random, shutil, time, copy
import numpy as np
import pandas as pd
from PIL import Image
from tqdm.auto import tqdm

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from torchvision import transforms, models

from ultralytics import YOLO
from skimage.feature import local_binary_pattern
from sklearn.metrics import classification_report, roc_auc_score, confusion_matrix, accuracy_score
from sklearn.model_selection import train_test_split

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

# --- paths ---
PERSON_W = "/content/yolov8_results/crowdhuman/weights/best.pt"
WEAPON_W = "/content/yolov8_results/openimages_weapon/weights/best.pt"

FRAMES_DIR = "/content/frames"              # <-- yahan tumhare input frames/images
CROPS_DIR  = "/content/cropped_persons"     # person crops will be saved here
os.makedirs(CROPS_DIR, exist_ok=True)

MASTER_CSV = "/content/frames_labels.csv"   # image,label (label: 0=Normal,1=WeaponPresent)

# subset control for speed (None = use all)
MAX_FRAMES_TO_SCAN = 5000     # detect par time bachega
MAX_SAMPLES_PER_CLASS = 4000  # later, training subset balance (None to use all)

# training config (hybrid CNN+LBPH)
IMG_SIZE = 224
BATCH = 64
EPOCHS = 6
LR = 2e-4
WEIGHT_DECAY = 1e-4
VAL_SPLIT = 0.15
TEST_SPLIT = 0.15
SEED = 42
random.seed(SEED); np.random.seed(SEED); torch.manual_seed(SEED)

In [None]:
# --- paths ---
PERSON_W = "/content/yolov8_results/crowdhuman/weights/best.pt"
WEAPON_W = "/content/yolov8_results/openimages_weapon/weights/best.pt"

FRAMES_DIR = "/content/frames"              # <-- yahan tumhare input frames/images
CROPS_DIR  = "/content/cropped_persons"     # person crops will be saved here
os.makedirs(CROPS_DIR, exist_ok=True)

MASTER_CSV = "/content/frames_labels.csv"   # image,label (label: 0=Normal,1=WeaponPresent)

# subset control for speed (None = use all)
MAX_FRAMES_TO_SCAN = 5000     # detect par time bachega
MAX_SAMPLES_PER_CLASS = 4000  # later, training subset balance (None to use all)

# training config (hybrid CNN+LBPH)
IMG_SIZE = 224
BATCH = 64
EPOCHS = 6
LR = 2e-4
WEIGHT_DECAY = 1e-4
VAL_SPLIT = 0.15
TEST_SPLIT = 0.15
SEED = 42
random.seed(SEED); np.random.seed(SEED); torch.manual_seed(SEED)


In [None]:
def run_detection_and_build_csv(frames_dir, crops_dir, out_csv,
                                person_w, weapon_w,
                                max_frames=None):
    person_model = YOLO(person_w)
    weapon_model = YOLO(weapon_w)

    exts = {".jpg",".jpeg",".png",".bmp"}
    all_imgs = [f for f in os.listdir(frames_dir) if os.path.splitext(f)[1].lower() in exts]
    all_imgs.sort()
    if max_frames is not None:
        all_imgs = all_imgs[:max_frames]

    rows = []
    pbar = tqdm(all_imgs, desc="Scanning frames")
    for img_name in pbar:
        img_path = os.path.join(frames_dir, img_name)
        # weapon detect once per frame
        wres = weapon_model(img_path, verbose=False)[0]
        has_weapon = 1 if len(wres.boxes) > 0 else 0

        # person detect
        pres = person_model(img_path, verbose=False)[0]
        img = cv2.imread(img_path)
        if img is None:
            continue
        H, W = img.shape[:2]

        for i, box in enumerate(pres.boxes):
            x1, y1, x2, y2 = map(int, box.xyxy[0])
            x1, y1 = max(0,x1), max(0,y1)
            x2, y2 = min(W,x2), min(H,y2)
            if x2<=x1 or y2<=y1:
                continue

            crop = img[y1:y2, x1:x2]
            crop_name = f"{os.path.splitext(img_name)[0]}_{i}.jpg"
            crop_path = os.path.join(crops_dir, crop_name)
            cv2.imwrite(crop_path, crop)
            rows.append([crop_name, has_weapon])

    df = pd.DataFrame(rows, columns=["image","label"])
    df.to_csv(out_csv, index=False)
    print(f"✅ CSV saved: {out_csv} | samples: {len(df)}")

# Run once (skip if CSV already exists)
if not os.path.exists(MASTER_CSV):
    run_detection_and_build_csv(FRAMES_DIR, CROPS_DIR, MASTER_CSV,
                                PERSON_W, WEAPON_W,
                                max_frames=MAX_FRAMES_TO_SCAN)
else:
    print("CSV already exists:", MASTER_CSV)

In [None]:
def clean_and_optionally_balance(csv_path, crops_dir, max_per_class=None, seed=42):
    df = pd.read_csv(csv_path)
    # remove missing crops
    df["exists"] = df["image"].apply(lambda x: os.path.exists(os.path.join(crops_dir, x)))
    df = df[df["exists"]].drop(columns=["exists"]).reset_index(drop=True)

    if max_per_class is not None:
        dfs = []
        for lbl, grp in df.groupby("label"):
            grp = grp.sample(n=min(max_per_class, len(grp)), random_state=seed)
            dfs.append(grp)
        df = pd.concat(dfs).sample(frac=1, random_state=seed).reset_index(drop=True)

    print("Class counts:\n", df["label"].value_counts())
    return df

df_all = clean_and_optionally_balance(MASTER_CSV, CROPS_DIR, max_per_class=MAX_SAMPLES_PER_CLASS, seed=SEED)
print("Total samples:", len(df_all))


In [None]:
train_df, temp_df = train_test_split(df_all, test_size=VAL_SPLIT+TEST_SPLIT,
                                     stratify=df_all["label"], random_state=SEED)
rel_test = TEST_SPLIT / (VAL_SPLIT+TEST_SPLIT + 1e-9)
val_df, test_df = train_test_split(temp_df, test_size=rel_test,
                                   stratify=temp_df["label"], random_state=SEED)

print("Train:", train_df.shape, "| Val:", val_df.shape, "| Test:", test_df.shape)
print("Train dist:\n", train_df["label"].value_counts(), "\nVal dist:\n", val_df["label"].value_counts(), "\nTest dist:\n", test_df["label"].value_counts())

# Save splits
train_df.to_csv("/content/train_labels.csv", index=False)
val_df.to_csv("/content/val_labels.csv", index=False)
test_df.to_csv("/content/test_labels.csv", index=False)


In [None]:
# LBPH extractor
def lbph_feature(img_path, radius=1, n_points=8):
    g = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
    if g is None:
        # fallback: tiny zero vec (should be rare if paths are cleaned)
        return np.zeros(n_points+2, dtype=np.float32)
    lbp = local_binary_pattern(g, n_points, radius, method="uniform")
    hist, _ = np.histogram(lbp.ravel(), bins=np.arange(0, n_points+3), range=(0, n_points+2))
    hist = hist.astype("float32")
    hist /= (hist.sum() + 1e-6)
    return hist

class PersonHybridDataset(Dataset):
    def __init__(self, df, crops_dir, transform=None):
        self.df = df.reset_index(drop=True)
        self.crops_dir = crops_dir
        self.transform = transform
        self._lbph_cache = {}  # simple memory cache

    def __len__(self): return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img_path = os.path.join(self.crops_dir, row["image"])

        img = Image.open(img_path).convert("RGB")
        if self.transform: img = self.transform(img)

        if img_path not in self._lbph_cache:
            self._lbph_cache[img_path] = lbph_feature(img_path)
        lbph = torch.tensor(self._lbph_cache[img_path], dtype=torch.float32)

        label = torch.tensor(int(row["label"]), dtype=torch.float32)
        return img, lbph, label

train_tf = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ColorJitter(0.2,0.2,0.2),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225]),
])
eval_tf = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225]),
])

train_ds = PersonHybridDataset(train_df, CROPS_DIR, transform=train_tf)
val_ds   = PersonHybridDataset(val_df,   CROPS_DIR, transform=eval_tf)
test_ds  = PersonHybridDataset(test_df,  CROPS_DIR, transform=eval_tf)

# handle imbalance (weights per class)
counts = train_df["label"].value_counts().to_dict()
w = {0: 1.0/max(counts.get(0,1),1), 1: 1.0/max(counts.get(1,1),1)}
sample_w = train_df["label"].map(w).values
sampler = WeightedRandomSampler(sample_w, num_samples=len(sample_w), replacement=True)

train_loader = DataLoader(train_ds, batch_size=BATCH, sampler=sampler, num_workers=0, pin_memory=True)
val_loader   = DataLoader(val_ds,   batch_size=BATCH, shuffle=False, num_workers=0, pin_memory=True)
test_loader  = DataLoader(test_ds,  batch_size=BATCH, shuffle=False, num_workers=0, pin_memory=True)
print("✅ DataLoaders ready")

In [None]:
class CNN_LBPH(nn.Module):
    def __init__(self, lbph_dim=10, num_classes=1, freeze_backbone=False):
        super().__init__()
        self.cnn = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)
        if freeze_backbone:
            for p in self.cnn.parameters(): p.requires_grad = False
        # replace final with identity to get 512-d
        self.cnn.fc = nn.Identity()
        self.lbph_fc = nn.Sequential(
            nn.Linear(lbph_dim, 64),
            nn.ReLU(),
            nn.Dropout(0.1)
        )
        self.classifier = nn.Sequential(
            nn.Linear(512 + 64, 128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, num_classes)  # logits
        )

    def forward(self, x_img, x_lbph):
        f_img = self.cnn(x_img)               # [B,512]
        f_lb  = self.lbph_fc(x_lbph)          # [B,64]
        f = torch.cat([f_img, f_lb], dim=1)   # [B,576]
        logit = self.classifier(f).squeeze(1) # [B]
        return logit

model = CNN_LBPH(lbph_dim=10, num_classes=1, freeze_backbone=False).to(device)
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.AdamW(model.parameters(), lr=LR, weight_decay=WEIGHT_DECAY)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=EPOCHS)
scaler = torch.amp.GradScaler('cuda') if torch.cuda.is_available() else None

In [None]:
def run_epoch(loader, train=True):
    model.train(train)
    total, correct, loss_sum = 0, 0, 0.0
    y_true, y_prob = [], []

    for imgs, lbph, labels in tqdm(loader, leave=False):
        imgs, lbph, labels = imgs.to(device), lbph.to(device), labels.to(device)

        with torch.amp.autocast('cuda', enabled=torch.cuda.is_available()):
            logits = model(imgs, lbph)
            loss = criterion(logits, labels)

        if train:
            optimizer.zero_grad(set_to_none=True)
            if scaler:
                scaler.scale(loss).backward()
                scaler.step(optimizer)
                scaler.update()
            else:
                loss.backward()
                optimizer.step()

        loss_sum += loss.item() * imgs.size(0)
        probs = torch.sigmoid(logits)
        preds = (probs > 0.5).float()
        correct += (preds == labels).sum().item()
        total   += imgs.size(0)

        y_true += labels.detach().cpu().numpy().tolist()
        y_prob += probs.detach().cpu().numpy().tolist()

    acc = correct / max(total,1)
    try:
        auc = roc_auc_score(y_true, y_prob)
    except:
        auc = 0.0
    return loss_sum/max(total,1), acc, auc

best_state, best_metric = None, -1
for ep in range(1, EPOCHS+1):
    tr_loss, tr_acc, tr_auc = run_epoch(train_loader, train=True)
    val_loss, val_acc, val_auc = run_epoch(val_loader, train=False)
    scheduler.step()

    print(f"[{ep}/{EPOCHS}] "
          f"train loss {tr_loss:.4f} acc {tr_acc:.3f} auc {tr_auc:.3f} | "
          f"val loss {val_loss:.4f} acc {val_acc:.3f} auc {val_auc:.3f}")

    score = val_auc if val_auc>0 else val_acc
    if score > best_metric:
        best_metric = score
        best_state = copy.deepcopy(model.state_dict())

# save best
os.makedirs("/content/checkpoints", exist_ok=True)
if best_state: model.load_state_dict(best_state)
torch.save(model.state_dict(), "/content/checkpoints/cnn_lbph_best.pt")
print("✅ Saved:", "/content/checkpoints/cnn_lbph_best.pt")

In [None]:
model.eval()
y_true, y_pred, y_prob = [], [], []
with torch.no_grad():
    for imgs, lbph, labels in tqdm(test_loader):
        imgs, lbph = imgs.to(device), lbph.to(device)
        logits = model(imgs, lbph)
        probs  = torch.sigmoid(logits).cpu().numpy()
        preds  = (probs > 0.5).astype(int)
        y_true.extend(labels.numpy().tolist())
        y_pred.extend(preds.tolist())
        y_prob.extend(probs.tolist())

print(classification_report(y_true, y_pred, target_names=["Normal","Weapon/Suspicious"]))
print("Test AUC:", roc_auc_score(y_true, y_prob))
print("Confusion Matrix:\n", confusion_matrix(y_true, y_pred))

In [None]:
def annotate_and_alert(image_path, person_w, weapon_w, clf_model, crops_tmp="/content/tmp_crops"):
    os.makedirs(crops_tmp, exist_ok=True)
    img = cv2.imread(image_path)
    if img is None:
        print("Could not read:", image_path)
        return None, None

    H,W = img.shape[:2]
    person_model = YOLO(person_w)
    weapon_model = YOLO(weapon_w)

    pres = person_model(image_path, verbose=False)[0]
    wres = weapon_model(image_path, verbose=False)[0]

    has_weapon = len(wres.boxes) > 0

    # draw weapon boxes (red)
    for b in wres.boxes:
        x1,y1,x2,y2 = map(int, b.xyxy[0])
        cv2.rectangle(img, (x1,y1), (x2,y2), (0,0,255), 2)
        cv2.putText(img, f"Weapon {float(b.conf[0]):.2f}", (x1, max(10,y1-5)),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,0,255), 2)

    suspicious_any = False
    # per-person classify (green = normal, yellow = suspicious)
    for i, b in enumerate(pres.boxes):
        x1,y1,x2,y2 = map(int, b.xyxy[0])
        x1,y1 = max(0,x1), max(0,y1)
        x2,y2 = min(W,x2), min(H,y2)
        if x2<=x1 or y2<=y1: continue

        crop = img[y1:y2, x1:x2]
        tmp_path = os.path.join(crops_tmp, f"tmp_{i}.jpg")
        cv2.imwrite(tmp_path, crop)

        # make tensor + lbph
        pil = Image.open(tmp_path).convert("RGB").resize((IMG_SIZE, IMG_SIZE))
        tensor = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225]),
        ])(pil).unsqueeze(0).to(device)

        lbph = torch.tensor(lbph_feature(tmp_path), dtype=torch.float32).unsqueeze(0).to(device)

        with torch.no_grad(), torch.amp.autocast('cuda', enabled=torch.cuda.is_available()):
            logit = clf_model(tensor, lbph)
            prob = torch.sigmoid(logit).item()
        is_susp = prob > 0.5
        suspicious_any = suspicious_any or is_susp

        color = (0,255,0) if not is_susp else (0,255,255)
        tag   = "Normal" if not is_susp else f"Suspicious {prob:.2f}"
        cv2.rectangle(img, (x1,y1), (x2,y2), color, 2)
        cv2.putText(img, tag, (x1, max(10,y1-5)),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

    intruder_alert = has_weapon or suspicious_any
    status = "INTRUDER ALERT" if intruder_alert else "CLEAR"
    cv2.putText(img, status, (10,30), cv2.FONT_HERSHEY_SIMPLEX, 1.0,
                (0,0,255) if intruder_alert else (0,255,0), 3)

    return img, intruder_alert

# --- load trained hybrid model and test on a sample image ---
clf = CNN_LBPH(lbph_dim=10, num_classes=1, freeze_backbone=False).to(device)
clf.load_state_dict(torch.load("/content/checkpoints/cnn_lbph_best.pt", map_location=device))
clf.eval()

test_image = os.path.join(FRAMES_DIR, sorted(os.listdir(FRAMES_DIR))[0])  # pick any frame
out_img, alert = annotate_and_alert(test_image, PERSON_W, WEAPON_W, clf)
out_path = "/content/annotated_result1.jpg"
if out_img is not None:
    cv2.imwrite(out_path, out_img)
    print(f"✅ Saved annotated image: {out_path} | Alert = {alert}")

In [None]:
import os, cv2
from ultralytics import YOLO
import pandas as pd
from tqdm import tqdm
import random

# Paths
person_model_path = "/content/yolov8_results/crowdhuman/weights/best.pt"
weapon_model_path = "/content/yolov8_results/openimages_weapon/weights/best.pt"
frames_dir = "/content/frames"
output_dir = "/content/cropped_persons1"
os.makedirs(output_dir, exist_ok=True)

# Load models
person_model = YOLO(person_model_path)
weapon_model = YOLO(weapon_model_path)

# Get images (subset of 5000 first)
all_images = os.listdir(frames_dir)
random.shuffle(all_images)
subset_images = all_images[:5000]   # ⚡ change 5000 → 21731 for full dataset

labels = []

# Iterate with progress bar
for img_name in tqdm(subset_images):
    img_path = os.path.join(frames_dir, img_name)
    img = cv2.imread(img_path)

    # Detect persons
    person_results = person_model(img_path, verbose=False)[0]

    # Detect weapons
    weapon_results = weapon_model(img_path, verbose=False)[0]

    # Label (1 if weapon detected, else 0)
    label = 1 if len(weapon_results.boxes) > 0 else 0

    # Crop persons
    for i, box in enumerate(person_results.boxes):
        x1, y1, x2, y2 = map(int, box.xyxy[0])
        cropped = img[y1:y2, x1:x2]
        crop_name = f"{os.path.splitext(img_name)[0]}_{i}.jpg"
        crop_path = os.path.join(output_dir, crop_name)
        cv2.imwrite(crop_path, cropped)

        # Store row for CSV
        labels.append([crop_name, label])

# Save CSV once
df = pd.DataFrame(labels, columns=["image", "label"])
df.to_csv("frames_labels.csv", index=False)
print("✅ Done. Total samples:", len(df))

In [None]:
import pandas as pd
import shutil, os
from sklearn.model_selection import train_test_split

# Load CSV, skipping the header row
df = pd.read_csv("frames_labels.csv", skiprows=1, names=["image", "label"])

# Split
train_df, temp_df = train_test_split(df, test_size=0.3, stratify=df["label"], random_state=42)
val_df, test_df = train_test_split(temp_df, test_size=0.5, stratify=temp_df["label"], random_state=42)

# Save splits
train_df.to_csv("train_labels.csv", index=False)
val_df.to_csv("val_labels.csv", index=False)
test_df.to_csv("test_labels.csv", index=False)

print("Train:", len(train_df), "Val:", len(val_df), "Test:", len(test_df))

In [None]:
print("Train distribution:\n", train_df["label"].value_counts())
print("Val distribution:\n", val_df["label"].value_counts())
print("Test distribution:\n", test_df["label"].value_counts())


In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split

# Load full CSV
df = pd.read_csv("frames_labels.csv", skiprows=1, names=["image","label"])

# Separate classes
df_normal = df[df["label"]==0]
df_weapon = df[df["label"]==1]

# Decide subset size
n_weapon = len(df_weapon)          # total weapon images
n_normal = n_weapon                # same number of normal images for balance

df_normal_sub = df_normal.sample(n=n_normal, random_state=42)
df_weapon_sub = df_weapon.sample(n=n_weapon, random_state=42)

# Combine
df_sub = pd.concat([df_normal_sub, df_weapon_sub]).sample(frac=1, random_state=42).reset_index(drop=True)

# Split train/val/test
train_df, temp_df = train_test_split(df_sub, test_size=0.3, stratify=df_sub["label"], random_state=42)
val_df, test_df = train_test_split(temp_df, test_size=0.5, stratify=temp_df["label"], random_state=42)

# Save CSVs
train_df.to_csv("train_labels_subset.csv", index=False)
val_df.to_csv("val_labels_subset.csv", index=False)
test_df.to_csv("test_labels_subset.csv", index=False)

print("Subset Train:", len(train_df))
print("Subset Val:", len(val_df))
print("Subset Test:", len(test_df))

print("\nTrain distribution:\n", train_df["label"].value_counts())
print("Val distribution:\n", val_df["label"].value_counts())
print("Test distribution:\n", test_df["label"].value_counts())


In [None]:
import torch
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from torchvision import transforms
from PIL import Image

# ---------- Transforms ----------
train_tf = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.RandomHorizontalFlip(0.5),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225]),
])

eval_tf = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225]),
])

# ---------- Dataset ----------
class FrameCSV(Dataset):
    def __init__(self, csv_path, transform=None):
        import pandas as pd
        self.df = pd.read_csv(csv_path)
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img = Image.open(row["image"]).convert("RGB")
        if self.transform:
            img = self.transform(img)
        label = int(row["label"])
        return img, label

# ---------- Load subset datasets ----------
train_ds = FrameCSV("train_labels_subset.csv", transform=train_tf)
val_ds   = FrameCSV("val_labels_subset.csv", transform=eval_tf)
test_ds  = FrameCSV("test_labels_subset.csv", transform=eval_tf)

# ---------- Weighted sampler for train ----------
labels = train_ds.df["label"].values
class_sample_count = torch.bincount(torch.tensor(labels))
weights = 1.0 / class_sample_count
sample_weights = weights[labels]
sampler = WeightedRandomSampler(sample_weights, num_samples=len(sample_weights), replacement=True)

# ---------- DataLoaders ----------
BATCH = 32
train_loader = DataLoader(train_ds, batch_size=BATCH, sampler=sampler, num_workers=2, pin_memory=True)
val_loader   = DataLoader(val_ds, batch_size=BATCH, shuffle=False, num_workers=2, pin_memory=True)
test_loader  = DataLoader(test_ds, batch_size=BATCH, shuffle=False, num_workers=2, pin_memory=True)

print("✅ DataLoaders ready with subset!")


In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from PIL import Image
import pandas as pd
from tqdm.auto import tqdm
import copy
from sklearn.metrics import roc_auc_score, classification_report, confusion_matrix

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

# ---------- Dataset ----------
class FrameDataset(Dataset):
    def __init__(self, csv_file, transform=None):
        self.df = pd.read_csv(csv_file)
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img = Image.open(row['image']).convert('RGB')
        if self.transform:
            img = self.transform(img)
        label = int(row['label'])
        return img, label

# ---------- Transforms ----------
train_tf = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])
])

eval_tf = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])
])

# ---------- Datasets ----------
train_ds = FrameDataset("train_labels_subset.csv", transform=train_tf)
val_ds   = FrameDataset("val_labels_subset.csv",   transform=eval_tf)
test_ds  = FrameDataset("test_labels_subset.csv",  transform=eval_tf)

train_loader = DataLoader(train_ds, batch_size=32, shuffle=True, num_workers=2, pin_memory=True)
val_loader   = DataLoader(val_ds, batch_size=32, shuffle=False, num_workers=2, pin_memory=True)
test_loader  = DataLoader(test_ds, batch_size=32, shuffle=False, num_workers=2, pin_memory=True)

# ---------- Model ----------
model = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)
model.fc = nn.Linear(model.fc.in_features, 2)
model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-4, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=10)


In [None]:
from torch import amp

def run_epoch(loader, train=True):
    model.train(train)
    loss_sum, correct, n = 0.0, 0, 0
    all_probs, all_labels = [], []

    scaler = amp.GradScaler(enabled=torch.cuda.is_available())

    for imgs, labels in tqdm(loader, leave=False):
        imgs, labels = imgs.to(device), labels.to(device)

        with amp.autocast(device_type='cuda', enabled=torch.cuda.is_available()):
            logits = model(imgs)
            loss = criterion(logits, labels)

        if train:
            optimizer.zero_grad(set_to_none=True)
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

        loss_sum += loss.item() * imgs.size(0)
        preds = logits.argmax(1)
        correct += (preds == labels).sum().item()
        n += imgs.size(0)

        all_probs += torch.softmax(logits,1)[:,1].detach().cpu().tolist()
        all_labels += labels.detach().cpu().tolist()

    auc = 0.0
    try:
        auc = roc_auc_score(all_labels, all_probs)
    except: pass
    return loss_sum/n, correct/n, auc


In [None]:
import os, copy, pandas as pd, numpy as np
from PIL import Image
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score
import torch, torch.nn as nn
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from torchvision import transforms, models
from tqdm.auto import tqdm

# ---------------- Device ----------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

# ---------------- Dataset ----------------
class FrameDataset(Dataset):
    def __init__(self, csv_file, root_dir, transform=None):
        self.df = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img_path = os.path.join(self.root_dir, row['image'])  # full path fix
        img = Image.open(img_path).convert('RGB')
        if self.transform:
            img = self.transform(img)
        label = int(row['label'])
        return img, label

# ---------------- Transforms ----------------
train_tf = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ColorJitter(0.2,0.2,0.2),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])
])
eval_tf = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])
])

# ---------------- Paths ----------------
root_dir = "/content/cropped_persons1"  # cropped persons folder
train_csv = "train_labels_subset.csv"
val_csv   = "val_labels_subset.csv"
test_csv  = "test_labels_subset.csv"

train_ds = FrameDataset(train_csv, root_dir, transform=train_tf)
val_ds   = FrameDataset(val_csv, root_dir, transform=eval_tf)
test_ds  = FrameDataset(test_csv, root_dir, transform=eval_tf)

# ---------------- Weighted Sampler for imbalance ----------------
labels = pd.read_csv(train_csv)["label"].values
class_sample_count = np.bincount(labels)
weights = 1.0 / class_sample_count
sample_weights = weights[labels]
sampler = WeightedRandomSampler(sample_weights, num_samples=len(sample_weights), replacement=True)

# ---------------- DataLoaders ----------------
BATCH = 32
train_loader = DataLoader(train_ds, batch_size=BATCH, sampler=sampler, num_workers=2, pin_memory=True)
val_loader   = DataLoader(val_ds,   batch_size=BATCH, shuffle=False, num_workers=2, pin_memory=True)
test_loader  = DataLoader(test_ds,  batch_size=BATCH, shuffle=False, num_workers=2, pin_memory=True)

# ---------------- Model ----------------
model = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)
model.fc = nn.Linear(model.fc.in_features, 2)  # 2-class
model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-4, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=10)

# ---------------- Train/Eval loop ----------------
def run_epoch(loader, train=True):
    model.train(train)
    loss_sum, correct, n = 0.0, 0, 0
    all_probs, all_labels = [], []

    scaler = torch.cuda.amp.GradScaler(enabled=torch.cuda.is_available())
    for imgs, labels in tqdm(loader, leave=False):
        imgs, labels = imgs.to(device), labels.to(device)
        with torch.cuda.amp.autocast(enabled=torch.cuda.is_available()):
            logits = model(imgs)
            loss = criterion(logits, labels)

        if train:
            optimizer.zero_grad(set_to_none=True)
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

        loss_sum += loss.item() * imgs.size(0)
        preds = logits.argmax(1)
        correct += (preds == labels).sum().item()
        n += imgs.size(0)
        all_probs += torch.softmax(logits,1)[:,1].detach().cpu().tolist()
        all_labels += labels.detach().cpu().tolist()

    auc = 0.0
    try: auc = roc_auc_score(all_labels, all_probs)
    except: pass
    return loss_sum/n, correct/n, auc

# ---------------- Training ----------------
best_wts, best_val = None, 0.0
EPOCHS = 8
for ep in range(1, EPOCHS+1):
    tr_loss, tr_acc, tr_auc = run_epoch(train_loader, train=True)
    val_loss, val_acc, val_auc = run_epoch(val_loader, train=False)
    scheduler.step()
    print(f"[{ep}/{EPOCHS}] train loss {tr_loss:.4f} acc {tr_acc:.3f} auc {tr_auc:.3f} | "
          f"val loss {val_loss:.4f} acc {val_acc:.3f} auc {val_auc:.3f}")

    score = val_auc if val_auc>0 else val_acc
    if score > best_val:
        best_val = score
        best_wts = copy.deepcopy(model.state_dict())

# Load best weights
if best_wts: model.load_state_dict(best_wts)
os.makedirs("/content/checkpoints", exist_ok=True)
torch.save(model.state_dict(), "/content/checkpoints/resnet18_subset_best.pt")
print("✅ Saved -> /content/checkpoints/resnet18_subset_best.pt")

# ---------------- Test Metrics ----------------
model.eval()
y_true, y_pred, y_prob = [], [], []
with torch.no_grad():
    for imgs, labels in tqdm(test_loader):
        imgs, labels = imgs.to(device), labels.to(device)
        logits = model(imgs)
        y_true += labels.cpu().tolist()
        y_pred += logits.argmax(1).cpu().tolist()
        y_prob += torch.softmax(logits,1)[:,1].cpu().tolist()

print(classification_report(y_true, y_pred, target_names=["Normal","Weapon"]))
print("Test AUC:", roc_auc_score(y_true, y_prob))
print("Confusion Matrix:\n", confusion_matrix(y_true, y_pred))
