Testing live

In [1]:
import os
import openvino
import whisper
import numpy as np
import speech_recognition as sr

from cmd_helper import optimum_cli
from datetime import datetime, timedelta
from openvino.runtime import Core
from pathlib import Path
from transformers import WhisperProcessor, WhisperForConditionalGeneration
from queue import Queue
from time import sleep

# Initialize OpenVINO Runtime
core = Core()

local_dir = "whisper-small-openvino-download"

# Path to OpenVINO IR model files
model_xml = "whisper-small-openvino-download/whisper_small/whisper_small_encoder.xml"  # Update with the path to your XML file
model_bin = model_xml.replace(".xml", ".bin")  # Associated .bin file

# Load and compile the model
model = core.read_model(model=model_xml, weights=model_bin)
compiled_model = core.compile_model(model=model, device_name="CPU")

# Get input and output info
input_layer = compiled_model.input(0)
output_layer = compiled_model.output(0)

print(f"Input layer: {input_layer}")
print(f"Output layer: {output_layer}")

Input layer: <ConstOutput: names[input_features] shape[1,80,3000] type: f16>
Output layer: <ConstOutput: names[last_hidden_state] shape[1,1500,768] type: f16>


In [2]:
import numpy as np
import torch

def preprocess_audio(audio_np, n_mels=80, frame_length=3000):
    unneeded = audio_np.shape[0] % 80
    if unneeded != 0:
        audio_np = audio_np[:-unneeded]

    # Convert the input to a PyTorch tensor
    tensor = torch.tensor(audio_np, dtype=torch.float32)
    
    # Add a batch dimension
    tensor = tensor.unsqueeze(0)

    # Calculate the number of frames
    n_frames = tensor.shape[-1] // n_mels
    tensor = tensor[:, :n_mels * n_frames].reshape(1, n_mels, n_frames)

    # Adjust to match the required frame length (3000)
    if tensor.shape[-1] < frame_length:
        # Pad with zeros if shorter than required
        padding = torch.zeros((1, n_mels, frame_length - tensor.shape[-1]), dtype=torch.float32)
        tensor = torch.cat((tensor, padding), dim=-1)
    elif tensor.shape[-1] > frame_length:
        # Truncate if longer than required
        tensor = tensor[:, :, :frame_length]

    # Convert back to a NumPy array with contiguous memory layout
    tensor_np = tensor.numpy()
    return np.ascontiguousarray(tensor_np)


In [5]:
from transformers import WhisperTokenizer, AutoTokenizer
from openvino_tokenizers import convert_tokenizer

token_xml = "whisper-small-openvino-download/ov-tokenizer/openvino_detokenizer.xml"  # Update with the path to your XML file
token_bin = token_xml.replace(".xml", ".bin")  # Associated .bin file

# Load and compile the model
tokenizer = core.read_model(model=token_xml, weights=token_bin)
compiled_tokenizer = core.compile_model(model=tokenizer, device_name="CPU")

def load_local_tokenizer(xml_path, bin_path):
    """
    Load the tokenizer from local OpenVINO tokenizer model files.

    Args:
        xml_path (str): Path to the .xml file of the tokenizer.
        bin_path (str): Path to the .bin file of the tokenizer.

    Returns:
        Tokenizer: A tokenizer object compatible with OpenVINO.
    """
    tokenizer = Tokenizer(xml_path, bin_path)
    return tokenizer

#tokenizer = load_local_tokenizer("whisper-small-openvino/ov-tokenizer/openvino_detokenizer.xml", "whisper-small-openvino/ov-tokenizer/openvino_detokenizer.bin")

def decode_whisper_output(output_tensor):
    """
    Decodes the output of the Whisper model into human-readable text.
    
    Args:
        output_tensor (numpy.ndarray): The raw output from the Whisper model.
        
    Returns:
        str: The decoded transcription.
    """
    # Convert the tensor to a list of token IDs
    token_ids = output_tensor.squeeze().tolist()
    transcription = tokenizer.decode(token_ids, skip_special_tokens=True)
    return transcription

RuntimeError: Exception from src/inference/src/cpp/core.cpp:95:
Exception from src/frontends/ir/src/ir_deserializer.cpp:938:
Cannot create StringTensorUnpack layer StringTensorUnpack_83 id:3 from unsupported opset: extension



In [None]:
# Define arguments as variables for notebook use
non_english = False    # Use non-English model if True
energy_threshold = 1000  # Energy level for mic detection
record_timeout = 2.0     # Real-time recording in seconds
phrase_timeout = 3.0     # Pause length between phrases for new line

# Initialize variables
phrase_time = None
data_queue = Queue()
recorder = sr.Recognizer()
recorder.energy_threshold = energy_threshold
recorder.dynamic_energy_threshold = False

# Set up microphone source
source = sr.Microphone(sample_rate=16000)

"""
# Load OpenVINO pipeline
ov_pipeline = openvino_genai.WhisperPipeline("whisper-small-openvino", device='CPU')
"""
# Initialize transcription list
transcription = ['']

# Adjust microphone for ambient noise
with source:
    recorder.adjust_for_ambient_noise(source)

