## 1. Labelling Vehicle and Segmentation
This method using SSD and U-Net. For identification vehicle's ID, we use StrongSORT based on that appearance

In [6]:
#Import libabry yang dibutuhkan
import cv2
import numpy as np
import matplotlib.pyplot as plt
import os
import glob
import time
import torch
from PIL import Image

In [7]:
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")

PyTorch version: 2.7.0+cpu
CUDA available: False


## 2. Detection Objek Using SSD 

In [None]:
# Load the SSD model from NVIDIA's PyTorch Hub
try:
    # Make sure you are connected to the internet for the first download
    ssd_model = torch.hub.load('NVIDIA/DeepLearningExamples:torchhub', 'nvidia_ssd')
    utils = torch.hub.load('NVIDIA/DeepLearningExamples:torchhub', 'nvidia_ssd_processing_utils')
    print("NVIDIA SSD model and processing utilities loaded successfully.")
except Exception as e:
    print(f"Error loading model from PyTorch Hub: {e}")
    print("Please ensure you have an internet connection and the repository/model is accessible.")
    ssd_model = None
    utils = None

if ssd_model:
    # Move model to GPU if available, otherwise CPU
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    ssd_model.to(device)
    ssd_model.eval() # Set the model to evaluation mode
    print(f"Model moved to: {device}")



Downloading: "https://github.com/NVIDIA/DeepLearningExamples/zipball/torchhub" to C:\Users\ACER/.cache\torch\hub\torchhub.zip




Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to C:\Users\ACER/.cache\torch\hub\checkpoints\resnet50-0676ba61.pth


100.0%
Downloading checkpoint from https://api.ngc.nvidia.com/v2/models/nvidia/ssd_pyt_ckpt_amp/versions/20.06.0/files/nvidia_ssdpyt_amp_200703.pt


In [2]:
# --- Configuration ---
PROTOTXT_PATH = "models/mobilenet_ssd/deploy.prototxt"
MODEL_PATH = "models/mobilenet_ssd/mobilenet_iter_73000.caffemodel"
# Ensure this VIDEO_DIR is correct relative to where your .ipynb file is saved.
# If your .ipynb file is in "FINAL PROJECT CV", then "Dataset" should be a subdirectory.
VIDEO_DIR = "Dataset"
CONFIDENCE_THRESHOLD = 0.5  # Minimum probability to filter weak detections

# COCO class labels (MobileNet SSD is often trained on COCO)
CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat",
        "bottle", "bus", "car", "cat", "chair", "cow", "diningtable",
        "dog", "horse", "motorbike", "person", "pottedplant", "sheep",
        "sofa", "train", "tvmonitor"]

# Colors for bounding boxes
CLASS_COLORS = {
    "car": (0, 255, 0),  # Green
    "bus": (255, 0, 0)   # Blue
    # Add other class colors if needed
}

# Target classes for detection
TARGET_CLASSES = ["car", "bus"]

In [14]:
# --- Load SSD Model ---
print("[INFO] Loading SSD model...")
net = None # Initialize net
try:
    # Ensure the paths are correct relative to the notebook's location
    if not os.path.exists(PROTOTXT_PATH):
        print(f"[ERROR] Prototxt file not found at: {os.path.abspath(PROTOTXT_PATH)}")
    elif not os.path.exists(MODEL_PATH):
        print(f"[ERROR] Model file not found at: {os.path.abspath(MODEL_PATH)}")
    else:
        net = cv2.dnn.readNetFromCaffe(PROTOTXT_PATH, MODEL_PATH)
        print("[INFO] SSD model loaded successfully.")
except cv2.error as e:
    print(f"[ERROR] Could not load SSD model.")
    print(f"Attempted Prototxt: {os.path.abspath(PROTOTXT_PATH)}")
    print(f"Attempted Caffemodel: {os.path.abspath(MODEL_PATH)}")
    print(f"OpenCV error: {e}")

if net is None:
    print("[WARNING] SSD Model could not be loaded. Video processing will not occur.")

[INFO] Loading SSD model...
[INFO] SSD model loaded successfully.


In [15]:
# --- Placeholder for U-Net Model Loading and Processing ---
# To integrate U-Net, you would typically:
# 1. Load your pre-trained U-Net model here (e.g., TensorFlow, Keras, PyTorch, or OpenCV dnn)
#    unet_model = cv2.dnn.readNetFromTensorflow('path/to/your/unet.pb') # Example
# 2. Define a function to process a frame with U-Net.

def apply_unet_processing(frame_input):
    """
    Placeholder function for U-Net processing.
    Replace this with your actual U-Net inference code.
    """
    # Example: If U-Net performs segmentation, you might get a mask
    # processed_unet_output = unet_model.forward(blob_from_unet_input(frame_input))
    # segmented_mask = postprocess_unet_output(processed_unet_output)
    # frame_with_unet_overlay = overlay_mask_on_frame(frame_input, segmented_mask)
    # return frame_with_unet_overlay

    # print("[INFO] U-Net processing placeholder: No U-Net model loaded or applied.")
    # For now, just return the frame as is
    return frame_input

In [None]:
# --- Process Videos ---
if net is None:
    print("[ERROR] SSD Model not loaded. Cannot proceed with video processing.")
