<a href="https://colab.research.google.com/github/DtotheS/video-subtitles/blob/main/src/video_subtitles.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# For Cloud
from google.colab import drive
drive.mount('/content/drive')
%cd /content/drive/MyDrive/video-subtitles
%pwd

%pip install pytube
%pip install git+https://github.com/openai/whisper.git
%pip install pysrt
%pip install transformers
%pip install ffmpeg-python
%pip install yt-dlp
%pip install moviepy
%pip install git+https://github.com/ssut/py-hanspell.git
%pip install sacremoses

Mounted at /content/drive
/content/drive/MyDrive/video-subtitles


'/content/drive/MyDrive/video-subtitles'

In [1]:
# For Local
# 1. Go to the project folder
# 2. conda activate subtitles_env

# %pip install pytube
# %pip install git+https://github.com/openai/whisper.git
# %pip install pysrt
# %pip install transformers
# %pip install ffmpeg-python
# %pip install yt-dlp
# %pip install moviepy
# %pip install git+https://github.com/ssut/py-hanspell.git
# %pip install --upgrade pip
# %pip install faster_whisper
# %pip install torchvision torchaudio
# %pip install "numpy<2.0"
# %pip install sentencepiece

%pwd

'/Users/agathos/DtotheS/video_subtitles'

# Base Model. Run on Google Colab

In [None]:
import os
import yt_dlp as youtube_dl
from moviepy.editor import VideoFileClip
import whisper
from transformers import pipeline
import pysrt
import subprocess

# Step 1: Download video using yt-dlp and rename it to 'video.mp4'
def download_youtube_video(url, output_path):
    print(f"Downloading video from {url}...")
    ydl_opts = {
        'format': 'bestvideo+bestaudio/best',  # Downloads the best quality video and audio
        'outtmpl': f'{output_path}/video.mp4',  # Save with a simple name 'video.mp4'
    }
    with youtube_dl.YoutubeDL(ydl_opts) as ydl:
        ydl.download([url])
        # The file might be named with a .webm extension after merging
        video_file = f"{output_path}/video.mp4.webm"  # This is the merged file
        print(f"Downloaded video to {video_file}")

        # Rename the video to ensure it's an MP4 file (optional conversion step)
        final_video_file = f"{output_path}/video.mp4"
        os.rename(video_file, final_video_file)
        print(f"Renamed video to {final_video_file}")
        return final_video_file

# Step 2: Extract audio from the video
def extract_audio_from_video(video_path, output_directory):
    print(f"Extracting audio from {video_path}...")
    if not os.path.exists(video_path):
        print(f"Error: Video file not found at {video_path}")
        return None
    video = VideoFileClip(video_path)
    audio = video.audio
    audio_path = os.path.join(output_directory, "audio.wav")
    audio.write_audiofile(audio_path)
    print(f"Audio saved to {audio_path}")
    return audio_path

# Step 3: Transcribe and translate audio using Whisper
def transcribe_and_translate(audio_path, whisper_model="L"):
    print("Loading Whisper model...")
    model = whisper.load_model(whisper_model)

    print("Transcribing audio...")
    result = model.transcribe(audio_path, language="ko")

    print("Translating transcription to English...")
    translator = pipeline("translation", model="Helsinki-NLP/opus-mt-ko-en")
    subtitles = []
    for i, segment in enumerate(result['segments']):
        translated_text = translator(segment["text"])[0]["translation_text"]
        subtitles.append({
            "index": i + 1,
            "start": segment["start"],
            "end": segment["end"],
            "text": translated_text
        })
    return subtitles

# Step 4: Create an SRT file with subtitles
def create_srt_file(subtitles, srt_path):
    print(f"Creating SRT file at {srt_path}...")
    subs = pysrt.SubRipFile()
    for sub in subtitles:
        sub_item = pysrt.SubRipItem(
            index=sub["index"],
            start=pysrt.SubRipTime(seconds=sub["start"]),
            end=pysrt.SubRipTime(seconds=sub["end"]),
            text=sub["text"]
        )
        subs.append(sub_item)
    subs.save(srt_path, encoding="utf-8")
    print(f"SRT file saved to {srt_path}")

