## Load media files separately

In [None]:
import os

# This cell define the input file path
file_path = ["F:\dir\\video.mp4", "F:\dir\\video2.mp4"]
if type(file_path) == str:
    print('File_path is a string')
    # remove spaces to make life easier
    file_path_new = file_path.replace(' ', '_')
    file_path_new = file_path_new.replace('(', '_')
    file_path_new = file_path_new.replace(')', '_')
    file_path_new = file_path_new.replace(
        ',', '')  # remove commas to avoid issues
    os.rename(file_path, file_path_new)  # rename file to avoid issues
else:
    file_path_list = file_path.copy()
    file_path_new = ''
    for i in range(len(file_path)):
        file_path_list[i] = file_path[i].replace(' ', '_')
        file_path_list[i] = file_path_list[i].replace('(', '_')
        file_path_list[i] = file_path_list[i].replace(')', '_')
        file_path_list[i] = file_path_list[i].replace(
            ',', '')  # remove commas to avoid issues
        # rename file to avoid issues
        os.rename(file_path[i], file_path_list[i])
    audio_file = ",".join(file_path_list)
print(audio_file)

## Load media directory

In [None]:
import os

def get_m4v_file_paths(directory):
    for filename in os.listdir(directory):
        # Construct the new filename by replacing spaces, parentheses, and commas
        new_filename = filename.replace(" ", "_").replace("(", "_").replace(")", "_").replace(",", "_").replace(".", "_")
        new_filename = filename.replace("_m4v" , ".m4v").replace("_srt" , ".srt")
        # Get the full paths for the old and new filenames
        old_file = os.path.join(directory, filename)
        new_file = os.path.join(directory, new_filename)
        # Rename the file
        if old_file != new_file:
            os.rename(old_file, new_file)
    # List all .m4v files in the specified directory with full paths
    files = [os.path.join(directory, f) for f in os.listdir(directory) if f.endswith(
        '.m4v') and os.path.isfile(os.path.join(directory, f))]
    # Join the file paths into a single string separated by commas
    file_paths = ",".join(files)
    return file_paths

# This cell define the input file path
audio_file = get_m4v_file_paths("E:\dir")
print(audio_file)

## Start transcribing

In [None]:
import os, subprocess, sys
from pathlib import Path
import whisper
from whisper.utils import format_timestamp, get_writer, WriteTXT
import numpy as np
import torch
import math
from openai import OpenAI
from typing import TextIO


# select task
task = "translate" #@param ["Transcribe", "Translate to English"]

# set output formats: https://github.com/openai/whisper/blob/v20231117/whisper/utils.py#L283
output_formats = "srt" #@param ["txt,vtt,srt,tsv,json", "txt,vtt,srt", "txt,vtt", "txt,srt", "txt", "vtt", "srt", "tsv", "json"] {allow-input: true}
output_formats = output_formats.split(',')

# set model

use_model = "medium" #@param ["tiny", "base", "small", "medium", "large-v1", "large-v2", "turbo"]

# select language

language = "German" #@param ["Auto-Detect", "Afrikaans", "Albanian", "Amharic", "Arabic", "Armenian", "Assamese", "Azerbaijani", "Bashkir", "Basque", "Belarusian", "Bengali", "Bosnian", "Breton", "Bulgarian", "Burmese", "Castilian", "Catalan", "Chinese", "Croatian", "Czech", "Danish", "Dutch", "English", "Estonian", "Faroese", "Finnish", "Flemish", "French", "Galician", "Georgian", "German", "Greek", "Gujarati", "Haitian", "Haitian Creole", "Hausa", "Hawaiian", "Hebrew", "Hindi", "Hungarian", "Icelandic", "Indonesian", "Italian", "Japanese", "Javanese", "Kannada", "Kazakh", "Khmer", "Korean", "Lao", "Latin", "Latvian", "Letzeburgesch", "Lingala", "Lithuanian", "Luxembourgish", "Macedonian", "Malagasy", "Malay", "Malayalam", "Maltese", "Maori", "Marathi", "Moldavian", "Moldovan", "Mongolian", "Myanmar", "Nepali", "Norwegian", "Nynorsk", "Occitan", "Panjabi", "Pashto", "Persian", "Polish", "Portuguese", "Punjabi", "Pushto", "Romanian", "Russian", "Sanskrit", "Serbian", "Shona", "Sindhi", "Sinhala", "Sinhalese", "Slovak", "Slovenian", "Somali", "Spanish", "Sundanese", "Swahili", "Swedish", "Tagalog", "Tajik", "Tamil", "Tatar", "Telugu", "Thai", "Tibetan", "Turkish", "Turkmen", "Ukrainian", "Urdu", "Uzbek", "Valencian", "Vietnamese", "Welsh", "Yiddish", "Yoruba"]

