Note : Recommended to run this in colab notebook and download the generated dataset folders

In [None]:
!pip -q install imageio imageio-ffmpeg opencv-python-headless gtts pydub


Generate Train split

In [None]:
import shutil
from google.colab import files

import os, json, random, math, tempfile
import imageio
import numpy as np
import cv2
from gtts import gTTS
from pydub import AudioSegment
from IPython.display import HTML, Audio
from base64 import b64encode

In [None]:
ROOT_DIR = "/content/video_caption_dataset_train"
VIDEO_DIR = os.path.join(ROOT_DIR, "video")
AUDIO_DIR = os.path.join(ROOT_DIR, "audio")

os.makedirs(VIDEO_DIR, exist_ok=True)
os.makedirs(AUDIO_DIR, exist_ok=True)

DURATION = 12
FPS = 24
IMG_SIZE = 384
MARGIN = 80

NUM_SAMPLES = 8000
START_ID = 0


SHAPES = [
    "circle",
    "pentagon",
    "hexagon",
    "heptagon",
    "octagon",
    "nonagon",
    "decagon",
    "star",
    "ring"
]


COLOR_PALETTE = {
    "red":      np.array([255,   0,   0]),
    "green":    np.array([  0, 255,   0]),
    "blue":     np.array([  0,   0, 255]),
    "yellow":   np.array([255, 255,   0]),
    "cyan":     np.array([  0, 255, 255]),
    "magenta":  np.array([255,   0, 255])
}


def regular_polygon(n, size):
    return np.array([[size*math.cos(2*math.pi*i/n),
                      size*math.sin(2*math.pi*i/n)]
                     for i in range(n)], np.float32)

def draw_shape(img, shape, center, size, angle, color):
    x, y = center
    color = tuple(int(c) for c in color)

    if shape == "circle":
        cv2.circle(img, (x,y), size, color, -1)
        return

    if shape == "ring":
        cv2.circle(img, (x,y), size, color, 3)
        return

    if shape == "star":
        pts=[]
        for i in range(10):
            r = size if i % 2 == 0 else size * 0.45
            t = i * math.pi / 5
            pts.append([r*math.cos(t), r*math.sin(t)])
        pts = np.array(pts, np.float32)

    elif shape in ["pentagon","hexagon","heptagon","octagon","nonagon","decagon"]:
        pts = regular_polygon(
            {"pentagon":5,"hexagon":6,"heptagon":7,"octagon":8,
             "nonagon":9,"decagon":10}[shape],
            size
        )
    else:
        raise ValueError(f"Unhandled shape: {shape}")

    M = cv2.getRotationMatrix2D((0,0), angle, 1.0)
    pts = (pts @ M[:,:2].T) + M[:,2] + np.array([x,y])
    cv2.fillPoly(img, [pts.astype(np.int32)], color)


def generate_audio(video_id, text):
    tts = gTTS(text=text, lang="en")

    with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp:
        tmp_path = tmp.name
        tts.save(tmp_path)

    audio = AudioSegment.from_mp3(tmp_path)
    target_ms = DURATION * 1000

    looped = audio
    while len(looped) < target_ms:
        looped += audio
    looped = looped[:target_ms]

    final_path = os.path.join(AUDIO_DIR, f"{video_id}.mp3")
    looped.export(final_path, format="mp3")

    os.remove(tmp_path)
    return final_path


