In [1]:
import os, json, uuid, time, argparse, subprocess
from datetime import datetime, timezone


In [2]:
import boto3
from botocore.client import Config
from kafka import KafkaProducer
from dotenv import load_dotenv


In [4]:
from dotenv import load_dotenv
load_dotenv()

True

In [5]:
MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "http://localhost:9000")
MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY", "minio")
MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY", "minio12345")
MINIO_BUCKET = os.getenv("MINIO_BUCKET", "chunks")

print("MinIO Config:")
print(f"Endpoint: {MINIO_ENDPOINT}")
print(f"Access Key: {MINIO_ACCESS_KEY}")
print(f"Secret Key: {MINIO_SECRET_KEY}")
print(f"Bucket: {MINIO_BUCKET}")

MinIO Config:
Endpoint: http://localhost:9000
Access Key: minio
Secret Key: minio12345
Bucket: chunks


In [6]:
KAFKA_BOOTSTRAP = os.getenv("KAFKA_BOOTSTRAP", "localhost:29092")
KAFKA_TOPIC_VIDEO_CHUNKS = os.getenv("KAFKA_TOPIC_VIDEO_CHUNKS", "video-chunks")

print("Kafka Config:")
print(f"Bootstrap Servers: {KAFKA_BOOTSTRAP}")
print(f"Topic: {KAFKA_TOPIC_VIDEO_CHUNKS}")

Kafka Config:
Bootstrap Servers: localhost:29092
Topic: video-chunks


In [7]:
#Create MinIO client
def s3_client():
    return boto3.client(
        "s3",
        endpoint_url=MINIO_ENDPOINT,
        aws_access_key_id=MINIO_ACCESS_KEY,
        aws_secret_access_key=MINIO_SECRET_KEY,
        config=Config(signature_version="s3v4"),
        region_name="us-east-1",
    )

In [8]:
# Get video duration in seconds using ffprobe
def ffprobe_duration_seconds(video_path: str) -> float:
    cmd = [
        "ffprobe", "-v", "error",
        "-show_entries", "format=duration",
        "-of", "default=noprint_wrappers=1:nokey=1",
        video_path
    ]
    out = subprocess.check_output(cmd).decode().strip()
    return float(out)

In [34]:
# Split video into chunks using ffmpeg
def split_video(input_path: str, out_dir: str, chunk_seconds: int) -> list[str]:
    os.makedirs(out_dir, exist_ok=True)
    # Creates out_dir/chunk_000.mp4, chunk_001.mp4, ...
    out_pattern = os.path.join(out_dir, "chunk_%03d.mp4")
    cmd = [
            "ffmpeg", "-y",
            "-i", input_path,
            "-c:v", "libx264", "-preset", "veryfast", "-crf", "23",
            "-force_key_frames", f"expr:gte(t,n_forced*{chunk_seconds})",
            "-c:a", "aac", "-b:a", "128k",
            "-f", "segment",
            "-segment_time", str(chunk_seconds),
            "-reset_timestamps", "1",
            out_pattern
        ]
    subprocess.check_call(cmd)
    return sorted(
        os.path.join(out_dir, f) for f in os.listdir(out_dir)
        if f.endswith(".mp4") and f.startswith("chunk_")
    )


In [10]:
# Upload file to MinIO
def upload_file_minio(local_path: str, object_key: str) -> str:
    s3 = s3_client()
    s3.upload_file(local_path, MINIO_BUCKET, object_key)
    return f"s3://{MINIO_BUCKET}/{object_key}"

In [11]:
# Create Kafka producer
def kafka_producer() -> KafkaProducer:
    return KafkaProducer(
        bootstrap_servers=KAFKA_BOOTSTRAP,
        value_serializer=lambda v: json.dumps(v).encode("utf-8"),
        key_serializer=lambda k: k.encode("utf-8"),
        acks="all",
        retries=5,
    )

In [16]:
source_video_id = str(uuid.uuid4())
source_video_id

'd8974baf-6e18-40fd-9726-f73457383f9d'

In [26]:
video_path="/home/qnt/ds/data/test.mp4"

In [35]:
duration = ffprobe_duration_seconds(video_path=video_path)
print(duration)

chunks = split_video(video_path, 
                     "/home/qnt/ds/video_splitter/out", 120)

2048.09288