# other parameters

prompt = "" #@param {type:"string"}

coherence_preference = "Less repetitions, but may have less coherence" #@param ["More coherence, but may repeat text", "Less repetitions, but may have less coherence"]

api_key = '' #@param {type:"string"}

# select audio file
# audio_file = file_path_new #@param {type:"string"}
audio_files = list(map(lambda audio_path: audio_path.strip(), audio_file.split(',')))

for audio_path in audio_files:
  if not os.path.isfile(audio_path):
    raise FileNotFoundError(audio_path)

# write result to file
class WriteText(WriteTXT):

  def write_result(self, result: dict, file: TextIO, **kwargs):
    print(result['text'], file=file, flush=True)
# write result to file
def write_result(result, output_format, output_file_name):
  output_format = output_format.strip()

  # start captions in non-zero timestamp (some media players does not detect the first caption)
  fix_vtt = output_format == 'vtt' and result['segments'] and result['segments'][0].get('start') == 0
  
  if fix_vtt:
    result['segments'][0]['start'] += 1/1000 # +1ms

  # write result in the desired format
  writer = WriteText(output_dir) if output_format == 'txt' else get_writer(output_format, output_dir)
  writer(result, output_file_name)

  if fix_vtt:
    result['segments'][0]['start'] = 0 # reset change

  output_file_path = os.path.join(output_dir, f"{output_file_name}.{output_format}")
  print(output_file_path)  
  
# detect device

if api_key:
  print("Using API")

  from pydub import AudioSegment
  from pydub.silence import split_on_silence
else:
  DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

  print(f"Using {'GPU' if DEVICE == 'cuda' else 'CPU ⚠️'}")

  # https://medium.com/analytics-vidhya/the-google-colab-system-specification-check-69d159597417
  if DEVICE == "cuda":
    !nvidia-smi -L
  else:
    if sys.platform == 'linux':
      !lscpu | grep "Model name" | awk '{$1=$1};1'
    
    print("Not using GPU can result in a very slow execution")
    print("Ensure Hardware accelerator by GPU is enabled in Google Colab: Runtime > Change runtime type")

    if use_model not in ['tiny', 'base', 'small']:
      print("You may also want to try a smaller model (tiny, base, small)")

# display language

WHISPER_LANGUAGES = [k.title() for k in whisper.tokenizer.TO_LANGUAGE_CODE.keys()]

if language == "Auto-Detect":
  language = "detect"

if language and language != "detect" and language not in WHISPER_LANGUAGES:
  print(f"\nLanguage '{language}' is invalid")
  language = "detect"

if language and language != "detect":
  print(f"\nLanguage: {language}")

# load model

if api_key:
  print()
else:
  MODELS_WITH_ENGLISH_VERSION = ["tiny", "base", "small", "medium"]

  if language == "English" and use_model in MODELS_WITH_ENGLISH_VERSION:
    use_model += ".en"

  print(f"\nLoading {use_model} model... {os.path.expanduser(f'~/.cache/whisper/{use_model}.pt')}")

  model = whisper.load_model(use_model, device=DEVICE)

  print(
      f"Model {use_model} is {'multilingual' if model.is_multilingual else 'English-only'} "
      f"and has {sum(np.prod(p.shape) for p in model.parameters()):,d} parameters.\n"
  )

# set options

