In [1]:
from scipy.io.wavfile import read
import numpy as np
import matplotlib.pyplot as plt
from typing import List, Tuple
import pandas as pd

In [2]:
rate, data = read("data/data.wav")

print(f"Częstotliwosc próbkowania: {rate} Hz")
print(f"Liczba próbek sygnału: {len(data)}")

Częstotliwosc próbkowania: 11025 Hz
Liczba próbek sygnału: 270113


In [3]:
def get_autocorr_coeff(data: np.ndarray, max_lag: int = 10) -> List:
    data_pd = pd.Series(data)
    autocorr_coeffs = []
    
    for i in range(max_lag+1):
        autocorr_coeffs.append(data_pd.autocorr(i).item())

    return autocorr_coeffs

# def get_autocorr_coeff(data: np.ndarray, max_lag: int = 10) -> List:
#     p_list = []
#     p_list.append(np.mean(data*data))   # Add p_0

#     for i in range(1, max_lag+1):
#         p_list.append(np.mean(data[i:]*data[:-i]))

#     return p_list

In [4]:
# Removes empty coefficients added in calculate_levinson_durbin to keep indexing from 1
def clean_coeff_list(coeff_list: List) -> List:
    del coeff_list[0]
    return [coeffs[1:] for coeffs in coeff_list]

# Given reflection coefficients returns True if AR model is stable
def check_stability(refl_coeffs: List) -> bool:
    for refl_coeff in refl_coeffs:
        if np.abs(refl_coeff) >= 1:
            return False
    return True

def calculate_levinson_durbin(data: np.ndarray, r_max: int = 10) -> Tuple:
    p_list = get_autocorr_coeff(data, r_max)

    sigmas = []
    sigmas.append(p_list[0])    # Add sigma_0

    a_list = [[0] for _ in range(0, r_max+1)]   # Adding 0 to every list to start indexing from 1 possible later
    k_list = []     # Reflection coefficients
    for i in range(1, r_max+1):
        p_i = p_list[i]

        z = 0   # Sum from j=1 to i-1 a_j,i-1(N)*p(i-j)(N)

        for j in range(1, i):
            z += a_list[i-1][j]*p_list[i-j]
        ki = (p_i - z) / sigmas[i-1]
        k_list.append(ki)

        for j in range(1, i+1):
            if j == i:
                a_list[i].append(ki)
            else:
                a_list[i].append(a_list[i-1][j]-ki*a_list[i-1][i-j])

        sigma = (1-ki**2)*sigmas[i-1]
        sigmas.append(sigma)

    a_list = clean_coeff_list(a_list)
    return a_list, k_list

In [5]:
class ARModel:
    def __init__(self, order: int = 4) -> None:
        self.order = order
        self.theta = np.zeros((order,1), dtype=np.float64)

    def set_coeffs(self, coeffs: np.ndarray) -> None:
        self.theta = coeffs

    def _predict(self, x: np.ndarray) -> float:
        return float(self.theta.T @ x)

In [22]:
class Transmitter:
    def __init__(self, ar_order: int = 10, segment_width: int = 256, overlap: int = 10) -> None:
        self.segment_width = segment_width
        self.overlap = overlap
        self.ar_model = ARModel(order=ar_order)

    def forward(self, data: np.ndarray) -> Tuple:
        segmented_data = self._prepare_data(data)

        errors_list = []
        coeffs_list = []
        targets_list = []
        predictions_list = []
        for segment in segmented_data:
            ar_model_coeffs, refl_coeffs = calculate_levinson_durbin(segment)
            self.ar_model.set_coeffs(np.flip(np.array(ar_model_coeffs[-1], dtype=np.float32)))
            
            targets = []
            predictions = []
            segment_errors = []
            for i in range(self.ar_model.order, len(segment)):
                curr_data = segment[i-self.ar_model.order:i]
                prediction = self.ar_model._predict(curr_data)
                target = segment[i]
                error = np.abs(target - prediction)
                segment_errors.append(error)
                predictions.append(prediction)
                targets.append(target)
            
            errors_list.append(segment_errors)
            coeffs_list.append(ar_model_coeffs)
            predictions_list.append(predictions)
            targets_list.append(targets)

        return coeffs_list, errors_list, predictions_list, targets_list, refl_coeffs

    # Given data cuts it into segments with specified overlap
    def _prepare_data(self, data: np.ndarray) -> np.ndarray:
        n_segments = len(data) // (self.segment_width - self.overlap)

        segments = []
        for i in range(n_segments):
            start_index = i*self.segment_width-i*self.overlap
            end_index = start_index+self.segment_width
            segment = data[start_index:end_index]

            # Drop last segment if it's length is not equal to segment width
            if i == n_segments-1 and len(segment) != self.segment_width:
                break

            segment = self._flatten_segment_ends(segment)
            segments.append(segment)

        return np.array(segments)
    
    def _flatten_segment_ends(self, segment: np.ndarray) -> np.ndarray:
        cos_arg = (2*np.pi / (len(segment)+1))*np.arange(1, len(segment)+1)
        return 0.5*(1 - np.cos(cos_arg))*segment

In [23]:
transmitter = Transmitter()

In [24]:
coeffs, errors, predictions_list, targets_list, refl_coeffs = transmitter.forward(data)

In [25]:
check_stability(refl_coeffs)

True

In [9]:
data.max()

np.int16(8047)

In [10]:
np.mean(errors)

np.float64(34.32125646675281)