<a href="https://colab.research.google.com/github/SarthakAgase/AI-Speech-Emotion-Detection/blob/main/Open_AI_Whisper_Speech_To_Text_(Multilingual).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import subprocess

from sys import platform as sys_platform

status, ffmpeg_version = subprocess.getstatusoutput("ffmpeg -version")

if status != 0:
  from platform import platform

  if sys_platform == 'linux' and 'ubuntu' in platform().lower():
    !apt install ffmpeg
  else:
    print("Install ffmpeg: https://ffmpeg.org/download.html")
else:
  print(ffmpeg_version.split('\n')[0])

  NO_ROOT_WARNING = '|& grep -v \"WARNING: Running pip as the \'root\' user"'

  !pip install --no-warn-script-location --user --upgrade pip {NO_ROOT_WARNING}
  !pip install --root-user-action=ignore git+https://github.com/openai/whisper.git@v20230314 numpy scipy torch deepl pydub openai==0.27.6

ffmpeg version 4.4.2-0ubuntu0.22.04.1 Copyright (c) 2000-2021 the FFmpeg developers
Collecting pip
  Downloading pip-23.3.2-py3-none-any.whl (2.1 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 2.1/2.1 MB 12.7 MB/s eta 0:00:00
Installing collected packages: pip
Successfully installed pip-23.3.2
Collecting git+https://github.com/openai/whisper.git@v20230314
  Cloning https://github.com/openai/whisper.git (to revision v20230314) to /tmp/pip-req-build-swg_x8nj
  Running command git clone --filter=blob:none --quiet https://github.com/openai/whisper.git /tmp/pip-req-build-swg_x8nj
  Running command git checkout -q 6dea21fd7f7253bfe450f1e2512a0fe47ee2d258
  Resolved https://github.com/openai/whisper.git to commit 6dea21fd7f7253bfe450f1e2512a0fe47ee2d258
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting deepl
  Downloading deepl-1.16.1-py3-none-any.whl.metadata

In [None]:
import os, subprocess

import whisper
from whisper.utils import format_timestamp, get_writer, WriteTXT

import numpy as np

try:
  import tensorflow
except ImportError:
  pass

import torch

import openai

import math

task = "translate"

audio_file = "/content/MarathiAudio.mp3"

audio_files = list(map(lambda audio_path: audio_path.strip(), audio_file.split(',')))

for audio_path in audio_files:
  if not os.path.isfile(audio_path):
    raise FileNotFoundError(audio_path)

use_model = "large-v2"

language = "Auto-Detect"

prompt = ""

coherence_preference = "More coherence, but may repeat text"

api_key = ''


if api_key:
  print("Using API")

  from pydub import AudioSegment
  from pydub.silence import split_on_silence
else:
  DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

  print(f"Using {'GPU' if DEVICE == 'cuda' else 'CPU ⚠️'}")

  # https://medium.com/analytics-vidhya/the-google-colab-system-specification-check-69d159597417
  if DEVICE == "cuda":
    !nvidia-smi -L
  else:
    if sys_platform == 'linux':
      !lscpu | grep "Model name" | awk '{$1=$1};1'

    print("Not using GPU can result in a very slow execution")
    print("Ensure Hardware accelerator by GPU is enabled in Google Colab: Runtime > Change runtime type")

    if use_model not in ['tiny', 'base', 'small']:
      print("You may also want to try a smaller model (tiny, base, small)")

# display language

WHISPER_LANGUAGES = [k.title() for k in whisper.tokenizer.TO_LANGUAGE_CODE.keys()]

if language == "Auto-Detect":
  language = "detect"

if language and language != "detect" and language not in WHISPER_LANGUAGES:
  print(f"\nLanguage '{language}' is invalid")
  language = "detect"

if language and language != "detect":
  print(f"\nLanguage: {language}")

# load model

if api_key:
  print()
else:
  MODELS_WITH_ENGLISH_VERSION = ["tiny", "base", "small", "medium"]

  if language == "English" and use_model in MODELS_WITH_ENGLISH_VERSION:
    use_model += ".en"

  print(f"\nLoading {use_model} model... {os.path.expanduser(f'~/.cache/whisper/{use_model}.pt')}")

  model = whisper.load_model(use_model, device=DEVICE)

  print(
      f"Model {use_model} is {'multilingual' if model.is_multilingual else 'English-only'} "
      f"and has {sum(np.prod(p.shape) for p in model.parameters()):,d} parameters.\n"
  )

# set options

## https://github.com/openai/whisper/blob/v20230308/whisper/transcribe.py#L36
## https://github.com/openai/whisper/blob/v20230308/whisper/decoding.py#L79
options = {
    'task': task,
    'verbose': True,
    'fp16': True,
    'best_of': 5,
    'beam_size': 5,
    'patience': None,
    'length_penalty': None,
    'suppress_tokens': '-1',
    'temperature': (0.0, 0.2, 0.4, 0.6, 0.8, 1.0),
    'condition_on_previous_text': coherence_preference == "More coherence, but may repeat text",
    'initial_prompt': prompt or None,
    'word_timestamps': False,
}

if api_key:
  openai.api_key = api_key

  api_supported_formats = ['mp3', 'mp4', 'mpeg', 'mpga', 'm4a', 'wav', 'webm']
  api_max_bytes = 25 * 1024 * 1024 # 25 MB

  api_transcribe = getattr(openai.Audio, task)
  api_model = 'whisper-1' # large-v2

  # https://platform.openai.com/docs/api-reference/audio?lang=python
  api_options = {
    'response_format': 'verbose_json',
  }

  if prompt:
    api_options['prompt'] = prompt

  api_temperature = options['temperature'][0] if isinstance(options['temperature'], (tuple, list)) else options['temperature']

  if isinstance(api_temperature, (float, int)):
    api_options['temperature'] = api_temperature
  else:
    raise ValueError("Invalid temperature type, it must be a float or a tuple of floats")
elif DEVICE == 'cpu':
  options['fp16'] = False
  torch.set_num_threads(os.cpu_count())


if task == "translate":
  print("-- TRANSLATE TO ENGLISH --")
else:
  print("-- TRANSCRIPTION --")

results = {}

for audio_path in audio_files:
  print(f"\nProcessing: {audio_path}\n")

  detect_language = not language or language == "detect"

  if not detect_language:
    options['language'] = language
    source_language_code = whisper.tokenizer.TO_LANGUAGE_CODE.get(language.lower())
  elif not api_key:
    # load audio and pad/trim it to fit 30 seconds
    audio = whisper.load_audio(audio_path)
    audio = whisper.pad_or_trim(audio)

    # make log-Mel spectrogram and move to the same device as the model
    mel = whisper.log_mel_spectrogram(audio).to(model.device)

    # detect the spoken language
    _, probs = model.detect_language(mel)

    source_language_code = max(probs, key=probs.get)
    options['language'] = whisper.tokenizer.LANGUAGES[source_language_code].title()

    print(f"Detected language: {options['language']}\n")

  # transcribe
  if api_key:
    # API
    if task == "transcribe" and not detect_language:
      api_options['language'] = source_language_code

    source_audio_name_path, source_audio_ext = os.path.splitext(audio_path)
    source_audio_ext = source_audio_ext[1:]

    if source_audio_ext in api_supported_formats:
      api_audio_path = audio_path
      api_audio_ext = source_audio_ext
    else:
      ## convert audio file to a supported format
      if options['verbose']:
        print(f"API supported formats: {','.join(api_supported_formats)}")
        print(f"Converting {source_audio_ext} audio to a supported format...")

      api_audio_ext = 'mp3'

      api_audio_path = f'{source_audio_name_path}.{api_audio_ext}'

      subprocess.run(['ffmpeg', '-i', audio_path, api_audio_path], check=True, capture_output=True)

      if options['verbose']:
        print(api_audio_path, end='\n\n')

    ## split audio file in chunks
    api_audio_chunks = []

    audio_bytes = os.path.getsize(api_audio_path)

    if audio_bytes >= api_max_bytes:
      if options['verbose']:
        print(f"Audio exceeds API maximum allowed file size.\nSplitting audio in chunks...")

      audio_segment_file = AudioSegment.from_file(api_audio_path, api_audio_ext)

      min_chunks = math.ceil(audio_bytes / (api_max_bytes / 2))

      # print(f"Min chunks: {min_chunks}")

      max_chunk_milliseconds = int(len(audio_segment_file) // min_chunks)

      # print(f"Max chunk milliseconds: {max_chunk_milliseconds}")

      def add_chunk(api_audio_chunk):
        api_audio_chunk_path = f"{source_audio_name_path}_{len(api_audio_chunks) + 1}.{api_audio_ext}"
        api_audio_chunk.export(api_audio_chunk_path, format=api_audio_ext)
        api_audio_chunks.append(api_audio_chunk_path)

      def raw_split(big_chunk):
        subchunks = math.ceil(len(big_chunk) / max_chunk_milliseconds)

        for subchunk_i in range(subchunks):
          chunk_start = max_chunk_milliseconds * subchunk_i
          chunk_end = min(max_chunk_milliseconds * (subchunk_i + 1), len(big_chunk))
          add_chunk(big_chunk[chunk_start:chunk_end])

      non_silent_chunks = split_on_silence(audio_segment_file,
                                           seek_step=5, # ms
                                           min_silence_len=1250, # ms
                                           silence_thresh=-25, # dB
                                           keep_silence=True) # needed to aggregate timestamps

      # print(f"Non silent chunks: {len(non_silent_chunks)}")

      current_chunk = non_silent_chunks[0] if non_silent_chunks else audio_segment_file

      for next_chunk in non_silent_chunks[1:]:
        if len(current_chunk) > max_chunk_milliseconds:
          raw_split(current_chunk)
          current_chunk = next_chunk
        elif len(current_chunk) + len(next_chunk) <= max_chunk_milliseconds:
          current_chunk += next_chunk
        else:
          add_chunk(current_chunk)
          current_chunk = next_chunk

      if len(current_chunk) > max_chunk_milliseconds:
        raw_split(current_chunk)
      else:
        add_chunk(current_chunk)

      if options['verbose']:
        print(f'Total chunks: {len(api_audio_chunks)}\n')
    else:
      api_audio_chunks.append(api_audio_path)

    ## process chunks
    result = None

    for api_audio_chunk_path in api_audio_chunks:
      ## API request
      with open(api_audio_chunk_path, 'rb') as api_audio_file:
        api_result = api_transcribe(api_model, api_audio_file, **api_options)

      api_segments = api_result['segments']

      if result:
        ## update timestamps
        last_segment_timestamp = result['segments'][-1]['end'] if result['segments'] else 0

        for segment in api_segments:
          segment['start'] += last_segment_timestamp
          segment['end'] += last_segment_timestamp

        ## append new segments
        result['segments'].extend(api_segments)

        if 'duration' in result:
          result['duration'] += api_result.get('duration', 0)
      else:
        ## first request
        result = api_result

        if detect_language:
          print(f"Detected language: {result['language'].title()}\n")

      ## display segments
      if options['verbose']:
        for segment in api_segments:
          print(f"[{format_timestamp(segment['start'])} --> {format_timestamp(segment['end'])}] {segment['text']}")
  else:
    # Open-Source
    result = whisper.transcribe(model, audio_path, **options)

  # fix results formatting
  for segment in result['segments']:
    segment['text'] = segment['text'].strip()

  result['text'] = '\n'.join(map(lambda segment: segment['text'], result['segments']))

  # set results for this audio file
  results[audio_path] = result

  def backtrace(trace: np.ndarray):


Using GPU
GPU 0: Tesla T4 (UUID: GPU-93def0c6-7c12-8dd4-5191-6ad45869201a)

Loading large-v2 model... /root/.cache/whisper/large-v2.pt


100%|█████████████████████████████████████| 2.87G/2.87G [00:35<00:00, 86.4MiB/s]


Model large-v2 is multilingual and has 1,541,384,960 parameters.

-- TRANSLATE TO ENGLISH --

Processing: /content/MarathiAudio.mp3

Detected language: Marathi

[00:00.000 --> 00:06.000]  Namaskar, I am Yash Vaidya. Welcome to Lok Satta's Podcast.
[00:06.000 --> 00:11.000]  Friends, many things that happen around us are precious to us.
[00:11.000 --> 00:14.000]  That's why Lok Satta has brought to you the Kutuhal Podcast.
[00:16.000 --> 00:21.000]  Today's podcast is about plastic in the body of marine life.
[00:21.000 --> 00:29.000]  In 1965, a plastic bag was found on a device used by fishermen in the Irish Sea.
[00:29.000 --> 00:37.000]  The world's largest marine organization has declared that this is the first case of plastic waste in the sea water.
[00:37.000 --> 00:45.000]  Since then, plastic waste has been going into the sea for decades.
[00:45.000 --> 00:50.000]  According to surveys carried out by various institutions on the international level,
[00:50.000 --> 00:56.000]  th

In [None]:
# set output folder
output_dir = "audio_transcription"

# set output formats: https://github.com/openai/whisper/blob/v20230308/whisper/utils.py#L188
output_formats = "txt"
output_formats = output_formats.split(',')

from typing import TextIO

class WriteText(WriteTXT):

  def write_result(self, result: dict, file: TextIO):
    print(result['text'], file=file, flush=True)

def write_result(result, output_format, output_file_name):
  output_format = output_format.strip()

  # start captions in non-zero timestamp (some media players does not detect the first caption)
  fix_vtt = output_format == 'vtt' and result['segments'] and result['segments'][0].get('start') == 0

  if fix_vtt:
    result['segments'][0]['start'] += 1/1000 # +1ms

  # write result in the desired format
  writer = WriteText(output_dir) if output_format == 'txt' else get_writer(output_format, output_dir)
  writer(result, output_file_name)

  if fix_vtt:
    result['segments'][0]['start'] = 0 # reset change

  output_file_path = os.path.join(output_dir, f"{output_file_name}.{output_format}")
  print(output_file_path)

# save results

print("Writing results...")

os.makedirs(output_dir, exist_ok=True)

for audio_path, result in results.items():
  print(end='\n')

  output_file_name = os.path.splitext(os.path.basename(audio_path))[0]

  for output_format in output_formats:
    write_result(result, output_format, output_file_name)

Writing results...

audio_transcription/MarathiAudio.txt
