In [1]:
pip install ultralytics


Note: you may need to restart the kernel to use updated packages.


In [2]:
import torch
import numpy as np
import cv2
!pip install deep-sort-realtime
from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort
from torchvision import transforms
from sklearn.preprocessing import StandardScaler
from keras.models import load_model
import os



In [3]:
obj_model = YOLO('yolov8x.pt')

In [4]:
pose_model = YOLO('yolov8x-pose.pt')

In [5]:
behavior_model_path = "behavior_lstm_model.h5"
if os.path.exists(behavior_model_path):
    behavior_model = load_model(behavior_model_path)
else:
    print("[WARNING] Behavior model not found. Skipping behavior classification.")
    behavior_model = None
scaler = StandardScaler()



In [6]:
tracker = DeepSort(max_age=30, n_init=3, nms_max_overlap=1.0)

In [7]:
def fuse_rgb_thermal(rgb_frame, thermal_frame):
    thermal_gray = cv2.cvtColor(thermal_frame, cv2.COLOR_BGR2GRAY)
    thermal_colored = cv2.applyColorMap(thermal_gray, cv2.COLORMAP_JET)
    fused = cv2.addWeighted(rgb_frame, 0.6, thermal_colored, 0.4, 0)
    return fused

In [8]:
def detect_objects(frame):
    results = obj_model(frame)[0]
    boxes = results.boxes.xyxy.cpu().numpy()
    labels = results.boxes.cls.cpu().numpy()
    confs = results.boxes.conf.cpu().numpy()
    return boxes, labels, confs

In [9]:
def track_objects(frame, boxes):
    detections = [(box[:4], 1.0, 'person') for box in boxes]
    tracks = tracker.update_tracks(detections, frame=frame)
    return tracks


In [10]:
def detect_pose_keypoints(frame):
    results = pose_model(frame)[0]
    keypoints = results.keypoints.cpu().numpy()
    return keypoints

In [11]:
def classify_behavior(keypoints_seq):
    if behavior_model is None:
        return -1  # No model available
    keypoints_seq = scaler.transform(keypoints_seq.reshape(-1, 34))
    keypoints_seq = keypoints_seq.reshape(1, 50, 34)
    preds = behavior_model.predict(keypoints_seq)
    return np.argmax(preds)

In [12]:
def detect_fence_tampering(prev_frame, current_frame):
    diff = cv2.absdiff(prev_frame, current_frame)
    gray = cv2.cvtColor(diff, cv2.COLOR_BGR2GRAY)
    blur = cv2.GaussianBlur(gray, (5, 5), 0)
    _, thresh = cv2.threshold(blur, 20, 255, cv2.THRESH_BINARY)
    contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    return len(contours) > 5

