In [18]:
from pathlib import Path
from enum import Enum

class FileExtension(Enum):
    MOV = ".mov"
    MP4 = ".mp4"

class DataDirectory:

    def __init__(self, data_directory: str):
        self.data_directory = Path(data_directory)

    def get_video_folders(self) -> list[Path]:
        self.video_folders = [folder for folder in self.data_directory.iterdir() if folder.is_dir()]
        return self.video_folders
    
    def get_all_video_files(self, folder_name: str, file_extension: FileExtension = FileExtension.MOV) -> list[Path]:
        folder_path = self.data_directory / folder_name
        return [file for file in folder_path.iterdir() if file.is_file() and file.suffix == file_extension.value or file_extension.value.upper()]
    
    def get_video_file(self, folder_name: str, index: int=0, file_extension: FileExtension = FileExtension.MOV) -> Path:
        video_files = self.get_all_video_files(folder_name, file_extension)
        return video_files[index]


In [99]:
import ffmpeg
from ffmpeg import Error as FFmpegError
from geopy.geocoders import Nominatim
from pydantic import BaseModel
from typing import Annotated, Optional
from datetime import datetime
from pathlib import Path
import json


class VideoMetaData(BaseModel):
    duration: Annotated[Optional[int], "time in seconds"] = None
    created: Annotated[Optional[datetime], "created date"] = None
    modified: Annotated[Optional[datetime], "modified date"] = None
    location: Annotated[Optional[str], "location details"] = None
    framerate: Annotated[Optional[float], "frame rate"] = None


class VideoFile:

    def __init__(self, filepath: Path) -> None:
        self.filepath = filepath

    def get_metadata(self) -> VideoMetaData:
        try:
            probe = ffmpeg.probe(str(self.filepath))
        except FFmpegError as e:
            print(f"[ERROR] ffprobe failed on {self.filepath}")
            print(f"stderr: {e.stderr.decode() if e.stderr else 'No stderr available'}")
            return VideoMetaData()
        
        # Initialize metadata fields
        duration = None
        created = None
        modified = None
        location = None
        framerate = None

        # Get video stream information
        video_info = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None)
        if video_info:
            duration = int(float(video_info.get('duration', 0)))
            avg_frame_rate = video_info.get('avg_frame_rate', '0/1')
            if '/' in avg_frame_rate:
                num, denom = map(float, avg_frame_rate.split('/'))
                framerate = num / denom if denom != 0 else None

        # Get format info
        format_info = probe.get('format', {})
        tags = format_info.get('tags', {})
        if tags:
            # created / modified
            created = self.parse_datetime(tags.get('creation_time'))
            modified = self.parse_datetime(tags.get('modification_time'))

            # location
            iso_location = tags.get('com.apple.quicktime.location.ISO6709')
            if iso_location:
                loc_data = self.parse_location_iso6709(iso_location)
                location = self.get_place_name(loc_data['latitude'], loc_data['longitude'])
        
        return VideoMetaData(
            duration=duration,
            created=created,
            modified=modified,
            location=location,
            framerate=framerate
        )
    
    def write_metadata(self, output_dir: Path) -> None:
        parent_folder = self.filepath.parent.name
        folder_name = self.filepath.stem  # stem gives filename without extension
        metadata_save_dir = output_dir / parent_folder / folder_name
        metadata_save_dir.mkdir(parents=True, exist_ok=True)  # make sure directory exists
        metadata_save_filepath = metadata_save_dir / "metadata.json"
        metadata = self.get_metadata()
        with open(metadata_save_filepath, "w") as file:
            file.write(metadata.model_dump_json(indent=4)) 
        return None
    
    def write_frames(self, output_dir: Path):
        parent_folder = self.filepath.parent.name
        folder_name = self.filepath.stem
        output_dir = output_dir / parent_folder / folder_name / Path("frames")
        output_dir.mkdir(parents=True, exist_ok=True)
        (
            ffmpeg
            .input(str(self.filepath))
            .output(str(output_dir / 'frame_%04d.png'), r=1)
            .global_args('-loglevel', 'error')
            .run()
        )
        return None

    def parse_datetime(self, dt: Optional[str]) -> Optional[datetime]:
        if not dt:
            return None
        try:
            return datetime.fromisoformat(dt.replace('Z', '+00:00'))
        except Exception:
            return None

    def parse_location_iso6709(self, location_iso: str) -> dict:
        location_iso = location_iso.strip('/')
        lat = float(location_iso[0:8])
        lon = float(location_iso[8:17])
        alt = float(location_iso[17:])
        return {"latitude": lat, "longitude": lon, "altitude_m": alt}

    def get_place_name(self, latitude: float, longitude: float) -> str:
        geolocator = Nominatim(user_agent="geoapi", timeout=10)
        location = geolocator.reverse((latitude, longitude), exactly_one=True, language="en")
        return location.address if location else "Unknown Location"


