In [1]:
%pip install --user av ultralytics==8.0.196 opencv-python

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.0 -> 24.1.2
[notice] To update, run: python.exe -m pip install --upgrade pip


### This code uses `av` timestamps: (output -> `timestamp`)

In [1]:
import os
import json
import av
import cv2
from ultralytics import YOLO

def detect_logos_in_video(video_path, model_path, confidence=0.35):
    if not os.path.exists(video_path):
        raise FileNotFoundError(f"Video file {video_path} not found")
    
    model = YOLO(model_path)
    container = av.open(video_path)
    stream = container.streams.video[0]
    
    logo_timestamps = {
        "Pepsi_pts": [],
        "CocaCols_pts": []
    }
    
    index = {
        "PEPSI": "Pepsi_pts",
        "COCACOLA": "CocaCols_pts"
    }

    for frame in container.decode(video=0):
        img = frame.to_ndarray(format='bgr24')
        
        results = model(img)
        
        timestamp = float(frame.pts * stream.time_base)
        rounded_timestamp = round(timestamp, 3)
        
        logos_detected = set()
        
        for result in results[0].boxes:
            score = result.conf.item()
            class_id = result.cls.item()

            if score > confidence:
                logo_name = model.names[int(class_id)].upper()
                if logo_name in ["PEPSI", "COCACOLA"]:
                    logos_detected.add(logo_name)
                    
                    x1, y1, x2, y2 = map(int, result.xyxy[0])
                    cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 2)
                    cv2.putText(img, f"{logo_name} {score:.2f}", (x1, y1 - 10),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
        
        for logo in logos_detected:
            logo_timestamps[index[logo]].append(rounded_timestamp)
        
        cv2.imshow("Logo Detection", img)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    container.close()
    cv2.destroyAllWindows()
    
    return logo_timestamps

def save_results_to_json(results, output_path):
    with open(output_path, 'w') as f:
        json.dump(results, f, indent=4)

video_path = "test\\both.mp4"
model_path = "trained_models\\200_best_small.pt"
output_json_path = "test\\logo_detection_results.json"

logo_timestamps = detect_logos_in_video(video_path, model_path)
save_results_to_json(logo_timestamps, output_json_path)

print(f"Results saved to {output_json_path}")


0: 384x640 (no detections), 140.0ms
Speed: 13.0ms preprocess, 140.0ms inference, 8.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 50.0ms
Speed: 2.0ms preprocess, 50.0ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 36.0ms
Speed: 2.0ms preprocess, 36.0ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 38.0ms
Speed: 2.0ms preprocess, 38.0ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 35.0ms
Speed: 2.0ms preprocess, 35.0ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 34.0ms
Speed: 2.0ms preprocess, 34.0ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 37.0ms
Speed: 2.0ms preprocess, 37.0ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 37.0ms
Speed: 2.0ms preprocess, 37.0m

Results saved to test\logo_detection_results.json


### This code uses frame counting: (output -> `second.frame`)

In [11]:
import os
import json
import cv2
from ultralytics import YOLO

def detect_logos_in_video(video_path, model_path, confidence=0.35, decimal_places=2):
    if not os.path.exists(video_path):
        raise FileNotFoundError(f"Video file {video_path} not found")
    
    model = YOLO(model_path)
    cap = cv2.VideoCapture(video_path)
    
    fps = cap.get(cv2.CAP_PROP_FPS)
    
    logo_timestamps = {
        "Pepsi_pts": [],
        "CocaCols_pts": []
    }
    
    index = {
        "PEPSI": "Pepsi_pts",
        "COCACOLA": "CocaCols_pts"
    }

    frame_count = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            print("Video ended during processing")
            break
        
        frame_count += 1
        
        results = model(frame)
        
        logos_detected = set()
        
        for result in results[0].boxes:
            score = result.conf.item()
            class_id = result.cls.item()

            if score > confidence:
                logo_name = model.names[int(class_id)].upper()
                if logo_name in ["PEPSI", "COCACOLA"]:
                    logos_detected.add(logo_name)
                    
                    x1, y1, x2, y2 = map(int, result.xyxy[0])
                    cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                    cv2.putText(frame, f"{logo_name} {score:.2f}", (x1, y1 - 10),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
        
        seconds = frame_count // fps
        frame_in_second = frame_count % fps
        timestamp = round(seconds + frame_in_second / 100, decimal_places)
        
        formatted_timestamp = f"{timestamp:.{decimal_places}f}"
        
        for logo in logos_detected:
            logo_timestamps[index[logo]].append(formatted_timestamp)
        
        cv2.imshow("Logo Detection", frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    cap.release()
    cv2.destroyAllWindows()
    
    return logo_timestamps

def save_results_to_json(results, output_path):
    with open(output_path, 'w') as f:
        json.dump(results, f, indent=4)

video_path = "test\\both.mp4"
model_path = "trained_models\\200_best_small.pt"
output_json_path = "test\\logo_detection_results.json"

logo_timestamps = detect_logos_in_video(video_path, model_path)
save_results_to_json(logo_timestamps, output_json_path)

print(f"Results saved to {output_json_path}")


0: 384x640 (no detections), 70.0ms
Speed: 3.0ms preprocess, 70.0ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 74.0ms
Speed: 2.0ms preprocess, 74.0ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 56.0ms
Speed: 2.0ms preprocess, 56.0ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 63.0ms
Speed: 1.0ms preprocess, 63.0ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 62.0ms
Speed: 2.0ms preprocess, 62.0ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 32.0ms
Speed: 2.0ms preprocess, 32.0ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 33.0ms
Speed: 1.0ms preprocess, 33.0ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 36.0ms
Speed: 1.0ms preprocess, 36.0ms i

Results saved to test\logo_detection_results.json


### This is the final PipeLine for the video processing:

In [6]:
import os
import json
import av
import cv2
import numpy as np
from ultralytics import YOLO

def detect_logos_in_video(video_path, model_path, confidence=0.5, isTimestamp=True):
    if not os.path.exists(video_path):
        raise FileNotFoundError(f"Video file {video_path} not found")
    
    folder, file_name = os.path.split(video_path)
    base_name, _ = os.path.splitext(file_name)
    
    video_path_out = os.path.join(folder, f"{base_name}_out.mp4")
    output_json_path = os.path.join(folder, f"{base_name}_detections.json")
    
    model = YOLO(model_path)
    logo_timestamps = {
        "Pepsi_pts": [],
        "CocaCols_pts": []
    }
    
    index = {
        "PEPSI": "Pepsi_pts",
        "COCACOLA": "CocaCols_pts"
    }
    
    if isTimestamp:
        container = av.open(video_path)
        stream = container.streams.video[0]
        fps = int(stream.average_rate)
        decimal_places = len(str(int(fps)))
        out = None
        
        for frame in container.decode(video=0):
            img = frame.to_ndarray(format='bgr24')
            results = model(img)
            timestamp = float(frame.pts * stream.time_base)
            rounded_timestamp = round(timestamp, decimal_places)
            logos_detected = set()
            
            for result in results[0].boxes:
                score = result.conf.item()
                class_id = result.cls.item()
                if score > confidence:
                    logo_name = model.names[int(class_id)].upper()
                    if logo_name in ["PEPSI", "COCACOLA"]:
                        logos_detected.add(logo_name)
                        x1, y1, x2, y2 = map(int, result.xyxy[0])
                        cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 2)
                        cv2.putText(img, f"{logo_name} {score:.2f}", (x1, y1 - 10),
                                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
            
            if out is None:
                H, W, _ = img.shape
                out = cv2.VideoWriter(video_path_out, cv2.VideoWriter_fourcc(*'mp4v'), fps, (W, H))
            
            for logo in logos_detected:
                logo_timestamps[index[logo]].append(rounded_timestamp)
            
            out.write(img)
            cv2.imshow("Logo Detection", img)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
            
        container.close()
        if out is not None:
            out.release()
    
    else:
        cap = cv2.VideoCapture(video_path)
        fps = cap.get(cv2.CAP_PROP_FPS)
        decimal_places = len(str(int(fps)))
        frame_count = 0
        out = None
        
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                print("Video ended during processing")
                break
            
            if out is None:
                H, W, _ = frame.shape
                out = cv2.VideoWriter(video_path_out, cv2.VideoWriter_fourcc(*'mp4v'), int(fps), (W, H))
            
            frame_count += 1
            results = model(frame)
            logos_detected = set()
            
            for result in results[0].boxes:
                score = result.conf.item()
                class_id = result.cls.item()
                if score > confidence:
                    logo_name = model.names[int(class_id)].upper()
                    if logo_name in ["PEPSI", "COCACOLA"]:
                        logos_detected.add(logo_name)
                        x1, y1, x2, y2 = map(int, result.xyxy[0])
                        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                        cv2.putText(frame, f"{logo_name} {score:.2f}", (x1, y1 - 10),
                                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

                seconds = frame_count // fps
                frame_in_second = frame_count % fps
                timestamp = round(seconds + frame_in_second / (10 ** decimal_places), decimal_places)
                formatted_timestamp = f"{timestamp:.{decimal_places}f}"
            
            for logo in logos_detected:
                logo_timestamps[index[logo]].append(formatted_timestamp)
            
            out.write(frame)
            cv2.imshow("Logo Detection", frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
        
        cap.release()
        if out is not None:
            out.release()
    cv2.destroyAllWindows()
    
    with open(output_json_path, 'w') as f:
        json.dump(logo_timestamps, f, indent=4)
    
    print(f"Results saved to {output_json_path}")

video_path = "test\\both.mp4"
model_path = "trained_models\\200_best_small.pt"

detect_logos_in_video(video_path, model_path, isTimestamp=False)


0: 384x640 (no detections), 66.0ms
Speed: 2.0ms preprocess, 66.0ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 60.0ms
Speed: 3.0ms preprocess, 60.0ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 51.0ms
Speed: 2.0ms preprocess, 51.0ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 64.0ms
Speed: 1.0ms preprocess, 64.0ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 62.0ms
Speed: 1.0ms preprocess, 62.0ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 47.0ms
Speed: 1.0ms preprocess, 47.0ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 61.3ms
Speed: 2.0ms preprocess, 61.3ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 61.0ms
Speed: 1.0ms preprocess, 61.0ms i

Results saved to test\both1_detections.json


Based on isTimestamp flag it will use either `av` or frame counting to get the timestamps.