In [2]:
import torch
from faster_whisper import WhisperModel
import whisperx
import os
import subprocess
import shlex
import unicodedata
import re
import shutil

print("CUDA:", torch.cuda.is_available())

# Configuration
supported_video_types = ["mp4", "webm", "mkv"]
supported_audio_types = ["mp3", "wav", "m4a", "opus"]
model_size = "large-v3"
device = "cuda"
batch_size = 48
compute_type = "float16"
YOUR_HF_TOKEN = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ENABLE_VIDEO_CUTTING = False  # Set this to False to disable video cutting
MERGE_THRESHOLD = 1000  # Merge segments that are less than 1 second apart (1000 milliseconds)

max_speakers = 25
min_speakers = 12

# Initialize models
whisperx_model = whisperx.load_model(model_size, device, compute_type=compute_type)

# Function to convert a string to ASCII, replacing non-ASCII characters with underscore
def to_ascii(s):
    return re.sub(r'[^a-zA-Z0-9]', '_', unicodedata.normalize('NFKD', s).encode('ascii', 'ignore').decode('ascii'))

# Function to rename file to ASCII
def rename_file_to_ascii(file_path):
    dir_name, base_name = os.path.split(file_path)
    base_name_ascii = to_ascii(os.path.splitext(base_name)[0]) + os.path.splitext(base_name)[1]
    new_file_path = os.path.join(dir_name, base_name_ascii)
    os.rename(file_path, new_file_path)
    return new_file_path

# Function to merge close segments
def merge_segments(segments, threshold):
    if not segments:
        return []
    merged_segments = [segments[0]]
    for current in segments[1:]:
        previous = merged_segments[-1]
        if current[0] - previous[1] <= threshold:
            merged_segments[-1] = (previous[0], current[1])
        else:
            merged_segments.append(current)
    return merged_segments

# Function to extract speaker segments
def extract_speaker_segments(transcript, merge_threshold):
    speaker_segments = {}
    for entry in transcript:
        if 'speaker' not in entry:
            continue
        speaker = entry["speaker"]
        start = entry['start'] * 1000  # Convert to milliseconds
        end = entry['end'] * 1000      # Convert to milliseconds
        if speaker not in speaker_segments:
            speaker_segments[speaker] = []
        speaker_segments[speaker].append((start, end))
    
    for speaker in speaker_segments:
        speaker_segments[speaker] = merge_segments(speaker_segments[speaker], merge_threshold)
    
    return speaker_segments

# Function to create segment file
def create_segment_file(speaker_segments, media_path, output_dir, speaker):
    segment_file_content = ""
    for start, end in speaker_segments:
        segment_file_content += f"file '{media_path}'\ninpoint {start / 1000}\noutpoint {end / 1000}\n"

    segment_file_path = os.path.join(output_dir, f"{speaker}_segments.txt")
    with open(segment_file_path, "w") as segment_file:
        segment_file.write(segment_file_content)
    return segment_file_path

# Function to split media using FFmpeg with GPU acceleration
def split_media_ffmpeg(media_path, speaker_segments, output_dir, is_video):
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    dir_name = os.path.basename(output_dir)
    media_type = "video" if is_video else "audio"
    for speaker, segments in speaker_segments.items():
        segment_file_path = create_segment_file(segments, media_path, output_dir, speaker)
        extension = ".mp4" if is_video else ".mp3"
        concatenated_output_path = os.path.join(output_dir, f"{speaker}_{dir_name}_combined{extension}")
        codec = "-c:v hevc_nvenc -preset fast" if is_video else "-c copy"
        concat_command = (
            f"ffmpeg -y -hwaccel cuda -f concat -safe 0 -i {shlex.quote(segment_file_path)} {codec} {shlex.quote(concatenated_output_path)}"
        )

        print(f"Running {media_type} split command: {concat_command}")
        result = subprocess.run(shlex.split(concat_command), capture_output=True, text=True)
        print(result.stdout)
        print(result.stderr)
        if result.returncode == 0:
            print(f"Exported combined {media_type} for {speaker} to {concatenated_output_path}")
        else:
            print(f"Failed to export combined {media_type} for {speaker}. FFmpeg error: {result.stderr}")

# Function to save speaker words to different files
def save_speaker_words(transcript, output_dir):
    speaker_words = {}
    for entry in transcript:
        speaker = entry.get("speaker", "unknown")
        text = entry.get("text", "")
        if speaker not in speaker_words:
            speaker_words[speaker] = []
        speaker_words[speaker].append(text)
    
    dir_name = os.path.basename(output_dir)
    for speaker, words in speaker_words.items():
        speaker_file = os.path.join(output_dir, f"{speaker}_{dir_name}_words.txt")
        with open(speaker_file, "w") as f:
            f.write(" ".join(words))
        print(f"Saved words for speaker {speaker} to {speaker_file}")

