## Utilitaire : imports & paramètres

In [5]:
import numpy as np
from scipy import signal, fftpack
import math

# paramètres communs
SR = 16000          # sampling rate cible (Hz)
FRAME_MS = 25       # taille trame en ms
STRIDE_MS = 10      # pas entre trames en ms
NFFT = 512          # FFT points
PRE_EMPH = 0.97     # coefficient de pré-accentuation
NFILT = 40          # nombre de filtres Mel
NUM_CEPS = 13       # dimension MFCC


## Chargement et (re)échantillonnage

### But :
obtenir un tableau audio (float32) et la fréquence d’échantillonnage sr homogène. 

### Explication

- Lire un fichier WAV/FLAC et convertir en mono float (−1.0 … 1.0).

- Si l’échantillonnage n’est pas SR (ex. 16000 Hz), on resample pour homogénéiser les données d’entrée du pipeline.


Entrée : fichier WAV stereo 44100 Hz (durée 3 s)
→ audio non traité : dtype int16, shape (132300, 2).

Sortie : (sr=16000, audio)
→ dtype float32, shape (48000,), valeurs ≈ [-0.23, 0.12, 0.01, ...].

In [6]:
# Exemple minimal : lire avec scipy (WAV mono) puis resampler si nécessaire
from scipy.io import wavfile
from scipy import signal as sps

def load_audio(path, target_sr=SR):
    sr, audio = wavfile.read(path)                 # audio peut être int16 etc.
    # normaliser en float32 entre -1 et 1
    if audio.dtype == 'int16':
        audio = audio.astype(np.float32) / 32768.0
    elif audio.dtype == 'int32':
        audio = audio.astype(np.float32) / 2147483648.0
    elif audio.dtype == 'uint8':
        audio = (audio.astype(np.float32) - 128) / 128.0
    audio = audio.mean(axis=1) if audio.ndim == 2 else audio  # mono
    if sr != target_sr:
        # resampling
        number_of_samples = round(len(audio) * float(target_sr) / sr)
        audio = sps.resample(audio, number_of_samples)
        sr = target_sr
    return sr, audio

sr, audio = load_audio("../data/atoutalheure.wav", target_sr=16000)


## Pré-accentuation (pre-emphasis)

### But : 
renforcer les hautes fréquences pour compenser la baisse énergétique et améliorer la détection de certains traits (consonnes/fricatives).

### Explication

- Filtre simple : y[n] = x[n] - a * x[n-1] (avec a ≈ 0.95–0.98).
- Effet : augmente la pente du spectre (atténue composantes basses, met en valeur hautes).


Entrée : audio[:8] = [0.0, 0.1, 0.15, 0.12, 0.08, -0.02, -0.04, 0.0]

Sortie (a=0.97) : [0.0, 0.003, 0.022, -0.0189, -0.0556, -0.0966, -0.0408, 0.0388]
(les valeurs illustratives montrent l’augmentation relative des variations rapides)

In [7]:
def pre_emphasis(signal_in, pre_emph=PRE_EMPH):
    return np.append(signal_in[0], signal_in[1:] - pre_emph * signal_in[:-1])

# Usage
audio_preemph = pre_emphasis(audio, pre_emph=0.97)


## Tramage (framing) + fenêtrage (Hamming)

### But : 
découper le signal en petites trames quasi-stationnaires (p.ex. 25 ms) avec chevauchement (p.ex. stride 10 ms) puis appliquer une fenêtre Hamming.

### Explication

- On suppose la stationnarité sur ~20–30 ms.

- On crée un tableau frames shape (num_frames, frame_len) et on multiplie chaque trame par une fenêtre hamming(frame_len).


Entrée : audio_preemph length 48000 (3 s @ 16kHz)

Sortie : frames_win shape (num_frames= (48000-400)/160 + 1 ≈ 296, frame_len=400) (25 ms → 400 samples)

frames_win[0] : vecteur de 400 floats.

In [8]:
def enframe(signal_in, sr=SR, frame_ms=FRAME_MS, stride_ms=STRIDE_MS):
    frame_len = int(sr * frame_ms / 1000)
    frame_step = int(sr * stride_ms / 1000)
    signal_length = len(signal_in)
    num_frames = 1 + int(np.floor((signal_length - frame_len) / frame_step))
    pad_len = num_frames * frame_step + frame_len
    pad_signal = np.append(signal_in, np.zeros(pad_len - signal_length))
    # stride trick pour efficacité (ici code simple)
    frames = np.stack([pad_signal[i*frame_step : i*frame_step + frame_len] for i in range(num_frames)])
    win = np.hamming(frame_len)
    return frames * win, frame_len, frame_step

