In [21]:
import requests
from dotenv import load_dotenv
import os
import sounddevice as sd
import scipy.io.wavfile as wav
import numpy as np
from flask import Flask, request, jsonify
import time
import tqdm
import io
from pydub import AudioSegment
import uuid

In [2]:
# Load environment variables from .env file
load_dotenv()

# Fetch the pyannote_api_key
pyannote_api_key = os.getenv('PYANNOTE_API_KEY')

In [7]:
url = "https://api.pyannote.ai/v1/test"

headers={
    "Authorization": f"Bearer {pyannote_api_key}",
    "Content-Type": "application/json"
}

response = requests.get(url, headers=headers)
print(response.json())

{'status': 'OK', 'message': 'Test connection successful'}


# Mic input

In [13]:
def record_audio(duration=5, fs=16000):
    print(f"üé§ Enregistrement pour {duration} secondes...")
    
    # sd.rec capture l'audio dans un tableau NumPy (non-bloquant)
    # fs=16000 est la fr√©quence standard pour les mod√®les d'IA (Pyannote)
    # channels=1 pour du Mono (suffisant pour la voix)
    recording = sd.rec(int(duration * fs), samplerate=fs, channels=1)
    
    # Important : On attend que l'enregistrement soit fini
    sd.wait()
    
    print("‚úÖ Enregistrement termin√© !")
    return recording

# --- Param√®tres ---
FS = 16000     # Fr√©quence d'√©chantillonnage (16kHz)
SECONDS = 5    # Dur√©e du test

# 1. Capturer
audio_data = record_audio(duration=SECONDS, fs=FS)


üé§ Enregistrement pour 5 secondes...
‚úÖ Enregistrement termin√© !


In [15]:
output_filename = "data/test_micro.wav"
wav.write(output_filename, FS, audio_data)

# MP4 to MP3

In [11]:
from moviepy import VideoFileClip

video = VideoFileClip("data/Avengers.mp4")
video.audio.subclipped(0,30).write_audiofile("data/Avengers_30s.mp3")
# video.audio.write_audiofile("data/Avengers.mp3")


MoviePy - Writing audio in data/Avengers_30s.mp3


                                                                    

MoviePy - Done.


# Audio upload on Pyannote server

In [32]:
# Define your media object key
object_key = "avengers"  # Replace with your desired object-key

# Create the pre-signed PUT URL.
response = requests.post(
    "https://api.pyannote.ai/v1/media/input",
    json={"url": f"media://{object_key}"},
    headers={
        "Authorization": f"Bearer {pyannote_api_key}",
        "Content-Type": "application/json"
    }
)
response.raise_for_status()
data = response.json()
presigned_url = data["url"]

# Upload local file to the pre-signed URL.
print("Uploading {0} to {1}".format("data/Avengers_30s.mp3", presigned_url))
with open("data/Avengers_30s.mp3", "rb") as input_file:
    # Upload your local audio file.
    requests.put(presigned_url, data=input_file)

Uploading data/Avengers_30s.mp3 to https://pyannoteai-temp-files.s3.eu-west-3.amazonaws.com/media/2753d10a-21c5-4425-ab27-03b822077fa6/36bda3e7-5fdf-4a0d-9ae3-61cfb867f915?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Content-Sha256=UNSIGNED-PAYLOAD&X-Amz-Credential=AKIA2UC3A43QVYKSXRWS%2F20251129%2Feu-west-3%2Fs3%2Faws4_request&X-Amz-Date=20251129T124119Z&X-Amz-Expires=3600&X-Amz-Signature=0764a7999c55b2ff773c8c53d3915a91fc70a4e808cf1622c2f8e5165e32c5a4&X-Amz-SignedHeaders=host&x-amz-checksum-crc32=AAAAAA%3D%3D&x-amz-sdk-checksum-algorithm=CRC32&x-id=PutObject


# Diarization

In [17]:
# Replace the input value with your temporary storage location.
body = {
  "url" : f"media://{object_key}",
}

url = "https://api.pyannote.ai/v1/diarize"
headers = {
   "Authorization": f"Bearer {pyannote_api_key}",
   "Content-Type": "application/json"
}

response = requests.post(url, json=body, headers=headers)
response.raise_for_status()
print(response.json())
job_id = response.json()["jobId"]

