# Whisper

In [None]:
import packages
from enum import Enum
from loguru import logger
from pprint import pprint

class Model(Enum):
	"""
	whisper.available_models()
 
	https://github.com/openai/whisper?tab=readme-ov-file#available-models-and-languages
 	"""
	TINY_EN = "tiny.en"
	TINY = "tiny"
	BASE_EN = "base.en"
	BASE = "base"
	SMALL_EN = "small.en"
	SMALL = "small"
	MEDIUM_EN = "medium.en"
	MEDIUM = "medium"
	LARGE_V1 = "large-v1"
	LARGE_V2 = "large-v2"
	LARGE_V3 = "large-v3"
	LARGE = "large"

class Device(Enum):
	CPU = "cpu"
	CUDA = "cuda"

class ComputeType(Enum):
	INT8 = "int8"
	INT8_FLOAT16 = "int8_float16"
	FLOAT16 = "float16"

# model = whisper.load_model(name=Model.TINY.value)

path_audio = f"{packages.ROOT_PATH}/data/assets/Car_AI_Assistant_Intro.mp3"

## [SYSTRAN/faster-whisper](https://github.com/SYSTRAN/faster-whisper)

In [None]:
from faster_whisper import WhisperModel

model = WhisperModel(
	# Model.TINY.value, device=Device.CUDA.value, compute_type=ComputeType.FLOAT16.value,
	# Model.TINY.value, device=Device.CUDA.value, compute_type=ComputeType.INT8_FLOAT16.value,
	Model.TINY.value, device=Device.CPU.value, compute_type=ComputeType.INT8.value,
)

segments, info = model.transcribe(
  path_audio, beam_size=5, 
	vad_filter=False, vad_parameters=dict(min_silence_duration_ms=500),
)

logger.info(f"Detected language: {info.language} ({info.language_probability:.3f})%")

result = ""
for seg in segments:
	result += seg.text

pprint(result)

### Test

In [2]:
import numpy as np
import subprocess
import os
from concurrent.futures import ThreadPoolExecutor
import math

