In [2]:
import assemblyai as aai
from config.env import AssemblyAI

# The API key will be loaded from your .env file through the config
api_key = AssemblyAI().API_KEY

# Initialize the client with your API key
aai.settings.api_key = api_key

# Example: Transcribe an audio file
transcriber = aai.Transcriber()
transcript = transcriber.transcribe(
    "/Users/andrew/Movies/Documentation/2025-03-11 20-19-08.mov"
)

ffmpeg -ss 64.928 -i "/Users/andrew/Movies/Documentation/2025-03-05 12-19-17.mov" -to 4.892 -c:v libx264 -c:a aac "/Users/andrew/Movies/Documentation/clip_vibe_segment_fixed.mov"

In [12]:
from langchain_anthropic import ChatAnthropic
from config.env import LLM_API_KEYS

model = ChatAnthropic(
    model="claude-3-7-sonnet-latest",
    anthropic_api_key=LLM_API_KEYS.ANTHROPIC_API_KEY,
    temperature=0.7,
)

from pydantic import BaseModel, Field


class Classifier(BaseModel):
    relevant: bool = Field(
        default=False,
        description="Is this informative conent. You want to return false for information that doesnt explain anything. Look to cut stutters or anything that lacks cohesiveness. This is audio of a video tutorial. Make sure you keep the into paragraph.",
    )
    reasoning: str = Field(
        default="", description="Your reasoning of why this text should be cut"
    )

prompts = [
    f"You are an educator and video tutorial expert. You are given a paragraph of text and you need to determine if it makes the cut. {p.text}"
    for p in transcript.get_paragraphs()
]

results = await model.with_structured_output(Classifier).abatch(prompts)

2025-03-05 15:17:13,080 - INFO - HTTP Request: GET https://api.assemblyai.com/v2/transcript/742a5fa5-1d9d-4bb5-915c-0db603ea080b/paragraphs "HTTP/1.1 200 OK"


2025-03-05 15:17:22,914 - INFO - HTTP Request: POST https://api.anthropic.com/v1/messages "HTTP/1.1 429 Too Many Requests"
2025-03-05 15:17:22,915 - INFO - Retrying request to /v1/messages in 3.000000 seconds
2025-03-05 15:17:22,935 - INFO - HTTP Request: POST https://api.anthropic.com/v1/messages "HTTP/1.1 429 Too Many Requests"
2025-03-05 15:17:22,936 - INFO - Retrying request to /v1/messages in 3.000000 seconds
2025-03-05 15:17:22,986 - INFO - HTTP Request: POST https://api.anthropic.com/v1/messages "HTTP/1.1 429 Too Many Requests"
2025-03-05 15:17:22,987 - INFO - Retrying request to /v1/messages in 3.000000 seconds
2025-03-05 15:17:22,997 - INFO - HTTP Request: POST https://api.anthropic.com/v1/messages "HTTP/1.1 429 Too Many Requests"
2025-03-05 15:17:22,999 - INFO - Retrying request to /v1/messages in 3.000000 seconds
2025-03-05 15:17:23,087 - INFO - HTTP Request: POST https://api.anthropic.com/v1/messages "HTTP/1.1 429 Too Many Requests"
2025-03-05 15:17:23,089 - INFO - Retrying

In [70]:

filters[0] = True

2025-03-05 15:18:04,748 - INFO - HTTP Request: GET https://api.assemblyai.com/v2/transcript/742a5fa5-1d9d-4bb5-915c-0db603ea080b/paragraphs "HTTP/1.1 200 OK"


In [72]:
filtered_list

