In [53]:
import torch
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline

device = "cuda:0" if torch.cuda.is_available() else "cpu"
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32

# model_id = "openai/whisper-tiny" # Much faster transcription.
model_id = "openai/whisper-large-v3-turbo" # This has greater quality if your machine can handle it.

model = AutoModelForSpeechSeq2Seq.from_pretrained(
    model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True
)
model.to(device)

processor = AutoProcessor.from_pretrained(model_id)

pipe = pipeline(
    "automatic-speech-recognition",
    model=model,
    tokenizer=processor.tokenizer,
    feature_extractor=processor.feature_extractor,
    torch_dtype=torch_dtype,
    device=device,
)

### Parameters

In [54]:
from IPython.display import Audio
import pyaudio
import webrtcvad
import numpy as np
import threading
import queue

# Parameters
FORMAT = pyaudio.paInt16  # Audio format (Integer 16 bit, for by webrtcvad)
RATE = 16000              # Sampling rate (16kHz, required by webrtcvad)
CHUNK = 480               # Chunks size to be processed, for 16kHz per second then a size of 480 would amount to 30ms

# Changable parameters
CHANNELS = 1              # Mono audio
VAD_AGGRESSIVENESS = 2    # Aggressiveness level (0, 1, 2, or 3) where 3 will only allow clear voice
SILENCE_TIMEOUT = 10      # Number of silent chunks before stopping a segment 

# According to studies, our goal is to cut when we sense 300ms of silence. SILENCE_TIMEOUT * (CHUNK / RATE) = 300ms
# You can play with these values to get the best parameters for your usecase.

### Test your mic's input

In [None]:
test = pyaudio.PyAudio()

stream = test.open(
    format=pyaudio.paFloat32,
    channels=CHANNELS,
    rate=RATE,
    input=True,
    frames_per_buffer=CHUNK
)

print("Recording...")

frames = []
for _ in range(0, RATE // CHUNK * 5):
    frames.append(np.frombuffer(stream.read(CHUNK), dtype=np.float32))
    
print("Recording finished.")

stream.stop_stream()
stream.close()
test.terminate()

audio_data = np.hstack(frames)

print("\n\nIf you hear yourself clearly then you can proceed.")
display(Audio(audio_data, rate=RATE))
print(pipe(audio_data)['text'])

## Live-Transcription

In [None]:
audio_queue = queue.Queue()

# Voice Activity Detection
vad = webrtcvad.Vad()
vad.set_mode(VAD_AGGRESSIVENESS)

def whisper_worker():
    """To seperate the process of transcription and audio gathering. (Non-blocking inference)"""
    
    while True:
        segment = audio_queue.get()
        
        if segment is None:
            break
        
        # if you want to hear each segment to ensure natural pause threshold
        # display(Audio(np.frombuffer(b''.join(segment), dtype=np.float32), rate=RATE))
        
        result = pipe(segment)['text']
        print(result, end="")


def process_voice_segment(segment):
    if len(segment) == 0:
        return
    
    audio_data = np.frombuffer(b''.join(segment), dtype=np.int16)

    # Convert int16 to float32 and normalize to [-1.0, 1.0]
    processed_segment = audio_data.astype(np.float32) / 32768.0
    
    audio_queue.put(processed_segment)


whisper_thread = threading.Thread(target=whisper_worker)
whisper_thread.start()

p = pyaudio.PyAudio()
stream = p.open(format=FORMAT,
                channels=CHANNELS,
                rate=RATE,
                input=True,
                frames_per_buffer=CHUNK)

voice_segment = []
silence_counter = 0

try:
    while True:
        data = stream.read(CHUNK)
        is_speech = vad.is_speech(data, RATE)

        # If voice is detected, add to the segment
        if is_speech:
            silence_counter = 0
            voice_segment.append(data)
        else:
            silence_counter += 1
            
            # Add pauses between sentences
            if len(voice_segment) > 0:
                voice_segment.append(data) 


        if silence_counter >= SILENCE_TIMEOUT:
            process_voice_segment(voice_segment)
            voice_segment = []  # Reset the segment
            silence_counter = 0

except KeyboardInterrupt:
    print("\nStream stopped.")

finally:
    # Clean up
    stream.stop_stream()
    stream.close()
    p.terminate()