## https://github.com/openai/whisper/blob/v20231117/whisper/transcribe.py#L37
## https://github.com/openai/whisper/blob/v20231117/whisper/decoding.py#L81
options = {
    'task': task,
    'verbose': True,
    'fp16': True,
    'best_of': 5,
    'beam_size': 5,
    'patience': None,
    'length_penalty': None,
    'suppress_tokens': '-1',
    'temperature': (0.0, 0.2, 0.4, 0.6, 0.8, 1.0), # float or tuple
    'condition_on_previous_text': False,
    'initial_prompt': prompt or None,
    'word_timestamps': False,
}

if api_key:
  api_client = OpenAI(api_key=api_key)

  api_supported_formats = ['mp3', 'mp4', 'mpeg', 'mpga', 'm4a', 'wav', 'webm']
  api_max_bytes = 25 * 1024 * 1024 # 25 MB

  api_transcribe = api_client.audio.transcriptions if task == 'transcribe' else api_client.audio.translations
  api_transcribe = api_transcribe.create
  
  api_model = 'whisper-1' # large-v2

  # https://platform.openai.com/docs/api-reference/audio?lang=python
  api_options = {
    'response_format': 'verbose_json',
  }

  if prompt:
    api_options['prompt'] = prompt
  
  api_temperature = options['temperature'][0] if isinstance(options['temperature'], (tuple, list)) else options['temperature']
  
  if isinstance(api_temperature, (float, int)):
    api_options['temperature'] = api_temperature
  else:
    raise ValueError("Invalid temperature type, it must be a float or a tuple of floats")
elif DEVICE == 'cpu':
  options['fp16'] = False
  torch.set_num_threads(os.cpu_count())

# execute task
# !whisper "{audio_file}" --task {task} --model {use_model} --output_dir {output_dir} --device {DEVICE} --verbose {options['verbose']}

if task == "translate":
  print("-- TRANSLATE TO ENGLISH --")
else:
  print("-- TRANSCRIPTION --")

results = {} # audio_path to result

