<a href="https://colab.research.google.com/github/HsnSaboor/AiSubtitles/blob/main/whisper-clipper.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Youtube/Google Drive Videos Translation/Transcription with Faster Whisper**

[faster-whisper](https://github.com/guillaumekln/faster-whisper) is a reimplementation of OpenAI's Whisper model using CTranslate2, which is a fast inference engine for Transformer models.

This implementation is up to 4 times faster than openai/whisper for the same accuracy while using less memory. The efficiency can be further improved with 8-bit quantization on both CPU and GPU.

Whisper is a general-purpose speech recognition model. It is trained on a large dataset of diverse audio and is also a multi-task model that can perform multilingual speech recognition as well as speech translation and language identification.

This Notebook will guide you through the transcription or translation of a  video file (from Youtube/Google Drive) using Faster Whisper. You'll be able to explore most inference parameters or use the Notebook as-is to store the output and video audio in your Google Drive.

## **How to use**
1. Read and understand the notebook. You should at the very least modify the **video selection section** to choose the video you wish to translate/transcribe
2. Click Runtime -> Run all and wait for the notebook to do its magic, alternatively you may run the cells one by one and skip the Google Drive portion if you do not intend to use it
3. A download prompt should appear once subtitles is ready, or check the 'Files' tab on the left for the output


In [26]:
#@markdown # **[Optional]** Access data in Google Drive 💾
#@markdown Enter a Google Drive path and run this cell to store the results inside Google Drive.

# Uncomment to copy generated images to drive, faster than downloading directly from colab in my experience.
from google.colab import drive
from pathlib import Path

drive_mount_path = Path("/") / "content" / "drive"
drive.mount(str(drive_mount_path), force_remount=True)
drive_mount_path /= "My Drive"
#@markdown ---
drive_path = "Colab Notebooks/Faster Whisper" #@param {type:"string"}
#@markdown ---
#@markdown **Run this cell again if you change your Google Drive path.**

drive_whisper_path = drive_mount_path / Path(drive_path.lstrip("/"))
drive_whisper_path.mkdir(parents=True, exist_ok=True)

Mounted at /content/drive


In [2]:
#@markdown # **Check GPU type** 🕵️

#@markdown The type of GPU you get assigned in your Colab session defined the speed at which the video will be transcribed.
#@markdown The higher the number of floating point operations per second (FLOPS), the faster the transcription.
#@markdown But even the least powerful GPU available in Colab is able to run any Whisper model.
#@markdown Make sure you've selected `GPU` as hardware accelerator for the Notebook (Runtime &rarr; Change runtime type &rarr; Hardware accelerator).

#@markdown |  GPU   |  GPU RAM   | FP32 teraFLOPS |     Availability   |
#@markdown |:------:|:----------:|:--------------:|:------------------:|
#@markdown |  T4    |    16 GB   |       8.1      |         Free       |
#@markdown | P100   |    16 GB   |      10.6      |      Colab Pro     |
#@markdown | V100   |    16 GB   |      15.7      |  Colab Pro (Rare)  |

#@markdown ---
#@markdown **Factory reset your Notebook's runtime if you want to get assigned a new GPU.**

!nvidia-smi -L

!nvidia-smi

GPU 0: Tesla T4 (UUID: GPU-0745fbc6-e3f8-5a87-798c-fefb08e81a88)
Thu Apr 24 19:32:50 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.54.15              Driver Version: 550.54.15      CUDA Version: 12.4     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  Tesla T4                       Off |   00000000:00:04.0 Off |                    0 |
| N/A   44C    P8             10W /   70W |       0MiB /  15360MiB |      0%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+-------

In [9]:
#@markdown # **Install libraries** 🏗️
#@markdown This cell will take a little while to download several libraries, including Faster Whisper.

#@markdown ---

! pip install faster-whisper
! pip install yt-dlp

import os
import sys
import warnings
from faster_whisper import WhisperModel
import yt_dlp
import subprocess
import torch
import shutil
import numpy as np
from IPython.display import display, Markdown, YouTubeVideo
import requests
from urllib.parse import urlsplit
from google.colab import files
from pathlib import Path

device = torch.device('cuda:0')
print('Using device:', device, file=sys.stderr)

!sudo apt-get update
!sudo apt install nvidia-cuda-toolkit



Using device: cuda:0


0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.82)] [                                                                               Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.82)] [                                                                               Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:6 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:7 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://pp

In [10]:
#@markdown # **Model selection** 🧠

#@markdown There are several models to choose from, with varying performance and speed. large-v2 is recommended for most cases:

#@markdown |  Size  | Parameters | English-only model | Multilingual model | Required VRAM | Relative speed |
#@markdown |:------:|:----------:|:------------------:|:------------------:|:-------------:|:--------------:|
#@markdown |  tiny  |    39 M    |     `tiny.en`      |       `tiny`       |     ~0.8 GB     |      ~32x      |
#@markdown |  base  |    74 M    |     `base.en`      |       `base`       |     ~1.0 GB     |      ~16x      |
#@markdown | small  |   244 M    |     `small.en`     |      `small`       |     ~1.4 GB     |      ~6x       |
#@markdown | medium |   769 M    |    `medium.en`     |      `medium`      |     ~2.7 GB     |      ~2x       |
#@markdown | large-v1  |   1550 M   |        N/A         |      `large-v1`       |    ~4.3 GB     |       1x       |
#@markdown | large-v2  |   1550 M   |        N/A         |      `large-v2`       |    ~4.3 GB     |       1x       |

#@markdown ---
model_size = 'large-v2' #@param ['tiny', 'tiny.en', 'base', 'base.en', 'small', 'small.en', 'medium', 'medium.en', 'large-v1', 'large-v2']
device_type = "cuda" #@param {type:"string"} ['cuda', 'cpu']
compute_type = "float16" #@param {type:"string"} ['float16', 'int8_float16', 'int8']
#@markdown ---
#@markdown **Run this cell again if you change the model.**

model = WhisperModel(model_size, device=device_type, compute_type=compute_type)


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [28]:
#@markdown # **Video selection** 📺

#@markdown Enter the URL of the Youtube video **OR** Google drive video path of the video you want to translate/transcribe, and run the cell. Make sure the correct Type is chosen! This may take awhile depending on video file size.

Type = "Google Drive" #@param ['Youtube video or playlist', 'Google Drive', 'Direct download']
#@markdown ---
#@markdown #### **Youtube video or playlist**
URL = "" #@param {type:"string"}
# store_audio = True #@param {type:"boolean"}
#@markdown ---
#@markdown #### **Google Drive video, audio (mp4, wav), or folder containing video and/or audio files**
video_path = "/content/drive/MyDrive/Ahmadoverstimulation.mp4" #@param {type:"string"}
#@markdown ---
#@markdown #### **Direct Download**
ddl_url = "" #@param {type:"string"}
#@markdown ---
#@markdown **Run this cell again if you change the video.**

video_path_local_list = []

if Type == "Youtube video or playlist":

    ydl_opts = {
        'format': 'm4a/bestaudio/best',
        'outtmpl': '%(id)s.%(ext)s',
        # ℹ️ See help(yt_dlp.postprocessor) for a list of available Postprocessors and their arguments
        'postprocessors': [{  # Extract audio using ffmpeg
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'wav',
        }]
    }

    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        error_code = ydl.download([URL])
        list_video_info = [ydl.extract_info(URL, download=False)]

    for video_info in list_video_info:
        video_path_local_list.append(Path(f"{video_info['id']}.wav"))

elif Type == "Google Drive":
    # --- FIX IS HERE ---
    # The original line was duplicating the path: video_path = drive_mount_path / Path(video_path.lstrip("/"))
    # Use the user-provided video_path directly after converting it to a Path object
    video_path = Path(video_path)
    # --- END FIX ---

    if video_path.is_dir():
        for video_path_drive in video_path.glob("**/*"):
            if video_path_drive.is_file():
                display(Markdown(f"**{str(video_path_drive)} selected for processing.**"))
            elif video_path_drive.is_dir():
                display(Markdown(f"**Subfolders not supported.**"))
            else:
                display(Markdown(f"**{str(video_path_drive)} does not exist, skipping.**"))
            video_path_local = Path(".").resolve() / (video_path_drive.name)
            shutil.copy(video_path_drive, video_path_local)
            video_path_local_list.append(video_path_local)
    elif video_path.is_file():
        video_path_local = Path(".").resolve() / (video_path.name)
        shutil.copy(video_path, video_path_local)
        video_path_local_list.append(video_path_local)
        print(f"{video_path_local} appended to list for processing")
        display(Markdown(f"**{str(video_path)} selected for processing.**"))
    else:
        display(Markdown(f"**{str(video_path)} does not exist.**"))

elif Type == "Direct download":
    print(f"⚠️ Please ensure this is a direct download link and is of a valid format")
    print(f"Attempting to download: {ddl_url}\n")
    # !wget {ddl_url} -O ddl_video.mp4
    # video_path_local_list.append("/content/ddl_video.mp4")

    response = requests.get(ddl_url)

    if response.status_code == 200:
        # Extract the filename from the URL
        filename = urlsplit(ddl_url).path.split("/")[-1]

        # Create the full path for the destination file in the current working directory
        destination_path = os.path.join(os.getcwd(), filename)

        # Save the file
        with open(destination_path, 'wb') as file:
            file.write(response.content)

        print(f"File downloaded successfully: {destination_path}")

        video_path_local = Path(".").resolve() / (filename)

        # print(f"Path local: {video_path_local}") # /content/video.mkv

        video_path_local_list.append(video_path_local)
    else:
        print(f"Failed to download file. Status code: {response.status_code}")

else:
    raise(TypeError("Please select supported input type."))

for video_path_local in video_path_local_list:
    valid_suffixes = [".mp4", ".mkv", ".mov", ".avi", ".wmv", ".flv", ".webm", ".3gp", ".mpeg"]

    print(f"Processing video file {video_path_local} with ffmpeg..")

    if video_path_local.suffix in valid_suffixes:
        input_suffix = video_path_local.suffix
        video_path_local_output = video_path_local.with_suffix(".wav") # Use a new variable for the output path
        result = subprocess.run(["ffmpeg", "-i", str(video_path_local), "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", str(video_path_local_output)]) # Use video_path_local and video_path_local_output
        # IMPORTANT: Need to update the list to point to the new .wav file if processing succeeded
        if result.returncode == 0:
             # Find the index and replace the original path with the new .wav path
             try:
                 idx = video_path_local_list.index(video_path_local)
                 video_path_local_list[idx] = video_path_local_output
                 print(f"Successfully converted {video_path_local.with_suffix(input_suffix)} to {video_path_local_output}")
             except ValueError:
                 print(f"Warning: Could not find original path {video_path_local} in list to replace with {video_path_local_output}")
        else:
             print(f"Error running ffmpeg on {video_path_local.with_suffix(input_suffix)}. Return code: {result.returncode}")
             # Handle error - maybe remove from list or mark as failed? For now, it will proceed with the original path, likely causing later errors.
             # A more robust approach would be to remove the failed item from the list
             # video_path_local_list.remove(video_path_local) # Be careful with list modification while iterating


# Add a print statement outside the loop to show the final list
print("\nFinal list of files to process (after conversion if needed):")
print(video_path_local_list)

/content/Ahmadoverstimulation.mp4 appended to list for processing


**/content/drive/MyDrive/Ahmadoverstimulation.mp4 selected for processing.**

Processing video file /content/Ahmadoverstimulation.mp4 with ffmpeg..
Successfully converted /content/Ahmadoverstimulation.mp4 to /content/Ahmadoverstimulation.wav

Final list of files to process (after conversion if needed):
[PosixPath('/content/Ahmadoverstimulation.wav')]


In [30]:
def seconds_to_time_format(s):
    # Convert seconds to hours, minutes, seconds, and milliseconds
    hours = s // 3600
    s %= 3600
    minutes = s // 60
    s %= 60
    seconds = s // 1
    milliseconds = round((s % 1) * 1000)

    # Return the formatted string
    return f"{int(hours):02d}:{int(minutes):02d}:{int(seconds):02d},{int(milliseconds):03d}"

#@markdown # **Run the model** 🚀

#@markdown Run this cell to execute the transcription/translation of the video. This can take a while and very based on the length of the video and the number of parameters of the model selected above.

#@markdown ## **Parameters** ⚙️

#@markdown ### **Behavior control**
#@markdown #### Language
language = "en" #@param ["auto", "en", "zh", "ja", "fr", "de"] {allow-input: true}
#@markdown #### initial prompt (change to transcribe if you prefer transcribing only)
initial_prompt = "Please Transcribe this Video" #@param {type:"string"}
#@markdown ---
#@markdown #### Word-level timestamps
word_level_timestamps = True #@param {type:"boolean"}
#@markdown ---
#@markdown #### VAD filter
vad_filter = True #@param {type:"boolean"}
vad_filter_min_silence_duration_ms = 50 #@param {type:"integer"}
#@markdown ---
#@markdown #### Output (Default is srt, txt if `text_only` be checked )
text_only = False #@param {type:"boolean"}


segments, info = model.transcribe(str(video_path_local), beam_size=5,
                                  language=None if language == "auto" else language,
                                  initial_prompt=initial_prompt,
                                  word_timestamps=word_level_timestamps,
                                  vad_filter=vad_filter,
                                  vad_parameters=dict(min_silence_duration_ms=vad_filter_min_silence_duration_ms))

display(Markdown(f"Detected language '{info.language}' with probability {info.language_probability}"))

ext_name = '.txt' if text_only else ".srt"
output_file_name = video_path_local.stem + ext_name
sentence_idx = 1
with open(output_file_name, 'w') as f:
  for segment in segments:
    if word_level_timestamps:
      for word in segment.words:
        ts_start = seconds_to_time_format(word.start)
        ts_end = seconds_to_time_format(word.end)
        print(f"[{ts_start} --> {ts_end}] {word.word}")
        if not text_only:
          f.write(f"{sentence_idx}\n")
          f.write(f"{ts_start} --> {ts_end}\n")
          f.write(f"{word.word}\n\n")
        else:
          f.write(f"{word.word}")
        f.write("\n")
        sentence_idx = sentence_idx + 1
    else:
      ts_start = seconds_to_time_format(segment.start)
      ts_end = seconds_to_time_format(segment.end)
      print(f"[{ts_start} --> {ts_end}] {segment.text}")
      if not text_only:
        f.write(f"{sentence_idx}\n")
        f.write(f"{ts_start} --> {ts_end}\n")
        f.write(f"{segment.text.strip()}\n\n")
      else:
        f.write(f"{segment.text.strip()}\n")
      sentence_idx = sentence_idx + 1

try:
  files.download(output_file_name)
  shutil.copy(video_path_local.parent / output_file_name,
            drive_whisper_path / output_file_name
  )
  display(Markdown(f"**Output file created: {drive_whisper_path / output_file_name}**"))
except:
  display(Markdown(f"**Output file created: {video_path_local.parent / output_file_name}**"))


Detected language 'en' with probability 1

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
[00:27:41,430 --> 00:27:41,790]  Now
[00:27:41,790 --> 00:27:41,990]  that
[00:27:41,990 --> 00:27:42,290]  action
[00:27:42,290 --> 00:27:42,490]  where
[00:27:42,490 --> 00:27:42,590]  you
[00:27:42,590 --> 00:27:42,830]  remove
[00:27:42,830 --> 00:27:43,150]  yourself
[00:27:43,150 --> 00:27:43,350]  From
[00:27:43,350 --> 00:27:43,850]  focusing
[00:27:43,850 --> 00:27:44,610]  on
[00:27:44,610 --> 00:27:44,830]  that
[00:27:44,830 --> 00:27:45,290]  thought
[00:27:45,290 --> 00:27:45,790]  To
[00:27:45,790 --> 00:27:46,250]  focusing
[00:27:46,250 --> 00:27:46,570]  back
[00:27:46,570 --> 00:27:46,690]  on
[00:27:46,690 --> 00:27:46,810]  your
[00:27:46,810 --> 00:27:47,070]  breath
[00:27:47,070 --> 00:27:47,510]  Is
[00:27:47,510 --> 00:27:47,730]  like
[00:27:47,730 --> 00:27:47,990]  one
[00:27:47,990 --> 00:27:48,270]  push
[00:27:48,270 --> 00:27:48,530]  up
[00:27:48,720 --> 00:27:49,580]  One
[00:27:49,580 -

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

**Output file created: /content/drive/My Drive/Colab Notebooks/Faster Whisper/Ahmadoverstimulation.srt**

In [66]:
%%writefile jsonfinal.py
import json
import re
import string
import sys
from dataclasses import dataclass, field
from typing import List, Dict, Any, Tuple, Optional

# Define the paths for input JSON, input SRT, and output JSON files
# This script expects the input JSON to have the original format,
# where segments are lists of dictionaries with 'start', 'end', 'text'.
input_json_path = 'input.json'
input_srt_path = 'input.srt'
output_json_path = 'output_timed_segments.json'

@dataclass
class SrtEntry:
    """Represents a single entry from an SRT file."""
    sequence: int
    start_time: float # Time in seconds
    end_time: float   # Time in seconds
    text: str
    normalized_text: str = field(init=False) # Normalized text for easier matching
    normalized_words: List[str] = field(default_factory=list, init=False) # Normalized words list

    def __post_init__(self):
        # Normalize the text first
        self.normalized_text = normalize_text_for_comparison(self.text)
        # Then split the normalized text into words
        self.normalized_words = self.normalized_text.split()


def srt_time_to_seconds(time_str: str) -> float:
    """Converts SRT time format (HH:MM:SS,mmm or HH:MM:SS.mmm) to seconds (float)."""
    # Handle both ',' and '.' for milliseconds
    parts = time_str.replace(',', '.').split(':')
    if len(parts) != 3:
        raise ValueError(f"Invalid SRT time format part count: {time_str}")
    try:
        hours = float(parts[0])
        minutes = float(parts[1])
        seconds = float(parts[2])
        return hours * 3600 + minutes * 60 + seconds
    except ValueError:
        raise ValueError(f"Invalid numeric time value in: {time_str}")


def parse_srt(file_path: str) -> List[SrtEntry]:
    """Parses an SRT file and returns a list of SrtEntry objects."""
    entries = []
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read().strip()

        # Use a regex to find blocks, being more robust to varying newlines
        # Assumes blocks are separated by one or more empty lines
        blocks = re.split(r'\n\s*\n+', content)

        for block in blocks:
            lines = block.strip().split('\n')
            # Must have at least sequence number, time, and one line of text
            if len(lines) < 3:
                # Suppress very noisy warnings for incomplete blocks
                # if lines: print(f"Warning: Skipping incomplete block:\n{block[:100]}...", file=sys.stderr)
                continue

            try:
                # Sequence number is the first line
                sequence = int(lines[0])

                # Time line is the second line
                time_line = lines[1]
                # Regex now explicitly accepts both , and . for milliseconds
                time_match = re.match(r'(\d+:\d+:\d+[,.]\d+)\s*-->\s*(\d+:\d+:\d+[,.]\d+)', time_line)

                if not time_match:
                    # Suppress very noisy warnings for invalid time format
                    # print(f"Warning: Skipping block {sequence} due to invalid time format: '{time_line}'", file=sys.stderr)
                    continue

                start_time_str, end_time_str = time_match.groups()

                # Text lines are from the third line onwards
                text_lines = lines[2:]
                text = "\n".join(text_lines).strip()

                if not text:
                     # Suppress very noisy warnings for empty text
                     # print(f"Warning: Skipping block {sequence} with empty text.", file=sys.stderr)
                     continue

                start_time = srt_time_to_seconds(start_time_str)
                end_time = srt_time_to_seconds(end_time_str)


                entries.append(SrtEntry(sequence, start_time, end_time, text))

            except (ValueError, IndexError) as e:
                # Handle blocks that don't fit the expected pattern
                 print(f"Warning: Skipping block due to parsing error: {e}\nBlock content (first 100 chars):\n{block[:100]}...", file=sys.stderr)
                 continue
            except Exception as e:
                 print(f"Warning: Unexpected error processing block {sequence}: {e}\nBlock content (first 100 chars):\n{block[:100]}...", file=sys.stderr)


    except FileNotFoundError:
        print(f"Error: SRT file not found at '{file_path}'", file=sys.stderr)
        return None
    except Exception as e:
        print(f"An unexpected error occurred while parsing SRT file: {e}", file=sys.stderr)
        return None

    return entries

def normalize_text_for_comparison(text: str) -> str:
    """
    Converts text to lowercase, handles hyphenated words (splitting word1-word2
    into 'word1' and '-word2'), removes other punctuation, and standardizes
    whitespace. Returns empty string if input is not valid string.
    This version is used for the actual word-by-word comparison.
    """
    if not isinstance(text, str):
        return ""

    text_lower = text.lower()

    # --- Step 1: Handle hyphenated words like "word1-word2" ---
    # Replace "word1-word2" with "word1 -word2" (add space before the hyphen)
    # Use a regex that finds sequences of word characters (\w+), followed by a hyphen,
    # followed by another sequence of word characters. Replace the hyphen with ' -'.
    # This specifically targets hyphens *between* word characters.
    processed_text = re.sub(r'(\w+)-(\w+)', r'\1 -\2', text_lower)

    # --- Step 2: Remove other punctuation ---
    # Create a translation table to remove punctuation except the hyphen
    # We keep the hyphen because we specifically handled it in step 1.
    # Remove apostrophes for simplicity in normalization unless they are part of a hyphenated word part.
    punctuation_to_remove = string.punctuation.replace('-', '') # Keep hyphen
    translator = str.maketrans('', '', punctuation_to_remove)
    processed_text = processed_text.translate(translator)

    # --- Step 3: Standardize whitespace ---
    # Replace multiple whitespace characters (including newlines) with a single space
    processed_text = re.sub(r'\s+', ' ', processed_text).strip()

    return processed_text

def split_original_segment_into_parts(original_text: str) -> List[str]:
    """
    Splits the original segment text into word-like parts, applying the
    hyphen splitting rule but keeping original casing and punctuation *within*
    the parts for reconstruction.
    """
    if not isinstance(original_text, str):
        return []

    parts = []
    # Split by whitespace
    words = original_text.split()

    for word in words:
        # Apply the hyphen splitting rule: find word-word
        # This regex looks for word characters, followed by a hyphen, followed by word characters.
        # It won't split "pre-". It *will* split "self-help".
        # Note: This is a simplification. Real-world text has complex hyphenation (e.g., "state-of-the-art").
        match = re.match(r'(\w+)-(\w+)', word)
        if match:
             # If it matches, split into the first part and the hyphenated second part
             word1_orig = match.groups()[0]
             word2_orig_with_hyphen = '-' + match.groups()[1] # Keep the hyphen on the second part as requested

             # Append the original parts
             parts.append(word1_orig)
             parts.append(word2_orig_with_hyphen)
        else:
             # If no specific hyphen pattern, treat the whole word token as one part
             parts.append(word)

    return parts


def find_segment_matches_in_srt(original_segment_text: str, srt_entries: List[SrtEntry]) -> List[Dict]:
    """
    Searches for consecutive sequences of words from the original_segment_text
    within the sequence of SRT entry words, allowing for skipped words in the SRT
    between the found segment words. Splits the original segment into multiple
    output segments if breaks in the sequence are detected.

    Args:
        original_segment_text (str): The text of the JSON segment.
        srt_entries (List[SrtEntry]): List of parsed SrtEntry objects with normalized_words.

    Returns:
        List[Dict]: A list of dictionaries, where each dictionary represents
                    a found sub-segment with 'start', 'end', and 'text'.
                    Includes the original segment with null times if no parts are matched.
    """
    found_sub_segments = []

    # Prepare segment words info: list of original parts
    original_segment_parts = split_original_segment_into_parts(original_segment_text)
    if not original_segment_parts:
         # If splitting results in no parts, add original with null times and return
         found_sub_segments.append({"start": None, "end": None, "text": original_segment_text})
         return found_sub_segments

    # Also get the normalized version of these parts for comparison
    normalized_segment_parts = [normalize_text_for_comparison(part) for part in original_segment_parts]

    # Filter out any parts that became empty after normalization
    # Keep track of the index mapping between original parts list and usable normalized parts list
    usable_parts_info = [] # List of (original_index, original_part, normalized_part)
    for idx, (orig_part, norm_part) in enumerate(zip(original_segment_parts, normalized_segment_parts)):
        if norm_part: # Only include if not empty after normalization
            usable_parts_info.append((idx, orig_part, norm_part))

    if not usable_parts_info:
         # If all parts became empty after normalization, add original with null times
         found_sub_segments.append({"start": None, "end": None, "text": original_segment_text})
         return found_sub_segments # Nothing meaningful to search for


    num_usable_parts = len(usable_parts_info)
    num_srt_entries = len(srt_entries)

    current_srt_entry_index = 0
    current_srt_word_index = 0

    current_sequence_start_time = None # Start time of the current consecutive match sequence
    current_sequence_end_time = None   # End time of the current consecutive match sequence
    current_sequence_segment_part_start_idx = None # Index in usable_parts_info where the current sequence started

    # Iterate through the usable segment parts we need to find
    for k in range(num_usable_parts):
        original_part_index, original_part, segment_part_to_find = usable_parts_info[k] # Get info for the current part

        found_part_in_srt = False

        # Search for the current segment part in the *remaining* SRT entries and words
        # This inner loop allows skipping words in SRT that are *not* part of the segment
        for i in range(current_srt_entry_index, num_srt_entries):
            srt_entry = srt_entries[i]
            # Determine the starting word index within the current SRT entry
            start_word_idx_in_entry = current_srt_word_index if i == current_srt_entry_index else 0

            # Iterate through words in the current SRT entry starting from the correct index
            for j in range(start_word_idx_in_entry, len(srt_entry.normalized_words)):
                srt_word = srt_entry.normalized_words[j]

                # Check if the current SRT word matches the segment part we're looking for
                if srt_word == segment_part_to_find:
                    # Found the segment part!
                    found_part_in_srt = True

                    # If this is the start of a new consecutive match sequence
                    if current_sequence_start_time is None:
                        current_sequence_start_time = srt_entry.start_time
                        current_sequence_segment_part_start_idx = k # Record the index in usable_parts_info

                    # Always update the end time to the current SRT entry's end time (the one containing the matched word)
                    current_sequence_end_time = srt_entry.end_time

                    # Update our position in the SRT stream to start searching for the *next* segment part
                    # We start searching from the word *after* the one we just matched
                    current_srt_entry_index = i
                    current_srt_word_index = j + 1
                    if current_srt_word_index >= len(srt_entry.normalized_words):
                        # If we finished words in this entry, move to the beginning of the next entry
                        current_srt_entry_index += 1
                        current_srt_word_index = 0

                    break # Break inner loop (found word in entry)
            if found_part_in_srt:
                break # Break outer loop (searching through SRT entries) and go find the next segment part

        # --- After searching for processed_segment_words_info[k][1] ---
        if not found_part_in_srt:
            # This segment part (at index k in usable_parts_info) was NOT found after the last found part.
            # The consecutive sequence of matched segment parts is broken *before* this part.

            if current_sequence_start_time is not None:
                # We had a consecutive sequence of matched parts just before this miss.
                # This sequence runs from current_sequence_segment_part_start_idx up to k-1.

                # Get the original text for this found sub-segment
                # Join the original parts using the original indices from the recorded start index in usable_parts_info up to k.
                # We need to map k (index in usable_parts_info) back to original_segment_parts index.
                # The range of original indices is from usable_parts_info[current_sequence_segment_part_start_idx][0]
                # to usable_parts_info[k-1][0]. We need all original parts *between* those.
                # This is complicated if parts are skipped in usable_parts_info due to normalization.

                # Simpler: The found sequence corresponds to usable_parts_info[current_sequence_segment_part_start_idx : k].
                # The original parts corresponding to these are original_segment_parts[usable_parts_info[current_sequence_segment_part_start_idx][0] : usable_parts_info[k-1][0] + 1]
                # Need to be careful with off-by-one and empty parts.

                # Let's construct the original text using the *original* indices stored in usable_parts_info.
                # Get the starting index in the *original* parts list
                original_start_idx = usable_parts_info[current_sequence_segment_part_start_idx][0]
                # Get the ending index in the *original* parts list (corresponding to the last matched part, which was at k-1)
                original_end_idx = usable_parts_info[k - 1][0] if k > current_sequence_segment_part_start_idx else original_start_idx # If sequence was length 1, end index is same as start

                # Extract the original parts from the original_segment_parts list
                # Range is inclusive start, inclusive end index -> [original_start_idx : original_end_idx + 1]
                sub_segment_original_parts_list = original_segment_parts[original_start_idx : original_end_idx + 1]
                sub_segment_text = " ".join(sub_segment_original_parts_list)


                # Add this found sub-segment to the results list
                # Only add if the text is not empty
                if sub_segment_text.strip():
                     found_sub_segments.append({
                         "start": current_sequence_start_time,
                         "end": current_sequence_end_time,
                         "text": sub_segment_text
                     })
                 #else:
                 #    print(f"Debug: Skipping empty sub-segment text from original '{original_segment_text[:50]}...'", file=sys.stderr)


                # Reset for the next potential consecutive sequence
                current_sequence_start_time = None
                current_sequence_end_time = None
                current_sequence_segment_part_start_idx = None

            # Continue the loop to try and find processed_segment_words_info[k+1]
            # The SRT search position (`current_srt_entry_index`, `current_srt_word_index`)
            # remains where the last matched word *was* found.

    # --- After iterating through all usable segment parts ---
    # If there's a current sequence that ended with the last usable segment part
    if current_sequence_start_time is not None:
         # The last matched sequence runs from current_sequence_segment_part_start_idx up to num_usable_parts - 1.

         # Get the starting index in the *original* parts list
         original_start_idx = usable_parts_info[current_sequence_segment_part_start_idx][0]
         # Get the ending index in the *original* parts list (corresponding to the last usable part, index num_usable_parts - 1)
         original_end_idx = usable_parts_info[num_usable_parts - 1][0]

         # Extract the original parts from the original_segment_parts list
         sub_segment_original_parts_list = original_segment_parts[original_start_idx : original_end_idx + 1]
         sub_segment_text = " ".join(sub_segment_original_parts_list)

         # Add the final found sub-segment
         if sub_segment_text.strip():
             found_sub_segments.append({
                  "start": current_sequence_start_time,
                  "end": current_sequence_end_time,
                  "text": sub_segment_text
             })
         #else:
         #    print(f"Debug: Skipping empty final sub-segment text from original '{original_segment_text[:50]}...'", file=sys.stderr)


    # If *no* sub-segments were found at all for this original segment (e.g., first usable part not found),
    # add the original segment with null times as a single entry as per requirement to keep all segments.
    if not found_sub_segments:
         # Add the original segment text with null times
         found_sub_segments.append({"start": None, "end": None, "text": original_segment_text})


    # Return the list of found sub-segments
    return found_sub_segments


def process_json_with_srt(json_data: List[Dict], srt_entries: List[SrtEntry]):
    """
    Processes the JSON data (where segments are list of dictionaries),
    finds times for segments using SRT (potentially splitting them),
    and returns the data with updated segment structures.

    Includes counters for success/failure and prints status.
    """
    if srt_entries is None: # SRT parsing failed previously
        print("Error: SRT entries are not available for processing.", file=sys.stderr)
        return None, 0, 0, 0, 0 # Return None for data, 0 for counts

    processed_data = []
    total_input_segments = 0 # Total segments in the input JSON
    valid_input_segments = 0 # Segments that were valid dictionaries with text
    total_output_segments = 0 # Total segments in the final output list (can be > total_input)
    output_segments_with_times = 0 # Segments in the output list that have start/end times

    # Iterate through each reel in the JSON data
    for reel in json_data:
        if not isinstance(reel, dict):
            print(f"Warning: Skipping unexpected item in root list (expected dictionary): {reel}", file=sys.stderr)
            continue

        processed_reel = reel.copy() # Copy reel data

        # Process the 'segments' list if it exists and is a list
        if 'segments' in processed_reel and isinstance(processed_reel['segments'], list):
            new_segments_list = [] # This will collect the original segments AND the split ones
            original_segments = processed_reel['segments'] # This is a list of dictionaries
            total_input_segments += len(original_segments)

            # Iterate through each segment dictionary in the original segments list
            for i, segment_dict_input in enumerate(original_segments):
                # We now expect segment_item to be a dictionary
                if not isinstance(segment_dict_input, dict):
                     print(f"Warning: Reel {reel.get('reel_no', 'N/A')}, Segment Index {i}: Skipping - Expected dictionary, but found type {type(segment_dict_input).__name__}.", file=sys.stderr)
                     # Add the invalid entry to the output list with nulls to keep it
                     new_segments_list.append({"start": None, "end": None, "text": str(segment_dict_input)}) # Use str representation if not a dict
                     total_output_segments += 1
                     continue

                # Ensure the dictionary has the 'text' key
                segment_text = segment_dict_input.get('text')
                if not isinstance(segment_text, str) or not segment_text.strip():
                     print(f"Warning: Reel {reel.get('reel_no', 'N/A')}, Segment Index {i}: Skipping - Dictionary is missing 'text' key or 'text' is empty/not string.", file=sys.stderr)
                     # Add the invalid entry to the output list with nulls to keep it
                     new_segments_list.append({"start": None, "end": None, "text": segment_text if isinstance(segment_text, str) else str(segment_dict_input)})
                     total_output_segments += 1
                     continue

                valid_input_segments += 1 # This segment is a valid candidate for searching


                print(f"Processing segment in Reel {reel.get('reel_no', 'N/A')}, Index {i}: '{segment_text[:50]}{'...' if len(segment_text)>50 else ''}'...")
                # Call the function that finds and potentially splits segments
                found_sub_segments = find_segment_matches_in_srt(segment_text, srt_entries)

                # Add all found sub-segments to the reel's new segments list
                new_segments_list.extend(found_sub_segments)
                total_output_segments += len(found_sub_segments)

                # Update matched count based on the sub-segments returned
                matched_in_this_segment = 0
                for sub_seg in found_sub_segments:
                     if sub_seg.get('start') is not None:
                         output_segments_with_times += 1
                         matched_in_this_segment += 1 # Count how many sub-segments for THIS input segment were matched

                if not found_sub_segments or (len(found_sub_segments) == 1 and found_sub_segments[0].get('start') is None):
                    print(f"  -> No match found for any part of this segment.")
                else:
                    print(f"  -> Found {len(found_sub_segments)} output part(s), {matched_in_this_segment} with times.")

            # Replace the original list of dictionaries with the new list of dictionaries (which might be longer)
            processed_reel['segments'] = new_segments_list

        elif 'segments' not in processed_reel:
             print(f"Warning: Reel {reel.get('reel_no', 'N/A')} does not have a 'segments' key.", file=sys.stderr)
             processed_reel['segments'] = [] # Ensure segments key exists, even if empty
        elif not isinstance(processed_reel['segments'], list):
             print(f"Error: Reel {reel.get('reel_no', 'N/A')}'s 'segments' value is not a list (found {type(processed_reel['segments']).__name__}). Skipping segments processing for this reel.", file=sys.stderr)
             processed_reel['segments'] = [] # Replace invalid segments value with empty list


        processed_data.append(processed_reel)

    # Return data and counts
    return (processed_data, total_input_segments, valid_input_segments,
            total_output_segments, output_segments_with_times)


# --- Main Execution ---
if __name__ == "__main__":
    # 1. Parse the SRT file
    print(f"Parsing SRT file: '{input_srt_path}'...")
    srt_entries = parse_srt(input_srt_path)

    if srt_entries is None:
        sys.exit("SRT parsing failed. Exiting.")

    print(f"Successfully parsed {len(srt_entries)} SRT entries.")

    # 2. Load the JSON file
    print(f"\nLoading JSON file: '{input_json_path}'...")
    json_data = None # Initialize before try block
    try:
        with open(input_json_path, 'r', encoding='utf-8') as f:
            json_data = json.load(f)

        # Validate the structure of the loaded JSON
        if not isinstance(json_data, list):
             print(f"Error: Root of input JSON data is not a list (found {type(json_data).__name__}). Expected list of reels.", file=sys.stderr)
             sys.exit("JSON structure validation failed. Exiting.")

        if not json_data:
             print("Warning: Input JSON list is empty.", file=sys.stderr)
             # Proceeding with empty data will result in an empty output list, which is acceptable.

        # Perform a quick check on the first potential segment for format
        if json_data:
            first_item = json_data[0]
            if isinstance(first_item, dict) and 'segments' in first_item and isinstance(first_item['segments'], list) and first_item['segments']:
                 first_segment = first_item['segments'][0]
                 if not isinstance(first_segment, dict) or 'text' not in first_segment or not isinstance(first_segment.get('text'), str):
                    print(f"Error: First segment in the input JSON has unexpected format (found {type(first_segment).__name__}). Expected dictionary with 'text' string.", file=sys.stderr)
                    print(f"       Please ensure '{input_json_path}' has reels with 'segments' as lists of dictionaries, each having a 'text' key.", file=sys.stderr)
                    sys.exit("Input JSON segment format validation failed. Exiting.")
            elif isinstance(first_item, dict):
                 print(f"Warning: First item in JSON is a dictionary but missing 'segments' key or 'segments' is not a list.", file=sys.stderr)
            else: # json_data is not empty, but first item isn't a dict
                 print(f"Warning: First item in JSON is not a dictionary (found {type(json_data[0]).__name__}). Skipping further structural validation.", file=sys.stderr)


    except FileNotFoundError:
        print(f"Error: Input JSON file not found at '{input_json_path}'", file=sys.stderr)
        sys.exit("JSON loading failed. Exiting.")
    except json.JSONDecodeError:
        print(f"Error: Could not decode JSON from '{input_json_path}'. Please check the file format.", file=sys.stderr)
        sys.exit("JSON loading failed. Exiting.")
    except Exception as e:
        print(f"An unexpected error occurred while loading the JSON file: {e}", file=sys.stderr)
        sys.exit("JSON loading failed. Exiting.")

    print("Successfully loaded JSON data.")

    # 3. Process the JSON data using the SRT entries
    print("\n--- Matching JSON segments to SRT times ---")
    # Call process function and capture data and counts
    (modified_data, total_input_segments, valid_input_segments,
     total_output_segments, output_segments_with_times) = process_json_with_srt(json_data, srt_entries)


    # 4. Print Summary
    print("\n--- Processing Summary ---")
    print(f"Total segments read from input JSON: {total_input_segments}")
    print(f"Valid input segments processed: {valid_input_segments}")
    print(f"Total segments generated in output JSON: {total_output_segments}")
    print(f"Output segments with start/end times found: {output_segments_with_times}")
    print(f"Output segments with null times (not fully matched): {total_output_segments - output_segments_with_times}")
    print("------------------------")


    if modified_data is None: # Processing failed (e.g. SRT parsing failed)
         sys.exit("Processing failed. Output file was not created.")

    # 5. Save the modified data to a new JSON file
    print(f"\nSaving processed data to '{output_json_path}'...")
    try:
        with open(output_json_path, 'w', encoding='utf-8') as outfile:
            # allow_nan=True is needed because Python's None is converted to JSON's null
            json.dump(modified_data, outfile, indent=4, allow_nan=True)

        print(f"Successfully saved processed data to '{output_json_path}'")

    except IOError as e:
        print(f"Error: Could not write to output file '{output_json_path}': {e}", file=sys.stderr)
        sys.exit("Saving failed. Exiting.")
    except Exception as e:
         print(f"An unexpected error occurred while writing the file: {e}", file=sys.stderr)
         sys.exit("Saving failed. Exiting.")

Overwriting jsonfinal.py


In [None]:
!python reel_maker.py /content/Ahmadoverstimulation.mp4 /content/output_timed_segments.json -o /content/output_reels

2025-04-24 22:16:22,667 - INFO - 📂 Loading reel data from /content/output_timed_segments.json
2025-04-24 22:16:22,668 - INFO - ✅ Successfully loaded 24 reel structures from JSON.
2025-04-24 22:16:22,668 - INFO - 
--- Processing Reel 1/24 (Reel #1) ---
2025-04-24 22:16:22,668 - INFO - 🎬 Processing Reel #1: Winning the War on Attention
2025-04-24 22:16:22,668 - INFO - ✂️ Cutting 10 segments for Reel #1
2025-04-24 22:16:22,669 - INFO - ✂️ Starting video clipping process
2025-04-24 22:16:22,669 - INFO - 📂 Creating output directory: /content/output_reels/Ahmadoverstimulation/reel_1_segments
2025-04-24 22:16:22,669 - INFO - 🔪 Processing clip 1/10
2025-04-24 22:16:22,942 - INFO - ✅ Created clip: clip_1.mp4 (6.87 MB)
2025-04-24 22:16:22,942 - INFO - 🔪 Processing clip 2/10
2025-04-24 22:16:47,519 - INFO - ✅ Created clip: clip_2.mp4 (1975.07 MB)
2025-04-24 22:16:47,519 - INFO - 🔪 Processing clip 3/10
2025-04-24 22:16:48,330 - INFO - ✅ Created clip: clip_3.mp4 (19.36 MB)
2025-04-24 22:16:48,330 -

In [35]:
# Install necessary Python libraries
# ffmpeg-python provides a convenient way to build ffmpeg commands in Python
# Other libraries might be needed depending on your video_splitter.py implementation
!pip install ffmpeg-python -q
print("Python libraries installed successfully.")

Python libraries installed successfully.


In [44]:
# Delete the specified output directory and its contents
!rm -rf /content/output_reels
print("Deleted /content/output_reels directory if it existed.")

Deleted /content/output_reels directory if it existed.