ffmpeg version 6.1.1-3ubuntu5 Copyright (c) 2000-2023 the FFmpeg developers
  built with gcc 13 (Ubuntu 13.2.0-23ubuntu3)
  configuration: --prefix=/usr --extra-version=3ubuntu5 --toolchain=hardened --libdir=/usr/lib/x86_64-linux-gnu --incdir=/usr/include/x86_64-linux-gnu --arch=amd64 --enable-gpl --disable-stripping --disable-omx --enable-gnutls --enable-libaom --enable-libass --enable-libbs2b --enable-libcaca --enable-libcdio --enable-libcodec2 --enable-libdav1d --enable-libflite --enable-libfontconfig --enable-libfreetype --enable-libfribidi --enable-libglslang --enable-libgme --enable-libgsm --enable-libharfbuzz --enable-libmp3lame --enable-libmysofa --enable-libopenjpeg --enable-libopenmpt --enable-libopus --enable-librubberband --enable-libshine --enable-libsnappy --enable-libsoxr --enable-libspeex --enable-libtheora --enable-libtwolame --enable-libvidstab --enable-libvorbis --enable-libvpx --enable-libwebp --enable-libx265 --enable-libxml2 --enable-libxvid --enable-libzimg --ena

In [36]:
print(chunks)

['/home/qnt/ds/video_splitter/out/chunk_000.mp4', '/home/qnt/ds/video_splitter/out/chunk_001.mp4', '/home/qnt/ds/video_splitter/out/chunk_002.mp4', '/home/qnt/ds/video_splitter/out/chunk_003.mp4', '/home/qnt/ds/video_splitter/out/chunk_004.mp4', '/home/qnt/ds/video_splitter/out/chunk_005.mp4', '/home/qnt/ds/video_splitter/out/chunk_006.mp4', '/home/qnt/ds/video_splitter/out/chunk_007.mp4', '/home/qnt/ds/video_splitter/out/chunk_008.mp4', '/home/qnt/ds/video_splitter/out/chunk_009.mp4', '/home/qnt/ds/video_splitter/out/chunk_010.mp4', '/home/qnt/ds/video_splitter/out/chunk_011.mp4', '/home/qnt/ds/video_splitter/out/chunk_012.mp4', '/home/qnt/ds/video_splitter/out/chunk_013.mp4', '/home/qnt/ds/video_splitter/out/chunk_014.mp4', '/home/qnt/ds/video_splitter/out/chunk_015.mp4', '/home/qnt/ds/video_splitter/out/chunk_016.mp4', '/home/qnt/ds/video_splitter/out/chunk_017.mp4']


In [33]:
prod = kafka_producer()

In [37]:
for idx, chunk_path in enumerate(chunks):
        chunk_id = str(uuid.uuid4())
        object_key = f"{source_video_id}/chunk_{idx:03d}.mp4"
        uri = upload_file_minio(chunk_path, object_key)
        start_sec = idx * 120
        end_sec = min((idx + 1) * 120, duration)

        msg = {
            "event_type": "video_chunk_ready",
            "chunk_id": chunk_id,
            "source_video_id": source_video_id,
            "chunk_index": idx,
            "chunk_start_sec": start_sec,
            "chunk_end_sec": end_sec,
            "chunk_duration_sec": end_sec - start_sec,
            "uri": uri,
            "created_utc": datetime.now(timezone.utc).isoformat(),
        }

        prod.send(KAFKA_TOPIC_VIDEO_CHUNKS, key=chunk_id, value=msg)
        print(f"Published: {chunk_id} -> {uri}")
        time.sleep(0.05)

Published: be0e16fc-b071-4e56-9a42-f13f26d9ef4d -> s3://chunks/d8974baf-6e18-40fd-9726-f73457383f9d/chunk_000.mp4
Published: 9d196391-a962-47a5-adfa-041368efd3a5 -> s3://chunks/d8974baf-6e18-40fd-9726-f73457383f9d/chunk_001.mp4
Published: 06c90195-f0cf-4ca8-a16d-b0e5b0a9267f -> s3://chunks/d8974baf-6e18-40fd-9726-f73457383f9d/chunk_002.mp4
Published: 3f1fd9ce-d950-4442-b706-44b140438abb -> s3://chunks/d8974baf-6e18-40fd-9726-f73457383f9d/chunk_003.mp4
Published: 2f8b4c53-411e-403e-8bea-a9fd410c25a7 -> s3://chunks/d8974baf-6e18-40fd-9726-f73457383f9d/chunk_004.mp4
Published: 5812238c-1e19-40eb-ab61-51318be0cb49 -> s3://chunks/d8974baf-6e18-40fd-9726-f73457383f9d/chunk_005.mp4
Published: 2b17d814-525b-4e48-a843-6171d412f446 -> s3://chunks/d8974baf-6e18-40fd-9726-f73457383f9d/chunk_006.mp4
Published: a0307b16-ce42-4dc2-8172-9b612b08a97f -> s3://chunks/d8974baf-6e18-40fd-9726-f73457383f9d/chunk_007.mp4
Published: c340e022-461a-4615-af68-cd90057ff407 -> s3://chunks/d8974baf-6e18-40fd-9726-f

In [38]:
prod.flush()