Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# **T06 - AUTOMATIC LABELLING OF TRAFFIC LIGHT DETECTION**

### **TEAM MEMBERS**

1. SHIVAM MAHESHWARI
2. DARREN GALLOIS
3. DORIAN LAMOUILLE
4. VENKATA NARAYANA BOMMANABOINA


### PROJECT OWNER - TAMBET MATIISEN

## 1.Connecting to Google Drive and Importing packages

In [9]:
# If using Google Colab, mount Drive:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [10]:

# Install YOLOv5 and requirements
!git clone https://github.com/ultralytics/yolov5.git
%cd yolov5
!pip install -r requirements.txt
%cd ..

# Install other dependencies
!pip install opencv-python
!pip install torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cu118

!pip install filterpy

# Install required libraries
!pip install opencv-python torch torchvision
!pip install git+https://github.com/facebookresearch/segment-anything.git

# Download SORT tracker
!wget https://raw.githubusercontent.com/abewley/sort/master/sort.py



fatal: destination path 'yolov5' already exists and is not an empty directory.
/content/yolov5
/content
Looking in indexes: https://pypi.org/simple, https://download.pytorch.org/whl/cu118
Collecting git+https://github.com/facebookresearch/segment-anything.git
  Cloning https://github.com/facebookresearch/segment-anything.git to /tmp/pip-req-build-pu92z298
  Running command git clone --filter=blob:none --quiet https://github.com/facebookresearch/segment-anything.git /tmp/pip-req-build-pu92z298
  Resolved https://github.com/facebookresearch/segment-anything.git to commit dca509fe793f601edb92606367a655c15ac00fdf
  Preparing metadata (setup.py) ... [?25l[?25hdone
--2024-12-15 13:19:06--  https://raw.githubusercontent.com/abewley/sort/master/sort.py
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.111.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent,

In [11]:
import os
import random
import shutil
import xml.etree.ElementTree as ET
import cv2
import numpy as np
import torch
from google.colab.patches import cv2_imshow
import cv2

In [12]:
# Replace 'TkAgg' with 'Agg' in sort.py programmatically
sort_file_path = "sort.py"

# Read and replace the backend in the file
with open(sort_file_path, "r") as file:
    content = file.read()

# Replace 'TkAgg' with 'Agg'
content = content.replace("matplotlib.use('TkAgg')", "matplotlib.use('Agg')")

# Write the updated content back to sort.py
with open(sort_file_path, "w") as file:
    file.write(content)

from sort import Sort


## 2. Define Classes and Paths


In [13]:

classes = ["red", "yellow", "green", "off"]

base_dir = "/content/drive/My Drive/traffic_light_detection"
dataset_dir = os.path.join(base_dir, "traffic_light_dataset/traffic_light_dataset")
images_dir = os.path.join(dataset_dir, "JPEGImages")
annotations_dir = os.path.join(dataset_dir, "Annotations")

yolo_dataset_dir = os.path.join(base_dir, "yolo_dataset")

# SAM checkpoint path
sam_checkpoint = "/content/drive/My Drive/traffic_light_detection/sam_vit_h.pth"

# Input video and output video for inference
input_video_path = "/content/drive/My Drive/traffic_light_detection/video-short.mp4"
output_video_path = "/content/drive/My Drive/traffic_light_detection/output_with_sam_blinking2.mp4"

In [14]:
print(os.listdir(base_dir))

['video-short.mp4', 'video.mp4', 'output_traffic_lights.mp4', 'traffic_light.ipynb', 'traffic_light_model.onnx', 'sam_vit_l_0b3195.pth', 'traffic_light_dataset', 'traffic_light_new.ipynb', 'converted_video.mp4', 'yolo_dataset', 'trafficlight_detectionostates.ipynb', 'data.yaml', 'yolo_train2', '.ipynb_checkpoints', 'sam_vit_h.pth', 'ML-Project-FinalPPT.gslides', 'MLPro.ipynb', 'output_with_blinking.mp4', 'blinking_video.mp4']


*************************************************
# **Step 3 to Step 5 cells are for model training! (Be Cautious when running) !!!**
*************************************************

## 3. Convert Pascal VOC to YOLO Format

In [None]:
# Create YOLO folder structure
os.makedirs(os.path.join(yolo_dataset_dir, "images", "train"), exist_ok=True)
os.makedirs(os.path.join(yolo_dataset_dir, "images", "val"), exist_ok=True)
os.makedirs(os.path.join(yolo_dataset_dir, "labels", "train"), exist_ok=True)
os.makedirs(os.path.join(yolo_dataset_dir, "labels", "val"), exist_ok=True)

annotation_files = [f for f in os.listdir(annotations_dir) if f.endswith(".xml")]
random.shuffle(annotation_files)


In [None]:
train_ratio = 0.8
train_count = int(len(annotation_files) * train_ratio)
train_files = annotation_files[:train_count]
val_files = annotation_files[train_count:]

def convert_bbox(size, box):
    w_img, h_img = size
    xmin, ymin, xmax, ymax = box
    x = (xmin + xmax)/2.0
    y = (ymin + ymax)/2.0
    w = xmax - xmin
    h = ymax - ymin
    return x/w_img, y/h_img, w/w_img, h/h_img

for xml_file in annotation_files:
    xml_path = os.path.join(annotations_dir, xml_file)
    tree = ET.parse(xml_path)
    root = tree.getroot()

    filename_node = root.find("filename")
    if filename_node is None:
        continue
    filename = filename_node.text
    image_path = os.path.join(images_dir, filename)
    if not os.path.isfile(image_path):
        continue

    size_node = root.find("size")
    width = int(size_node.find("width").text)
    height = int(size_node.find("height").text)

    yolo_lines = []
    for obj in root.findall("object"):
        cls_name = obj.find("name").text.strip().lower()
        if cls_name not in classes:
            continue
        cls_id = classes.index(cls_name)
        bndbox = obj.find("bndbox")
        xmin = float(bndbox.find("xmin").text)
        ymin = float(bndbox.find("ymin").text)
        xmax = float(bndbox.find("xmax").text)
        ymax = float(bndbox.find("ymax").text)
        x, y, w, h = convert_bbox((width, height), (xmin, ymin, xmax, ymax))
        yolo_lines.append(f"{cls_id} {x} {y} {w} {h}\n")

    if xml_file in train_files:
        subset = "train"
    else:
        subset = "val"

    shutil.copy(image_path, os.path.join(yolo_dataset_dir, "images", subset, filename))

    label_filename = os.path.splitext(filename)[0] + ".txt"
    with open(os.path.join(yolo_dataset_dir, "labels", subset, label_filename), 'w') as f:
        f.writelines(yolo_lines)

print("VOC to YOLO conversion completed.")

## 4. Create data.yaml for YOLO

In [None]:
data_yaml_content = f"""
train: {yolo_dataset_dir}/images/train
val: {yolo_dataset_dir}/images/val

nc: {len(classes)}
names: {classes}
"""
with open(os.path.join(base_dir, "data.yaml"), 'w') as f:
    f.write(data_yaml_content)

print("data.yaml created.")

data.yaml created.


# 5. Train YOLOv5

In [None]:
# We'll use yolov5s.pt as a starting point.
%cd yolov5
!python train.py --img 640 --batch 16 --epochs 50 --data "{base_dir}/data.yaml" --weights yolov5s.pt --project "{base_dir}" --name "yolo_train"
%cd ..

print("YOLO training completed. Check the runs folder for best.pt weights.")

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  with torch.cuda.amp.autocast(amp):
  with torch.cuda.amp.autocast(amp):
  with torch.cuda.amp.autocast(amp):
  with torch.cuda.amp.autocast(amp):
  with torch.cuda.amp.autocast(amp):
  with torch.cuda.amp.autocast(amp):
  with torch.cuda.amp.autocast(amp):
  with torch.cuda.amp.autocast(amp):
  with torch.cuda.amp.autocast(amp):
  with torch.cuda.amp.autocast(amp):
  with torch.cuda.amp.autocast(amp):
  with torch.cuda.amp.autocast(amp):
  with torch.cuda.amp.autocast(amp):
  with torch.cuda.amp.autocast(amp):
  with torch.cuda.amp.autocast(amp):
  with torch.cuda.amp.autocast(amp):
  with torch.cuda.amp.autocast(amp):
  with torch.cuda.amp.autocast(amp):
  with torch.cuda.amp.autocast(amp):
  with torch.cuda.amp.autocast(amp):
  with torch.cuda.amp.autocast(amp):
  with torch.cuda.amp.autocast(amp):
  with torch.cuda.amp.autocast(amp):
  with torch.cuda.amp.autocast(amp):
  with torch.cuda.amp.autocast(amp):
  with tor

#6. Inference with YOLO, SORT and Blinking Detection

In [19]:
# Load the YOLO model
model = torch.hub.load('ultralytics/yolov5', 'custom', path=os.path.join(base_dir, "yolo_train2/weights/best.pt"), force_reload=True)
model.conf = 0.25

Downloading: "https://github.com/ultralytics/yolov5/zipball/master" to /root/.cache/torch/hub/master.zip
YOLOv5 🚀 2024-12-15 Python-3.10.12 torch-2.5.1+cpu CPU

Fusing layers... 
Model summary: 157 layers, 7020913 parameters, 0 gradients, 15.8 GFLOPs
Adding AutoShape... 


In [20]:
# Initialize SORT tracker
tracker = Sort(max_age=10, min_hits=3, iou_threshold=0.3)


In [21]:
# Blink detection logic
def is_blinking(state_history, threshold=2):
    """
    Check if a traffic light is blinking based on state transitions.
    """
    transitions = 0
    for i in range(1, len(state_history)):
        if state_history[i] != state_history[i - 1]:  # State change detected
            transitions += 1
    return transitions >= threshold  # Blinking detected if transitions exceed threshold

states_history = {}

In [22]:
def compute_iou(boxA, boxB):
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[2], boxB[2])
    yB = min(boxA[3], boxB[3])
    interArea = max(0, xB - xA + 1)*max(0, yB - yA + 1)
    boxAArea = (boxA[2]-boxA[0]+1)*(boxA[3]-boxA[1]+1)
    boxBArea = (boxB[2]-boxB[0]+1)*(boxB[3]-boxB[1]+1)
    iou = interArea / float(boxAArea + boxBArea - interArea)
    return iou

In [23]:
import torch
from PIL import Image
import cv2
import numpy as np
from sort import Sort
import os

# Video input and output paths
input_path = os.path.join(base_dir, "blinking_video.mp4")
output_path = os.path.join(base_dir, "output_with_blinking.mp4")

# Parameters
history_length = 10  # Number of frames to keep in state history
iou_threshold = 0.3  # IOU threshold for SORT matching

cap = cv2.VideoCapture(input_path)
if not cap.isOpened():
    print("Error opening video file")
    exit()

# Get video properties
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

frame_count = 0
print("Processing video...")

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    frame_count += 1

    # Convert the frame to a PIL image for YOLOv5
    img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    results = model(img)  # Run YOLO detection

    # Extract detections
    detections = results.xyxy[0].cpu().numpy()
    tracker_input = []
    for det in detections:
        x_min, y_min, x_max, y_max, conf, cls = det
        tracker_input.append([x_min, y_min, x_max, y_max, conf])

    # Update SORT tracker
    tracker_input = np.array(tracker_input)
    if len(tracker_input) == 0:  # If no detections
        tracker_input = np.empty((0, 5))
    tracked_objects = tracker.update(tracker_input)

    # Annotate the frame
    annotated_frame = np.array(results.render()[0])
    annotated_frame = cv2.cvtColor(annotated_frame, cv2.COLOR_RGB2BGR)  # Convert back to BGR

    # Process tracked objects
    for trk in tracked_objects:
        x_min, y_min, x_max, y_max, track_id = map(int, trk)
        best_class = None

        # Match tracked object to YOLO detections
        for det in detections:
            dx_min, dy_min, dx_max, dy_max, dconf, dcls = det
            iou = (max(0, min(x_max, dx_max) - max(x_min, dx_min)) *
                   max(0, min(y_max, dy_max) - max(y_min, dy_min))) / (
                       (x_max - x_min) * (y_max - y_min) + 1e-6)
            if iou > iou_threshold:
                best_class = classes[int(dcls)]
                break

        # Update state history and check for blinking
        if best_class:
            if track_id not in states_history:
                states_history[track_id] = []
            states_history[track_id].append(best_class)
            if len(states_history[track_id]) > history_length:
                states_history[track_id].pop(0)

            # Add "BLINKING" label if detected
            if is_blinking(states_history[track_id]):
                cv2.putText(annotated_frame, "BLINKING", (x_min, y_min - 30), cv2.FONT_HERSHEY_SIMPLEX,
                            1, (0, 0, 255), 2)

    # Write the annotated frame to the output video
    out.write(annotated_frame)

cap.release()
out.release()
print(f"Processing complete. Video with blinking detection saved to: {output_path}")


Processing video...


  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with a

Processing complete. Video with blinking detection saved to: /content/drive/My Drive/traffic_light_detection/output_with_blinking.mp4


  with amp.autocast(autocast):