while True:
    response = requests.get(
        f"https://api.pyannote.ai/v1/jobs/{job_id}", headers=headers
    )

    if response.status_code != 200:
        print(f"Error: {response.status_code} - {response.text}")
        break

    data = response.json()
    status = data["status"]

    if status in ["succeeded", "failed", "canceled"]:
        if status == "succeeded":
            print("Job completed successfully!")
            print(data["output"])
        else:
            print(f"Job {status}")
        break

    print(f"Job status: {status}, waiting...")
    time.sleep(1)  # Wait 1 second before polling again

{'jobId': '43f37031-2941-4c45-8693-fba3965142f6', 'status': 'created'}
Job status: running, waiting...
Job completed successfully!
{'diarization': [{'speaker': 'SPEAKER_01', 'start': 5.925, 'end': 7.305}, {'speaker': 'SPEAKER_01', 'start': 7.545, 'end': 11.965}, {'speaker': 'SPEAKER_00', 'start': 12.025, 'end': 15.925}, {'speaker': 'SPEAKER_00', 'start': 16.245, 'end': 18.005}, {'speaker': 'SPEAKER_01', 'start': 18.565, 'end': 20.345}, {'speaker': 'SPEAKER_01', 'start': 20.925, 'end': 21.545}, {'speaker': 'SPEAKER_01', 'start': 22.865, 'end': 23.805}, {'speaker': 'SPEAKER_01', 'start': 26.245, 'end': 28.745}, {'speaker': 'SPEAKER_01', 'start': 29.165, 'end': 29.785}, {'speaker': 'SPEAKER_01', 'start': 29.825, 'end': 29.845}, {'speaker': 'SPEAKER_01', 'start': 29.905, 'end': 29.925}]}


# Audio input periodic diarization

In [None]:
duration = 15
period = 5
job_ids = []

for start_time in tqdm.tqdm(range(0, duration, period)):
    end_time = min(start_time + period, duration)

    segment = sd.rec(int(period * 16000), samplerate=16000, channels=1)
    sd.wait()

    buffer = io.BytesIO()
    wav.write(buffer, FS, segment)
    buffer.seek(0)

    object_key = f"segment{int(start_time)}-{int(end_time)}"

    # Create the pre-signed PUT URL.
    response = requests.post(
        "https://api.pyannote.ai/v1/media/input",
        json={"url": f"media://{object_key}"},
        headers=headers
    )
    response.raise_for_status()
    data = response.json()
    presigned_url = data["url"]

    requests.put(presigned_url, data=buffer.getvalue())

    response = requests.post(
        "https://api.pyannote.ai/v1/diarize",
        json={"url": f"media://{object_key}"},
        headers=headers
    )
    response.raise_for_status()
    job_id = response.json()["jobId"]
    print(f"Started job {job_id} for segment {start_time}-{end_time}s")
        


 33%|‚ñà‚ñà‚ñà‚ñé      | 1/3 [00:07<00:14,  7.23s/it]

Started job dc25072a-bcda-4aa9-8310-e1a576115752 for segment 0-5s


 67%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñã   | 2/3 [00:13<00:06,  6.95s/it]

Started job bc8573eb-a26c-4be5-ac62-a7ead4d8d837 for segment 5-10s


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 3/3 [00:22<00:00,  7.36s/it]

Started job c3b1e0f4-ef39-4991-9c47-6c9da6436a2d for segment 10-15s





# Testing with audio file instead of live

In [62]:
duration = 30  # Duration in seconds
period = 5     # Period in seconds
audio_chunks = []
voiceprints = {}
audio_voiceprints = {}
speaking_times = {}
TARGET_FS = 16000  # Target sample rate for pyannote

# Load MP3 and get its properties
audio = AudioSegment.from_mp3("data/Avengers.mp3")
original_fs = audio.frame_rate

# Resample to 16kHz mono if needed
if original_fs != TARGET_FS or audio.channels != 1:
    audio = audio.set_frame_rate(TARGET_FS).set_channels(1)

# Convert to numpy array
audio_data = np.array(audio.get_array_of_samples(), dtype=np.int16)

def upload_array(segment: np.ndarray):
    object_key = str(uuid.uuid4().hex)

    buffer = io.BytesIO()
    wav.write(buffer, TARGET_FS, segment)
    buffer.seek(0)
    
    response = requests.post(
        "https://api.pyannote.ai/v1/media/input",
        json={"url": f"media://{object_key}"},
        headers=headers
    )

    response.raise_for_status()
    data = response.json()
    presigned_url = data["url"]
    requests.put(presigned_url, data=buffer.getvalue())

    return object_key