# Step 5: Embed subtitles into the video
def embed_subtitles(video_path, srt_path, output_video_path):
    print(f"Embedding subtitles into video...")
    command = [
        "ffmpeg",
        "-i", video_path,
        "-vf", f"subtitles={srt_path}:force_style='FontName=Arial,FontSize=16'",
        output_video_path
    ]
    subprocess.run(command, check=True)
    print(f"Video with subtitles saved to {output_video_path}")

# Main function to process the YouTube video
def process_youtube_video(url, output_dir):
    os.makedirs(output_dir, exist_ok=True)

    # Step 1: Download the video
    video_path = download_youtube_video(url, output_dir)

    # Check if the video file exists
    if not os.path.exists(video_path):
        print(f"Error: Video file not found: {video_path}")
        return

    # Step 2: Extract audio
    audio_path = extract_audio_from_video(video_path, output_dir)

    if audio_path is None:
        print("Error: Failed to extract audio.")
        return

    # Step 3: Transcribe and translate audio
    subtitles = transcribe_and_translate(audio_path)

    # Step 4: Create the SRT file
    srt_path = os.path.join(output_dir, "subtitles.srt")
    create_srt_file(subtitles, srt_path)

    # Step 5: Embed subtitles into the video
    output_video_path = os.path.join(output_dir, "video_with_subtitles.mp4")
    embed_subtitles(video_path, srt_path, output_video_path)

    print(f"Process completed! Final video saved at: {output_video_path}")
    return output_video_path

# Example usage
if __name__ == "__main__":
    youtube_url = "https://www.youtube.com/watch?v=sLs04yn42KA"  # Replace with your YouTube URL
    output_directory = "./data"

    process_youtube_video(youtube_url, output_directory)


Downloading video from https://www.youtube.com/watch?v=sLs04yn42KA...
[youtube] Extracting URL: https://www.youtube.com/watch?v=sLs04yn42KA
[youtube] sLs04yn42KA: Downloading webpage
[youtube] sLs04yn42KA: Downloading tv player API JSON
[youtube] sLs04yn42KA: Downloading ios player API JSON
[youtube] sLs04yn42KA: Downloading player 37364e28
[youtube] sLs04yn42KA: Downloading m3u8 information
[info] sLs04yn42KA: Downloading 1 format(s): 399+251
[download] Destination: ./data/video.mp4.f399.mp4
[download] 100% of  335.96MiB in 00:00:08 at 39.67MiB/s  
[download] Destination: ./data/video.mp4.f251.webm
[download] 100% of   39.98MiB in 00:00:01 at 37.84MiB/s  
[Merger] Merging formats into "./data/video.mp4.webm"
Deleting original file ./data/video.mp4.f251.webm (pass -k to keep)
Deleting original file ./data/video.mp4.f399.mp4 (pass -k to keep)
Downloaded video to ./data/video.mp4.webm
Renamed video to ./data/video.mp4
Extracting audio from ./data/video.mp4...
MoviePy - Writing audio in .



MoviePy - Done.
Audio saved to ./data/audio.wav
Loading Whisper model...


100%|███████████████████████████████████████| 139M/139M [00:03<00:00, 36.6MiB/s]
  checkpoint = torch.load(fp, map_location=device)



Transcribing audio...
Translating transcription to English...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.



config.json:   0%|          | 0.00/1.39k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/312M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/293 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/44.0 [00:00<?, ?B/s]

source.spm:   0%|          | 0.00/842k [00:00<?, ?B/s]

target.spm:   0%|          | 0.00/813k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.72M [00:00<?, ?B/s]


Device set to use cuda:0
You seem to be using the pipelines sequentially on GPU. In order to maximize efficiency please use a dataset


Creating SRT file at ./data/subtitles.srt...
SRT file saved to ./data/subtitles.srt
Embedding subtitles into video...
Video with subtitles saved to ./data/video_with_subtitles.mp4
Process completed! Final video saved at: ./data/video_with_subtitles.mp4


# Improved Model - Run on GC

In [6]:
import os
import yt_dlp as youtube_dl
from moviepy.video.io.VideoFileClip import VideoFileClip
import whisper
import torch
from hanspell import spell_checker
from transformers import pipeline
import pysrt
import json
import subprocess
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM


# Helper: Save data to a JSON file
def save_to_file(data, file_path):
    with open(file_path, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=4)
    print(f"Saved to {file_path}")


# Helper: Load data from a JSON file
def load_from_file(file_path):
    if os.path.exists(file_path):
        with open(file_path, 'r', encoding='utf-8') as f:
            print(f"Loaded from {file_path}")
            return json.load(f)
    return None


