Mount Drive

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
%cd /content/drive/MyDrive/

/content/drive/MyDrive


Go to root project directory

In [22]:
%cd pentograph_project/

/content/drive/MyDrive/pentograph_project


In [23]:
# !nvidia-smi

In [24]:
import os
HOME = os.getcwd()
print(HOME)

/content/drive/MyDrive/pentograph_project


In [25]:
%%capture
!pip install -q git+https://github.com/THU-MIG/yolov10.git
!pip install -q supervision roboflow
!pip install gradio
!pip install -q roboflow

In [34]:
import cv2
import supervision as sv
from ultralytics import YOLOv10
from IPython.display import Image

from google.colab import userdata
from roboflow import Roboflow


import cv2
import supervision as sv
from ultralytics import YOLOv10
from IPython.display import Image
import random


#for UI
import gradio as gr
import cv2
import numpy as np
from ultralytics import YOLOv10
import supervision as sv
from datetime import timedelta
import tempfile
import os
import csv


### Launch our UI

Once you run the code-
Output print will contain :

Running on public URL: 'Click on the Url'


Now rediceting you to browser for the UI.

In [40]:
# Load the YOLOv10 model
#paste the absolute path to our best.pt checkpoint file
model = YOLOv10(f'/content/drive/MyDrive/pentograph_project/best.pt')

# Initialize annotators
box_annotator = sv.BoxAnnotator()
label_annotator = sv.LabelAnnotator()

def process_video(video_path, conf_threshold, distance_threshold):
    cap = cv2.VideoCapture(video_path)
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Create temporary files for the output video and CSVs
    with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as temp_output, \
         tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_csv_alerts, \
         tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_csv_all:
        temp_output_path = temp_output.name
        temp_csv_alerts_path = temp_csv_alerts.name
        temp_csv_all_path = temp_csv_all.name

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(temp_output_path, fourcc, fps, (width, height))

    frame_number = 0
    alert_logs = []
    all_logs = []

    csv_headers = ['frame', 'timestamp', 'distance', 'avg_confidence', 'alert']

    # Load alert symbol
    #paste the absolute path to the alert png icon
    alert_symbol = cv2.imread('/content/drive/MyDrive/pentograph_project/alert_icon.png', cv2.IMREAD_UNCHANGED)
    # Resize alert symbol if needed
    alert_symbol = cv2.resize(alert_symbol, (100, 100))  # Adjust size as needed

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        results = model(frame, conf=conf_threshold)[0]
        detections = sv.Detections.from_ultralytics(results)

        centroid = None
        contact_point = None

        for i in range(len(detections)):
            class_id = int(detections.class_id[i])
            bbox = detections.xyxy[i]

            if class_id == 0:  # centroid
                centroid = ((bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2)
            elif class_id == 1:  # contact_point
                contact_point = ((bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2)

        # Annotate the frame with bounding boxes and labels
        annotated_frame = box_annotator.annotate(scene=frame.copy(), detections=detections)
        annotated_frame = label_annotator.annotate(scene=annotated_frame, detections=detections)

        timestamp = str(timedelta(seconds=frame_number/fps))
        alert = 'No'
        distance = 0
        avg_confidence = 0

        if centroid and contact_point:
            distance = np.sqrt((centroid[0] - contact_point[0])**2 + (centroid[1] - contact_point[1])**2)
            avg_confidence = np.mean(detections.confidence[np.isin(detections.class_id, [0, 1])])

            if distance > distance_threshold:
                alert = 'Yes'
                cv2.putText(annotated_frame, "ALERT", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
                cv2.rectangle(annotated_frame, (0, 0), (annotated_frame.shape[1]-1, annotated_frame.shape[0]-1), (0, 0, 255), 10)

                # Add alert symbol to the frame
                y_offset = annotated_frame.shape[0] - alert_symbol.shape[0] - 10
                x_offset = annotated_frame.shape[1] - alert_symbol.shape[1] - 10
                for c in range(0, 3):
                    annotated_frame[y_offset:y_offset+alert_symbol.shape[0], x_offset:x_offset+alert_symbol.shape[1], c] = \
                        alert_symbol[:,:,c] * (alert_symbol[:,:,3]/255.0) + \
                        annotated_frame[y_offset:y_offset+alert_symbol.shape[0], x_offset:x_offset+alert_symbol.shape[1], c] * (1.0 - alert_symbol[:,:,3]/255.0)

        log_entry = [frame_number, timestamp, f"{distance:.2f}", f"{avg_confidence:.2f}", alert]
        if alert == 'Yes':
            alert_logs.append(log_entry)
        all_logs.append(log_entry)

        out.write(annotated_frame)
        frame_number += 1

    cap.release()
    out.release()

    # Write CSV data for alerts only
    with open(temp_csv_alerts_path, 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(csv_headers)
        writer.writerows(alert_logs)

    # Write CSV data for all predictions
    with open(temp_csv_all_path, 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(csv_headers)
        writer.writerows(all_logs)

    #coded by abhijit das
    return temp_output_path, alert_logs, all_logs, temp_csv_alerts_path, temp_csv_all_path

def gradio_video_inference(video, conf_threshold, distance_threshold):
    output_path, alert_logs, all_logs, csv_alerts_path, csv_all_path = process_video(video, conf_threshold, distance_threshold)
    return output_path, alert_logs, all_logs, csv_alerts_path, csv_all_path, output_path

# Gradio interface
with gr.Blocks() as demo:
    gr.Markdown("# Pentograph Alert Generation Demo")
    with gr.Row():
        video_input = gr.Video(label="Upload Video for Processing")
        video_output = gr.Video(label="Processed Video")

    with gr.Row():
        conf_threshold = gr.Slider(minimum=0.1, maximum=1.0, value=0.15, step=0.05, label="Confidence Threshold")
        distance_threshold = gr.Slider(minimum=10, maximum=100, value=50, step=5, label="Distance Threshold")

    with gr.Tab("Alert Logs"):
        alert_output = gr.Dataframe(
            headers=["frame", "timestamp", "distance", "avg_confidence", "alert"],
            label="Alert Logs",
            wrap=True
        )

    with gr.Tab("All Predictions"):
        all_predictions_output = gr.Dataframe(
            headers=["frame", "timestamp", "distance", "avg_confidence", "alert"],
            label="All Predictions",
            wrap=True
        )

    with gr.Row():
        csv_alerts_output = gr.File(label="Download Alert Logs (CSV)")
        csv_all_output = gr.File(label="Download All Predictions (CSV)")
        video_download = gr.File(label="Download Processed Video")

    video_input.upload(
        gradio_video_inference,
        inputs=[video_input, conf_threshold, distance_threshold],
        outputs=[video_output, alert_output, all_predictions_output, csv_alerts_output, csv_all_output, video_download]
    )

demo.launch(share=True, inbrowser=True)

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
Running on public URL: https://817790a42d96220d8d.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from Terminal to deploy to Spaces (https://huggingface.co/spaces)


