In [1]:
!pip install ultralytics



In [5]:
import cv2
import numpy as np
from ultralytics import YOLO
from ultralytics.utils.plotting import Annotator, colors

# Load YOLOv8 model
model = YOLO("best.pt")
names = model.model.names

def stream_video():
    # Open a connection to the webcam
    cap = cv2.VideoCapture(0)

    # Check if the webcam is opened correctly
    if not cap.isOpened():
        print("Error: Could not open video device.")
        return

    # Set video frame dimensions (optional)
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)

    # Stream video until the user terminates the code
    while True:
        # Capture frame-by-frame
        ret, frame = cap.read()
        if not ret:
            print("Failed to capture frame")
            break

        # Pass the frame to the YOLOv8 model
        results = model(frame)
        boxes = results[0].boxes.xyxy.cpu().numpy()
        clss = results[0].boxes.cls.cpu().numpy()

        # Annotator Init
        annotator = Annotator(frame, line_width=2)

        for box, cls in zip(boxes, clss):
            annotator.box_label(box, color=colors(int(cls), True), label=names[int(cls)])

        # Display the resulting frame with detections
        annotated_frame = annotator.result()
        cv2.imshow('Video Stream with YOLOv8', annotated_frame)

        # Press 'q' to quit the video stream
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    # Release the capture when everything is done
    cap.release()
    cv2.destroyAllWindows()

stream_video()


0: 480x640 1 NO-Hardhat, 2 NO-Masks, 1 NO-Safety Vest, 1 Person, 1018.9ms
Speed: 3.5ms preprocess, 1018.9ms inference, 0.0ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 NO-Hardhat, 1 NO-Mask, 1 NO-Safety Vest, 1 Person, 1104.8ms
Speed: 4.2ms preprocess, 1104.8ms inference, 1.7ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 NO-Hardhat, 1 NO-Mask, 1 NO-Safety Vest, 1 Person, 974.6ms
Speed: 0.0ms preprocess, 974.6ms inference, 0.0ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 NO-Hardhat, 1 NO-Mask, 1 Person, 1058.0ms
Speed: 3.5ms preprocess, 1058.0ms inference, 1.7ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 NO-Hardhat, 1 NO-Mask, 1 NO-Safety Vest, 1 Person, 973.1ms
Speed: 1.7ms preprocess, 973.1ms inference, 0.0ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 NO-Hardhat, 1 NO-Mask, 1 Person, 996.5ms
Speed: 0.0ms preprocess, 996.5ms inference, 0.0ms postprocess per image at shape (1, 3, 480, 640)

0:

In [4]:
import cv2
import numpy as np
from ultralytics import YOLO

from ultralytics.utils.checks import check_imshow
from ultralytics.utils.plotting import Annotator, colors

from collections import defaultdict

track_history = defaultdict(lambda: [])
model = YOLO("best.pt")
names = model.model.names

video_path = "prathik.mp4"
cap = cv2.VideoCapture(video_path)
assert cap.isOpened(), "Error reading video file"

w, h, fps = (int(cap.get(x)) for x in (cv2.CAP_PROP_FRAME_WIDTH, cv2.CAP_PROP_FRAME_HEIGHT, cv2.CAP_PROP_FPS))

result = cv2.VideoWriter("object_tracking.avi",
                       cv2.VideoWriter_fourcc(*'mp4v'),
                       fps,
                       (w, h))

while cap.isOpened():
    success, frame = cap.read()
    if success:
        results = model.track(frame, persist=True, verbose=False)
        boxes = results[0].boxes.xyxy.cpu()

        if results[0].boxes.id is not None:

            # Extract prediction results
            clss = results[0].boxes.cls.cpu().tolist()
            track_ids = results[0].boxes.id.int().cpu().tolist()
            confs = results[0].boxes.conf.float().cpu().tolist()

            # Annotator Init
            annotator = Annotator(frame, line_width=2)

            for box, cls, track_id in zip(boxes, clss, track_ids):
                annotator.box_label(box, color=colors(int(cls), True), label=names[int(cls)])

                # Store tracking history
                track = track_history[track_id]
                track.append((int((box[0] + box[2]) / 2), int((box[1] + box[3]) / 2)))
                if len(track) > 30:
                    track.pop(0)

                # Plot tracks
                points = np.array(track, dtype=np.int32).reshape((-1, 1, 2))
                cv2.circle(frame, (track[-1]), 7, colors(int(cls), True), -1)
                cv2.polylines(frame, [points], isClosed=False, color=colors(int(cls), True), thickness=2)

        result.write(frame)
        if cv2.waitKey(1) & 0xFF == ord("q"):
            break
    else:
        break

result.release()
cap.release()
cv2.destroyAllWindows()

In [6]:
#convert avi to mp4 and save in folder 
import cv2

# Replace 'your_video.avi' with the actual filename
cap = cv2.VideoCapture('object_tracking.avi')

# Define the output filename (e.g., converted_video.mp4)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter('converted_video.mp4', fourcc, 20.0, (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))))

while True:
    ret, frame = cap.read()
    if not ret:
        break

    # Process or display the frame here (optional)

    out.write(frame)

cap.release()
out.release()
cv2.destroyAllWindows()

In [7]:
import cv2
 
 
 
# Create a VideoCapture object and read from input file
# If the input is the camera, pass 0 instead of the video file name
cap = cv2.VideoCapture('converted_video.mp4')
 
 
# Check if camera opened successfully
if (cap.isOpened()== False):
  print("Error opening video stream or file")
 
# Read until video is completed
while (cap.isOpened()):
    # Capture frame-by-frame
    ret, frame = cap.read()
    if ret == True:
 
        # Display the resulting frame
        cv2.imshow('Frame', frame)
 
        # Press Q on keyboard to  exit
        if cv2.waitKey(25) & 0xFF == ord('q'):
            break
 
    # Break the loop
    else:
        break
 
# When everything done, release the video capture object
cap.release()
 
# Closes all the frames
cv2.destroyAllWindows()