In [3]:
import os
import cv2
import numpy as np
import torch
import yaml
from collections import Counter
from IPython.display import Video
from tensorflow.keras.models import load_model
import tensorflow as tf

output_dir = 'yolov9'

# Paths to your trained custom YOLO model files
yolo_weights = 'yolov9/runs/train/yolov9-e-finetuning/weights/best.pt'  # Update this path if needed
data_yaml = 'yolov9/Construction-Site-Safety-30/data.yaml'  # Update this path if needed

# Load the trained LRCN model
LRCN_model = load_model('best_model.keras')

# Define constants
IMG_SIZE = 224
MAX_SEQ_LENGTH = 20
NUM_FEATURES = 2048
CLASSES_LIST = ['BaseLayer_Preparation', 'Concrete_Placement', 'Mortar_Preparation', 'Plaster','Wall Construction']
OUTPUT_SIZE = (640, 640)  # Higher resolution for output video
SMOOTHING_WINDOW = 5  # Size of the smoothing window

# Check if YOLO files exist
if not os.path.exists(yolo_weights):
    raise FileNotFoundError(f"YOLO weights file not found: {yolo_weights}")
if not os.path.exists(data_yaml):
    raise FileNotFoundError(f"Data config file not found: {data_yaml}")

# Load class names from data.yaml
with open(data_yaml, 'r') as file:
    data_config = yaml.safe_load(file)
classes = data_config['names']

# Load YOLOv9 model using the local repository
model = torch.hub.load(output_dir, 'custom', path=yolo_weights, source='local', force_reload=True)

