### Imports

In [None]:
import torch
import numpy as np
import pandas as pd
from itertools import product
from src.forward import *
from src.knn import *

device = "cuda" if torch.cuda.is_available() else "cpu"

### Phi definition

In [3]:
class FIRFilter(torch.nn.Module):
    
    def __init__(self, filter_type="hp", coef=0.85, fs=44100, ntaps=101, plot=False):

        """Initilize FIR pre-emphasis filtering module."""
        super(FIRFilter, self).__init__()
        self.filter_type = filter_type
        self.coef = coef
        self.fs = fs
        self.ntaps = ntaps
        self.plot = plot

        import scipy.signal

        if ntaps % 2 == 0:
            raise ValueError(f"ntaps must be odd (ntaps={ntaps}).")

        if filter_type == "hp":
            self.fir = torch.nn.Conv1d(1, 1, kernel_size=3, bias=False, padding=1)
            self.fir.weight.requires_grad = False
            self.fir.weight.data = torch.tensor([1, -coef, 0]).view(1, 1, -1)
        elif filter_type == "fd":
            self.fir = torch.nn.Conv1d(1, 1, kernel_size=3, bias=False, padding=1)
            self.fir.weight.requires_grad = False
            self.fir.weight.data = torch.tensor([1, 0, -coef]).view(1, 1, -1)
        elif filter_type == "aw":
            # Definition of analog A-weighting filter according to IEC/CD 1672.
            f1 = 20.598997
            f2 = 107.65265
            f3 = 737.86223
            f4 = 12194.217
            A1000 = 1.9997

            NUMs = [(2 * np.pi * f4) ** 2 * (10 ** (A1000 / 20)), 0, 0, 0, 0]
            DENs = np.polymul(
                [1, 4 * np.pi * f4, (2 * np.pi * f4) ** 2],
                [1, 4 * np.pi * f1, (2 * np.pi * f1) ** 2],
            )
            DENs = np.polymul(
                np.polymul(DENs, [1, 2 * np.pi * f3]), [1, 2 * np.pi * f2]
            )

            # convert analog filter to digital filter
            b, a = scipy.signal.bilinear(NUMs, DENs, fs=fs)

            # compute the digital filter frequency response
            w_iir, h_iir = scipy.signal.freqz(b, a, worN=512, fs=fs)

            # then we fit to 101 tap FIR filter with least squares
            taps = scipy.signal.firls(ntaps, w_iir, abs(h_iir), fs=fs)

            # now implement this digital FIR filter as a Conv1d layer
            self.fir = torch.nn.Conv1d(
                1, 1, kernel_size=ntaps, bias=False, padding=ntaps // 2
            )
            self.fir.weight.requires_grad = False
            self.fir.weight.data = torch.tensor(taps.astype("float32")).view(1, 1, -1)

        self.fir.weight.data = self.fir.weight.data.to(device)

    def forward(self, input):
        """Calculate forward propagation.
        Args:
            input (Tensor): Predicted signal (B, #channels, #samples).
            target (Tensor): Groundtruth signal (B, #channels, #samples).
        Returns:
            Tensor: Filtered signal.
        """
        input = torch.nn.functional.conv1d(
            input.unsqueeze(0).to(torch.float), self.fir.weight.data, padding=self.ntaps // 2
        )
        return input.squeeze(0).to(torch.float)

### Create Parameters Dataset

In [4]:
# Boundaries

bounds = [['omega', 'tau', 'p', 'd', 'alpha'],
 [(2.400247964468862, 3.798136579655672),
  (0.0700188044714488, 0.7999966616122908),
  (-4.999978530884291, -0.6989804486272966),
  (-4.99983759075039, -0.5229983775344527),
  (1.2362882382361523e-05, 0.9999649724709304)]]

# Create DataFrame and write it to a CSV file for later use

def create_DF(bounds=bounds, k=5, path='parameters.csv'):
    
    #Linspace of every parameters of size k
    Dbase = np.zeros((k,5))
    for i in range(5):
        Dbase[:,i] = np.linspace(bounds[1][i][0],bounds[1][i][1],k)
    baseDF = pd.DataFrame(data=Dbase,columns=bounds[0])

    #Product of the linspaces to get all the possible combinations (size k**5, will take time)
    D = list(product(baseDF['omega'],baseDF['tau'],baseDF['p'],baseDF['d'],baseDF['alpha']))
    DF = pd.DataFrame(data=D,columns=bounds[0])

    DF.to_csv(path)
    
    return DF


In [5]:
# Only run this to recreate the parameters CSV, this can take a long time to finish depending on the subdivision k

#create_DF(bounds=bounds, k=5, path='default_parameters.csv')


### Naive k neighbours search

### Approximated k neighbours search

In [None]:
# First we need to create the M matrix

# M(theta0) = grad(Phi o g)(theta0).T * grad(Phi o g)(theta0)
# This return M = f(theta0)

def M_from_G(G):
    return torch.matmul(torch.transpose(G,0,1),G)

def M_from_theta(theta, G):
    return M_from_G(G(inputs=theta))

def M_factory(logscale,Phi):
    S_from_theta = pknn_forward_factory(logscale,Phi)
    #G = torch.func.jacfwd(S_from_theta)
    G = functools.partial(torch.autograd.functional.jacobian, func=S_from_theta, create_graph=False,strategy="forward-mode",vectorize=True)
    M = functools.partial(M_from_theta,G=G)
    return M

def dist_from_M_and_theta0(t_candidat, t_ref, M):
    return np.matmul(np.matmul(np.transpose(t_ref-t_candidat),M.cpu().detach().numpy()),t_ref-t_candidat)

def dist_approximated_factory(M, t_ref):
    return functools.partial(dist_from_M_and_theta0, M=M, t_ref=t_ref)

### Main

In [None]:
DatasetPath = "default_parameters.csv"
parameters_name = ["omega","tau","p","d","alpha"]
logscale = True
Phi = FIRFilter()

#Get the reference Theta

theta_ref_index = 0
DS = pd.read_csv(DatasetPath, index_col=0)
theta_ref_line = DS.iloc[[theta_ref_index]]
theta_ref = [ theta_ref_line[parameters_name[k]].iloc[0] for k in range(len(parameters_name)) ]

#Find the knn (Naive and Approx)

k = 20



[np.float64(2.400247964468862),
 np.float64(0.0700188044714488),
 np.float64(-4.999978530884291),
 np.float64(-4.99983759075039),
 np.float64(1.2362882382361523e-05)]

### Tests

In [8]:
theta2 = np.array([2.4498039582702016,0.6461548540239277,-1.7064752202175573,-1.0410779935838828,0.0562217157838597])

theta1 = np.array([2.490387548888748,0.5584495826467211,-4.862062785726994,-2.9690459969445326,0.965225942518418])
theta1bis = theta1 + np.array([0.0,0.0,0.0,0.0,0.001])

logscale = True


#############################
M = M_factory(logscale,Phi)
t = torch.tensor(theta1, requires_grad=True).to(device)
M = M(t)
dTheta = (theta1-theta1bis)
distance_opti = np.matmul(np.matmul(np.transpose(dTheta),M.cpu().detach().numpy()),dTheta)

print(distance_opti)

#############################
S_from_theta = pknn_forward_factory(logscale,Phi)
distance_naive = np.linalg.norm(S_from_theta(theta1).cpu().detach().numpy() - S_from_theta(theta1bis).cpu().detach().numpy())**2

print(distance_naive)

0.10057369531250018
0.095978074