In [100]:
DATA_DIRECTORY = "../sample_data"
folder_slno = 2

data_directory = DataDirectory(DATA_DIRECTORY)
folder_name = data_directory.get_video_folders()[folder_slno].name
video_files = data_directory.get_all_video_files(folder_name=folder_name)
len(video_files)


57

In [101]:
from tqdm.notebook import tqdm

for video_file_path in tqdm(video_files, total=len(video_files)):
    if video_file_path.suffix.lower() not in {'.mp4', '.mov'}:
        print(f"Skipping unsupported file: {video_file_path.name}")
        continue
    video_file = VideoFile(filepath=video_file_path)
    video_file.get_metadata()
    video_file.write_frames(output_dir=Path("./save_dir"))
    video_file.write_metadata(output_dir=Path("./save_dir"))

  0%|          | 0/57 [00:00<?, ?it/s]

Skipping unsupported file: .DS_Store


In [102]:
from PIL import Image
from pathlib import Path

def stitch_frames_horizontally(frames_dir: Path, output_path: Path, resize_height: int = None):
    # Collect all frame image files (e.g., .png or .jpg)
    frame_paths = sorted(frames_dir.glob("*.png"))  # or use "*.jpg" if needed

    if not frame_paths:
        print("No frames found in the directory.")
        return None

    # Open all images and optionally resize to a common height
    images = []
    for img_path in frame_paths:
        img = Image.open(img_path)
        if resize_height:
            w, h = img.size
            new_width = int((resize_height / h) * w)
            img = img.resize((new_width, resize_height))
        images.append(img)

    # Compute total width and max height
    total_width = sum(img.width for img in images)
    max_height = max(img.height for img in images)

    # Create a blank canvas
    stitched_image = Image.new("RGB", (total_width, max_height))

    # Paste images side-by-side
    x_offset = 0
    for img in images:
        stitched_image.paste(img, (x_offset, 0))
        x_offset += img.width
    
    # Save the final stitched image
    stitched_image.save(output_path)


In [104]:
all_video_frames = list(Path("save_dir/Thailand").iterdir())

for video_frame in tqdm(all_video_frames, total=len(all_video_frames)):

    try:
        frames_dir = list(video_frame.iterdir())[1] 
        out_dir = frames_dir.parent / "stiched.png"
        stitch_frames_horizontally(frames_dir, out_dir)
    except NotADirectoryError:
        print(f"{video_frame} not a valid dir")



  0%|          | 0/56 [00:00<?, ?it/s]

In [179]:
from groq import Groq
import base64

# Function to encode the image
def encode_image(image_path):
  with open(image_path, "rb") as image_file:
    return base64.b64encode(image_file.read()).decode('utf-8')

client = Groq()

