In [1]:
import cv2
import yt_dlp

from ultralytics import solutions
from ultralytics import YOLO

video_url = "https://www.youtube.com/watch?v=0Pg3S6s76IE"  # Youtube URL northbound

model = YOLO("yolo11n.pt")

In [2]:
def get_stream_url(youtube_url):
    ydl_opts = {
        'quiet': True,
        'skip_download': True,
        'no_warnings': True,
        'force_generic_extractor': False,
        'format': 'best[ext=mp4][protocol^=http]/best'
    }
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        info = ydl.extract_info(youtube_url, download=False)
        formats = info.get('formats', [info])
        # prefer video-only best mp4 OR best overall
        for f in reversed(formats):
            if f.get('vcodec', '') != 'none' and f.get('acodec', '') == 'none' and 'url' in f:
                return f['url']
        # fallback: best available format
        return formats[-1]['url']

In [3]:
stream_url = get_stream_url(video_url)
print("Stream URL:", stream_url)

cap = cv2.VideoCapture(stream_url)
assert cap.isOpened(), "Error reading video file"

Stream URL: https://manifest.googlevideo.com/api/manifest/hls_playlist/expire/1750304866/ei/ATRTaP6RPIT4u_kPkYzfGA/ip/152.122.255.5/id/0Pg3S6s76IE.2/itag/270/source/yt_live_broadcast/requiressl/yes/ratebypass/yes/live/1/sgovp/gir%3Dyes%3Bitag%3D137/rqh/1/hls_chunk_host/rr1---sn-vgqsknsk.googlevideo.com/xpc/EgVo2aDSNQ%3D%3D/playlist_duration/30/manifest_duration/30/bui/AY1jyLNnRxDEd5f3daPZ_GSYXJqwwIa9nn_TnrzACZT5Fx91yGAOPRMCpDQw2N73U7Vf-oUovepu9RZD/spc/l3OVKWU1T5MiKKbgyqc5H14rEbUdwn01hByq2u9LuucdPxq7vHw_8_3u8f7Ztw/vprv/1/playlist_type/DVR/initcwndbps/3863750/met/1750283266,/mh/sg/mm/44/mn/sn-vgqsknsk/ms/lva/mv/m/mvi/1/pl/16/rms/lva,lva/dover/13/pacing/0/short_key/1/keepalive/yes/fexp/51466697/mt/1750282844/sparams/expire,ei,ip,id,itag,source,requiressl,ratebypass,live,sgovp,rqh,xpc,playlist_duration,manifest_duration,bui,spc,vprv,playlist_type/sig/AJfQdSswRgIhAPUtzsQj5OHqI7Qb2SMvp3oe8kGKgXLtLdTL7vHe2hjIAiEA0BWnuRp07xH7P-qv55-MnH5A1txu1FlZNe7yKJsVFTY%3D/lsparams/hls_chunk_host,initcwndbp

In [4]:
# Video writer
w, h, fps = (int(cap.get(x)) for x in (cv2.CAP_PROP_FRAME_WIDTH, cv2.CAP_PROP_FRAME_HEIGHT, cv2.CAP_PROP_FPS))
video_writer = cv2.VideoWriter("speed_management.avi", cv2.VideoWriter_fourcc(*"mp4v"), fps, (w, h))

# the line after which we want to calculate speed
line_y_cross = [(550, 500), (1250,500)]  # Red line position

# Pass region as dictionary
region_points = [(550, 50), (1250, 50), (550, 950), (1250, 950)]

# Initialize speed estimation object
speedestimator = solutions.SpeedEstimator(
    show=True,  # display the output
    region = region_points,
    model="yolo11n.pt",  # path to the YOLO11 model file.
    fps=fps,  # adjust speed based on frame per second
    # max_speed=120,  # cap speed to a max value (km/h) to avoid outliers
    # max_hist=5,  # minimum frames object tracked before computing speed
    # meter_per_pixel=0.05,  # highly depends on the camera configuration
    classes=[2, 3, 5, 6, 7],  # estimate speed of specific classes.
    line_width=1  # adjust the line width for bounding boxes
)


Ultralytics Solutions:  {'source': None, 'model': 'yolo11n.pt', 'classes': [2, 3, 5, 6, 7], 'show_conf': True, 'show_labels': True, 'region': [(550, 50), (1250, 50), (550, 950), (1250, 950)], 'colormap': 21, 'show_in': True, 'show_out': True, 'up_angle': 145.0, 'down_angle': 90, 'kpts': [6, 8, 10], 'analytics_type': 'line', 'figsize': (12.8, 7.2), 'blur_ratio': 0.5, 'vision_point': (20, 20), 'crop_dir': 'cropped-detections', 'json_file': None, 'line_width': 1, 'records': 5, 'fps': 30, 'max_hist': 5, 'meter_per_pixel': 0.05, 'max_speed': 120, 'show': True, 'iou': 0.7, 'conf': 0.25, 'device': None, 'max_det': 300, 'half': False, 'tracker': 'botsort.yaml', 'verbose': True, 'data': 'images'}


In [5]:

# Process video
while cap.isOpened():
    success, im0 = cap.read()

    if not success:
        print("Video frame is empty or processing is complete.")
        break

    tracks = model.track(im0, persist=True,show=False)
    results = speedestimator(im0)

    # print(results)  # access the output

    video_writer.write(results.plot_im)  # write the processed frame.

    # Exit loop if 'q' key is pressed
    if 0xFF == ord('q'): 
        # Release resources on way out of while loop
        print("manually quitting")
        break

cap.release()
video_writer.release()
cv2.destroyAllWindows()  # destroy all opened windows


0: 384x640 11 cars, 1 truck, 100.9ms
Speed: 3.0ms preprocess, 100.9ms inference, 1.7ms postprocess per image at shape (1, 3, 384, 640)
0: 1080x1920 33.9ms
Speed: 165.3ms track, 33.9ms solution per image at shape (1, 3, 1080, 1920)


0: 384x640 11 cars, 1 truck, 86.5ms
Speed: 2.8ms preprocess, 86.5ms inference, 1.3ms postprocess per image at shape (1, 3, 384, 640)
1: 1080x1920 8.4ms
Speed: 124.8ms track, 8.4ms solution per image at shape (1, 3, 1080, 1920)


0: 384x640 11 cars, 1 truck, 89.2ms
Speed: 2.3ms preprocess, 89.2ms inference, 1.3ms postprocess per image at shape (1, 3, 384, 640)
2: 1080x1920 5.9ms
Speed: 124.7ms track, 5.9ms solution per image at shape (1, 3, 1080, 1920)


0: 384x640 11 cars, 1 truck, 91.3ms
Speed: 2.1ms preprocess, 91.3ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)
3: 1080x1920 4.4ms
Speed: 131.7ms track, 4.4ms solution per image at shape (1, 3, 1080, 1920)


0: 384x640 11 cars, 1 truck, 93.2ms
Speed: 2.1ms preprocess, 93.2ms inference, 

KeyboardInterrupt: 