Import basic libraries and initiate functions


In [1]:
from faster_whisper import WhisperModel
from typing import Union
import numpy as np
import threading
import keyboard
import pyaudio
import whisper
import psutil
import queue
import wave
import time
import sys

def get_models(models):
    print("Available models:")
    list = []
    for i, model in enumerate(models):
        print(f"{i}: {model}")
        list.append(model)
    return list
        
def run_model(type:str,models:str,workers:int, device:str):
    if type == "openai":
        try:
            model = whisper.load_model(models)
            return model
        except Exception as e:
            print(f"Error occured! {e}")
    elif type == "ctranslate":
        try:
            #model = WhisperModel(models,device=device,compute_type="int8_float16",num_workers=workers, download_root="../models/") # GPU ONLY
            model = WhisperModel(models,device=device,num_workers=workers, download_root="../models/") # CPU ONLY
            return model
        except Exception as e:
            print(f"Error occured! {e}")
            
def transcribe_fast(model,file: Union[str,np.ndarray],size:int,y:bool,print_out:bool):
    model = model
    
    # convert audio data buffer to a NumPy ndarray
    #audio_array = np.frombuffer(file, dtype=np.int16)
    
    # Return segments and info such as detected lang. and prob.
    segments, info = model.transcribe(audio=file,beam_size=size,word_timestamps=y)
    text = segments
    if print_out == True:
        print("\nDetected language '%s' with probability %f" % (info.language, info.language_probability), end="")

    # Print out segments
    if print_out == True:
        for segment in segments:
            print("\n[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text), end="")
            
    return segments
  
def transcribe_openai(model, file):
    # load audio and pad/trim it to fit 30 seconds
    audio = whisper.load_audio(file)
    audio = whisper.pad_or_trim(audio)

    # make log-Mel spectrogram and move to the same device as the model
    mel = whisper.log_mel_spectrogram(audio).to(model.device)

    # detect the spoken language
    _, probs = model.detect_language(mel)
    print(f"Detected language: {max(probs, key=probs.get)}")

    # decode the audio
    options = whisper.DecodingOptions()
    result = whisper.decode(model, mel, options)

    # print the recognized text
    print(result.text)
    model = whisper.load_model("base")
    result = model.transcribe("audio.mp3")
  
def translate_fast(model,file: Union[str,np.ndarray],size:int,y:bool,print_out:bool):
    #model = model
    # Return segments and info such as detected lang. and prob.
    segments, info = model.transcribe(audio=file,beam_size=size,word_timestamps=y,task="translate")
    list = []
    
    if print_out == True:
        print("\nDetected language '%s' with probability %f" % (info.language, info.language_probability))

    # Print out segments
    if print_out == True:
        for segment in segments:
            print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))
            list.append(segment.text)
            print(f"items in list: {segment.text}")
            
    return list

def translate_openai(model,file):
    model = model
    result = model.transcribe(file,task="translate")
    print(result["text"])
    
def record_audio(audio_obj,mic_id,audio_buffer):
    # Recording parameters
    chunk = 1024
    format = pyaudio.paInt16
    channels = 1
    rate = 44100
    rec_sec = 0.5
    
    stream = audio_obj.open(format=format, 
                        channels=channels, 
                        rate=rate, 
                        input=True,
                        input_device_index=mic_id,
                        frames_per_buffer=chunk)
    print("Recording!")
    frames = []
    WAVE_OUTPUT_FILENAME = "output.wav"
    wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')
    wf.setnchannels(channels)
    wf.setsampwidth(audio_obj.get_sample_size(format))
    wf.setframerate(rate)
    while 1:
        for i in range(0,int(rate / chunk * rec_sec)):
            data = stream.read(chunk)
            frames.append(data)
            audio_data = np.frombuffer(data, dtype=np.int16)
            audio_buffer.put(audio_data)
            wf.writeframes(b''.join(frames))
            
        if keyboard.is_pressed("q"):
            # stop recording
            wf.close()
            stream.stop_stream()
            stream.close()
            audio_obj.terminate()
            break
    
    print("Closing thread...")
    
    
    
    

  from .autonotebook import tqdm as notebook_tqdm


Check available audio sources


In [2]:
import pyaudio

# Get a list of available input devices
audio_obj= pyaudio.PyAudio()
info = audio_obj.get_host_api_info_by_index(0)
numdevices = info.get('deviceCount')
device_id = []
for i in range(0, numdevices):
    if (audio_obj.get_device_info_by_host_api_device_index(0, i).get('maxInputChannels')) > 0:
        print("Input Device id ", i, " - ", audio_obj.get_device_info_by_host_api_device_index(0, i).get('name'))
        device_id.append(i)

