#Linear Predictive Analysis

In [37]:
#Importing all the necessary libraries together throughout the notebook

In [38]:
import librosa #Import the librosa library, which is used for audio and music analysis.
import librosa.display # Import the librosa.display submodule for displaying audio-related visualizations.
import matplotlib.pyplot as plt # Import the matplotlib.pyplot module for creating plots and visualizations.
%matplotlib inline
import matplotlib.patches as patches
import numpy as np # Import numpy as np, Numpy is used for numerical computations.
import scipy as sp # Scipy is used for scientific and technical computing.
from scipy import signal # Import the signal module from scipy for signal processing functions.
import random #provides various random number generators.
import scipy
from scipy.signal import lfilter, freqz #Imports functions of linear filter and frequency Response of a filter.
from scipy.fft import fft, ifft,fftfreq
from scipy.io import wavfile
from scipy.signal import spectrogram, hamming
from scipy.interpolate import make_interp_spline
from IPython.display import display, Audio, HTML # Import the display, Audio, and HTML classes/functions from the IPython.display module.
from control import TransferFunction
from control import pzmap

In [39]:
#defining necessary functions together to be used throughout the notebook

#Pre-emphasis
Pre-emphasis is a signal processing technique that enhances higher-frequency components in a signal to improve its quality for various applications. It is achieved by subtracting a fraction of the previous sample from the current one

$y[n]=x[n]-k*x[n-1]$

Where:

$y[n]$ is the emphasized output signal.

$x[n]$ is the original input signal.

$k$ is a constant controlling the emphasis level.

In [40]:
def pre_emphasis(input,k):
  duration = len(input) # Calculate the duration (length) of the input signal
  output = np.zeros_like(input) # Create an array to store the pre-emphasized signal, initialized with zeros
  output[0] = input[0] # Create an array to store the pre-emphasized signal, initialized with zeros
  for n in range(1,duration): # Loop through the input signal to perform pre-emphasi
    output[n] = input[n] - k*input[n-1]  # Apply the pre-emphasis formula: y[n] = x[n] - k * x[n-1]
  return output

"narrowband magnitude spectrum slice using a Hamming window" is a method for isolating and analyzing a specific range of frequencies within a signal while minimizing distortion by applying a smoothing technique.

