In [43]:
import cv2
from ultralytics import YOLO
import requests
import json

In [44]:
FLASK_URL = "http://192.168.137.2:4444/receive_docker_output"
CONFIDENCE_MIN = 0.4
FIELD_MESSAGE_TYPE = "message_type"
FIELD_CONTENT = "content"
FIELD_DATA = "data"
MST_MSG = "msg"
CT_SAFETY = "safety_detection"
MSG_FOREIGN = "detection_foreign"

RES_START = 1
RES_END = 2
RES_FOREIGN = 0

In [45]:
model = YOLO("yolov8x.pt",task="detect")
#model.eval()

In [46]:
def send_message(url=str, output=dict):
    print(f"Sending message: {output} to {url}")
    return
    response = requests.post(url, headers={"Content-Type": "application/json"}, data=json.dumps(output))
    response_text = json.loads(response.text)
    print(f"Server response ({url}): {response_text}")
    
    return response_text

In [47]:
def object_detection(model: YOLO, show=True, url=str):
    colors = [(0, 255, 0), (0, 0, 255), (255, 0, 0)]
    cap = cv2.VideoCapture(1)

    output = {
        FIELD_MESSAGE_TYPE: MST_MSG,
        FIELD_CONTENT: CT_SAFETY,
        FIELD_DATA: {}
    }

    counter = 0
    while True:
        # Read a frame from the video capture
        ret, frame = cap.read()
        frame = frame[:,160:520]

        if not ret:
            output[FIELD_DATA] = {
                "message": "No safety camera connected"
            }
            break

        if counter == 0:
            output[FIELD_DATA] = {
                "result": RES_START,
                "message": "Safety detection started"
            }
            send_message(url,output)

        # Perform inference using the YOLOv8 model
        results = model(frame, agnostic_nms=True)[0]

        if not results:
            continue

        for result in results:
            detection_count = result.boxes.shape[0]

            for i in range(detection_count):
                cls = int(result.boxes.cls[i].item())
                name = result.names[cls]
                confidence = float(result.boxes.conf[i].item())
                bounding_box = result.boxes.xyxy[i].cpu().numpy()
                if name == "person" and confidence > CONFIDENCE_MIN:
                    output[FIELD_DATA] = {
                        "result": RES_FOREIGN,
                        "object": {
                            "name": name,
                            "confidence": confidence
                        },
                        "message": f"Foreign object detected: {name}, {round(confidence*100,1)}%"
                    }
                    
                    send_message(url,output)
                x = int(bounding_box[0])
                y = int(bounding_box[1])
                width = int(bounding_box[2] - x)
                height = int(bounding_box[3] - y)

                if show:
                # Draw bounding box and label on the frame
                    color = colors[cls % len(colors)]
                    label = f'{name}: {confidence:.2f}'
                    cv2.rectangle(frame, (x, y), (x + width, y + height), color, 2)
                    cv2.putText(frame, label, (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)

        # Display the resulting frame
        if show:
            cv2.imshow('YOLOv8 Detection', frame)

        # Break the loop if 'q' key is pressed
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
        
        counter += 1

    # Release the video capture object and close all windows
    output[FIELD_MESSAGE_TYPE] = MST_MSG
    output[FIELD_DATA] = {
                "result": RES_END,
                "message": "Safety detection finished"
            }

    send_message(url,output)
    cap.release()
    cv2.destroyAllWindows()

In [48]:
object_detection(model, True, FLASK_URL)

Sending message: {'message_type': 'msg', 'content': 'safety_detection', 'data': {'result': 1, 'message': 'Safety detection started'}} to http://192.168.137.2:4444/receive_docker_output



0: 640x480 1 cell phone, 991.7ms
Speed: 7.6ms preprocess, 991.7ms inference, 0.0ms postprocess per image at shape (1, 3, 640, 480)

0: 640x480 1 cell phone, 1034.3ms
Speed: 1.3ms preprocess, 1034.3ms inference, 0.0ms postprocess per image at shape (1, 3, 640, 480)

0: 640x480 1 cell phone, 1013.5ms
Speed: 0.0ms preprocess, 1013.5ms inference, 0.0ms postprocess per image at shape (1, 3, 640, 480)

0: 640x480 1 cell phone, 950.3ms
Speed: 2.8ms preprocess, 950.3ms inference, 0.0ms postprocess per image at shape (1, 3, 640, 480)

0: 640x480 1 cell phone, 1043.5ms
Speed: 0.0ms preprocess, 1043.5ms inference, 0.0ms postprocess per image at shape (1, 3, 640, 480)

0: 640x480 1 cell phone, 969.1ms
Speed: 2.3ms preprocess, 969.1ms inference, 0.0ms postprocess per image at shape (1, 3, 640, 480)

0: 640x480 1 cell phone, 995.5ms
Speed: 0.5ms preprocess, 995.5ms inference, 0.0ms postprocess per image at shape (1, 3, 640, 480)

0: 640x480 1 cell phone, 995.1ms
Speed: 3.5ms preprocess, 995.1ms inf

Sending message: {'message_type': 'msg', 'content': 'safety_detection', 'data': {'result': 2, 'message': 'Safety detection finished'}} to http://192.168.137.2:4444/receive_docker_output
