## Inputs and params

In [2]:
# # Input params
# Provide either local file path or YouTube URL
# INPUT_VIDEO_FILE="record.mp4" 
# INPUT_YOUTUBE_URL="https://www.youtube.com/watch?v=4V2C0X4qqLY"

# # Tokens, etc
# Hugging Face token: https://huggingface.co/docs/hub/security-tokens#user-access-tokens 
HUGGINGFACE_AUTH_TOKEN=""

# # Model params

# Model size: tiny, base, small, medium, large, large-v2
# Can also have ".en" suffix for English-specific model; e.g. base.en
WHISPER_MODEL = "base.en"

# Merge tracks with same label and separated by less than COLLAR seconds.
COLLAR_IN_SECS = 0.5

# Output files
OUTPUT_TRANSCRIPTION="output.sub"

# Temporary files used in the process
TEMP_VIDEO_FILE="temp/input.mp4"
TEMP_AUDIO_FILE="temp/input.wav"


## Helpers

In [7]:
from transformers import WhisperProcessor, WhisperForConditionalGeneration, pipeline
from pyannote.audio import Pipeline
from pytube import YouTube
from pydub import AudioSegment
import datetime
import moviepy.editor as mp
import os, shutil

def ensure_dir(path):
    """Make sure director from the given path exists"""
    
    dir = os.path.dirname(path)
    if dir:
        os.makedirs(dir, exist_ok=True)

def fetch_youtube_audio(url):
    """Fetch WAV audio from given youtube URL"""
    
    print("Fetching audio from Youtube URL:", url)

    ensure_dir(TEMP_VIDEO_FILE)
    ensure_dir(TEMP_AUDIO_FILE)

    video_stream = YouTube(url).streams.first()
    video_stream.download(filename=TEMP_VIDEO_FILE)

    video = mp.VideoFileClip(TEMP_VIDEO_FILE)
    video.audio.write_audiofile(TEMP_AUDIO_FILE, codec='pcm_s16le')

    print("Done fetching audio form YouTube to file: ", TEMP_AUDIO_FILE)
    return TEMP_AUDIO_FILE


def extract_wav_from_video(video_file):
    """Extract WAV audio from given video file"""
    
    print("Extracting audio from video file", video_file)

    ensure_dir(TEMP_AUDIO_FILE)
    video = mp.VideoFileClip(video_file)
    video.audio.write_audiofile(TEMP_AUDIO_FILE, codec='pcm_s16le')

    print("Done fetching audio form YouTube to file: ", TEMP_AUDIO_FILE)
    return TEMP_AUDIO_FILE


TIMESTAMP_FORMAT = "%H:%M:%S.%f"
base_time = datetime.datetime(1970, 1, 1)

def format_timestamp(seconds):
    """Format timestamp in SubViewer format: https://wiki.videolan.org/SubViewer/"""
    
    date = base_time + datetime.timedelta(seconds=seconds)
    return date.strftime(TIMESTAMP_FORMAT)[:-4]

def extract_audio_track(input_file, output_file, start_time, end_time):
    """Extract and save part of given audio file"""
    
    # Load the WAV file
    audio = AudioSegment.from_wav(input_file)

    # Calculate the start and end positions in milliseconds
    start_ms = start_time * 1000
    end_ms = end_time * 1000

    # Extract the desired segment
    track = audio[start_ms:end_ms]

    track.export(output_file, format="mp3")

def generate_speaker_diarization(audio_file):
    """Generate speaker diarization for given audio file"""
    
    print("Generating speaker diarization... audio_file=", audio_file)

    pipeline = Pipeline.from_pretrained(
      "pyannote/speaker-diarization-3.0",
      use_auth_token=HUGGINGFACE_AUTH_TOKEN)
    
    result = pipeline(audio_file)
    
    print("Done generating spearer diarization")
    return result

def generate_transcription(diarization):
    """Generate transcription from given diarization object"""
    
    print("Generating transcription model: ", WHISPER_MODEL)

    pipe = pipeline(
        "automatic-speech-recognition",
        model=f"openai/whisper-{WHISPER_MODEL}",
        chunk_length_s=30,
        device="cuda"
    )

    # Create directory for tracks
    shutil.rmtree("output-tracks", ignore_errors=True)
    os.mkdir("output-tracks")

    result = []
    for turn, _, speaker in diarization.support(COLLAR_IN_SECS).itertracks(yield_label=True):
        track_file = f"output-tracks/{round(turn.start, 2)}-{speaker}.mp3"
        track_path = os.path.join(os.curdir, track_file)
        extract_audio_track(TEMP_AUDIO_FILE, track_file, turn.start, turn.end, )
        
        track_data = None
        with open(track_file, "rb") as audio_content:
            track_data = audio_content.read()

        output = pipe(track_data, batch_size=8, return_timestamps=False)
        text = output['text']
            
        result.append({
            'start': turn.start,
            'end': turn.end,
            'speaker': speaker, 
            'text': text.strip(), 
            'track_path': track_path
        })

    print("Done generating transcripion. Parts: ", len(result))
    return result

def format_transcription(transcription):
    """Format transcription in SubViewer format: https://wiki.videolan.org/SubViewer/"""
    
    result = ""
    for t in transcription:
        result += f"{format_timestamp(t['start'])},{format_timestamp(t['end'])}\n{t['speaker']}: {t['text']}\n\n"
    return result

def save_transcription(transcription, output_file):
    """Save trainscription in SubViewer format to file."""
    
    print("Saving transcripion to file...", output_file)
    
    f = open(output_file, "w")
    f.write(format_transcription(transcription))
    f.close()

    print("Done saving transcripion")

## Main program

### Fetch video / Extract Audio

In [8]:
%%time
if INPUT_VIDEO_FILE:
    audio_file = extract_wav_from_video(INPUT_VIDEO_FILE)
elif INPUT_YOUTUBE_URL:
    audio_file = fetch_youtube_audio(INPUT_YOUTUBE_URL)
else:
    print("Set INPUT_VIDEO_FILE or INPUT_YOUTUBE_URL")


Extracting audio from video file record.mp4
MoviePy - Writing audio in temp/input.wav


                                                                      

MoviePy - Done.
Done fetching audio form YouTube to file:  temp/input.wav
CPU times: total: 93.8 ms
Wall time: 765 ms




### Generate diarizations

In [9]:
%%time
diarization = generate_speaker_diarization(audio_file)

Generating speaker diarization... audio_file= temp/input.wav
Done generating spearer diarization
CPU times: total: 24.5 s
Wall time: 53.7 s


In [11]:
%%time
transcription = generate_transcription(diarization)
save_transcription(transcription, OUTPUT_TRANSCRIPTION)

print(format_transcription(transcription))

Generating transcription model:  base.en


FileNotFoundError: [WinError 2] The system cannot find the file specified

# 