# Usage
frames_win, frame_len, frame_step = enframe(audio_preemph, sr)


## Spectre de puissance (power spectrum)

### But : 
pour chaque trame, calculer la FFT (magnitude ou puissance) — base pour spectrogramme / filtration Mel.

### Explication

- On calcule X[k] = FFT(frame, NFFT) puis |X[k]| ou puissance |X[k]|^2 / NFFT.

- Pour gain de compacité on conserve uniquement les bins réelles : rfft.


Entrée : frames_win shape (296, 400)

Sortie : pow_frames shape (296, 257) (pour NFFT=512)

- pow_frames[0, :10] : [1.2e-5, 3.3e-6, 2.7e-6, ...] (valeurs d’énergie par bin)

In [9]:
def power_spectrum(frames_win, NFFT=NFFT):
    # frames_win: (num_frames, frame_len)
    mag = np.abs(np.fft.rfft(frames_win, n=NFFT, axis=1))
    pow_spec = (1.0 / NFFT) * (mag ** 2)
    return pow_spec  # shape (num_frames, NFFT/2+1)

# Usage
pow_frames = power_spectrum(frames_win)


## Banque de filtres Mel → Log-Mel Spectrum

### But : 
regrouper l’énergie spectrale selon l’échelle Mel (perceptuelle) puis prendre le log — vecteur robuste et compact.

### Explication

- Convertir fréquences linéaires → mel (non-linéaire), placer NFILT triangles entre 0 et Nyquist.

- Appliquer chaque filtre sur pow_frames pour obtenir filter_banks shape (num_frames, NFILT).

- Prendre log (ou 20*log10) pour correspondre à la perception loudness.


Entrée : pow_frames shape (296, 257)

Sortie : log_mel shape (296, 40)

- log_mel[0, :5] : [-45.2, -50.1, -42.9, -39.8, -55.0] (dB approximatifs)

In [10]:
def hz_to_mel(hz): return 2595.0 * np.log10(1 + hz / 700.0)
def mel_to_hz(mel): return 700.0 * (10**(mel / 2595.0) - 1)

def mel_filterbanks(pow_frames, sr=SR, nfilt=NFILT, NFFT=NFFT):
    low_freq = 0
    high_freq = sr / 2
    low_mel = hz_to_mel(low_freq)
    high_mel = hz_to_mel(high_freq)
    mel_points = np.linspace(low_mel, high_mel, nfilt + 2)
    hz_points = mel_to_hz(mel_points)
    bin_points = np.floor((NFFT + 1) * hz_points / sr).astype(int)
    fbank = np.zeros((nfilt, int(np.floor(NFFT/2 + 1))))
    for m in range(1, nfilt+1):
        f_m_minus = bin_points[m-1]; f_m = bin_points[m]; f_m_plus = bin_points[m+1]
        for k in range(f_m_minus, f_m):
            fbank[m-1, k] = (k - f_m_minus) / (f_m - f_m_minus + 1e-8)
        for k in range(f_m, f_m_plus):
            fbank[m-1, k] = (f_m_plus - k) / (f_m_plus - f_m + 1e-8)
    filter_banks = np.dot(pow_frames, fbank.T)
    # numerical stability
    filter_banks = np.where(filter_banks == 0, np.finfo(float).eps, filter_banks)
    log_fbanks = 20 * np.log10(filter_banks)
    return log_fbanks  # shape (num_frames, nfilt)

# Usage
log_mel = mel_filterbanks(pow_frames)


## MFCC (Mel Frequency Cepstral Coefficients)

### But : 
obtenir des coefficients compacts et peu corrélés (par DCT) à partir des log-Mel.

### Explication

- Appliquer la DCT (type II) sur chaque vecteur log-Mel et conserver NUM_CEPS premières composantes.

- On applique souvent un cepstral lifter (poids) et on peut ajouter les deltas/delta-deltas (dérivées temporelles).


Entrée : log_mel shape (296, 40)

Sortie : mfcc shape (296, 13)

- mfcc[0] : [ -2.12, 1.34, -0.45, 0.12, ... ] (valeurs unitless, données d’apprentissage)

In [11]:
def mfcc_from_log_mel(log_mel, num_ceps=NUM_CEPS, lifter=22):
    # log_mel: (num_frames, nfilt)
    ceps = fftpack.dct(log_mel, type=2, axis=1, norm='ortho')[:, :num_ceps]
    # liftering
    n = np.arange(num_ceps)
    lift = 1 + (lifter/2.) * np.sin(np.pi * n / lifter)
    ceps *= lift
    return ceps  # shape (num_frames, num_ceps)