def wait_for_job(job_id: str):
    while True:
        response = requests.get(
            f"https://api.pyannote.ai/v1/jobs/{job_id}", headers=headers
        )

        if response.status_code != 200:
            print(f"Error: {response.status_code} - {response.text}")
            break

        data = response.json()
        status = data["status"]

        if status in ["succeeded", "failed", "canceled"]:
            return data

        print(f"Job {job_id} status: {status}, waiting...")
        time.sleep(1)

for start_sec in tqdm.tqdm(range(0, duration, period)):
    end_sec = min(start_sec + period, duration)
    
    # Calculate array indices based on sample rate
    start_idx = int(start_sec * TARGET_FS)
    end_idx = int(end_sec * TARGET_FS)
    
    # Extract segment
    segment = audio_data[start_idx:end_idx]
    audio_chunks.append(segment)

    object_key = upload_array(segment)

    # Start diarization job
    if not voiceprints:
        response = requests.post(
            "https://api.pyannote.ai/v1/diarize",
            json={"url": f"media://{object_key}"},
            headers=headers
        )
    else:
        response = requests.post(
            "https://api.pyannote.ai/v1/identify",
            json={
                "url": f"media://{object_key}",
                "voiceprints": [{"label": lbl, "voiceprint": vp} for lbl, vp in voiceprints.items()]
            },
            headers=headers
        )

    response.raise_for_status()
    job_id = response.json()["jobId"]

    # Poll for job completion
    data = wait_for_job(job_id)

    # Process diarization results
    res = data["output"].get("identification", data["output"].get("diarization"))
    updated_speakers = set()
    for segment in res:
        speaker,start,end = segment['speaker'], segment['start'], segment['end']

        if speaker not in audio_voiceprints:
            audio_voiceprints[f"identified_speaker_{len(audio_voiceprints)+1}"] = np.array([])
            speaker_identified = f"identified_speaker_{len(audio_voiceprints)}"
        else:
            speaker_identified = speaker

        if len(audio_voiceprints[speaker_identified]) < int(29*TARGET_FS):
            new_segment = audio_chunks[-1][int(start*TARGET_FS):int(end*TARGET_FS)]
            audio_voiceprints[speaker_identified] = np.concatenate((audio_voiceprints[speaker_identified], new_segment))[0:int(29*TARGET_FS)]
            speaking_times[speaker] = speaking_times.get(speaker, 0) + (end - start)
            updated_speakers.add(speaker_identified)
            
    for speaker in updated_speakers:
        segment = audio_voiceprints[speaker]
        object_key = upload_array(segment)

        response = requests.post(
            "https://api.pyannote.ai/v1/voiceprint",
            headers=headers,
            json={"url": f"media://{object_key}"}
        )

        response.raise_for_status()
        job_id = response.json()["jobId"]

        data = wait_for_job(job_id)
        voiceprints[speaker] = data["output"]["voiceprint"]

  0%|          | 0/6 [00:00<?, ?it/s]

Job b1faab92-3a6b-4b03-94d1-66c9cc1b2330 status: created, waiting...
Job b1faab92-3a6b-4b03-94d1-66c9cc1b2330 status: running, waiting...


 17%|‚ñà‚ñã        | 1/6 [00:06<00:33,  6.67s/it]


KeyboardInterrupt: 

In [35]:
from IPython.display import Audio

# Play audio segments for each identified speaker
for speaker, audio_data in audio_voiceprints.items():
    print(f"Playing audio for {speaker}")
    display(Audio(audio_data, rate=TARGET_FS))

Playing audio for SPEAKER_00_identified


# Local voiceprint matcher

In [75]:
import librosa
import numpy as np
from scipy.spatial.distance import cosine

def audio_to_voiceprint(path, sr=16000, n_mfcc=20):
    # Load audio
    audio, _ = librosa.load(path, sr=sr, mono=True)
    
    print(f"Audio loaded: {audio.shape[0]} samples at {sr} Hz")
    # Extract MFCC
    mfcc = librosa.feature.mfcc(
        y=audio, sr=sr, n_mfcc=n_mfcc, n_fft=512, hop_length=160
    )
    
    # Mean across time frames ‚Üí stable vector
    voiceprint = np.mean(mfcc, axis=1)
    
    # Normalize
    voiceprint = voiceprint / np.linalg.norm(voiceprint)
    
    return voiceprint