def export_chunk(start_time, duration, input_file, output_path):
    command = [
        'ffmpeg',
        '-ss', str(start_time),
        '-i', input_file,
        '-t', str(duration),
        '-acodec', 'libmp3lame',
        '-ar', '44100',
        '-ab', '192k',
        '-y',
        output_path
    ]
    subprocess.run(command, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

def get_audio_duration(input_file):
    command = ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', input_file]
    result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    return float(result.stdout)

def split_audio(input_file, split_duration):
    # Get the total duration of the audio file
    total_duration = get_audio_duration(input_file)

    # Calculate the number of chunks
    num_chunks = math.ceil(total_duration / split_duration)

    # Generate start times for each chunk
    start_times = np.linspace(0, total_duration - split_duration, num_chunks)

    # Get the base name of the input file (without the extension)
    base_name = os.path.splitext(os.path.basename(input_file))[0]

    # Create output directory if it doesn't exist
    output_dir = f"{base_name}_chunks"
    os.makedirs(output_dir, exist_ok=True)

    # Use ThreadPoolExecutor for I/O bound operations
    with ThreadPoolExecutor() as executor:
        # Submit tasks to the executor
        futures = []
        for i, start_time in enumerate(start_times):
            output_path = os.path.join(output_dir, f"{base_name}_{i}.mp3")
            futures.append(executor.submit(export_chunk, start_time, split_duration, input_file, output_path))

        # Wait for all tasks to complete
        for future in futures:
            future.result()

# Usage
split_audio(path_audio, 2)  # Split the file into 2-second chunks

In [None]:
import os
import numpy as np
from pydub import AudioSegment
from faster_whisper import WhisperModel
from concurrent.futures import ProcessPoolExecutor, as_completed
import logging
import time

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def initialize_model():
    return WhisperModel("tiny", device="cpu", compute_type="int8")

def transcribe_chunk(args):
    chunk_path, chunk_index = args
    try:
        model = initialize_model()
        logger.info(f"Starting transcription of {chunk_path}")
        start_time = time.time()
        segments, info = model.transcribe(
            chunk_path, beam_size=5,
            vad_filter=False, vad_parameters=dict(min_silence_duration_ms=500),
        )
        duration = time.time() - start_time
        logger.info(f"Finished transcription of {chunk_path} in {duration:.2f} seconds")
        return chunk_index, "".join(segment.text for segment in segments), info.language, info.language_probability
    except Exception as e:
        logger.error(f"Error transcribing {chunk_path}: {str(e)}")
        return chunk_index, "", "", 0.0

def split_and_transcribe(input_file, chunk_duration=30):
    try:
        logger.info(f"Loading audio file: {input_file}")
        audio = AudioSegment.from_file(input_file)
        logger.info(f"Audio duration: {len(audio) / 1000:.2f} seconds")

        chunk_length_ms = chunk_duration * 1000
        chunks = [audio[i:i+chunk_length_ms] for i in range(0, len(audio), chunk_length_ms)]
        logger.info(f"Split audio into {len(chunks)} chunks")

        temp_dir = "temp_chunks"
        os.makedirs(temp_dir, exist_ok=True)

        chunk_paths = []
        for i, chunk in enumerate(chunks):
            chunk_path = os.path.join(temp_dir, f"chunk_{i}.wav")
            chunk.export(chunk_path, format="wav")
            chunk_paths.append((chunk_path, i))
            logger.info(f"Exported chunk {i+1}/{len(chunks)}")

        results = [None] * len(chunks)
        with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
            future_to_path = {executor.submit(transcribe_chunk, args): args for args in chunk_paths}
            for future in as_completed(future_to_path):
                path, index = future_to_path[future]
                try:
                    chunk_index, text, lang, prob = future.result()
                    results[chunk_index] = (text, lang, prob)
                    logger.info(f"Completed transcription of {path}")
                except Exception as e:
                    logger.error(f"Exception occurred for {path}: {str(e)}")

        full_transcript = "".join(result[0] for result in results if result[0])
        languages = [result[1] for result in results if result[1]]
        probabilities = [result[2] for result in results if result[2] > 0]
        
        detected_language = max(set(languages), key=languages.count) if languages else ""
        language_probability = sum(probabilities) / len(probabilities) if probabilities else 0.0

        for path, _ in chunk_paths:
            os.remove(path)
        os.rmdir(temp_dir)

        return full_transcript, detected_language, language_probability
    except Exception as e:
        logger.error(f"Error in split_and_transcribe: {str(e)}")
        return "", "", 0.0

def experiment_chunk_durations(input_file, durations):
    results = []
    for duration in durations:
        start_time = time.time()
        transcript, language, probability = split_and_transcribe(input_file, chunk_duration=duration)
        end_time = time.time()
        processing_time = end_time - start_time
        results.append((duration, processing_time, len(transcript)))
        print(f"Chunk duration: {duration}s, Processing time: {processing_time:.2f}s, Transcript length: {len(transcript)}")
    
    # Find the optimal duration
    optimal_duration = min(results, key=lambda x: x[1])
    print(f"\nOptimal chunk duration: {optimal_duration[0]}s with processing time: {optimal_duration[1]:.2f}s")

    return results

input_file = path_audio

# transcript, language, probability = split_and_transcribe(input_file, 30)
# logger.info(f"Detected language: {language} ({probability:.3f})")
# logger.info("Transcript:")
# logger.info(transcript)

durations_to_test = [5, 10, 15, 20, 30, 45, 60]  # Durations in seconds
experiment_results = experiment_chunk_durations(input_file, durations_to_test)

## [openai/whisper](https://github.com/openai/whisper)

In [None]:

# import whisper

# audio = whisper.load_audio(path_audio)
# audio = whisper.pad_or_trim(audio) # 30s

# mel = whisper.log_mel_spectrogram(audio).to(model.device)

# _, probs = model.detect_language(mel)
# logger.info(f"Detected language: {max(probs, key=probs.get)}")

# options = whisper.DecodingOptions()
# result = whisper.decode(model, mel, options)

# pprint(result.text)

---

# [KoljaB/RealtimeSTT](https://github.com/KoljaB/RealtimeSTT)

# [dscripka/openWakeWord](https://github.com/dscripka/openWakeWord)

In [None]:
import packages
from openwakeword import utils, model
# from openwakeword.model import Model

In [None]:
# One-time download of all pre-trained models (or only select models)
utils.download_models(target_directory=f"{packages.ROOT_PATH}/data/assets/repos/openWakeWord/models")

In [None]:
def get_audio_frame():
	pass

# Get audio data containing 16-bit 16khz PCM audio data from a file, microphone, network stream, etc.
# For the best efficiency and latency, audio frames should be multiples of 80 ms, with longer frames
# increasing overall efficiency at the cost of detection latency
frame = get_audio_frame()

model = model.Model(inference_framework="onnx")

pred = model.predict(frame)