for audio_path in audio_files:
  print(f"\nProcessing: {audio_path}\n")

  # detect language
  detect_language = not language or language == "detect"

  if not detect_language:
    options['language'] = language
    source_language_code = whisper.tokenizer.TO_LANGUAGE_CODE.get(language.lower())
  elif not api_key:
    # load audio and pad/trim it to fit 30 seconds
    audio = whisper.load_audio(audio_path)
    audio = whisper.pad_or_trim(audio)

    # make log-Mel spectrogram and move to the same device as the model
    mel = whisper.log_mel_spectrogram(audio, n_mels=model.dims.n_mels).to(model.device)

    # detect the spoken language
    _, probs = model.detect_language(mel)

    source_language_code = max(probs, key=probs.get)
    options['language'] = whisper.tokenizer.LANGUAGES[source_language_code].title()
    
    print(f"Detected language: {options['language']}\n")

  # transcribe
  if api_key:
    # API
    if task == "transcribe" and not detect_language:
      api_options['language'] = source_language_code
    
    source_audio_name_path, source_audio_ext = os.path.splitext(audio_path)
    source_audio_ext = source_audio_ext[1:]

    if source_audio_ext in api_supported_formats:
      api_audio_path = audio_path
      api_audio_ext = source_audio_ext
    else:
      ## convert audio file to a supported format
      if options['verbose']:
        print(f"API supported formats: {','.join(api_supported_formats)}")
        print(f"Converting {source_audio_ext} audio to a supported format...")

      api_audio_ext = 'mp3'

      api_audio_path = f'{source_audio_name_path}.{api_audio_ext}'

      subprocess.run(['ffmpeg', '-i', audio_path, api_audio_path], check=True, capture_output=True)

      if options['verbose']:
        print(api_audio_path, end='\n\n')

    ## split audio file in chunks
    api_audio_chunks = []

    audio_bytes = os.path.getsize(api_audio_path)

    if audio_bytes >= api_max_bytes:
      if options['verbose']:
        print(f"Audio exceeds API maximum allowed file size.\nSplitting audio in chunks...")
      
      audio_segment_file = AudioSegment.from_file(api_audio_path, api_audio_ext)

      min_chunks = math.ceil(audio_bytes / (api_max_bytes / 2))

      # print(f"Min chunks: {min_chunks}")

      max_chunk_milliseconds = int(len(audio_segment_file) // min_chunks)

      # print(f"Max chunk milliseconds: {max_chunk_milliseconds}")

      def add_chunk(api_audio_chunk):
        api_audio_chunk_path = f"{source_audio_name_path}_{len(api_audio_chunks) + 1}.{api_audio_ext}"
        api_audio_chunk.export(api_audio_chunk_path, format=api_audio_ext)
        api_audio_chunks.append(api_audio_chunk_path)
      
      def raw_split(big_chunk):
        subchunks = math.ceil(len(big_chunk) / max_chunk_milliseconds)

        for subchunk_i in range(subchunks):
          chunk_start = max_chunk_milliseconds * subchunk_i
          chunk_end = min(max_chunk_milliseconds * (subchunk_i + 1), len(big_chunk))
          add_chunk(big_chunk[chunk_start:chunk_end])
      
      non_silent_chunks = split_on_silence(audio_segment_file,
                                           seek_step=5, # ms
                                           min_silence_len=1250, # ms
                                           silence_thresh=-25, # dB
                                           keep_silence=True) # needed to aggregate timestamps

      # print(f"Non silent chunks: {len(non_silent_chunks)}")
      
      current_chunk = non_silent_chunks[0] if non_silent_chunks else audio_segment_file

      for next_chunk in non_silent_chunks[1:]:
        if len(current_chunk) > max_chunk_milliseconds:
          raw_split(current_chunk)
          current_chunk = next_chunk
        elif len(current_chunk) + len(next_chunk) <= max_chunk_milliseconds:
          current_chunk += next_chunk
        else:
          add_chunk(current_chunk)
          current_chunk = next_chunk
      
      if len(current_chunk) > max_chunk_milliseconds:
        raw_split(current_chunk)
      else:
        add_chunk(current_chunk)
      
      if options['verbose']:
        print(f'Total chunks: {len(api_audio_chunks)}\n')
    else:
      api_audio_chunks.append(api_audio_path)
    
    ## process chunks
    result = None

    for api_audio_chunk_path in api_audio_chunks:
      ## API request
      with open(api_audio_chunk_path, 'rb') as api_audio_file:
        api_result = api_transcribe(model=api_model, file=api_audio_file, **api_options)
        api_result = api_result.model_dump() # to dict
      
      api_segments = api_result['segments']
      
      if result:
        ## update timestamps
        last_segment_timestamp = result['segments'][-1]['end'] if result['segments'] else 0

        for segment in api_segments:
          segment['start'] += last_segment_timestamp
          segment['end'] += last_segment_timestamp

        ## append new segments
        result['segments'].extend(api_segments)
        
        if 'duration' in result:
          result['duration'] += api_result.get('duration', 0)
      else:
        ## first request
        result = api_result
        
        if detect_language:
          print(f"Detected language: {result['language'].title()}\n")
    
      ## display segments
      if options['verbose']:
        for segment in api_segments:
          print(f"[{format_timestamp(segment['start'])} --> {format_timestamp(segment['end'])}] {segment['text']}")
  else:
    # Open-Source
    result = whisper.transcribe(model, audio_path, **options)
    output_file_name = Path(audio_path).stem.replace('.', '_') # remove spaces to make life easier
    output_dir = os.path.dirname(audio_path)
    for output_format in output_formats:
      write_result(result, output_format, output_file_name)


  # fix results formatting
  for segment in result['segments']:
    segment['text'] = segment['text'].strip()
  
  result['text'] = '\n'.join(map(lambda segment: segment['text'], result['segments']))

  # set results for this audio file
  results[audio_path] = result
  #os.rename(file_path_new, file_path) # restore original file name

## Embed .srt to video files

In [None]:
from srt_embedder import batch_embed

# embed srt in video
batch_embed.batch_embed_srt("E:\dir")