***Mini-projet Traitement du Signal***

In [1]:
import numpy as np
import scipy.io.wavfile as wavfile
from scipy.signal import resample
from lpc import *

In [None]:
if __name__ == '__main__':

    # -------------------------------------------------------
    # 1: Normalize and resample the signal at 8kHz
    # -------------------------------------------------------
    sampling_rate, speech = wavfile.read('./audio/speech.wav')

    # Normalization
    speech = np.array(speech)
    speech = 0.9 * speech / max(abs(speech))

    # Resampling
    target_sampling_rate = 8000
    target_size = int(len(speech) * target_sampling_rate / sampling_rate)
    speech = resample(speech, target_size)
    sampling_rate = target_sampling_rate

    # Save resampled signal
    wavfile.write("./results/speech_resampled.wav", sampling_rate, speech)

    # -------------------------------------------------------
    # 2: Block decomposition of the signal
    # -------------------------------------------------------
    
    def hamming_window(N, T, sampling_rate):
        t = np.linspace(0, T, N)  # Vecteur de temps allant de 0 à T
        return 0.54 - 0.46 * np.cos(2 * np.pi * t / T)

    # Utilisation d'une fenêtre de Hamming de 20 ms
    T = 0.02  # Largeur de la fenêtre en secondes
    N = int(T * sampling_rate)  # Nombre d'échantillons dans la fenêtre
    w = hamming_window(N, T, sampling_rate)  # Fenêtre de Hamming 

    # Recouvrement de 50%
    R = 0.5
    step = int(N * (1 - R))

    blocks, windowed_blocks = blocks_decomposition(speech, w, R=0.5)
    n_blocks, block_size = blocks.shape
    
    w = hann(floor(0.04*sampling_rate), False)
    

    def blocks_reconstruction(windowed_blocks, window, taux_recouvrement):
    
      pas = int(len(window) * (1 - taux_recouvrement))
      signal_reconstruit = np.zeros(speech.size)
      nombre_segments = len(windowed_blocks)

      for i in range(nombre_segments):
        indice_debut = i * pas
        signal_reconstruit[indice_debut:indice_debut+len(window)] += windowed_blocks[i]

      return signal_reconstruit
    
    # Check if the reconstruction of the signal is correct
    rec = blocks_reconstruction(windowed_blocks, w, speech.size, R = 0.5) 
    wavfile.write("./results/block_reconstruction.wav", sampling_rate, rec)  
 
     
    # -------------------------------------------------------
    # 3: Encodes the signal block by block
    # -------------------------------------------------------

    def lpc_encode(segment, p):
    
      N = len(segment)
      r = np.array([np.dot(segment[:N-k], segment[k:]) for k in range(p+1)])
      r = r / N  # Normalisation

      # Construction de la matrice Toeplitz
      r_mat = np.zeros((p, p))
      for i in range(p):
        for j in range(p):
            r_mat[i, j] = r[abs(i - j)]

      r_vect = r[1:p+1]
      coefficients = np.linalg.solve(r_mat, r_vect)
      prediction = np.zeros_like(segment)

      for n in range(p, N):
        prediction[n] = np.dot(coefficients, segment[n-p:n][::-1])

      return coefficients, prediction


    p = 32 # number of coefficients of the filter
    blocks_encoding = []
    
    for block, windowed_block in zip(blocks, windowed_blocks):

        coefs, prediction = lpc_encode(windowed_block, p)
        residual = windowed_block - prediction
        voiced, pitch = estimate_pitch(block, sampling_rate, threshold=1)
        
        blocks_encoding.append({'coefs': coefs, 
          'residual': residual,
          'size': block.size,
          'gain': np.std(residual),
          'pitch': pitch,
          'voiced': voiced})
               
    # -------------------------------------------------------
    # 4: Decodes each block based upon the residual
    # -------------------------------------------------------
    
    def lpc_decode(coefficients, residual):
    
      p = len(coefficients)
      N = len(residual)
      reconstructed_signal = np.zeros(N)

      for n in range(N):
        if n < p:
          reconstructed_signal[n] = residual[n]
        else:
            reconstructed_signal[n] = residual[n] + np.dot(coefficients, reconstructed_signal[n-p:n][::-1])

      return reconstructed_signal


    blocks_decoded = []
    for encoding in blocks_encoding:
      
        block_decoded = lpc_decode(encoding['coefs'], encoding['residual'])
        blocks_decoded.append(block_decoded)

    blocks_decoded = np.array(blocks_decoded)
    decoded_speech = blocks_reconstruction(blocks_decoded, w, speech.size, 
      R = 0.5)
      
    wavfile.write("./results/decoded_speech.wav", sampling_rate, decoded_speech)
    
    # -------------------------------------------------------
    # 5: Decodes each block based upon white noise
    # -------------------------------------------------------
    
    def lpc_decode(coefficients, excitation):
    
      p = len(coefficients)
      N = len(excitation)
      reconstructed_signal = np.zeros(N)

      for n in range(N):
        if n < p:
          reconstructed_signal[n] = excitation[n]
        else:
          reconstructed_signal[n] = excitation[n] + np.dot(coefficients, reconstructed_signal[n-p:n][::-1])

      return reconstructed_signal

    blocks_decoded = []
    for encoding in blocks_encoding:
      
        excitation = np.random.normal(0, encoding['gain'], encoding['size'])
        block_decoded = lpc_decode(encoding['coefs'], excitation)
        blocks_decoded.append(block_decoded)

    blocks_decoded = np.array(blocks_decoded)
    decoded_speech = blocks_reconstruction(blocks_decoded, w, speech.size, R = 0.5)
      
    wavfile.write("./results/decoded_speech_noise.wav", sampling_rate, decoded_speech)
    
    # -----------------------------------------------------------
    # 6: Decodes each block based upon the pitch (Bonus Question)
    # -----------------------------------------------------------
    
    blocks_decoded = []
    for encoding in blocks_encoding:
      
        if(encoding['voiced']):
        #if False:
        
            excitation = np.zeros(encoding['size'])
            step = int(round(encoding['pitch']*sampling_rate))
            excitation[::step] = 1
            excitation *= encoding['gain']/np.std(excitation)
            
        else:
        
            excitation = np.random.normal(0, encoding['gain'], encoding['size'])
        
        block_decoded = lpc_decode(encoding['coefs'], excitation)
        blocks_decoded.append(block_decoded)

    blocks_decoded = np.array(blocks_decoded)
    decoded_speech = blocks_reconstruction(blocks_decoded, w, speech.size, 
      R = 0.5)
      
    wavfile.write("./results/decoded_speech_pitch.wav", sampling_rate, 
     decoded_speech)