In [82]:
import librosa
import numpy as np
from scipy.spatial.distance import cosine

def audio_to_voiceprint(path, sr=16000, n_mfcc=20):
    y, _ = librosa.load(path, sr=sr, mono=True)

    # remove silences (very important)
    y = librosa.effects.trim(y, top_db=30)[0]

    # MFCC
    mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=n_mfcc)

    # Delta and delta-delta
    mfcc_delta = librosa.feature.delta(mfcc)
    mfcc_delta2 = librosa.feature.delta(mfcc, order=2)

    # Stack: shape = (60, time)
    feats = np.vstack([mfcc, mfcc_delta, mfcc_delta2])

    # Cepstral Mean and Variance Normalization (CMVN)
    feats = (feats - np.mean(feats, axis=1, keepdims=True)) / \
            (np.std(feats, axis=1, keepdims=True) + 1e-8)

    # Now average (or better: take median)
    voiceprint = np.median(feats, axis=1)

    # L2-normalize
    return voiceprint / np.linalg.norm(voiceprint)


In [88]:
vp1 = audio_to_voiceprint("data/Avengers.mp3")
vp2 = audio_to_voiceprint("data/Avengers_30s.mp3")

similarity = 1 - cosine(vp1, vp2)
print("Similarity:", similarity)


Similarity: 0.70274204


In [81]:
similarity = 1 - cosine(vp1, vp1)
print("Similarity:", similarity)

Similarity: 1.0


# Solution with no voiceprint

In [None]:
duration = 30  # Duration in seconds
period = 5     # Period in seconds
audio_chunks = []
voiceprints = {}
audio_voiceprints = {}
speaking_times = {}
TARGET_FS = 16000  # Target sample rate for pyannote

# Load MP3 and get its properties
audio = AudioSegment.from_mp3("data/Avengers.mp3")
original_fs = audio.frame_rate

# Resample to 16kHz mono if needed
if original_fs != TARGET_FS or audio.channels != 1:
    audio = audio.set_frame_rate(TARGET_FS).set_channels(1)

# Convert to numpy array
audio_data = np.array(audio.get_array_of_samples(), dtype=np.int16)

def upload_array(segment: np.ndarray):
    object_key = str(uuid.uuid4().hex)

    buffer = io.BytesIO()
    wav.write(buffer, TARGET_FS, segment)
    buffer.seek(0)
    
    response = requests.post(
        "https://api.pyannote.ai/v1/media/input",
        json={"url": f"media://{object_key}"},
        headers=headers
    )

    response.raise_for_status()
    data = response.json()
    presigned_url = data["url"]
    requests.put(presigned_url, data=buffer.getvalue())

    return object_key

def wait_for_job(job_id: str):
    while True:
        response = requests.get(
            f"https://api.pyannote.ai/v1/jobs/{job_id}", headers=headers
        )

        if response.status_code != 200:
            print(f"Error: {response.status_code} - {response.text}")
            break

        data = response.json()
        status = data["status"]

        if status in ["succeeded", "failed", "canceled"]:
            return data

        print(f"Job {job_id} status: {status}, waiting...")
        time.sleep(1)

for start_sec in tqdm.tqdm(range(0, duration, period)):
    end_sec = min(start_sec + period, duration)
    
    # Calculate array indices based on sample rate
    start_idx = int(start_sec * TARGET_FS)
    end_idx = int(end_sec * TARGET_FS)
    
    # Extract segment
    segment = audio_data[0:end_idx]
    audio_chunks.append(segment)

    object_key = upload_array(segment)

    # Start diarization job
    response = requests.post(
        "https://api.pyannote.ai/v1/diarize",
        json={"url": f"media://{object_key}"},
        headers=headers
    )

    response.raise_for_status()
    job_id = response.json()["jobId"]

    data = wait_for_job(job_id)

    # Process diarization results
    res = data["output"].get("identification", data["output"].get("diarization"))
    updated_speakers = set()
    for segment in res:
        speaker,start,end = segment['speaker'], segment['start'], segment['end']

    
            
    for speaker in updated_speakers:
        segment = audio_voiceprints[speaker]
        object_key = upload_array(segment)

        response = requests.post(
            "https://api.pyannote.ai/v1/voiceprint",
            headers=headers,
            json={"url": f"media://{object_key}"}
        )

        response.raise_for_status()
        job_id = response.json()["jobId"]

        data = wait_for_job(job_id)
        voiceprints[speaker] = data["output"]["voiceprint"]