# Extract audio from video
def extract_audio_from_video(video_path, audio_output_path):
    command = f"ffmpeg -y -i {shlex.quote(video_path)} -vn -acodec libmp3lame {shlex.quote(audio_output_path)}"
    print(f"Running command: {command}")
    result = subprocess.run(shlex.split(command), capture_output=True, text=True)
    if result.returncode != 0:
        print(f"Failed to extract audio from video. FFmpeg error: {result.stderr}")
    else:
        print(f"Extracted audio to {audio_output_path}")

# Process all specified files in the current directory
files = [f for f in os.listdir('.') if f.split('.')[-1] in supported_video_types + supported_audio_types]
print("---FILES:", files)

for file in files:
    file_path = os.path.abspath(file)
    file_path_ascii = rename_file_to_ascii(file_path)
    output_dir = f"{os.path.splitext(file_path_ascii)[0]}_output"
    os.makedirs(output_dir, exist_ok=True)

    # Move the video file to the output directory
    new_media_path = os.path.join(output_dir, os.path.basename(file_path_ascii))
    shutil.move(file_path_ascii, new_media_path)

    if file.split('.')[-1] in supported_video_types:
        audio_path = os.path.join(output_dir, "extracted_audio.mp3")
        extract_audio_from_video(new_media_path, audio_path)
    else:
        audio_path = new_media_path

    # Transcription with WhisperX
    audio = whisperx.load_audio(audio_path)
    result = whisperx_model.transcribe(audio, batch_size=batch_size)
    dir_name = os.path.basename(output_dir)
    transcription_before_alignment_file = os.path.join(output_dir, f"1_transcription_before_alignment_{dir_name}.log")
    with open(transcription_before_alignment_file, "w") as f:
        f.write("Transcription before alignment:\n")
        for segment in result["segments"]:
            f.write(f"{segment}\n")

    model_a, metadata = whisperx.load_align_model(language_code=result["language"], device=device)
    result = whisperx.align(result["segments"], model_a, metadata, audio, device, return_char_alignments=False)
    transcription_after_alignment_file = os.path.join(output_dir, f"1_transcription_after_alignment_{dir_name}.log")
    with open(transcription_after_alignment_file, "w") as f:
        f.write("Transcription after alignment:\n")
        for segment in result["segments"]:
            f.write(f"{segment}\n")

    diarize_model = whisperx.DiarizationPipeline(use_auth_token=YOUR_HF_TOKEN, device=device, )
    diarize_segments = diarize_model(audio , min_speakers=min_speakers, max_speakers=max_speakers)
    diarization_file = os.path.join(output_dir, f"2_diarization_segments_{dir_name}.log")
    with open(diarization_file, "w") as f:
        f.write("Diarization segments:\n")
        for segment in diarize_segments:
            f.write(f"{segment}\n")

    result = whisperx.assign_word_speakers(diarize_segments, result)
    speaker_labels_file = os.path.join(output_dir, f"3_segments_with_speaker_ids_{dir_name}.log")
    with open(speaker_labels_file, "w") as f:
        for segment in result["segments"]:
            f.write(f"{segment}\n")

    speaker_segments = extract_speaker_segments(result["segments"], MERGE_THRESHOLD)
    
    if ENABLE_VIDEO_CUTTING and file.split('.')[-1] in supported_video_types:
        split_media_ffmpeg(new_media_path, speaker_segments, output_dir, is_video=True)
    else:
        split_media_ffmpeg(audio_path, speaker_segments, output_dir, is_video=False)
    
    save_speaker_words(result["segments"], output_dir)


CUDA: True


Lightning automatically upgraded your loaded checkpoint from v1.5.4 to v2.2.4. To apply the upgrade to your files permanently, run `python -m pytorch_lightning.utilities.upgrade_checkpoint C:\Users\cemka\.cache\torch\whisperx-vad-segmentation.bin`


No language specified, language will be first be detected for each audio file (increases inference time).
Model was trained with pyannote.audio 0.0.1, yours is 3.1.1. Bad things might happen unless you revert pyannote.audio to 0.x.
Model was trained with torch 1.10.0+cu102, yours is 2.2.1. Bad things might happen unless you revert torch to 1.x.
---FILES: ['SPEAKER_02_42_output_combined.mp3', 'SPEAKER_07_13_output_combined.mp3']
Detected language: en (1.00) in first 30s of audio...
Running audio split command: ffmpeg -y -hwaccel cuda -f concat -safe 0 -i 'z:\AI\Whisper\SPEAKER_02_42_output_combined_output\SPEAKER_10_segments.txt' -c copy 'z:\AI\Whisper\SPEAKER_02_42_output_combined_output\SPEAKER_10_SPEAKER_02_42_output_combined_output_combined.mp3'

ffmpeg version 7.0-full_build-www.gyan.dev Copyright (c) 2000-2024 the FFmpeg developers
  built with gcc 13.2.0 (Rev5, Built by MSYS2 project)
  configuration: --enable-gpl --enable-version3 --enable-static --disable-w32threads --disable-a