In [41]:
def narrowband_spectrum(input,duration,fs):
    middle = int(len(input)//2) # Calculate the middle index of the input signal
    window_length = int((duration/1000)*fs) # Calculate the middle index of the input signal
    win_len_half = window_length//2 # Calculate the middle index of the input signal
    windowed_input = input[middle-win_len_half:middle+win_len_half] # Extract a slice of the input signal centered around the middle point
    output = np.hamming(window_length)*windowed_input # Extract a slice of the input signal centered around the middle point
    #w,h = signal.freqz(output)
    #frequency_reponse = (fs*w/(2*np.pi))/1000
    #magnitude_dB = 20 * np.log10(h)
    return output, middle, window_length


Autocorrelation, denoted by $p_{k}$
 , is a statistical measure that assesses the correlation between data points in a time series at different time lags. It quantifies the degree to which a data point at time $t$ is related to a data point at time
$t-k$, where $k$ is the time lag


\[
\rho_k = \frac{\sum_{t=k+1}^{N}(X_t - \bar{X})(X_{t-k} - \bar{X})}{\sqrt{\sum_{t=1}^{N}(X_t - \bar{X})^2 \sum_{t=k+1}^{N}(X_{t-k} - \bar{X})^2}}\]


Where:
- $rho_k$ is the autocorrelation at lag $k$.
- $X_t$ represents the value of the time series at time $t$.
- $\barX$ is the mean (average) of the time series values.
- $k$ is the time lag, indicating how many time periods back the correlation is being calculated.
- $N$ is the total number of data points in the time series.

In this formula, $rho_k$ varies between -1 (perfect negative correlation) and 1 (perfect positive correlation), with 0 signifying no correlation between the data points at the given time lag \( k \). Autocorrelation is a fundamental concept in time series analysis, offering insights into temporal dependencies and patterns within the data.

In [42]:
def autocorrelation(input, p=None): # Calculate the autocorrelation of the 'input' signal using numpy's correlate function
    R = np.correlate(input, input, mode='full')
    # Since we're interested in the right half of the symmetric autocorrelation function,
    # we slice 'R' to keep only that part.
    R = R[-len(input):]
    if p is None: # If 'p' is not specified, return the full autocorrelation function
        return R  # Return full autocorrelation
    else: # If 'p' is specified, return only the first 'p + 1' elements of the autocorrelation
        return R[:p + 1]

The Levinson-Durbin algorithm is a technique to efficiently estimate the coefficients of an autoregressive (AR) model. In a nutshell, it minimizes the prediction error step by step

\begin{align*}
\alpha_m &= \frac{1}{E_m}\left(E_m^+ - \sum_{j=1}^{m-1}A_jX_{m-j}\right)\\
A_m &= -\alpha_m\\
X_m &= A_m - \sum_{j=1}^{m-1}A_jX_{m-j}
\end{align*}

$\alpha_m$ is a reflection coefficient at each step. \\
$A_m$ represents the AR model coefficients. \\
$X_m$ is the prediction error. \\
$E_m$ is the cumulative prediction error up to step $m$. \\
$E_m^+$ is the prediction error after updating at step $m$.


In [43]:
def levinson_algorithm(R, p):
    # Initialize arrays for error, 'a' coefficients, gain, and 'k' coefficients
    error = np.zeros(p + 1)
    a = np.zeros((p + 1, p + 1))
    gain = np.zeros(p + 1)
    k = np.zeros(p + 1)
    a[:, 0] = 1 # Initialize arrays for error, 'a' coefficients, gain, and 'k' coefficients
    error[0] = R[0] # The initial error is set to the first element of R
    k[1] = R[1] / error[0]  # Calculate the first reflection coefficient 'k'
    a[1][1] = k[1] # Store the first 'k' coefficient in the 'a' matrix
    error[1] = (1 - k[1] ** 2) * error[0] # Calculate the updated error and store it
    gain[1] = np.sqrt(error[1]) # Calculate the gain factor for this step
    for i in range(2, p + 1): # Loop for the remaining 'k' coefficients and 'a' coefficients
        t = 0
        for j in range(1, i):
            t += a[i - 1][j] * R[i - j] # Compute a sum involving the previous 'a' coefficients and R values
        k[i] = (R[i] - t) / error[i - 1] # Calculate the next reflection coefficient 'k'
        a[i][i] = k[i] # Store this 'k' coefficient in the 'a' matrix
        for j in range(1, i):
            a[i][j] = a[i - 1][j] - k[i] * a[i - 1][i - j] # Compute the next row of 'a' coefficients
        error[i] = (1 - k[i] ** 2) * error[i - 1]
        gain[i] = np.sqrt(error[i]) # Calculate the gain factor for this step

    return error, gain, a

#linear_predictor Function


a. Autocorrelation Calculation (r = autocorrelation(input))

b. Levinson-Durbin Algorithm (error, gain, a = levinson_algorithm(r, p))

In [44]:
def linear_predictor(input, p):
    r = autocorrelation(input) # Calculate the autocorrelation of the input data.
    error, gain, a = levinson_algorithm(r, p) # Use the Levinson-Durbin recursion algorithm to estimate the prediction error, prediction gain, and predictor coefficients.
    return error, gain, a

#Pole Zero Plot

In [45]:
'''def plot_pole_zero(z, p):
    # Create a figure/plot
    fig, ax = plt.subplots()
    # Create the unit circle
    unit_circle = patches.Circle((0, 0), radius=1, fill=False, color='black', linestyle='dashed')
    ax.add_patch(unit_circle)
    # Add axes lines
    plt.axvline(0, color='0.8')
    plt.axhline(0, color='0.8')

    # Plot the poles and zeros
    plt.plot(p.real, p.imag, 'x', label='Poles')
    plt.plot(z.real, z.imag, 'o', label='Zeros')

    # Set the axis limits and ticks
    axis_range = 1.5
    plt.axis('scaled')
    plt.axis([-axis_range, axis_range, -axis_range, axis_range])
    ticks = [-1, -0.5, 0.5, 1]
    plt.xticks(ticks)
    plt.yticks(ticks)'''
def plot_pole_zero(z, p):
    plt.clf()  # Clear the previous plot
    # Create a figure/plot
    fig, ax = plt.subplots()
    # Create the unit circle
    unit_circle = patches.Circle((0, 0), radius=1, fill=False, color='black', ls='dashed')
    ax.add_patch(unit_circle)
    # Add axes lines
    plt.axvline(0, color='0.8')
    plt.axhline(0, color='0.8')
    # Plot the poles and zeros
    plt.plot(p.real, p.imag, 'x', label='Poles')
    plt.plot(z.real, z.imag, 'o', label='Zeros')
    # Set the axis limits and ticks
    axis_range = 1.5
    plt.axis('scaled')
    plt.axis([-axis_range, axis_range, -axis_range, axis_range])
    ticks = [-1, -0.5, 0.5, 1]
    plt.xticks(ticks)
    plt.yticks(ticks)
    # Set labels and legend
    plt.title("Pole-Zero Plot")
    plt.xlabel("Real")
    plt.ylabel("Imaginary")

def plot_pole_zero_for_p(narrowband_output, p): # Function to plot pole-zero diagram for a specific 'p' value.
    error, gain, a = linear_predictor(narrowband_output, 10) # Perform linear prediction to get predictor coefficients.
    coeff_den = [a[p][0], *(-a[p][1:p+1])] # Extract the denominator coefficients of the transfer function.
    z, poles, k = signal.tf2zpk(gain[p], coeff_den) # Convert the coefficients to zeros, poles, and gain.
    fig = plt.figure() # Create a new plot for the pole-zero diagram for 'p'
    plt.title('Pole Zero plot for p=' + str(p))
    plot_pole_zero(z, poles)
    plt.grid(True, color='0.9')
    print('Pole Zero plot for p=' + str(p))
    plt.show()

In [46]:
def LPC_spectrum(a, G, fs, input, orders=[1, 2, 4, 6, 8, 10]):
    num_orders = len(orders) # Calculate the number of orders to process
    w_sig, h_sig = signal.freqz(input) # Calculate the frequency response of the original input signal
    w_khz = (fs * w_sig / (2 * np.pi) / 1000)  # Convert the angular frequency to kilohertz
    h_db = 20 * np.log10(abs(h_sig)) # Calculate the magnitude in decibels (dB) of the original signal
    plt.figure(figsize=(20, 10))

    for idx, order in enumerate(orders, start=1):  # Loop through the specified LPC orders
        poles = [a[order][0], *(-a[order][1:order + 1])]  # Extract the LPC poles from the 'a' coefficients
        w, h = signal.freqz(G[order], poles) # Calculate the frequency response of the LPC filter with the specified order
        plt.subplot(2, 3, idx)
        plt.plot(w_khz, h_db, linestyle='dashed', label="Original Windowed")
        plt.plot((fs * w / (2 * np.pi) / 1000), 20 * np.log10(abs(h)), label="Estimated") # Plot the original signal's frequency response as a dashed line # Plot the estimated LPC filter's frequency response
        plt.title(f"LPC spectrum for p = {order}")
        plt.xlabel("Frequency (KHz)")
        plt.ylabel(r"Magnitude $|H(\omega)|$ (dB)")
        plt.grid()
        plt.xlim(xmin=0)
        plt.legend()
        plt.autoscale(enable=True, axis='x', tight=True)

Given:

gain is the prediction gain obtained from linear prediction.
coeff is an array of prediction coefficients obtained from linear prediction.
input is the input signal.
Inverse Filtering (Inverse Filter):

**1. Initialization:**
   - Sampling rate: $F_s$
   - Signal duration: $text{duration}$ (in seconds)
   - Window duration in samples: $\text{window_duration} = \left(\frac{\text{duration}}{1000}\right) \cdot F_s $
   - Length of input signal: ${length}$
   - Create an initial inverse filter: \( \text{inverse_filter}$

**2. Inverse Filtering:**
   - For each sample \( i \) in the input signal:
     - Initialize \( \text{inverse\_filter}[i] \) with the value of the input at that sample.
     - For each prediction coefficient \( j \) in \( \text{coeff} \):
       - Update \( \text{inverse\_filter}[i] \) by subtracting the coefficient \( \text{coeff}[j] \) times the input signal value at \( i - j \) samples.
     - Normalize \( \text{inverse\_filter}[i] \) by dividing it by the prediction gain \( \text{gain} \).

**3. Autocorrelation:**
   - Calculate the autocorrelation of the inverse filter:
     \[ \text{r\_inverse} = \text{np.correlate}(\text{inverse\_filter}, \text{inverse\_filter}, \text{mode}="same") \]
   - Calculate the autocorrelation of the input signal:
     \[ \text{r\_signal} = \text{np.correlate}(\text{input}, \text{input}, \text{mode}="same") \]

**4. Fundamental Frequency Estimation:**
   - Find the indices of the first and second peaks in \( \text{r\_inverse} \):
     \[ \text{first} = \text{np.argmax}(\text{r\_inverse}) \]
     \[ \text{second} = \text{np.argmax}(\text{r\_inverse}[\text{r\_inverse} < 0.8 \cdot \text{np.max}(\text{r\_inverse})]) \]
   - Calculate the fundamental frequency $F_0$ using the peak indices:
     $ F_0 $= $\frac{F_s}{\text{first} - \text{second}}$


Where:
Fs is the sampling rate of the signal.
The algorithm essentially uses linear prediction

In [47]:
def fundamental_frequency(gain, coeff, input):
    Fs=44100 # Sampling rate
    duration=30 # Sampling rate
    window_duration = int((duration / 1000) * Fs) # Calculate the window duration in samples based on the signal duration
    length = input.shape[0]
    inverse_filter = np.copy(input) # Create a copy of the input signal for inverse filtering
    for i in range(length): # Apply the inverse filter to estimate the fundamental frequency
        for j in range(min(i + 1, len(coeff))):
            inverse_filter[i] -= coeff[j] * input[i - j] # Update the inverse filter using the prediction coefficients
        inverse_filter[i] /= gain  # Normalize the inverse filter by the prediction gain
    r_inverse = np.correlate(inverse_filter, inverse_filter, mode="same") # Compute the autocorrelation of the inverse filter and the input signal
    r_signal = np.correlate(input, input, mode="same")
    first = np.argmax(r_inverse) # Find the first and second peaks in the autocorrelation of the inverse filter
    second = np.argmax(r_inverse[r_inverse < 0.8 * np.max(r_inverse)])
    F0 = Fs / (first - second) # Calculate the fundamental frequency (F0) using the peaks

    return inverse_filter, F0, r_inverse, r_signal

In [48]:
def generate_triangular_excitation(pitch, duration, pulse_sample_width, sampling_frequency):
    samples_per_period = int(sampling_frequency / pitch)
    total_samples = int(sampling_frequency * duration)
    # Calculate the "ON" and "OFF" segments of the triangular pulse
    half_pulse_width = pulse_sample_width // 2
    on_segment = np.linspace(0, 1, half_pulse_width, endpoint=False)
    off_segment = np.linspace(1, 0, half_pulse_width)
    # Construct the full pulse
    pulse = np.concatenate((on_segment, off_segment))
    # Ensure the pulse length matches the desired width
    if len(pulse) > pulse_sample_width:
        pulse = pulse[:pulse_sample_width]
    elif len(pulse) < pulse_sample_width:
        pulse = np.pad(pulse, (0, pulse_sample_width - len(pulse)))
    # Initialize the source_excitation
    source_excitation = np.zeros(total_samples)
    # Replicate the pulse for the desired duration, taking care of boundaries
    for i in range(0, total_samples, samples_per_period):
        if i + len(pulse) <= total_samples:
            source_excitation[i:i + len(pulse)] = pulse
        else:
            source_excitation[i:] = pulse[:total_samples - i]

    time_values = np.linspace(0, duration, total_samples)

    return source_excitation, time_values


'def generate_triangular_excitation(pitch, duration, pulse_sample_width, sampling_frequency):\n    samples_per_period = int(sampling_frequency / pitch)\n    total_samples = int(sampling_frequency * duration)\n    # Calculate the "ON" and "OFF" segments of the triangular pulse\n    half_pulse_width = pulse_sample_width // 2\n    on_segment = np.linspace(0, 1, half_pulse_width, endpoint=False)\n    off_segment = np.linspace(1, 0, half_pulse_width)\n    # Construct the full pulse\n    pulse = np.concatenate((on_segment, off_segment))\n    # Ensure the pulse length matches the desired width\n    if len(pulse) > pulse_sample_width:\n        pulse = pulse[:pulse_sample_width]\n    elif len(pulse) < pulse_sample_width:\n        pulse = np.pad(pulse, (0, pulse_sample_width - len(pulse)))\n    # Initialize the source_excitation\n    source_excitation = np.zeros(total_samples)\n    # Replicate the pulse for the desired duration, taking care of boundaries\n    for i in range(0, total_samples, sam