In [None]:
# !pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
# !pip install transformers datasets ultralytics

In [None]:
from datasets import load_dataset
import os
import torch
import cv2
import time

# ETL

Converting the ```keremberke/hard-hat-detection``` dataset to something we can train on.

In [None]:
dataset = load_dataset("keremberke/hard-hat-detection", name="full")

In [None]:
dataset['train'][0]

In [None]:
# Define paths
output_dir = 'dataset'
image_dir = os.path.join(output_dir, 'images')
label_dir = os.path.join(output_dir, 'labels')
splits = ['train', 'validation', 'test']

# Create directories
for split in splits:
    os.makedirs(os.path.join(image_dir, split), exist_ok=True)
    os.makedirs(os.path.join(label_dir, split), exist_ok=True)

def convert_to_yolo_format(bbox, img_width, img_height):
    x_center = (bbox[0] + bbox[2] / 2) / img_width
    y_center = (bbox[1] + bbox[3] / 2) / img_height
    width = bbox[2] / img_width
    height = bbox[3] / img_height
    return x_center, y_center, width, height

def save_yolo_format(dataset, split):
    for i, example in enumerate(dataset[split]):
        # Save image
        img = example['image']
        img_filename = f"{example['image_id']}.jpg"
        img.save(os.path.join(image_dir, split, img_filename))

        # Save labels
        annotations = []
        for bbox, category_id in zip(example['objects']['bbox'], example['objects']['category']):
            x_center, y_center, width, height = convert_to_yolo_format(
                bbox, example['width'], example['height']
            )
            annotations.append(f"{category_id} {x_center} {y_center} {width} {height}")

        label_filename = f"{example['image_id']}.txt"
        with open(os.path.join(label_dir, split, label_filename), 'w') as f:
            f.write("\n".join(annotations))

# Convert and save the dataset
for split in splits:
    save_yolo_format(dataset, split)


# Training

In [None]:
# Download specific flavour
model = torch.hub.load('ultralytics/yolov5', 'yolov5n', pretrained=True)

In [None]:
# Download the Repo
!git clone https://github.com/ultralytics/yolov5
!pip install -r yolov5/requirements.txt

In [None]:
!python yolov5/detect.py --weights yolov5n.pt --img 640 --conf 0.25 --source dataset/images/test
# !python yolov5/detect.py --weights yolov5s.pt --img 640 --conf 0.25 --source dataset/images/test

In [None]:
!python yolov5/train.py --img 640 --batch 16 --epochs 10 --data dataset.yaml --weights yolov5n.pt --name nano_experiment
# !python yolov5/train.py --img 640 --batch 16 --epochs 1 --data dataset.yaml --weights yolov5s.pt --name experiment

In [None]:
!python yolov5/val.py --weights runs/train/nano_experiment/weights/best.pt --data dataset.yaml --img 640
# !python yolov5/val.py --weights runs/train/experiment/weights/best.pt --data dataset.yaml --img 640

# Inference

In [None]:
# Load the YOLO model
# model = torch.hub.load('ultralytics/yolov5', 'custom', path='runs/train/experiment/weights/best.pt')
model = torch.hub.load('ultralytics/yolov5', 'custom', path='runs/train/nano_experiment/weights/best.pt')
untrained_model = torch.hub.load('ultralytics/yolov5', 'yolov5n')

In [None]:
def resize_frame(frame, width=None, height=None):
    if width is None and height is None:
        return frame

    h, w = frame.shape[:2]
    if width and height:
        # Both width and height are specified
        resized_frame = cv2.resize(frame, (width, height))
    elif width:
        # Only width is specified, calculate height to maintain aspect ratio
        ratio = width / float(w)
        resized_frame = cv2.resize(frame, (width, int(h * ratio)))
    elif height:
        # Only height is specified, calculate width to maintain aspect ratio
        ratio = height / float(h)
        resized_frame = cv2.resize(frame, (int(w * ratio), height))

    return resized_frame