Input Device id  0  -  Microsoft Sound Mapper - Input
Input Device id  1  -  Chat Mic (3- TC-HELICON GoXLR)
Input Device id  2  -  Broadcast Stream Mix (3- TC-HEL
Input Device id  3  -  Sample (3- TC-HELICON GoXLR)


In [3]:
# Initialize audio buffer
audio_buffer = queue.Queue()

# Create audio recording thread
#t1 = threading.Thread(target=record_audio, args=(audio_obj,device_id[5],audio_buffer))
#t1.start()

#p1 = Process(target=record_audio,args=(audio_obj,device_id[5],queue))
#p1.start()
#record_audio(audio_obj,device_id[5])

Check for available models, then download the preferred.


In [4]:
models = whisper.available_models()
id = get_models(models)

Available models:
0: tiny.en
1: tiny
2: base.en
3: base
4: small.en
5: small
6: medium.en
7: medium
8: large-v1
9: large-v2
10: large


In [5]:
model = run_model("ctranslate",id[9],4,"cuda")
#model2 = whisper.load_model("tiny")

Run and time execution of transcription


In [10]:
file = "../Audio/jap.wav"
file = "../Audio/jap4.wav"
t_avg = []
list = []
n = 1

# Transcribe audio
print("Transcribing 1")
for i in range(n):
    t1 = time.time()
    #text = transcribe_fast(model,file,5,True,True)
    segments, info = model.transcribe(audio=file,beam_size=5,word_timestamps=True,task="transcribe")
    print("\nDetected language '%s' with probability %f" % (info.language, info.language_probability))
    t2 = time.time()

    # Print out segments
    for segment in segments:
        print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))
        list.append(segment.text)
        print(f"items in list: {list}")
    print("\n\nElapsed time:", t2-t1)
    t_avg.append(t2-t1)
print(f"Average time: {np.sum(t_avg)/n} s")
print(f"Total time: {np.sum(t_avg)} s")

"""
list2 = []
# Translate audio
print("Transcribing 2")
for i in range(n):
    t1 = time.time()
    #text = transcribe_fast(model,file,5,True,True)
    segments, info = model.transcribe(audio=file,beam_size=5,word_timestamps=True,task="translate")
    print("\nDetected language '%s' with probability %f" % (info.language, info.language_probability))

    # Print out segments
    for segment in segments:
        print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))
        list2.append(segment.text)
        print(f"items in list: {list2}")
    t2 = time.time()
    print("\n\nElapsed time:", t2-t1)
    t_avg.append(t2-t1)
print(f"Average time: {np.sum(t_avg)/n} s")
print(f"Total time: {np.sum(t_avg)} s")
"""


#while 1:
#    #audio_data = audio_buffer.get()
#    t1 = time.time()
#    text = transcribe_fast(model,file,5,True,True)
#    t2 = time.time()
#    print("\nElapsed time:", t2-t1, end="")
    
#    if keyboard.is_pressed("w"):
#            print("Closing thread...")
#            print("\nStopping...")
#            sys.exit()
#            break

Transcribing 1

Detected language 'ja' with probability 0.953613
[0.00s -> 0.82s] また明日!
items in list: ['また明日!']


Elapsed time: 0.41118407249450684
Average time: 0.41118407249450684 s
Total time: 0.41118407249450684 s


'\nlist2 = []\n# Translate audio\nprint("Transcribing 2")\nfor i in range(n):\n    t1 = time.time()\n    #text = transcribe_fast(model,file,5,True,True)\n    segments, info = model.transcribe(audio=file,beam_size=5,word_timestamps=True,task="translate")\n    print("\nDetected language \'%s\' with probability %f" % (info.language, info.language_probability))\n\n    # Print out segments\n    for segment in segments:\n        print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))\n        list2.append(segment.text)\n        print(f"items in list: {list2}")\n    t2 = time.time()\n    print("\n\nElapsed time:", t2-t1)\n    t_avg.append(t2-t1)\nprint(f"Average time: {np.sum(t_avg)/n} s")\nprint(f"Total time: {np.sum(t_avg)} s")\n'

In [7]:
for i in list:
    print(i)

また明日!


In [8]:
from translate import Translator
translator= Translator(to_lang="en",from_lang="ja")
for i in list:
    t1 = time.time()
    translation = translator.translate(i)
    print(translation)
    print("time",time.time()-t1)



ModuleNotFoundError: No module named 'translate'

In [None]:
from deep_translator import GoogleTranslator, MyMemoryTranslator, LibreTranslator,batch_detection
t1 = time.time()
trans = GoogleTranslator("ja","en").translate_batch(list)
print(trans)
print("time",time.time()-t1)

t1 = time.time()
trans = GoogleTranslator("ja","en").translate(str(list))
print(trans)
print("time",time.time()-t1)
print(trans)

In [None]:
for i in list:
    t1 = time.time()
    translated = MyMemoryTranslator(source='ja', target='en').translate_batch(str(i))
    print(translated)
    print("time",time.time()-t1)
    
    t1 = time.time()
    translated = MyMemoryTranslator(source='ja', target='en').translate(str(list))
    print(translated)
    print("time",time.time()-t1)

In [None]:
# Slow and bad accuracy
#translated = LibreTranslator(source='ja', target='en').translate_batch(list)
#print(translated)
#translated = LibreTranslator(source='ja', target='en').translate(str(list))
#print(translated)

Calculate memory usage


In [None]:
pid = psutil.Process()
memory_info = pid.memory_info()
print("Memory usage:", memory_info.rss / 1024 / 1024, "MB")