# Step 1: Download video using yt-dlp
def download_youtube_video(url, output_path):
    print(f"Downloading video from {url}...")
    ydl_opts = {
        'format': 'bestvideo+bestaudio/best',
        'outtmpl': f'{output_path}/video.%(ext)s',  # Save the file with the original extension
    }
    with youtube_dl.YoutubeDL(ydl_opts) as ydl:
        info = ydl.extract_info(url, download=True)
        video_file = os.path.join(output_path, f"video.{info['ext']}")  # Get the original extension
        print(f"Downloaded video to {video_file}")

        # Rename to video.mp4 if not already in MP4 format
        if not video_file.endswith(".mp4"):
            renamed_file = os.path.join(output_path, "video.mp4")
            os.rename(video_file, renamed_file)
            print(f"Renamed video to {renamed_file}")
            return renamed_file
        return video_file


# Step 2: Extract audio from the video
def extract_audio_from_video(video_path, output_directory):
    print(f"Extracting audio from {video_path}...")
    audio_path = os.path.join(output_directory, "audio.wav")
    video = VideoFileClip(video_path)
    audio = video.audio
    audio.write_audiofile(audio_path)
    print(f"Audio saved to {audio_path}")
    return audio_path


# Step 3: Transcribe audio using Whisper
def transcribe_audio_with_whisper(audio_path, whisper_model="large", output_dir=""):
    transcribed_file = os.path.join(output_dir, "transcribed.json")

    # Check if transcription already exists
    cached_result = load_from_file(transcribed_file)
    if cached_result:
        return cached_result

    print("Loading Whisper model...")
    device = "cuda" if torch.cuda.is_available() else "cpu"
    print(f"Using device: {device}")
    model = whisper.load_model(whisper_model, device=device)

    print("Transcribing audio...")
    result = model.transcribe(audio_path, language="ko")
    save_to_file(result, transcribed_file)
    return result


# Step 4: Correct Korean text using Hanspell
def correct_text_with_hanspell(text):
    try:
        response = spell_checker.check(text)
        response_dict = response.as_dict()
        if 'result' in response_dict:
            return response.checked
        else:
            print(f"Warning: Hanspell response missing 'result' for text: {text}")
            return text
    except Exception as e:
        print(f"Error during Hanspell correction: {e}")
        return text


def correct_transcription_with_hanspell(transcription, output_dir=""):
    corrected_file = os.path.join(output_dir, "corrected_transcription.json")

    # Check if corrected transcription already exists
    cached_result = load_from_file(corrected_file)
    if cached_result:
        return cached_result

    print("Correcting transcription using Hanspell...")
    corrected_segments = []
    for segment in transcription['segments']:
        original_text = segment["text"]
        corrected_text = correct_text_with_hanspell(original_text)
        corrected_segments.append({
            "index": segment["id"] + 1,
            "start": segment["start"],
            "end": segment["end"],
            "text": corrected_text
        })
    save_to_file(corrected_segments, corrected_file)
    return corrected_segments


def translate_to_english(corrected_segments, output_dir=""):
    """Translate Korean text to English using Helsinki-NLP opus-mt-ko-en."""
    translated_file = os.path.join(output_dir, "translated_segments.json")

    # Check if translation already exists
    cached_result = load_from_file(translated_file)
    if cached_result:
        return cached_result

    print("Initializing translation model...")
    tokenizer = AutoTokenizer.from_pretrained("Helsinki-NLP/opus-mt-ko-en")
    model = AutoModelForSeq2SeqLM.from_pretrained("Helsinki-NLP/opus-mt-ko-en")

    print("Translating transcription to English...")
    translated_segments = []
    for segment in corrected_segments:
        inputs = tokenizer(segment["text"], return_tensors="pt", truncation=True)
        outputs = model.generate(**inputs)
        translated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)

        translated_segments.append({
            "index": segment["index"],
            "start": segment["start"],
            "end": segment["end"],
            "text": translated_text
        })

    save_to_file(translated_segments, translated_file)
    print(f"Translation completed. Saved to {translated_file}")
    return translated_segments