[Paragraph(text="Okay, so I am going to show you guys how I continue to use cursor and development. And so I have this vision, system architecture and technical requirements and user stories, this documentation that I've built up for this project. And I really want to push. I've seen this idea around vibe coding floating through, and I think that's stupid and ridiculous. You should not do that.", start=1040, end=32670, confidence=0.83302, speaker=None, channel=None, words=[Word(text='Okay,', start=1040, end=1664, confidence=0.83302, speaker=None, channel=None), Word(text='so', start=1752, end=2184, confidence=0.99473, speaker=None, channel=None), Word(text='I', start=2272, end=2472, confidence=0.99956, speaker=None, channel=None), Word(text='am', start=2496, end=2680, confidence=0.53552, speaker=None, channel=None), Word(text='going', start=2720, end=2872, confidence=0.86627, speaker=None, channel=None), Word(text='to', start=2896, end=2984, confidence=0.99922, speaker=None, channel=No

In [51]:
def create_ffmpeg_command(paragraph, input_file, output_dir=None, output_filename=None):
    """
    Create an ffmpeg command to extract a segment from a video based on paragraph timestamps.

    Args:
        paragraph: An object with text, start, and end attributes (timestamps in milliseconds)
        input_file: Path to the input video file
        output_dir: Directory for the output file (defaults to same as input)
        output_filename: Custom filename for the output (defaults to auto-generated from text)

    Returns:
        str: The complete ffmpeg command
    """
    from pathlib import Path
    import re

    # Convert milliseconds to seconds
    start_sec = paragraph.start / 1000
    duration_sec = (paragraph.end - paragraph.start) / 1000

    # Create a filename from the text if not provided
    if output_filename is None:
        # Take first 5 words of text and clean it for a filename
        words = paragraph.text.split()[:5]
        clean_text = "_".join(words)
        # Remove special characters
        clean_text = re.sub(r"[^\w\s]", "", clean_text)
        clean_text = clean_text.replace(" ", "_").lower()
        output_filename = f"clip_{clean_text}.mov"

    # Set output directory
    if output_dir is None:
        output_dir = str(Path(input_file).parent)

    # Construct full output path
    output_path = str(Path(output_dir) / output_filename)

    # Create the ffmpeg command
    command = (
        f'ffmpeg -ss {start_sec:.3f} -i "{input_file}" -to {duration_sec:.3f} '
        f'-c:v libx264 -c:a aac "{output_path}"'
    )

    return command

In [52]:
def create_commands_for_paragraphs(paragraphs, input_file, output_dir=None):
    """
    Create ffmpeg commands for a list of paragraphs.

    Args:
        paragraphs: List of paragraph objects with text, start, and end attributes
        input_file: Path to the input video file
        output_dir: Directory for output files

    Returns:
        list: List of ffmpeg commands
    """
    commands = []
    for i, paragraph in enumerate(paragraphs):
        # Create a numbered filename
        output_filename = f"clip_{i+1:03d}.mov"
        command = create_ffmpeg_command(
            paragraph,
            input_file,
            output_dir=output_dir,
            output_filename=output_filename,
        )
        commands.append(command)
    return commands

In [54]:
import os
import subprocess
import concurrent.futures
from pathlib import Path
import tempfile
import re

In [57]:
def process_clip(paragraph, input_file, output_dir, clip_index):
    """Process a single clip in parallel"""
    output_filename = f"clip_{clip_index:03d}.mov"
    command, output_path = create_ffmpeg_command(
        paragraph, input_file, output_dir, output_filename
    )

    try:
        subprocess.run(command, check=True, capture_output=True)
        return {
            "success": True,
            "output_path": output_path,
            "index": clip_index,
            "error": None,
        }
    except subprocess.CalledProcessError as e:
        return {
            "success": False,
            "output_path": None,
            "index": clip_index,
            "error": str(e),
        }


def create_concat_file(clip_paths, concat_file_path):
    """Create a concat file for ffmpeg"""
    with open(concat_file_path, "w") as f:
        for path in clip_paths:
            f.write(f"file '{path}'\n")


def concatenate_clips(clip_paths, output_file):
    """Concatenate clips into a final video"""
    # Create a temporary concat file
    with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as temp:
        concat_file = temp.name
        create_concat_file(clip_paths, concat_file)

    # Run ffmpeg to concatenate the clips
    command = [
        "ffmpeg",
        "-f",
        "concat",
        "-safe",
        "0",
        "-i",
        concat_file,
        "-c",
        "copy",
        output_file,
    ]

    try:
        subprocess.run(command, check=True)
        os.unlink(concat_file)  # Delete the temporary file
        return True
    except subprocess.CalledProcessError as e:
        print(f"Error concatenating clips: {e}")
        return False

from loguru import logger
def process_paragraphs(
    paragraphs, input_file, output_dir, final_output_file, max_workers=4
):
    """
    Process a list of paragraphs, then concatenate them.

    Args:
        paragraphs: List of paragraph objects with text, start, and end attributes
        input_file: Path to the input video file
        output_dir: Directory for output files
        final_output_file: Path for the final concatenated video
        max_workers: Maximum number of parallel threads (default: 4)

    Returns:
        bool: Success status
    """
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)

    # Process clips in parallel using ThreadPoolExecutor instead of ProcessPoolExecutor
    results = []

    # Option 1: Process sequentially if having issues with parallel processing
    if max_workers == 1:
        logger.info("Processing clips sequentially")
        for i, paragraph in enumerate(paragraphs):
            result = process_clip(paragraph, input_file, output_dir, i + 1)
            results.append(result)
    else:
        # Option 2: Process in parallel with ThreadPoolExecutor
        logger.info(f"Processing clips in parallel with {max_workers} workers")
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = []
            for i, paragraph in enumerate(paragraphs):
                future = executor.submit(
                    process_clip, paragraph, input_file, output_dir, i + 1
                )
                futures.append(future)

            # Collect results as they complete
            for future in concurrent.futures.as_completed(futures):
                try:
                    result = future.result()
                    results.append(result)
                except Exception as e:
                    logger.error(f"Error in thread: {e}")
                    results.append(
                        {
                            "success": False,
                            "output_path": None,
                            "index": -1,
                            "error": f"Thread error: {e}",
                        }
                    )

    # Check if all clips were processed successfully
    successful_results = [r for r in results if r["success"]]
    failed_results = [r for r in results if not r["success"]]

    logger.info(f"Processed {len(successful_results)} clips successfully")
    if failed_results:
        logger.warning(f"Failed to process {len(failed_results)} clips")
        for fail in failed_results:
            logger.warning(f"  Clip {fail['index']}: {fail['error']}")

    if not successful_results:
        logger.error("No clips were processed successfully. Aborting concatenation.")
        return False

    # Sort results by index to maintain order
    successful_results.sort(key=lambda x: x["index"])
    clip_paths = [result["output_path"] for result in successful_results]

    # Concatenate all clips
    logger.info("Concatenating clips...")
    success = concatenate_clips(clip_paths, final_output_file)

    if success:
        logger.info(f"Final video created at: {final_output_file}")

    return success

In [59]:
import os
import subprocess
import concurrent.futures
from pathlib import Path
import tempfile
import re
import logging
from dataclasses import dataclass
from typing import Optional, List

# Set up logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)


@dataclass
class Paragraph:
    """Class to represent a paragraph with timestamps"""

    text: str
    start: int
    end: int


def create_ffmpeg_command(paragraph, input_file, output_dir, output_filename=None):
    """
    Create an ffmpeg command to extract a segment from a video based on paragraph timestamps.

    Args:
        paragraph: An object with text, start, and end attributes (timestamps in milliseconds)
        input_file: Path to the input video file
        output_dir: Directory for the output file
        output_filename: Custom filename for the output (defaults to auto-generated from text)

    Returns:
        tuple: The complete ffmpeg command and output path
    """
    # Convert milliseconds to seconds
    start_sec = paragraph.start / 1000
    duration_sec = (paragraph.end - paragraph.start) / 1000

    # Create a filename from the text if not provided
    if output_filename is None:
        # Take first 5 words of text and clean it for a filename
        words = paragraph.text.split()[:5]
        clean_text = "_".join(words)
        # Remove special characters
        clean_text = re.sub(r"[^\w\s]", "", clean_text)
        clean_text = clean_text.replace(" ", "_").lower()
        output_filename = f"clip_{clean_text}.mov"

    # Construct full output path
    output_path = str(Path(output_dir) / output_filename)

    # Create the ffmpeg command
    command = [
        "ffmpeg",
        "-ss",
        f"{start_sec:.3f}",
        "-i",
        input_file,
        "-to",
        f"{duration_sec:.3f}",
        "-c:v",
        "libx264",
        "-c:a",
        "aac",
        "-y",
        output_path,  # Add -y to overwrite existing files
    ]

    return command, output_path


def process_clip(args):
    """
    Process a single clip - takes a single argument to avoid unpacking issues

    Args:
        args: A tuple containing (paragraph, input_file, output_dir, clip_index)

    Returns:
        dict: Result information
    """
    paragraph, input_file, output_dir, clip_index = args

    output_filename = f"clip_{clip_index:03d}.mov"
    command, output_path = create_ffmpeg_command(
        paragraph, input_file, output_dir, output_filename
    )

    logger.info(f"Processing clip {clip_index}: {' '.join(command)}")

    try:
        # Run with a timeout to prevent hanging processes
        result = subprocess.run(
            command,
            check=True,
            capture_output=True,
            text=True,
            timeout=300,  # 5 minute timeout
        )
        logger.info(f"Successfully processed clip {clip_index}")
        return {
            "success": True,
            "output_path": output_path,
            "index": clip_index,
            "error": None,
        }
    except subprocess.CalledProcessError as e:
        logger.error(f"Error processing clip {clip_index}: {e}")
        logger.error(f"STDERR: {e.stderr}")
        return {
            "success": False,
            "output_path": None,
            "index": clip_index,
            "error": f"Process error: {e}",
        }
    except subprocess.TimeoutExpired as e:
        logger.error(f"Timeout processing clip {clip_index}: {e}")
        return {
            "success": False,
            "output_path": None,
            "index": clip_index,
            "error": f"Timeout: {e}",
        }
    except Exception as e:
        logger.error(f"Unexpected error processing clip {clip_index}: {e}")
        return {
            "success": False,
            "output_path": None,
            "index": clip_index,
            "error": f"Unexpected error: {e}",
        }


def create_concat_file(clip_paths, concat_file_path):
    """Create a concat file for ffmpeg"""
    with open(concat_file_path, "w") as f:
        for path in clip_paths:
            # Use absolute paths and escape single quotes
            abs_path = os.path.abspath(path).replace("'", "'\\''")
            f.write(f"file '{abs_path}'\n")


def concatenate_clips(clip_paths, output_file):
    """Concatenate clips into a final video"""
    if not clip_paths:
        logger.error("No clips to concatenate")
        return False

    # Create a temporary concat file
    with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as temp:
        concat_file = temp.name
        create_concat_file(clip_paths, concat_file)

    logger.info(f"Created concat file at {concat_file}")

    # Run ffmpeg to concatenate the clips
    command = [
        "ffmpeg",
        "-f",
        "concat",
        "-safe",
        "0",
        "-i",
        concat_file,
        "-c",
        "copy",
        "-y",
        output_file,
    ]

    logger.info(f"Running concat command: {' '.join(command)}")

    try:
        subprocess.run(command, check=True, capture_output=True, text=True)
        logger.info(f"Successfully concatenated clips to {output_file}")
        os.unlink(concat_file)  # Delete the temporary file
        return True
    except subprocess.CalledProcessError as e:
        logger.error(f"Error concatenating clips: {e}")
        logger.error(f"STDERR: {e.stderr}")
        return False
    except Exception as e:
        logger.error(f"Unexpected error during concatenation: {e}")
        return False


def process_paragraphs(
    paragraphs, input_file, output_dir, final_output_file, max_workers=4
):
    """
    Process a list of paragraphs, then concatenate them.

    Args:
        paragraphs: List of paragraph objects with text, start, and end attributes
        input_file: Path to the input video file
        output_dir: Directory for output files
        final_output_file: Path for the final concatenated video
        max_workers: Maximum number of parallel threads (default: 4)

    Returns:
        bool: Success status
    """
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)

    # Process clips in parallel using ThreadPoolExecutor instead of ProcessPoolExecutor
    results = []

    # Option 1: Process sequentially if having issues with parallel processing
    if max_workers == 1:
        logger.info("Processing clips sequentially")
        for i, paragraph in enumerate(paragraphs):
            result = process_clip((paragraph, input_file, output_dir, i + 1))
            results.append(result)
    else:
        # Option 2: Process in parallel with ThreadPoolExecutor
        logger.info(f"Processing clips in parallel with {max_workers} workers")
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            # Create a list of argument tuples to avoid unpacking issues
            tasks = [
                (paragraph, input_file, output_dir, i + 1)
                for i, paragraph in enumerate(paragraphs)
            ]

            # Map the process_clip function to the tasks
            for result in executor.map(process_clip, tasks):
                results.append(result)

    # Check if all clips were processed successfully
    successful_results = [r for r in results if r["success"]]
    failed_results = [r for r in results if not r["success"]]

    logger.info(f"Processed {len(successful_results)} clips successfully")
    if failed_results:
        logger.warning(f"Failed to process {len(failed_results)} clips")
        for fail in failed_results:
            logger.warning(f"  Clip {fail['index']}: {fail['error']}")

    if not successful_results:
        logger.error("No clips were processed successfully. Aborting concatenation.")
        return False

    # Sort results by index to maintain order
    successful_results.sort(key=lambda x: x["index"])
    clip_paths = [result["output_path"] for result in successful_results]

    # Concatenate all clips
    logger.info("Concatenating clips...")
    success = concatenate_clips(clip_paths, final_output_file)

    if success:
        logger.info(f"Final video created at: {final_output_file}")

    return success

In [73]:
from collections import namedtuple

# Define a Paragraph class or use a namedtuple

input_file = "/Users/andrew/Movies/Documentation/2025-03-05 12-19-17.mov"
output_dir = "/Users/andrew/Movies/Documentation/clips"
final_output = "/Users/andrew/Movies/Documentation/final_compilation.mov"

# Process all paragraphs and create the final video
process_paragraphs(filtered_list, input_file, output_dir, final_output)

2025-03-05 15:18:31,119 - INFO - Processing clips in parallel with 4 workers
2025-03-05 15:18:31,120 - INFO - Processing clip 1: ffmpeg -ss 1.040 -i /Users/andrew/Movies/Documentation/2025-03-05 12-19-17.mov -to 31.630 -c:v libx264 -c:a aac -y /Users/andrew/Movies/Documentation/clips/clip_001.mov
2025-03-05 15:18:31,120 - INFO - Processing clip 2: ffmpeg -ss 33.770 -i /Users/andrew/Movies/Documentation/2025-03-05 12-19-17.mov -to 14.376 -c:v libx264 -c:a aac -y /Users/andrew/Movies/Documentation/clips/clip_002.mov
2025-03-05 15:18:31,120 - INFO - Processing clip 3: ffmpeg -ss 48.178 -i /Users/andrew/Movies/Documentation/2025-03-05 12-19-17.mov -to 16.638 -c:v libx264 -c:a aac -y /Users/andrew/Movies/Documentation/clips/clip_003.mov
2025-03-05 15:18:31,121 - INFO - Processing clip 4: ffmpeg -ss 220.644 -i /Users/andrew/Movies/Documentation/2025-03-05 12-19-17.mov -to 17.646 -c:v libx264 -c:a aac -y /Users/andrew/Movies/Documentation/clips/clip_004.mov
2025-03-05 15:18:48,783 - INFO - Su

True

In [None]:
from moviepy.editor import VideoFileClip, TextClip, CompositeVideoClip, ColorClip
import numpy as np


def instagram_caption_video(
    video_path,
    output_path,
    text,
    font_size=70,
    position="bottom",
    animation="fade",
    bg_opacity=0.7,
    duration_factor=0.8,  # Show caption for 80% of video duration
):
    """
    Create an Instagram-style captioned video

    Args:
        video_path: Path to input video
        output_path: Path for output video
        text: Text to display
        font_size: Size of the font
        position: Position ('top', 'center', 'bottom')
        animation: Animation style ('none', 'fade', 'typewriter', 'bounce')
        bg_opacity: Opacity of the background (0-1)
        duration_factor: How much of the video duration to show the caption
    """
    # Load the video
    video = VideoFileClip(video_path)

    # Calculate text duration
    text_duration = video.duration * duration_factor
    text_start = (video.duration - text_duration) / 2  # Center in time

    # Create text clip
    txt_clip = TextClip(
        text,
        fontsize=font_size,
        color="white",
        font="Arial-Bold",
        stroke_color="black",
        stroke_width=1,
    )

    # Size the background based on text size
    txt_width, txt_height = txt_clip.size
    bg_width = txt_width + 40  # Add padding
    bg_height = txt_height + 40

    # Create semi-transparent background
    bg_clip = ColorClip(size=(bg_width, bg_height), color=(0, 0, 0))
    bg_clip = bg_clip.set_opacity(bg_opacity)

    # Position the text on the background
    txt_clip = txt_clip.set_position("center")

    # Combine text and background
    txt_bg_clip = CompositeVideoClip([bg_clip, txt_clip])

    # Set position on the video
    if position == "top":
        txt_bg_clip = txt_bg_clip.set_position(("center", 50))
    elif position == "center":
        txt_bg_clip = txt_bg_clip.set_position("center")
    else:  # bottom
        txt_bg_clip = txt_bg_clip.set_position(("center", video.h - bg_height - 50))

    # Apply animation
    if animation == "fade":
        # Fade in and out
        def fade_func(t):
            # t is relative to the clip's start
            relative_t = (t - text_start) / text_duration if text_duration > 0 else 0
            if relative_t < 0 or relative_t > 1:
                return 0
            elif relative_t < 0.1:  # First 10% of time
                return relative_t / 0.1
            elif relative_t > 0.9:  # Last 10% of time
                return (1 - relative_t) / 0.1
            else:
                return 1

        txt_bg_clip = txt_bg_clip.set_opacity(fade_func)

    elif animation == "typewriter":
        # This would require a more complex implementation with multiple clips
        pass

    elif animation == "bounce":
        # Bouncing effect
        def bounce_pos(t):
            # t is relative to the clip's start
            relative_t = (t - text_start) / text_duration if text_duration > 0 else 0
            if position == "top":
                return ("center", 50 + np.sin(relative_t * 8) * 10)
            elif position == "center":
                return ("center", video.h / 2 + np.sin(relative_t * 8) * 10)
            else:  # bottom
                return (
                    "center",
                    video.h - bg_height - 50 + np.sin(relative_t * 8) * 10,
                )

        txt_bg_clip = txt_bg_clip.set_position(bounce_pos)

    # Set the timing
    txt_bg_clip = txt_bg_clip.set_start(text_start).set_duration(text_duration)

    # Composite with the video
    final_clip = CompositeVideoClip([video, txt_bg_clip])

    # Write the result
    final_clip.write_videofile(output_path, codec="libx264", audio_codec="aac")