def get_system_prompt(location:str)->str:
  return f"""
        SYSTEM:
        You are a **Video Description Specialist** with advanced multimodal understanding, capable of interpreting composite images and contextual metadata.
        All mention whether the user is present in the video or not.

        CAPABILITIES:

        * Expert in visual storytelling, identifying scene elements, actions, and emotions from stitched-frame panoramas.
        * Able to incorporate geolocation information to enrich descriptions with cultural, environmental, and historical context.

        TASK:
        Your objective is to generate a concise, vivid, and informative description of a video, based solely on:

        1. A single horizontally-stitched image representing sequential frames of the video.
        2. Location : {location}

        Ensure you parse both fields correctly.

        CONSTRAINTS & STYLE:

        * **Length:** 50–75 words.
        * **Tone:** Engaging and descriptive, suitable for social media captions or video platforms.
        * **Structure:**

        1. **Setting** (mention environment and time of day if inferable)
        2. **Action** (describe the main movement or event)
        3. **Mood/Context** (use location to add cultural or emotional nuance)
        * **Do not** reference the input format or mention that you are an AI.

        ERROR HANDLING:

        * If the `stitched_image` is missing or unreadable, respond:
        "Error: Invalid or missing image input."
        * If the `location` is provided but unrecognized, omit geographic details and proceed with a general description.

        OUTPUT:
        Return description as plain text
"""

def get_image_description(system_prompt: str, base64_image: bytes) -> str:
    chat_completion = client.chat.completions.create(
        messages=[
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": system_prompt},
                    {
                        "type": "image_url",
                        "image_url": {
                            "url": f"data:image/jpeg;base64,{base64_image}",
                        },
                    },
                ],
            }
        ],
        model="meta-llama/llama-4-scout-17b-16e-instruct",
    )

    return chat_completion.choices[0].message.content

In [180]:
from pathlib import Path
import json
from PIL import Image
import io

file_directory = Path("./save_dir/Thailand")

for image_dir in file_directory.iterdir():
    image_path = image_dir / "stiched.png"

    img = Image.open(image_path)
    img.thumbnail((2048, 2048), Image.LANCZOS)
    buffered = io.BytesIO()
    img.save(buffered, format="JPEG")  # or "PNG" if your image is PNG
    base64_image = base64.b64encode(buffered.getvalue()).decode("utf-8")

    with open(image_dir / "metadata.json", "r") as f:
        metadata = json.load(f)

    location = metadata['location']
    system_prompt = get_system_prompt(location)
    description = get_image_description(system_prompt, base64_image)
    description_file_path = image_dir / f"description.txt"
    with open(description_file_path, "w", encoding="utf-8") as f:
        f.write(description)
    print(f"Frame description written to {description_file_path}")


Frame description written to save_dir/Thailand/IMG_8287/description.txt
Frame description written to save_dir/Thailand/IMG_8041/description.txt
Frame description written to save_dir/Thailand/IMG_8414/description.txt
Frame description written to save_dir/Thailand/IMG_8289/description.txt
Frame description written to save_dir/Thailand/IMG_8286/description.txt
Frame description written to save_dir/Thailand/IMG_8556/description.txt
Frame description written to save_dir/Thailand/IMG_8307/description.txt
Frame description written to save_dir/Thailand/IMG_8593/description.txt
Frame description written to save_dir/Thailand/IMG_8309/description.txt
Frame description written to save_dir/Thailand/IMG_8336/description.txt
Frame description written to save_dir/Thailand/IMG_8365/description.txt
Frame description written to save_dir/Thailand/IMG_8308/description.txt
Frame description written to save_dir/Thailand/IMG_8337/description.txt
Frame description written to save_dir/Thailand/IMG_8561/descript

In [181]:
def stich_all_descriptions(header: str, dir: str | Path) -> str:
    """
    We have description.txt for each video in the corresponding folder.
    Need to get this files and group them together.

    Args:
    ---
        header : A brief intro on what these videos are about.
        dir : A folder contains the folders that has description.txt of each video.
    
    Returns:
    ---
        str : Full description stiched together into a single string.
    """

    if isinstance(dir, str):
        dir = Path(dir)
    
    if not dir.is_dir():
        raise NotADirectoryError(f"{dir} is not a folder.")
    
    description = f"{header}\nThe descriptions of all the videos are given below:\n"

    for i, video_folder in tqdm(enumerate(dir.iterdir()), total=len(list(dir.iterdir()))):

        if not dir.is_dir():
            print(f"{video_folder} is not a folder.")

        video_description_file = video_folder / "description.txt"

        with open(video_description_file, "r", encoding="utf-8") as f:
            video_description = f.read()
        
        video_name = video_folder.name

        description += f"\n{i+1}. {video_name}.MOV : {video_description}"
    
    return description
        