def generate_sample(video_id):
    num_frames = DURATION * FPS
    num_shapes = random.randint(2,3)
    chosen = random.sample(SHAPES, num_shapes)

    states=[]
    for sh in chosen:
        color_name = random.choice(list(COLOR_PALETTE.keys()))
        states.append({
            "shape": sh,
            "color": COLOR_PALETTE[color_name],
            "color_name": color_name,
            "pos": np.random.randint(MARGIN, IMG_SIZE-MARGIN, size=2).astype(float),
            "vel": np.random.uniform(-2,2,size=2),
            "angle": random.uniform(0,360),
            "rot": random.uniform(-3,3),
            "size": random.randint(14,28)
        })

    assert all(s["shape"] in SHAPES for s in states)
    assert all(s["color_name"] in COLOR_PALETTE for s in states)

    objects = [f"{s['color_name']} {s['shape']}" for s in states]
    caption = f"This video contains {len(objects)} shapes: {', '.join(objects)}."

    cap_entry = {
        "video_id": video_id,
        "desc": caption,
        "subtitle": ""
    }

    vpath = os.path.join(VIDEO_DIR, f"{video_id}.mp4")
    writer = imageio.get_writer(
        vpath, fps=FPS, codec="libx264", pixelformat="yuv420p"
    )

    for _ in range(num_frames):
        frame = np.zeros((IMG_SIZE, IMG_SIZE, 3), dtype=np.uint8)

        for s in states:
            s["pos"] += s["vel"]
            s["angle"] += s["rot"]

            for d in [0,1]:
                if s["pos"][d] < MARGIN or s["pos"][d] > IMG_SIZE-MARGIN:
                    s["vel"][d] *= -1
                    s["pos"][d] = np.clip(s["pos"][d], MARGIN, IMG_SIZE-MARGIN)

            draw_shape(
                frame,
                s["shape"],
                (int(s["pos"][0]), int(s["pos"][1])),
                s["size"],
                s["angle"],
                s["color"]
            )

        writer.append_data(frame)

    writer.close()

    apath = generate_audio(video_id, caption)

    return vpath, apath, cap_entry


captions=[]
video_paths=[]
audio_paths=[]

for i in range(NUM_SAMPLES):
    vid_num = START_ID + i
    vid = f"video_{vid_num:04d}"

    vp, ap, cap = generate_sample(vid)
    video_paths.append(vp)
    audio_paths.append(ap)
    captions.append(cap)

    print(f"{vid} Done")

with open(os.path.join(ROOT_DIR, "captions.json"), "w") as f:
    json.dump(captions, f, indent=2)


def show_video(path):
    data = open(path, "rb").read()
    url = "data:video/mp4;base64," + b64encode(data).decode()
    return HTML(f'<video width=420 controls><source src="{url}"></video>')

display(show_video(video_paths[0]))
display(Audio(audio_paths[0]))


In [None]:
!zip -r video_caption_dataset_train.zip /content/video_caption_dataset_train

In [None]:
archive_name = shutil.make_archive(ROOT_DIR, 'zip', ROOT_DIR)
files.download(archive_name)

Generate validation split

In [None]:
ROOT_DIR = "/content/video_caption_dataset_val"
VIDEO_DIR = os.path.join(ROOT_DIR, "video")
AUDIO_DIR = os.path.join(ROOT_DIR, "audio")

os.makedirs(VIDEO_DIR, exist_ok=True)
os.makedirs(AUDIO_DIR, exist_ok=True)

DURATION = 12
FPS = 24
IMG_SIZE = 384
MARGIN = 80

NUM_SAMPLES = 1000
START_ID = 8000


SHAPES = [
    "circle",
    "pentagon",
    "hexagon",
    "heptagon",
    "octagon",
    "nonagon",
    "decagon",
    "star",
    "ring"
]


COLOR_PALETTE = {
    "red":      np.array([255,   0,   0]),
    "green":    np.array([  0, 255,   0]),
    "blue":     np.array([  0,   0, 255]),
    "yellow":   np.array([255, 255,   0]),
    "cyan":     np.array([  0, 255, 255]),
    "magenta":  np.array([255,   0, 255])
}


def regular_polygon(n, size):
    return np.array([[size*math.cos(2*math.pi*i/n),
                      size*math.sin(2*math.pi*i/n)]
                     for i in range(n)], np.float32)

def draw_shape(img, shape, center, size, angle, color):
    x, y = center
    color = tuple(int(c) for c in color)

    if shape == "circle":
        cv2.circle(img, (x,y), size, color, -1)
        return

    if shape == "ring":
        cv2.circle(img, (x,y), size, color, 3)
        return

    if shape == "star":
        pts=[]
        for i in range(10):
            r = size if i % 2 == 0 else size * 0.45
            t = i * math.pi / 5
            pts.append([r*math.cos(t), r*math.sin(t)])
        pts = np.array(pts, np.float32)

    elif shape in ["pentagon","hexagon","heptagon","octagon","nonagon","decagon"]:
        pts = regular_polygon(
            {"pentagon":5,"hexagon":6,"heptagon":7,"octagon":8,
             "nonagon":9,"decagon":10}[shape],
            size
        )
    else:
        raise ValueError(f"Unhandled shape: {shape}")

    M = cv2.getRotationMatrix2D((0,0), angle, 1.0)
    pts = (pts @ M[:,:2].T) + M[:,2] + np.array([x,y])
    cv2.fillPoly(img, [pts.astype(np.int32)], color)