# Step 6: Create SRT file
def create_srt_file(subtitles, srt_path):
    print(f"Creating SRT file at {srt_path}...")
    subs = pysrt.SubRipFile()
    for sub in subtitles:
        sub_item = pysrt.SubRipItem(
            index=sub["index"],
            start=pysrt.SubRipTime(seconds=sub["start"]),
            end=pysrt.SubRipTime(seconds=sub["end"]),
            text=sub["text"]
        )
        subs.append(sub_item)
    subs.save(srt_path, encoding="utf-8")
    print(f"SRT file saved to {srt_path}")


# Step 7: Embed subtitles into the video
def embed_subtitles(video_path, srt_path, output_video_path):
    print(f"Embedding subtitles into video...")
    command = [
        "ffmpeg",
        "-i", video_path,
        "-vf", f"subtitles={srt_path}:force_style='FontName=Arial,FontSize=16'",
        output_video_path
    ]
    subprocess.run(command, check=True)
    print(f"Video with subtitles saved to {output_video_path}")


'''# Main function - from Youtube url.
def process_youtube_video(url, output_dir):
    os.makedirs(output_dir, exist_ok=True)

    # Step 1: Download the video
    video_path = download_youtube_video(url, output_dir)

    # Step 2: Extract audio
    audio_path = extract_audio_from_video(video_path, output_dir)

    # Step 3: Transcribe audio
    transcription = transcribe_audio_with_whisper(audio_path, output_dir=output_dir)

    # Step 4: Correct transcription
    corrected_segments = correct_transcription_with_hanspell(transcription, output_dir=output_dir)

    # Step 5: Translate to English
    translated_segments = translate_to_english(corrected_segments, output_dir=output_dir)

    # Step 6: Create SRT file
    srt_path = os.path.join(output_dir, "subtitles.srt")
    create_srt_file(translated_segments, srt_path)

    # Step 7: Embed subtitles
    output_video_path = os.path.join(output_dir, "video_with_subtitles.mp4")
    embed_subtitles(video_path, srt_path, output_video_path)

    print(f"Process completed! Final video saved at: {output_video_path}")
    return output_video_path


# Example usage
if __name__ == "__main__":
    youtube_url = "https://www.youtube.com/watch?v=sLs04yn42KA"
    output_directory = "./data"
    process_youtube_video(youtube_url, output_directory)
'''
    # Main function starting from transcription.json
def process_from_saved_transcription(video_path, transcription_path, output_dir):
    """Process starting from saved transcription JSON, skipping spell-check."""
    os.makedirs(output_dir, exist_ok=True)

    # Load transcription from JSON
    transcription = load_from_file(transcription_path)

    if not transcription:
        raise FileNotFoundError(f"Transcription file not found or empty: {transcription_path}")

    # Step 4: Skip Correcting Transcription
    print("Skipping spell check and using original transcription.")
    corrected_segments = [
        {
            "index": segment["id"] + 1,
            "start": segment["start"],
            "end": segment["end"],
            "text": segment["text"]
        }
        for segment in transcription["segments"]
    ]

    # Step 5: Translate to English
    translated_segments = translate_to_english(corrected_segments, output_dir=output_dir)

    # Step 6: Create SRT file
    srt_path = os.path.join(output_dir, "subtitles.srt")
    create_srt_file(translated_segments, srt_path)

    # Step 7: Embed subtitles
    output_video_path = os.path.join(output_dir, "video_with_subtitles.mp4")
    embed_subtitles(video_path, srt_path, output_video_path)

    print(f"Process completed! Final video saved at: {output_video_path}")
    return output_video_path


# Example Usage
if __name__ == "__main__":
    video_file_path = "./data/video.mp4"
    transcription_file_path = "./data/transcription.json"
    output_directory = "./012625"
    process_from_saved_transcription(video_file_path, transcription_file_path, output_directory)

Loaded from ./data/transcription.json
Skipping spell check and using original transcription.
Initializing translation model...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.



tokenizer_config.json:   0%|          | 0.00/44.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/1.39k [00:00<?, ?B/s]

source.spm:   0%|          | 0.00/842k [00:00<?, ?B/s]

target.spm:   0%|          | 0.00/813k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.72M [00:00<?, ?B/s]




pytorch_model.bin:   0%|          | 0.00/312M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/293 [00:00<?, ?B/s]

Translating transcription to English...
Saved to ./012625/translated_segments.json
Translation completed. Saved to ./012625/translated_segments.json
Creating SRT file at ./012625/subtitles.srt...
SRT file saved to ./012625/subtitles.srt
Embedding subtitles into video...


KeyboardInterrupt: 