In [182]:
description = stich_all_descriptions(header="These are descriptions of small video snippets which I shot on my Thailand trip. Create a script fot my reel", dir="./save_dir/Thailand")

  0%|          | 0/56 [00:00<?, ?it/s]

In [183]:
description

'These are descriptions of small video snippets which I shot on my Thailand trip. Create a script fot my reel\nThe descriptions of all the videos are given below:\n\n1. IMG_8287.MOV : In a bustling night market in Soa Luangwat, Phuket Province, Thailand, a woman expertly grills seafood. Sizzling skewers of shrimp and other delicacies line the grill. The vibrant setting, illuminated by bright lights, showcases a lively atmosphere. The woman is present in the video. Mouth-watering dishes are prepared with skill and care. The cultural richness of the local cuisine is on full display.\n2. IMG_8041.MOV : "Inside a kitchen, a vertical spit rotates stacked meat. Sizzling shawarma fills the air with savory aromas. The setting appears modern with stainless steel and a brick wall backdrop. The chef is absent. Located at Smile Dental Center, Pattaya Tai Road, Pattaya City, Thailand, this scene highlights culinary prep work."\n3. IMG_8414.MOV : At Phuket Palace, a delicious breakfast unfolds. \nSe

In [None]:
# def get_system_prompt_for_script_creation() -> str:
#     return """
#         You are a professional video editor and storyteller.
#         Your task is to generate a compelling and structured 60-second Instagram Reel script using unordered B-roll video descriptions.

#         ### Instructions:
#             1. Read and understand the unordered list of video descriptions provided.
#             2. Craft a clear, engaging narrative suitable for a 60-second Instagram Reel.
#             3. Do not follow the input order. Instead, curate a logical and engaging story from the available content.
#             4. Select only the most relevant videos that help tell the story—not all videos need to be used.
#             7. Make sure you pick the videos having the user.
#             5. Each video clip is 3–5 seconds long, so choose at least 10 videos to fill the reel duration (~60 seconds).
#             6. Sequence the chosen videos in an order that matches your narrative.
#             7. Output the result in two sections:
#                 sequence: A list of the selected video filenames in narrative order.
#                 narrative: A short written script or description summarizing the story arc of the reel.
#     """

In [198]:
import librosa
from datetime import timedelta

def get_beat_timestamps(audio_path, duration=None):
    y, sr = librosa.load(audio_path)
    onset_env = librosa.onset.onset_strength(y=y, sr=sr)
    onset_frames = librosa.onset.onset_detect(onset_envelope=onset_env, sr=sr, units='frames', backtrack=True, hop_length=256)
    beat_times = librosa.frames_to_time(onset_frames, sr=sr)
    beat_timestamps = [timedelta(seconds=float(t)) for t in beat_times]
    return beat_timestamps

In [205]:
def get_system_prompt_for_script_creation(num_scenes: int) -> str:
    return f"""
        You are a professional video editor and storyteller.
        Your task is to generate a compelling and structured 60-second Instagram Reel script using unordered B-roll video descriptions.
        Make the script as detailed as possible. Include transitions in the script. The number of scenes require for the script are {num_scenes}

        ### Instructions:
            1. Read and understand the unordered list of video descriptions provided.
            2. Craft a clear, engaging narrative suitable for a 60-second Instagram Reel.
            3. Do not follow the input order. Instead, curate a logical and engaging story from the available content.
            4. Describe each scene in detail.
            5. There should be a logical order and continuity for one scene to another.
            6. Finally when all scenes are put together, there should be continuity for the whole script.
            7. The script and scenes should be engaging for the user.
            9. Do not include video name in the script.
            10. A scene can be a part of a video.
            11. Ensure there are {num_scenes} scenes in the script.
            12. Need to make sure you do  not compermise the number of scenes
            11. Output the result in two sections:
                scenes: A list of scenes to create the real.
    """

In [206]:
beats = get_beat_timestamps("./music/1.mp3")
len(beats)