In [13]:
def run_inference(rgb_path, thermal_path):
    cap_rgb = cv2.VideoCapture(rgb_path)
    cap_th = cv2.VideoCapture(thermal_path)
    prev_rgb = None

    pose_history = []

    while cap_rgb.isOpened():
        ret_rgb, frame_rgb = cap_rgb.read()
        ret_th, frame_th = cap_th.read()

        if not ret_rgb or not ret_th:
            break

        fused = fuse_rgb_thermal(frame_rgb, frame_th)
        boxes, labels, confs = detect_objects(fused)
        tracks = track_objects(fused, boxes)

        # Fence tampering
        tampering = False
        if prev_rgb is not None:
            tampering = detect_fence_tampering(prev_rgb, frame_rgb)
        prev_rgb = frame_rgb.copy()

        # Behavior detection
        keypoints = detect_pose_keypoints(frame_rgb)
        if keypoints is not None and len(keypoints) > 0:
            pose_history.append(keypoints.xy[0].flatten())
            if len(pose_history) >= 50:
                label = classify_behavior(np.array(pose_history[-50:]))
                if label != -1:
                    print(f"Suspicious Behavior: {label}")

        if tampering:
            print("Fence tampering detected!")

        # Visualize results (optional for demo)
        for track in tracks:
            if not track.is_confirmed():
                continue
            track_id = track.track_id
            ltrb = track.to_ltrb()
            cv2.rectangle(fused, (int(ltrb[0]), int(ltrb[1])), (int(ltrb[2]), int(ltrb[3])), (0, 255, 0), 2)
            cv2.putText(fused, f"ID: {track_id}", (int(ltrb[0]), int(ltrb[1]-10)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 2)

        cv2.imshow("Fused Surveillance", fused)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap_rgb.release()
    cap_th.release()
    cv2.destroyAllWindows()


In [14]:
pip install ultralytics opencv-python numpy torch torchvision keras joblib deep_sort_realtime


Note: you may need to restart the kernel to use updated packages.


import os
import cv2

dataset_path = "path_to_FLIR_dataset"

# Check some sample images
rgb_sample = os.path.join(dataset_path, "RGB", "sample_rgb.jpg")
thermal_sample = os.path.join(dataset_path, "Thermal", "sample_thermal.jpg")

rgb_image = cv2.imread(rgb_sample)
thermal_image = cv2.imread(thermal_sample)

cv2.imshow("RGB Image", rgb_image)
cv2.imshow("Thermal Image", thermal_image)
cv2.waitKey(0)
cv2.destroyAllWindows()


import os

for root, dirs, files in os.walk("/kaggle/working"):
    for name in files:
        print(os.path.join(root, name))


def test_single_pair(rgb_image_path, thermal_image_path):
    frame_rgb = cv2.imread(rgb_image_path)
    frame_th = cv2.imread(thermal_image_path)

    if frame_rgb is None or frame_th is None:
        print("⚠️ Error: Could not read images.")
        return

    fused = fuse_rgb_thermal(frame_rgb, frame_th)
    boxes, labels, confs = detect_objects(fused)
    tracks = track_objects(fused, boxes)
    keypoints = detect_pose_keypoints(frame_rgb)

    # Visualize results
    for track in tracks:
        if not track.is_confirmed():
            continue
        track_id = track.track_id
        ltrb = track.to_ltrb()
        cv2.rectangle(fused, (int(ltrb[0]), int(ltrb[1])), (int(ltrb[2]), int(ltrb[3])), (0, 255, 0), 2)
        cv2.putText(fused, f"ID: {track_id}", (int(ltrb[0]), int(ltrb[1] - 10)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)

    cv2.imshow("Fused Single Image", fused)
    cv2.waitKey(0)
    cv2.destroyAllWindows()


import os
import cv2

dataset_dir = "/kaggle/input/flir-thermal-rgb-dataset/Thermal-RGB-Images-Dataset/valid/images"

# List all images
image_files = sorted(os.listdir(dataset_dir))

# Ensure images are paired correctly
rgb_images = [f for f in image_files if "RGB" in f]
thermal_images = [f for f in image_files if "Thermal" in f]

# Check if pairing is correct
print(f"Total RGB images: {len(rgb_images)}, Total Thermal images: {len(thermal_images)}")
print(f"Sample RGB: {rgb_images[:5]}")
print(f"Sample Thermal: {thermal_images[:5]}")


In [15]:
import cv2

# Input and output paths
input_path = '/kaggle/input/sherbrooke-video/sherbrooke_video.avi'
output_path = 'sherbrooke_thermal.avi'

# Open the input video
cap = cv2.VideoCapture(input_path)

# Get video properties
fps = cap.get(cv2.CAP_PROP_FPS)
width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

# Define video writer with same resolution and fps
fourcc = cv2.VideoWriter_fourcc(*'XVID')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

# Process each frame
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Convert to grayscale
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

    # Apply thermal colormap
    thermal = cv2.applyColorMap(gray, cv2.COLORMAP_JET)

    # Write thermal frame
    out.write(thermal)

# Release everything
cap.release()
out.release()

print("✅ Thermal video generated:", output_path)


✅ Thermal video generated: sherbrooke_thermal.avi


In [None]:
run_inference("/kaggle/input/sherbrooke-video/sherbrooke_video.avi", "sherbrooke_thermal.avi")



0: 480x640 4 persons, 7 cars, 2 traffic lights, 2255.1ms
Speed: 3.9ms preprocess, 2255.1ms inference, 2.1ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 (no detections), 2186.9ms
Speed: 3.9ms preprocess, 2186.9ms inference, 0.8ms postprocess per image at shape (1, 3, 480, 640)
