In [None]:
# install the necessary packages

! pip install --upgrade pip
! pip install --upgrade transformers accelerate


In [None]:
# setup model

import torch
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline

device = "cuda:0" if torch.cuda.is_available() else "cpu"
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32

model_id = "openai/whisper-large-v3-turbo"

model = AutoModelForSpeechSeq2Seq.from_pretrained(
    model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True
)
model.to(device)

processor = AutoProcessor.from_pretrained(model_id)

pipe = pipeline(
    "automatic-speech-recognition",
    model=model,
    tokenizer=processor.tokenizer,
    feature_extractor=processor.feature_extractor,
    torch_dtype=torch_dtype,
    device=device,
)


In [None]:
# convert all videos in the ./videos directory to mp3

import os
import subprocess
from pathlib import Path

# convert all videos in the ./videos directory to mp3
# and save them in the ./audios directory
# Create output directory if it doesn't exist
os.makedirs('./audios', exist_ok=True)

# Get all video files from videos directory
video_path = Path('./videos')
video_files = [f for f in video_path.glob('*') if f.is_file()]

# Convert each video to mp3
for video_file in video_files:
    output_file = f'./audios/{video_file.stem}.mp3'
    # Check if the output file already exists
    if os.path.exists(output_file):
        print(f'Skipping {video_file}, already converted.')
        continue
    subprocess.run([
        'ffmpeg',
        '-i', str(video_file),
        '-q:a', '0',
        '-map', 'a',
        output_file
    ])
    print(f'Converted {video_file} to {output_file}')


In [None]:
# generation arguments for the model 
generate_kwargs = {
    "num_beams": 1,
    "condition_on_prev_tokens": False,
    "compression_ratio_threshold": 1.35,  # zlib compression ratio threshold (in token space)
    "temperature": (0.0, 0.2, 0.4, 0.6, 0.8, 1.0),
    "logprob_threshold": -1.0,
}

In [None]:
def fix_openai_timestamps_and_return_chunks(res):
    # As a known issue, the timestamps of Whisper reset every 30 seconds (segment length).
    # we count the amount of timestamp inversions, then we add 30 seconds for each inversion
    # to the timestamp of the current chunk.
    last_timestamp = 0
    monotonic_violation_counter = 0 # we assume something is said at least once every 30 seconds, else this breaks
    chunks = []
    for chunk in res["chunks"]:
        begin = chunk["timestamp"][0]
        if begin == None:
            begin = last_timestamp
        end = chunk["timestamp"][1]
        if end == None:
            end = begin
        begin += monotonic_violation_counter * 30
        end += monotonic_violation_counter * 30
        
        if end < begin:
            monotonic_violation_counter += 1
            end += 30
        elif begin < last_timestamp:
            monotonic_violation_counter += 1
            begin += 30
            end += 30
        last_timestamp = end
        chunk["timestamp"] = [begin, end]
        chunks.append(chunk)
    return chunks
    

import json
# transcribe each audio in ./audios directory using the pipeline
# Create output directory for transcriptions
os.makedirs('./transcriptions', exist_ok=True)

# Get all audio files from audios directory
audio_path = Path('./audios')
audio_files = [f for f in audio_path.glob('*.mp3') if f.is_file()]

# Process each audio file
for audio_file in audio_files:
    output_json = f'./transcriptions/{audio_file.stem}.json'
    
    # Skip if already processed
    if os.path.exists(output_json):
        print(f'Skipping {audio_file}, already transcribed.')
        continue
    
    print(f'Transcribing {audio_file}...')
    
    # Run transcription with the pipeline
    result = pipe(str(audio_file), return_timestamps=True, generate_kwargs=generate_kwargs)
    
    # Save the transcription result to a JSON file
    with open(output_json, 'w') as f:
        json.dump(fix_openai_timestamps_and_return_chunks(result), f, indent=4)
    
    print(f'Saved transcription to {output_json}')


In [None]:
# utility: delete all existing transcriptions, useful for testing

import shutil

# Delete the transcriptions directory and its contents
shutil.rmtree('./transcriptions', ignore_errors=True)