55

In [208]:
import os
from groq import Groq
import instructor
from pydantic import BaseModel
from typing import Annotated

# Initialize with API key
client = Groq(api_key=os.getenv("GROQ_API_KEY"))

# Enable instructor patches for Groq client
client = instructor.from_groq(client)


# class Script(BaseModel):
#     script: Annotated[str, "The constructed narrative based on the unordered b-roll descriptions."]
#     sequence: Annotated[list[str], "Order of the video names based on the constructed narrative."]

class Script(BaseModel):
    scenes: Annotated[list[str], "Scenes in order."]


# Create structured output
script = client.chat.completions.create(
    model="llama-3.3-70b-versatile",
    messages=[
        {"role": "system", "content": get_system_prompt_for_script_creation(num_scenes=len(beats))},
        {"role": "user", "content": description}
    ],
    response_model=Script,
)

In [209]:
len(script.scenes)

43

In [187]:
import os
import random
from PIL import Image
from moviepy import VideoFileClip, AudioFileClip, concatenate_videoclips
from moviepy.video.VideoClip import ImageClip

from pathlib import Path
from typing import List, Optional

def create_video(video_clips: List[Path], audio_file: Optional[Path] = None, output_file: str = "thailand_trip.mp4"):
    fps = 30
    width, height = 1280, 720

    # Load and resize video clips
    clips = [VideoFileClip(str(clip)) for clip in video_clips]

    # Concatenate all clips
    final_video = concatenate_videoclips(clips, method="compose")

    # Add audio if provided
    if audio_file:
        audio = AudioFileClip(str(audio_file))
        final_video = final_video.set_audio(audio)

    # Write the final video
    final_video.write_videofile(output_file, fps=fps, codec="libx264", audio_codec="aac")


In [189]:
sequence = script.sequence
sequence = [Path("../sample_data/Thailand") / video for video in sequence]
len(sequence)

11

In [190]:
create_video(video_clips=sequence)

{'video_found': True, 'audio_found': True, 'metadata': {'major_brand': 'qt', 'minor_version': '0', 'compatible_brands': 'qt', 'creation_time': '2024-02-18T10:34:30.000000Z', 'com.apple.quicktime.location.accuracy.horizontal': '35.000000', 'com.apple.quicktime.cinematic-video': '', 'com.apple.quicktime.location.ISO6709': '+07.8809+098.2919+005.853/', 'com.apple.quicktime.make': 'Apple', 'com.apple.quicktime.model': 'iPhone 13', 'com.apple.quicktime.software': '17.3', 'com.apple.quicktime.creationdate': '2024-02-12T10:01:35+0700'}, 'inputs': [{'streams': [{'input_number': 0, 'stream_number': 0, 'stream_type': 'video', 'language': None, 'default': True, 'size': [1920, 1080], 'bitrate': 11422, 'fps': 30.0, 'codec_name': 'hevc', 'profile': '(Main)', 'metadata': {'Metadata': '', 'creation_time': '2024-02-18T10:34:30.000000Z', 'handler_name': 'Core Media Video', 'vendor_id': '[0][0][0][0]', 'encoder': 'HEVC', 'Side data': '', 'displaymatrix': 'rotation of -90.00 degrees'}}, {'input_number': 0

                                                                      

MoviePy - Done.
MoviePy - Writing video thailand_trip.mp4



                                                                          

MoviePy - Done !
MoviePy - video ready thailand_trip.mp4


In [191]:
import librosa
from datetime import timedelta

def get_beat_timestamps(audio_path, duration=None):
    y, sr = librosa.load(audio_path)
    onset_env = librosa.onset.onset_strength(y=y, sr=sr)
    onset_frames = librosa.onset.onset_detect(onset_envelope=onset_env, sr=sr, units='frames', backtrack=True, hop_length=256)
    beat_times = librosa.frames_to_time(onset_frames, sr=sr)
    beat_timestamps = [timedelta(seconds=float(t)) for t in beat_times]
    return beat_timestamps

In [196]:
beats = get_beat_timestamps("./music/1.mp3")
len(beats)

55