### create venv

In [1]:
%uv venv

Note: you may need to restart the kernel to use updated packages.


Using CPython 3.13.1 interpreter at: [36mc:\Users\rrahajason\AppData\Local\Programs\Python\Python313\python.exe[39m
Creating virtual environment at: [36m.venv[39m
Activate with: [32m.venv\Scripts\activate[39m


In [1]:
%uv pip install numpy

Note: you may need to restart the kernel to use updated packages.


[2mUsing Python 3.13.1 environment at: c:\Users\rrahajason\AppData\Local\Programs\Python\Python313[0m
[2mAudited [1m1 package[0m [2min 13ms[0m[0m


In [29]:
import subprocess
import threading
import queue

def _start_ffmpeg_process(file_path):
    """Start FFmpeg process to extract audio."""
    return subprocess.Popen(
        [
            'ffmpeg',
            '-i', file_path,
            '-vn',  # No video
            '-acodec', 'pcm_s16le',  # PCM 16-bit little-endian
            '-ar', '16000',  # 16kHz sample rate
            '-ac', '1',  # Mono
            '-f', 'wav',  # WAV format
            'pipe:1'  # Output to stdout
        ],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        bufsize=10**8
    )

def _read_chunk_thread(stream, size, result_queue):
    """Read chunk in a separate thread."""
    try:
        data = stream.read(size)
        result_queue.put(('data', data))
    except Exception as e:
        result_queue.put(('error', e))

def _read_with_timeout(stream, size, timeout):
    """Read data with timeout, returns (success, data)."""
    q = queue.Queue()
    thread = threading.Thread(target=_read_chunk_thread, args=(stream, size, q))
    thread.daemon = True
    thread.start()
    thread.join(timeout=timeout)
    
    if thread.is_alive():
        return False, None  # Timeout
    
    try:
        result_type, result_data = q.get_nowait()
        if result_type == 'error':
            raise result_data
        return True, result_data
    except queue.Empty:
        return False, None

def _get_total_duration(file_path) -> float:
    """Get total duration of the MP4 file using ffprobe."""
    result = subprocess.run(
        [
            'ffprobe',
            '-v', 'error',
            '-show_entries', 'format=duration',
            '-of', 'default=noprint_wrappers=1:nokey=1',
            file_path
        ],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE
    )
    return float(result.stdout)

def _get_number_of_chunks(file_path, chunk_duration):
    """Calculate number of chunks based on total duration and chunk duration."""
    total_duration = _get_total_duration(file_path)
    num_chunks = int(total_duration // chunk_duration)
    if total_duration % chunk_duration > 0:
        num_chunks += 1
    return num_chunks

def _get_last_chunk_duration(total_duration, chunk_duration) -> int:
    """Get duration of the last chunk."""
    remainder = total_duration % chunk_duration
    return int(remainder) if remainder > 0 else int(chunk_duration)

def open_mp4_with_ffmpeg(file_path, chunk_duration=8, read_timeout=5):
    """
    Open an MP4 file and extract audio channel in memory as chunks.
    
    Args:
        file_path: Path to the MP4 file
        chunk_duration: Duration of each audio chunk in seconds (default: 8)
        read_timeout: Timeout in seconds for reading chunks (default: 5)
    
    Yields:
        bytes: Audio data chunks in WAV format
    """
    process = _start_ffmpeg_process(file_path)
    bytes_per_chunk = 16000 * 2 * chunk_duration
    total_duration = _get_total_duration(file_path)
    num_chunks = _get_number_of_chunks(file_path, chunk_duration)
    last_chunk_duration = _get_last_chunk_duration(total_duration, chunk_duration)

    # Skip WAV header (44 bytes)
    header = process.stdout.read(44)
    if len(header) < 44:
        process.terminate()
        return

    for i in range(num_chunks):
        # Adjust bytes for last chunk
        current_chunk_size = bytes_per_chunk
        if i == num_chunks - 1:
            current_chunk_size = 16000 * 2 * last_chunk_duration
        
        success, data = _read_with_timeout(process.stdout, current_chunk_size, read_timeout)
        
        if not success:
            # Timeout
            print(f"Read timeout after {read_timeout}s on chunk {i}")
            process.terminate()
            break
        
        if not data:
            # End of stream earlier than expected
            print(f"End of stream at chunk {i}")
            break
        
        yield data
    # force terminate ffmpeg process
    if i == num_chunks - 1:
        process.terminate()
    process.wait()

In [25]:
import numpy as np
from IPython.display import Audio, display

def play_audio_chunk(audio_bytes, sample_rate=16000):
    """
    Play binary audio data in Jupyter Notebook.
    
    Args:
        audio_bytes: Raw audio data in bytes (PCM format)
        sample_rate: Sample rate of the audio (default: 16000)
    """
    # Convert raw PCM bytes to numpy array
    audio_array = np.frombuffer(audio_bytes, dtype=np.int16)
    # Normalize to float32 in range [-1, 1]
    audio_normalized = audio_array.astype(np.float32) / 32768.0
    
    display(Audio(audio_normalized, rate=sample_rate, autoplay=True))

def play_audio_from_generator(audio_generator, sample_rate=16000):
    """
    Play audio chunks from a generator.
    
    Args:
        audio_generator: Generator yielding audio chunks
        sample_rate: Sample rate of the audio (default: 16000)
    """
    for chunk in audio_generator:
        play_audio_chunk(chunk, sample_rate)

In [4]:
save_path = 'cdg_end.wav'
with open(save_path, 'wb') as f:
    # Write WAV header
    f.write(b'RIFF')
    f.write((36 + 0).to_bytes(4, 'little'))  # Placeholder for file size
    f.write(b'WAVEfmt ')
    f.write((16).to_bytes(4, 'little'))  # Subchunk1Size
    f.write((1).to_bytes(2, 'little'))  # AudioFormat (PCM)
    f.write((1).to_bytes(2, 'little'))  # NumChannels
    f.write((16000).to_bytes(4, 'little'))  # SampleRate
    f.write((16000 * 2).to_bytes(4, 'little'))  # ByteRate
    f.write((2).to_bytes(2, 'little'))  # BlockAlign
    f.write((16).to_bytes(2, 'little'))  # BitsPerSample
    f.write(b'data')
    f.write((0).to_bytes(4, 'little'))  # Placeholder for data chunk size

    data_size = 0
    for i, chunk in enumerate(open_mp4_with_ffmpeg('cdg.mp4', chunk_duration=11)): 
        # skip first chunk for demo purposes
        if i < 3:
            continue
        # if i > 0: 
        #     break
        f.write(chunk)
        data_size += len(chunk)

    # Update file size and data chunk size in header
    f.seek(4)
    f.write((36 + data_size).to_bytes(4, 'little'))
    f.seek(40)
    f.write((data_size).to_bytes(4, 'little'))

In [14]:
%uv pip install --native-tls huggingface_hub

Note: you may need to restart the kernel to use updated packages.


[2mUsing Python 3.13.1 environment at: c:\Users\rrahajason\AppData\Local\Programs\Python\Python313[0m
[2mAudited [1m1 package[0m [2min 17ms[0m[0m


In [5]:
# Download model quantized with Q5_0 method
from huggingface_hub import hf_hub_download; hf_hub_download(repo_id='bofenghuang/whisper-large-v3-french', filename='ggml-model-q5_0.bin', local_dir='./models/whisper-large-v3-french')


  from .autonotebook import tqdm as notebook_tqdm


'models\\whisper-large-v3-french\\ggml-model-q5_0.bin'

In [15]:
%uv pip install requests dotenv

Note: you may need to restart the kernel to use updated packages.


[2mUsing Python 3.13.1 environment at: c:\Users\rrahajason\AppData\Local\Programs\Python\Python313[0m
[2mAudited [1m2 packages[0m [2min 13ms[0m[0m


In [4]:
from dotenv import load_dotenv
import os

load_dotenv()

WHISPER_API_URL = os.getenv('WHISPER_API_URL', 'http://localhost:8080')

In [6]:
def transcribe_audio_via_api(audio_file_path, api_url=WHISPER_API_URL):
    """
    Transcribe audio file using Whisper API.
    
    Args:
        audio_file_path: Path to the audio file
        api_url: URL of the Whisper API endpoint
    """
    import requests

    with open(audio_file_path, 'rb') as f:
        files = {'file': f}
        response = requests.post(f"{api_url}/inference", files=files)
    
    if response.status_code == 200:
        return response.json()
    else:
        raise requests.HTTPError(f"API request failed with status code {response.status_code}: {response.text}")

### Note
Make sure the whisper server is running. Here is the command line:

```bash
wsl
cd /mnt/c/sources/POC/whisper.cpp
./build/bin/whisper-server -m ../transcript/models/whisper-large-v3-french/ggml-model-q5_0.bin -l fr --port 8080 --host 0.0.0.0
```

In [7]:
result = transcribe_audio_via_api('cdg_end.wav')
print(result)

{'text': 'De la vraie France, de la France Ã©ternelle.\n'}


In [21]:
def transcribe_audio_bytes_via_api(audio_bytes, api_url=WHISPER_API_URL, sample_rate=16000, timeout=60):
    """
    Transcribe audio data in bytes using Whisper API.
    
    Args:
        audio_bytes: Raw PCM audio data in bytes
        api_url: URL of the Whisper API endpoint
        sample_rate: Sample rate of the audio (default: 16000)
        timeout: Request timeout in seconds (default: 300 for long processing)
    """
    import requests
    import io
    
    # Create WAV file with header
    wav_buffer = io.BytesIO()
    
    # Write WAV header
    data_size = len(audio_bytes)
    duration = data_size / (sample_rate * 2)
    # print(f"Processing chunk: {data_size} bytes ({duration:.2f} seconds)")
    
    wav_buffer.write(b'RIFF')
    wav_buffer.write((36 + data_size).to_bytes(4, 'little'))  # File size
    wav_buffer.write(b'WAVEfmt ')
    wav_buffer.write((16).to_bytes(4, 'little'))  # Subchunk1Size
    wav_buffer.write((1).to_bytes(2, 'little'))  # AudioFormat (PCM)
    wav_buffer.write((1).to_bytes(2, 'little'))  # NumChannels (mono)
    wav_buffer.write((sample_rate).to_bytes(4, 'little'))  # SampleRate
    wav_buffer.write((sample_rate * 2).to_bytes(4, 'little'))  # ByteRate
    wav_buffer.write((2).to_bytes(2, 'little'))  # BlockAlign
    wav_buffer.write((16).to_bytes(2, 'little'))  # BitsPerSample
    wav_buffer.write(b'data')
    wav_buffer.write((data_size).to_bytes(4, 'little'))  # Data chunk size
    
    # Write audio data
    wav_buffer.write(audio_bytes)
    
    # Send to API
    wav_buffer.seek(0)
    files = {'file': ('audio.wav', wav_buffer.read(), 'audio/wav')}
    
    try:
        response = requests.post(f"{api_url}/inference", files=files, timeout=timeout)
        wav_buffer.close()
        if response.status_code == 200:
            return response.json()
        else:
            raise requests.HTTPError(f"API request failed with status code {response.status_code}: {response.text}")
    except requests.Timeout:
        print(f"Request timed out after {timeout} seconds")
        return {'text': '', 'error': f'Request timed out after {timeout} seconds'}
    except Exception as e:
        print(f"Request failed: {e}")
        return {'text': '', 'error': str(e)}

In [28]:
for i, chunk in enumerate(open_mp4_with_ffmpeg('cdg.mp4', chunk_duration=11)):  
    # if i < 3: 
    #     continue
    play_audio_chunk(chunk)

In [30]:
import gc

for i, chunk in enumerate(open_mp4_with_ffmpeg('cdg.mp4', chunk_duration=11)):  
    # if i < 1:  
    #     continue
    
    print()
    line = transcribe_audio_bytes_via_api(chunk)
    print(line)

gc.collect()


{'text': 'Paris, Paris outragÃ©, Paris brisÃ©, Paris martyrisÃ©, mais Paris...\n'}

{'text': "LibÃ©rÃ©, libÃ©rÃ© par lui-mÃªme, libÃ©rÃ© par son peuple avec le concours des armÃ©es de la France Ã  l'hiver.\n"}

{'text': "Avec l'appui et le concours de la France tout entiÃ¨re, c'est-Ã -dire de la France qui se tente, c'est-Ã -dire de la seule France.\n"}

{'text': 'De la vraie France, de la France Ã©ternelle.\n'}


0