Pour démontrer que les coefficients $(α_k)_{1≤k≤p}$ sont solutions de l'équation matricielle donnée, nous devons minimiser l'erreur moyenne de l'estimation linéaire.

O a: $ \tilde{s}[n] = \sum_{k=1}^{p} α_k s[n - k] $ et l'erreur d'estimation $ϵ[n]$ est alors: $ϵ[n] = s[n] - \tilde{s}[n] = s[n] - \sum_{k=1}^{p} α_k s[n - k]$

L'erreur moyenne quadratique est alors: $\frac{1}{N} \sum_{n=1}^{N} ϵ[n]^2$ et nous devons trouver les coefficients $(α_k)$ tels que la dérivée de cette erreur par rapport à chaque $α_k$ soit nulle.

$$
\frac{\partial}{\partial α_k} \left( \frac{1}{N} \sum_{n=1}^{N} ϵ[n]^2 \right) = \frac{2}{N} \sum_{n=1}^{N} ϵ[n] \frac{\partial ϵ[n]}{\partial α_k}
$$

En remplaçant \(ϵ[n]\) par son expression, nous obtenons :

$$
\frac{\partial ϵ[n]}{\partial α_k} = -s[n - k]
$$

Ainsi, la dérivée devient :

$$
\frac{2}{N} \sum_{n=1}^{N} \left( s[n] - \sum_{j=1}^{p} α_j s[n - j] \right) (-s[n - k]) = 0
$$

En simplifiant, nous obtenons :

$$
\sum_{n=1}^{N} s[n] s[n - k] = \sum_{n=1}^{N} \sum_{j=1}^{p} α_j s[n - j] s[n - k]
$$

En réinsérant dans l'expression de $r_s[k]$:

$$
r_s[k] = \frac{1}{N} \sum_{n=1}^{N} s[n] s[n - k]
$$

Nous pouvons réécrire l'équation précédente sous forme matricielle :

$$
\begin{pmatrix}
r_s[0] & r_s[1] & \cdots & r_s[p-1] \\
r_s[1] & r_s[0] & \cdots & r_s[p-2] \\
\vdots & \vdots & \ddots & \vdots \\
r_s[p-1] & r_s[p-2] & \cdots & r_s[0]
\end{pmatrix}
\begin{pmatrix}
α_1 \\
α_2 \\
\vdots \\
α_p
\end{pmatrix}
=
\begin{pmatrix}
r_s[1] \\
r_s[2] \\
\vdots \\
r_s[p]
\end{pmatrix}
$$