def record_callback(_, audio: sr.AudioData) -> None:
    """Threaded callback function to handle audio data."""
    data = audio.get_raw_data()
    data_queue.put(data)

# Start background recording
recorder.listen_in_background(source, record_callback, phrase_time_limit=record_timeout)
print("Model loaded and microphone initialized.\n")

try:
    while True:
        now = datetime.utcnow()
        if not data_queue.empty():
            phrase_complete = False
            if phrase_time and now - phrase_time > timedelta(seconds=phrase_timeout):
                phrase_complete = True
            phrase_time = now

            audio_data = b''.join(data_queue.queue)
            data_queue.queue.clear()


            audio_np = np.frombuffer(audio_data, dtype=np.int16).astype(np.float32) / 32768.0
            print(f"Received {len(audio_np)} samples for transcription")

            # Preprocess audio
            audio_np = preprocess_audio(audio_np)

            # Run inference
            result = compiled_model([audio_np])[output_layer]

            # Decode and print transcription - by detokenizer
            transcription = decode_whisper_output(result)
            print(transcription)


            text = result['text'].strip()

            if phrase_complete:
                transcription.append(text)
            else:
                transcription[-1] = text

            os.system('cls' if os.name == 'nt' else 'clear')
            for line in transcription:
                print(line)
            print('', end='', flush=True)
        else:
            sleep(0.25)
except KeyboardInterrupt:
    print("\nTranscription stopped by user.")
    print("\nFinal Transcription:")
    for line in transcription:
        print(line)

Model loaded and microphone initialized.

Received 37888 samples for transcription


NameError: name 'decode_whisper_output' is not defined

Fuzzy matching to find closest match phrase in the current verse

In [28]:
import os
import numpy as np
import speech_recognition as sr
import whisper
import torch
from datetime import datetime, timedelta
from queue import Queue
from time import sleep
from difflib import SequenceMatcher

# Known lyrics for "Twinkle, Twinkle, Little Star"
lyrics = {
    "Verse 1": [
        "Twinkle, twinkle, little star",
        "How I wonder what you are",
        "Up above the world so high",
        "Like a diamond in the sky",
    ]
}

# Fuzzy matching function
def find_closest_match(transcription, lyrics):
    best_match = ""
    highest_similarity = 0
    for line in lyrics:
        similarity = SequenceMatcher(None, transcription, line).ratio()
        if similarity > highest_similarity:
            highest_similarity = similarity
            best_match = line
    return best_match, highest_similarity

# Initialize variables for speech recognition and Whisper
energy_threshold = 1000  # Energy level for mic detection
record_timeout = 2.0  # Real-time recording in seconds
phrase_timeout = 3.0  # Pause length between phrases for new line
phrase_time = None
data_queue = Queue()
recorder = sr.Recognizer()
recorder.energy_threshold = energy_threshold
recorder.dynamic_energy_threshold = False

# Set up microphone source
source = sr.Microphone(sample_rate=16000)

# Initialize transcription list
transcription = ['']

# Adjust microphone for ambient noise
with source:
    recorder.adjust_for_ambient_noise(source)

# Define a callback for audio data processing
def record_callback(_, audio: sr.AudioData) -> None:
    """Threaded callback function to handle audio data."""
    data = audio.get_raw_data()
    data_queue.put(data)

# Start background recording
recorder.listen_in_background(source, record_callback, phrase_time_limit=record_timeout)
print("Model loaded and microphone initialized.\n")

try:
    current_verse = "Verse 1"  # Start with the first verse
    while True:
        now = datetime.utcnow()
        if not data_queue.empty():
            phrase_complete = False
            if phrase_time and now - phrase_time > timedelta(seconds=phrase_timeout):
                phrase_complete = True
            phrase_time = now

            # Combine audio data from queue
            audio_data = b''.join(data_queue.queue)
            data_queue.queue.clear()

            # Convert audio data to the format Whisper expects
            audio_np = np.frombuffer(audio_data, dtype=np.int16).astype(np.float32)

            # convert to shape [1, 80, 3000]
            audio_np = preprocess_audio(audio_np)

            # Perform transcription using Whisper
            result = audio_model.transcribe(audio_np, fp16=torch.cuda.is_available())
            recognized_text = result['text'].strip()

            # Match the transcription to the current verse's lyrics
            match, similarity = find_closest_match(recognized_text, lyrics[current_verse])

            #if phrase_complete:
             #   transcription.append(match if similarity > 0.7 else recognized_text)
            #else:
             #   transcription[-1] = match if similarity > 0.7 else recognized_text

            # Print the transcription and matched lyrics
            os.system('cls' if os.name == 'nt' else 'clear')
            #print("Transcription (matched to lyrics):\n")
            #for line in transcription:
             #   print(line)
            print(f"\nRecognized: {recognized_text}")
            print(f"Best Match: {match} (Similarity: {similarity:.2f})")
        else:
            sleep(0.25)
except KeyboardInterrupt:
    print("\nTranscription stopped by user.")
    print("\nFinal Transcription:")
    for line in transcription:
        print(line)

  checkpoint = torch.load(fp, map_location=device)


TypeError: expected np.ndarray (got numpy.ndarray)