def generate_audio(video_id, text):
    tts = gTTS(text=text, lang="en")

    with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp:
        tmp_path = tmp.name
        tts.save(tmp_path)

    audio = AudioSegment.from_mp3(tmp_path)
    target_ms = DURATION * 1000

    looped = audio
    while len(looped) < target_ms:
        looped += audio
    looped = looped[:target_ms]

    final_path = os.path.join(AUDIO_DIR, f"{video_id}.mp3")
    looped.export(final_path, format="mp3")

    os.remove(tmp_path)
    return final_path


def generate_sample(video_id):
    num_frames = DURATION * FPS
    num_shapes = random.randint(2,3)
    chosen = random.sample(SHAPES, num_shapes)

    states=[]
    for sh in chosen:
        color_name = random.choice(list(COLOR_PALETTE.keys()))
        states.append({
            "shape": sh,
            "color": COLOR_PALETTE[color_name],
            "color_name": color_name,
            "pos": np.random.randint(MARGIN, IMG_SIZE-MARGIN, size=2).astype(float),
            "vel": np.random.uniform(-2,2,size=2),
            "angle": random.uniform(0,360),
            "rot": random.uniform(-3,3),
            "size": random.randint(14,28)
        })

    assert all(s["shape"] in SHAPES for s in states)
    assert all(s["color_name"] in COLOR_PALETTE for s in states)

    objects = [f"{s['color_name']} {s['shape']}" for s in states]
    caption = f"This video contains {len(objects)} shapes: {', '.join(objects)}."

    cap_entry = {
        "video_id": video_id,
        "desc": caption,
        "subtitle": ""
    }

    vpath = os.path.join(VIDEO_DIR, f"{video_id}.mp4")
    writer = imageio.get_writer(
        vpath, fps=FPS, codec="libx264", pixelformat="yuv420p"
    )

    for _ in range(num_frames):
        frame = np.zeros((IMG_SIZE, IMG_SIZE, 3), dtype=np.uint8)

        for s in states:
            s["pos"] += s["vel"]
            s["angle"] += s["rot"]

            for d in [0,1]:
                if s["pos"][d] < MARGIN or s["pos"][d] > IMG_SIZE-MARGIN:
                    s["vel"][d] *= -1
                    s["pos"][d] = np.clip(s["pos"][d], MARGIN, IMG_SIZE-MARGIN)

            draw_shape(
                frame,
                s["shape"],
                (int(s["pos"][0]), int(s["pos"][1])),
                s["size"],
                s["angle"],
                s["color"]
            )

        writer.append_data(frame)

    writer.close()

    apath = generate_audio(video_id, caption)

    return vpath, apath, cap_entry


captions=[]
video_paths=[]
audio_paths=[]

for i in range(NUM_SAMPLES):
    vid_num = START_ID + i
    vid = f"video_{vid_num:04d}"

    vp, ap, cap = generate_sample(vid)
    video_paths.append(vp)
    audio_paths.append(ap)
    captions.append(cap)

    print(f"{vid} Done")

with open(os.path.join(ROOT_DIR, "captions.json"), "w") as f:
    json.dump(captions, f, indent=2)


def show_video(path):
    data = open(path, "rb").read()
    url = "data:video/mp4;base64," + b64encode(data).decode()
    return HTML(f'<video width=420 controls><source src="{url}"></video>')

display(show_video(video_paths[0]))
display(Audio(audio_paths[0]))


In [None]:
!zip -r video_caption_dataset_val.zip /content/video_caption_dataset_val

In [None]:
archive_name = shutil.make_archive(ROOT_DIR, 'zip', ROOT_DIR)
files.download(archive_name)