else:
    # Check if VIDEO_DIR exists
    if not os.path.isdir(VIDEO_DIR):
        print(f"[ERROR] Video directory not found: {os.path.abspath(VIDEO_DIR)}")
        video_files = []
    else:
        video_files = sorted([
            os.path.join(VIDEO_DIR, f) for f in os.listdir(VIDEO_DIR)
            if f.lower().endswith(('.mp4', '.avi', '.mov', '.mkv')) # made it case-insensitive
        ])

    if not video_files:
        if os.path.isdir(VIDEO_DIR):
            print(f"[INFO] No video files found in {os.path.abspath(VIDEO_DIR)}")
    else:
        print(f"[INFO] Found video files: {video_files}")

        quit_all_processing = False
        try:
            for video_path in video_files:
                if quit_all_processing:
                    break

                print(f"\n[INFO] Processing video: {video_path}")
                cap = cv2.VideoCapture(video_path)

                if not cap.isOpened():
                    print(f"[ERROR] Could not open video file: {video_path}")
                    continue

                frame_count = 0
                # Sanitize video name for window title (optional, but good practice)
                base_video_name = os.path.basename(video_path)
                window_name = f"Output - {base_video_name}"


                while True:
                    ret, frame = cap.read()
                    if not ret:
                        print(f"[INFO] End of video: {base_video_name}")
                        break

                    frame_count += 1
                    if frame is None: # Should be caught by 'not ret' but as a safeguard
                        print(f"[WARNING] Read an empty frame from {base_video_name} (frame {frame_count})")
                        continue
                    
                    (h, w) = frame.shape[:2]

                    # --- SSD Detection ---
                    blob = cv2.dnn.blobFromImage(cv2.resize(frame, (300, 300)),
                                                0.007843, (300, 300), 127.5)
                    net.setInput(blob)
                    detections = net.forward()
                    
                    detected_objects_frame = frame.copy() # Work on a copy

                    for i in np.arange(0, detections.shape[2]):
                        confidence = detections[0, 0, i, 2]
                        if confidence > CONFIDENCE_THRESHOLD:
                            idx = int(detections[0, 0, i, 1])
                            # Ensure idx is within bounds of CLASSES list
                            if 0 <= idx < len(CLASSES):
                                detected_class_name = CLASSES[idx]

                                if detected_class_name in TARGET_CLASSES:
                                    box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
                                    (startX, startY, endX, endY) = box.astype("int")
                                    
                                    color = CLASS_COLORS.get(detected_class_name, (0, 0, 255)) # Default to red
                                    label = f"{detected_class_name}: {confidence:.2f}"
                                    
                                    cv2.rectangle(detected_objects_frame, (startX, startY), (endX, endY), color, 2)
                                    y_label_pos = startY - 15 if startY - 15 > 15 else startY + 20 # Adjust label position
                                    cv2.putText(detected_objects_frame, label, (startX, y_label_pos), 
                                                cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
                            else:
                                print(f"[WARNING] Detected class index {idx} out of bounds for CLASSES list.")
                    
                    # --- U-Net Processing (Placeholder) ---
                    final_processed_frame = apply_unet_processing(detected_objects_frame)

                    # --- Display the output frame ---
                    # Ensure the window is created before trying to check its properties later
                    cv2.imshow(window_name, final_processed_frame)
                    key = cv2.waitKey(1) & 0xFF # waitKey(1) is crucial for video playback and event handling

                    if key == ord("q"):
                        print("[INFO] 'q' pressed, quitting all processing...")
                        quit_all_processing = True
                        break  # Break from inner loop (current video)
                    elif key == ord("n"):
                        print(f"[INFO] 'n' pressed, skipping to next video (from {base_video_name})...")
                        break  # Break from inner loop (current video)
                
                cap.release()
                # Attempt to destroy the specific window for the video that just finished or was skipped.
                # Check if window still exists before destroying, especially if 'q' was pressed.
                if cv2.getWindowProperty(window_name, cv2.WND_PROP_VISIBLE) >= 1:
                    cv2.destroyWindow(window_name)
            
        except Exception as e:
            print(f"[ERROR] An unexpected error occurred during video processing: {e}")
            import traceback
            traceback.print_exc() # Print full traceback for debugging
        finally:
            # This block executes whether the try block completes normally or an exception occurs.
            print("[INFO] Cleaning up any remaining OpenCV windows...")
            cv2.destroyAllWindows()
            # It can sometimes help in Jupyter to call waitKey a few times after destroyAllWindows
            # to ensure the OS processes the window close events.
            for _ in range(4): # Call it a few times
                cv2.waitKey(1)


if net is not None and not video_files and os.path.isdir(VIDEO_DIR):
    print("[INFO] Video processing loop finished or no videos were available to process.")
    cv2.destroyAllWindows() # Ensure cleanup if no videos run but model was loaded
elif net is None:
    print("[INFO] Model was not loaded, so no processing was attempted.")

print("--- End of Notebook Cell Execution ---")

[INFO] Found video files: ['Dataset\\vid1.mp4', 'Dataset\\vid2.mp4', 'Dataset\\vid3.mp4']

[INFO] Processing video: Dataset\vid1.mp4
[INFO] End of video: vid1.mp4

[INFO] Processing video: Dataset\vid2.mp4
[INFO] End of video: vid2.mp4

[INFO] Processing video: Dataset\vid3.mp4
[INFO] End of video: vid3.mp4
[INFO] Cleaning up any remaining OpenCV windows...
--- End of Notebook Cell Execution ---