In [None]:
# Helper function to draw detections
def draw_detections(frame, detections, color, threshold=0.3):
    bboxes = []
    for _, detection in detections.iterrows():
        x1, y1, x2, y2 = detection['xmin'], detection['ymin'], detection['xmax'], detection['ymax']
        x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
        confidence = detection['confidence']

        if confidence > threshold:
            class_name = detection['name']
            text = f'{class_name}: {confidence:.2f}'
            frame = cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
            cv2.putText(frame, text, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
            bboxes.append((x1, y1, x2, y2))
    return bboxes

# Helper function to check bbox overlap
def bbox_overlap(bbox1, bbox2):
    x1, y1, x2, y2 = bbox1
    x1_b, y1_b, x2_b, y2_b = bbox2
    return not (x2 < x1_b or x2_b < x1 or y2 < y1_b or y2_b < y1)

video_path = 'day_at_work.mp4'
cap = cv2.VideoCapture(video_path)

if not cap.isOpened():
    print("Error: Could not open video.")
    exit()

# Initialize accumulators for timing
total_frames = 0
total_trained_model_time = 0.0
total_untrained_model_time = 0.0
total_pipeline_time = 0.0

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    frame = resize_frame(frame, width=640, height=320)

    # Measure the start time of the overall pipeline
    start_time = time.time()

    # Measure the start time for the trained model
    start_trained_model = time.time()
    trained_detections = model(frame)
    end_trained_model = time.time()
    trained_model_time = end_trained_model - start_trained_model

    # Measure the start time for the untrained model
    start_untrained_model = time.time()
    untrained_detections = untrained_model(frame)
    end_untrained_model = time.time()
    untrained_model_time = end_untrained_model - start_untrained_model

    # Filter detections for people (class 0) and clothing (class 1)
    trained_detections = trained_detections.pandas().xyxy[0]
    # trained_detections = trained_detections[(trained_detections['class'] == 0) | (trained_detections['class'] == 1)]

    untrained_detections = untrained_detections.pandas().xyxy[0]
    # untrained_detections = untrained_detections[(untrained_detections['class'] == 0) | (untrained_detections['class'] == 1)]
    untrained_detections = untrained_detections[(untrained_detections['class'] == 0)]

    # Draw detections and collect bounding boxes
    hardhat_bboxes = draw_detections(frame, trained_detections, (0, 255, 0))
    person_bboxes = draw_detections(frame, untrained_detections, (255, 0, 0), threshold=0.5)

    # Check for safety status
    safe = False
    if person_bboxes and hardhat_bboxes:
        safe = all(any(bbox_overlap(person_bbox, hardhat_bbox) for hardhat_bbox in hardhat_bboxes) for person_bbox in person_bboxes)

    # Display safety status
    status_text = "Safe" if safe else "Unsafe"
    status_color = (0, 255, 0) if safe else (0, 0, 255)
    cv2.putText(frame, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, status_color, 2)

    # Measure the end time of the overall pipeline
    end_time = time.time()
    total_pipeline_time_frame = end_time - start_time

    # Update accumulators
    total_frames += 1
    total_trained_model_time += trained_model_time
    total_untrained_model_time += untrained_model_time
    total_pipeline_time += total_pipeline_time_frame

    # Calculate averages
    avg_trained_model_time = total_trained_model_time / total_frames
    avg_untrained_model_time = total_untrained_model_time / total_frames
    avg_pipeline_time = total_pipeline_time / total_frames

    # Display benchmark results on the frame
    cv2.putText(frame, f'Model 1 time: {trained_model_time:.2f}s', (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
    cv2.putText(frame, f'Model 2 time: {untrained_model_time:.2f}s', (10, 80), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
    cv2.putText(frame, f'Total pipeline time: {total_pipeline_time_frame:.2f}s', (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
    cv2.putText(frame, f'Avg Model 1 time: {avg_trained_model_time:.2f}s', (10, 120), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
    cv2.putText(frame, f'Avg Model 2 time: {avg_untrained_model_time:.2f}s', (10, 140), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
    cv2.putText(frame, f'Avg pipeline time: {avg_pipeline_time:.2f}s', (10, 160), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)

    # Display the frame
    cv2.imshow('YOLO Object Detection', frame)

    # Press 'q' to exit the video
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()


# Export

Export the model to the desired format

In [None]:
!python yolov5/export.py --weights runs/train/nano_experiment/weights/best.pt --include tflite

In [None]:
!python yolov5/detect.py --weights runs/train/nano_experiment/weights/best-fp16.tflite