# optionally compute deltas:
def deltas(feat, N=2):
    rows, cols = feat.shape
    denom = 2 * sum([i*i for i in range(1, N+1)])
    delta_feat = np.empty_like(feat)
    padded = np.pad(feat, ((N,N),(0,0)), mode='edge')
    for t in range(rows):
        delta_feat[t] = sum([n * (padded[t+N+n] - padded[t+N-n]) for n in range(1, N+1)]) / denom
    return delta_feat

# Usage
mfcc = mfcc_from_log_mel(log_mel)
mfcc_delta = deltas(mfcc)
mfcc_ddelta = deltas(mfcc_delta)


## Pitch (F0) & RMS (énergie)

### But : 
extraire des features prosodiques — fondamental pour la prosodie / TTS / diarisation / certaines tâches ASR.

### Explication

- Méthodes courantes : autocorr + peak picking (bon pour voix régulière) ou méthodes plus robustes (YIN, pYIN, librosa.pyin).

- RMS : énergie par trame = sqrt(mean(frame^2)).


Entrée : la trame frames_win[50] (voisée)

Sortie : f0 ≈ 120.0 Hz (valeur si la frame contient une voix fondée sur F0=120Hz)

RMS : 0.03 (énergie moyenne, unité = amplitude audio)

In [12]:
def autocorr_pitch(frame, sr=SR, fmin=50, fmax=400):
    # renvoyer 0 si non voiced / pas d'estimation
    corr = np.correlate(frame, frame, mode='full')[len(frame)-1:]
    min_lag = int(sr / fmax)
    max_lag = int(sr / fmin)
    if max_lag >= len(corr): return 0.0
    peak_lag = np.argmax(corr[min_lag:max_lag]) + min_lag
    peak_val = corr[peak_lag]
    if peak_val < 1e-6: return 0.0
    return sr / peak_lag

def pitch_track(frames_win, sr=SR):
    return np.array([autocorr_pitch(frames_win[i], sr) for i in range(frames_win.shape[0])])

def rms_energy(frames_win):
    return np.sqrt(np.mean(frames_win**2, axis=1))

# Usage
f0s = pitch_track(frames_win)
rms = rms_energy(frames_win)


## Normalisation (CMVN : cepstral mean & variance normalization)

### But : 
rendre les features invariantes aux différences d’enregistrement / gain. Très utile avant d’alimenter un modèle.

### Explication

- On centre (soustrait la moyenne) et on divise par l’écart-type par dimension : x' = (x - mean)/std.

- Alternatives : normalisation sur fenêtres (sliding CMVN).


Entrée : mfcc shape (296, 13)

Sortie : mfcc_norm shape (296, 13) où chaque colonne a mean≈0 et std≈1.

In [13]:
def cmvn(features, eps=1e-10):
    mu = np.mean(features, axis=0)
    sigma = np.std(features, axis=0)
    return (features - mu) / (sigma + eps)

# Usage
mfcc_norm = cmvn(mfcc)


In [14]:
# inputs: mfcc_norm (T x D)
# acoustic_model: model -> logits (T x V) (V = taille vocab. de tokens)
# lm: language model scoring function (optional)

logits = acoustic_model(mfcc_norm)  # shape (T, V)
# softmax pour obtenir probabilités
probs = softmax(logits, axis=1)

# Beam search (très schématique)
def ctc_beam_search(probs, beam_width=10, lm=None):
    beams = [("", 1.0)]  # (prefix, score)
    for t in range(probs.shape[0]):
        new_beams = {}
        for prefix, score in beams:
            for token in topk_indices(probs[t], k=beam_width):
                p_token = probs[t, token]
                new_prefix = extend(prefix, token)  # gère tokens blanks/merge
                new_score = score * p_token
                if lm: new_score *= lm.score_partial(new_prefix)
                # keep best per prefix
                if new_prefix not in new_beams or new_beams[new_prefix] < new_score:
                    new_beams[new_prefix] = new_score
        # keep top-K beams
        beams = sorted(new_beams.items(), key=lambda x: x[1], reverse=True)[:beam_width]
    return beams[0][0]  # best hypothesis after T frames

# Note: en pratique on utilise des librairies (ctcdecode, warp-ctc, fairseq, huggingface) solide.


NameError: name 'acoustic_model' is not defined