# Function to crop the center square of the frame
def crop_center_square(frame):
    y, x = frame.shape[0:2]
    min_dim = min(y, x)
    start_x = (x // 2) - (min_dim // 2)
    start_y = (y // 2) - (min_dim // 2)
    return frame[start_y : start_y + min_dim, start_x : start_x + min_dim]

# Function to load video frames
def load_video(path, resize=(IMG_SIZE, IMG_SIZE)):
    if not os.path.exists(path):
        print(f"Video file {path} does not exist.")
        return np.array([])
    
    cap = cv2.VideoCapture(path)
    if not cap.isOpened():
        print(f"Error: Cannot open video file {path}")
        return np.array([])

    frames = []
    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frame = crop_center_square(frame)
            frame = cv2.resize(frame, resize)
            frame = frame[:, :, [2, 1, 0]]  # Convert BGR to RGB
            frames.append(frame)
    finally:
        cap.release()
    
    if len(frames) == 0:
        print(f"No frames found for video {path}")
    
    return np.array(frames)

# Build feature extractor using InceptionV3
def build_feature_extractor():
    feature_extractor = tf.keras.applications.InceptionV3(
        weights="imagenet",
        include_top=False,
        pooling="avg",
        input_shape=(IMG_SIZE, IMG_SIZE, 3),
    )
    preprocess_input = tf.keras.applications.inception_v3.preprocess_input

    inputs = tf.keras.Input((IMG_SIZE, IMG_SIZE, 3))
    preprocessed = preprocess_input(inputs)
    outputs = feature_extractor(preprocessed)
    return tf.keras.Model(inputs, outputs, name="feature_extractor")

# Load the feature extractor
feature_extractor = build_feature_extractor()

# Function to extract features from frames
def extract_features(frames, feature_extractor):
    num_frames = frames.shape[0]
    temp_frame_features = np.zeros((num_frames, NUM_FEATURES), dtype="float32")
    for i in range(num_frames):
        frame = frames[i]
        features = feature_extractor.predict(frame[None, ...])
        temp_frame_features[i] = features
    return temp_frame_features

# Function to predict activities for video segments
def predict_activities(frames, feature_extractor, model, segment_length=MAX_SEQ_LENGTH):
    num_frames = len(frames)
    predictions = []
    confidences = []
    
    for start in range(0, num_frames, segment_length):
        end = start + segment_length
        segment_frames = frames[start:end]
        if len(segment_frames) < segment_length:
            segment_frames = np.pad(segment_frames, ((0, segment_length - len(segment_frames)), (0, 0), (0, 0), (0, 0)), mode='constant')
        
        features = extract_features(segment_frames, feature_extractor)
        features = np.expand_dims(features, axis=0)  # Add batch dimension
        prediction = model.predict(features)
        predicted_class = CLASSES_LIST[np.argmax(prediction)]
        confidence = np.max(prediction)
        predictions.extend([predicted_class] * len(segment_frames[:end - start]))
        confidences.extend([confidence] * len(segment_frames[:end - start]))
        print(f"Segment {start}-{end} predicted as {predicted_class} with confidence {confidence:.2f}")

    return predictions, confidences

# Function to apply smoothing to predictions
def smooth_predictions(predictions, window_size=SMOOTHING_WINDOW):
    smoothed_predictions = []
    for i in range(len(predictions)):
        start = max(0, i - window_size // 2)
        end = min(len(predictions), i + window_size // 2 + 1)
        window_predictions = predictions[start:end]
        most_common_prediction = Counter(window_predictions).most_common(1)[0][0]
        smoothed_predictions.append(most_common_prediction)
    return smoothed_predictions

# Load the new video and preprocess it

video_title = 't1'  # Replace with your actual video title
test_videos = 'test_videos'

# Get the input video file path
new_video_path = f'{test_videos}/{video_title}.mp4'

frames = load_video(new_video_path)

if frames.shape[0] == 0:
    print("No frames to process. Exiting.")
else:
    # Predict activities for video segments
    predictions, confidences = predict_activities(frames, feature_extractor, LRCN_model)
    
    # Apply smoothing to the predictions
    smoothed_predictions = smooth_predictions(predictions)
    
    # Annotate and save the video
    cap = cv2.VideoCapture(new_video_path)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    print("VideoWriter settings:", width, height, fps)
    video_writer = cv2.VideoWriter('annotated_new_video.mp4', fourcc, fps, (width, height))

    if not video_writer.isOpened():
        print("Error: VideoWriter not opened.")
    else:
        print("VideoWriter opened successfully.")

    frame_index = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break

        original_frame = frame.copy()  # Save the original frame for YOLO
        
        # YOLOv9 object detection
        results = model(original_frame)
        
        # Iterate through detection results
        for result in results.xyxy[0]:  # results.xyxy[0] gives the detected boxes
            # Convert map to list and handle tensors correctly
            x1, y1, x2, y2 = map(int, result[:4])
            conf, cls = result[4:6]
            confidence = conf.item()  # Convert tensor to scalar
            
            label = classes[int(cls)]
            color = (0, 255, 0)
            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
            cv2.putText(frame, f"{label} {confidence:.2f}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
        
        # Annotate the frame with the predicted class and confidence
        if frame_index < len(smoothed_predictions):
            predicted_class = smoothed_predictions[frame_index]
            confidence = confidences[frame_index] * 100  # Convert to percentage
            text = f"{predicted_class} ({confidence:.2f}%)"
        else:
            text = "Unknown"

        font_scale = 1
        font = cv2.FONT_HERSHEY_SIMPLEX
        thickness = 2
        text_size, _ = cv2.getTextSize(text, font, font_scale, thickness)
        text_x = 10
        text_y = 30
        box_coords = ((text_x - 5, text_y + 5), (text_x + text_size[0] + 5, text_y - text_size[1] - 5))
        
        # Add bounding box
        cv2.rectangle(frame, box_coords[0], box_coords[1], (0, 0, 0), cv2.FILLED)
        cv2.putText(frame, text, (text_x, text_y), font, font_scale, (0, 255, 0), thickness, cv2.LINE_AA)
        
        # Debugging: Print frame type and dimensions before writing
        print(f"Writing frame {frame_index} of type {type(frame)} with shape {frame.shape}")

        # Write the frame to the output video
        video_writer.write(frame)
        frame_index += 1

    cap.release()
    video_writer.release()
    print("Annotated video saved as 'annotated_new_video.mp4'")

# Ensure the video is written before trying to read it back
import time
time.sleep(5)

# Display the video in Jupyter notebook
Video("annotated_new_video.mp4")


YOLO 🚀 v0.1-104-g5b1ea9a Python-3.10.10 torch-2.2.1+cu121 CUDA:0 (NVIDIA L4, 22700MiB)

Fusing layers... 
yolov9-e summary: 839 layers, 68584822 parameters, 0 gradients, 240.9 GFLOPs
Adding AutoShape... 


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 16ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 15ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 16ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 15ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 15ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 14ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 14ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 14ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 14ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 14ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 14ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 15ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 15ms