<a href="https://colab.research.google.com/github/KennedyMen/Quickscripts/blob/main/Whisperx_Subs.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install faster-whisper
!pip install ffmpeg-python
!pip install pysubs2
!pip install tqdm

In [None]:
!apt-get update
!apt-get install -y ffmpeg
!pip install ffsubsync
!wget https://github.com/kaegi/alass/releases/download/v2.0.0/alass-linux64 -O /usr/local/bin/alass
!chmod +x /usr/local/bin/alass



In [None]:
!ffsubsync --version
!python -c "import faster_whisper; print(faster_whisper.__version__)"
!python -c "import pysubs2; print(pysubs2.__version__)"
!python -c "import tqdm; print(tqdm.__version__)"
!ffmpeg -version | head -n 1
!alass --version


In [12]:
!alass --version

In [None]:
import os
import faster_whisper
import math
from tqdm import tqdm

# Configuration
language = "fr"
input_directory = "/content/drive/MyDrive/Colab_Notebooks/Files/Media"  # Change this to your input folder
output_directory = "/content/drive/MyDrive/Colab_Notebooks/Files/Subs"  # Change this to your output folder

# Load Whisper Model
model = faster_whisper.WhisperModel("large-v2", device="cuda")

def convert_to_hms(seconds: float) -> str:
    hours, remainder = divmod(seconds, 3600)
    minutes, seconds = divmod(remainder, 60)
    milliseconds = math.floor((seconds % 1) * 1000)
    return f"{int(hours):02}:{int(minutes):02}:{int(seconds):02},{milliseconds:03}"

def convert_seg(segment: faster_whisper.transcribe.Segment) -> str:
    return f"{convert_to_hms(segment.start)} --> {convert_to_hms(segment.end)}\n{segment.text.lstrip()}\n\n"

# Ensure output directory exists
os.makedirs(output_directory, exist_ok=True)

# Process each audio file in the input directory
for filename in os.listdir(input_directory):
    if filename.endswith(".mp3") or filename.endswith(".wav"):
        input_path = os.path.join(input_directory, filename)
        output_path = os.path.join(output_directory, f"{os.path.splitext(filename)[0]}.srt")

        print(f"Processing: {filename}")
        segments, info = model.transcribe(input_path, language=language)

        full_txt = []
        timestamps = 0.0  # for progress bar
        with tqdm(total=info.duration, unit=" audio seconds") as pbar:
            for i, segment in enumerate(segments, start=1):
                full_txt.append(f"{i}\n{convert_seg(segment)}")
                pbar.update(segment.end - timestamps)
                timestamps = segment.end
            if timestamps < info.duration:  # silence at the end of the audio
                pbar.update(info.duration - timestamps)

        with open(output_path, mode="w", encoding="UTF-8") as f:
            f.writelines(full_txt)
        print(f"Saved: {output_path}")

print(f"All Subtitles Finished and saved to: {output_path}")

In [None]:
%cd /content/drive/MyDrive/Colab_Notebooks/Files/Setup
!chmod +x sync_subs.sh
!./sync_subs.sh


In [8]:
import os
import subprocess
from pathlib import Path
import logging
import concurrent.futures

# Configuration
AUDIO_DIRECTORY = "/content/drive/MyDrive/Colab_Notebooks/Files/Media"  # Directory containing MP3 files
SUBTITLE_DIRECTORY = "/content/drive/MyDrive/Colab_Notebooks/Files/Subs"  # Directory containing SRT files
OUTPUT_DIRECTORY = "/content/drive/MyDrive/Colab_Notebooks/Files/Subs"  # Where to save synced subtitles

# Set up logging
logging.basicConfig(level=logging.INFO,
                   format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def sync_subtitle(audio_path, srt_path):
    """
    Sync subtitle file using ffsubsync first, then use that output with alass
    Returns the path to the final synchronized subtitle file
    """
    output_dir = Path(OUTPUT_DIRECTORY)
    output_dir.mkdir(parents=True, exist_ok=True)

    base_name = Path(srt_path).stem
    temp_ffsubsync_output = output_dir / f"{base_name}_ffsubsync_temp.srt"
    final_output = output_dir / f"{base_name}_synced.srt"

    try:
        # Step 1: FFSubsync
        logger.info(f"Running ffsubsync on {srt_path}")
        subprocess.run([
            "ffsubsync",
            str(audio_path),
            "-i", str(srt_path),
            "-o", str(temp_ffsubsync_output)
        ], check=True, capture_output=True)

        # Step 2: Alass (using ffsubsync output as input)
        if temp_ffsubsync_output.exists():
            logger.info(f"Running alass on ffsubsync output")
            subprocess.run([
                "alass",
                str(audio_path),
                str(temp_ffsubsync_output),
                str(final_output)
            ], check=True, capture_output=True)

            # Clean up temporary file
            temp_ffsubsync_output.unlink()

            if final_output.exists():
                logger.info(f"Successfully created synced version for {base_name}")
                return final_output

    except subprocess.CalledProcessError as e:
        logger.error(f"Error syncing {srt_path}: {e}")
        return None
    except Exception as e:
        logger.error(f"Unexpected error processing {srt_path}: {e}")
        return None

    return None

def find_matching_pairs():
    """
    Find matching audio and subtitle files across the two directories
    """
    audio_files = {f.stem: f for f in Path(AUDIO_DIRECTORY).glob("*.mp3")}
    srt_files = {f.stem: f for f in Path(SUBTITLE_DIRECTORY).glob("*.srt")}

    pairs = []
    for name in audio_files.keys() & srt_files.keys():  # Intersection of filenames
        pairs.append((audio_files[name], srt_files[name]))

    return pairs

def main():
    # Validate directories
    for directory in [AUDIO_DIRECTORY, SUBTITLE_DIRECTORY]:
        if not os.path.exists(directory):
            logger.error(f"Directory does not exist: {directory}")
            return

    pairs = find_matching_pairs()
    if not pairs:
        logger.warning("No matching audio/subtitle pairs found!")
        logger.info(f"Audio files: {list(Path(AUDIO_DIRECTORY).glob('*.mp3'))}")
        logger.info(f"Subtitle files: {list(Path(SUBTITLE_DIRECTORY).glob('*.srt'))}")
        return

    logger.info(f"Found {len(pairs)} matching pairs to process")

    # Process files in parallel
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        future_to_pair = {
            executor.submit(sync_subtitle, audio, srt): (audio, srt)
            for audio, srt in pairs
        }

        for future in concurrent.futures.as_completed(future_to_pair):
            audio, srt = future_to_pair[future]
            try:
                result = future.result()
                if result:
                    logger.info(f"Successfully processed {srt.name}")
                else:
                    logger.error(f"Failed to process {srt.name}")
            except Exception as e:
                logger.error(f"Error processing {srt.name}: {e}")

if __name__ == "__main__":
    main()

In [3]:
!ps aux | grep ffsubsync


root        3142  0.0  0.0   6484  2240 ?        S    20:46   0:00 grep ffsubsync
