# Frequency Doubling with Adaptive Filters

Ian Malone

This problem can be called frequency doubling, because we want to learn the mapping that doubles the frequency of a single sine wave with an adaptive filter. Take the input at 1KHz sampled at 10KHz, and the desired response at 2KHz, and generate 2 seconds of data. 


Part 1) 
Use a linear FIR filter of size 10 trained with Wiener solution and LMS. Verify if the Wiener filter is able to solve this problem. Explain what happens in your own words. 


Part 2) 
Apply the KLMS to this problem (also using a vector of 10 input samples) and show that the solution is quite good. Experiment with different kernel sizes and also stepsizes to find the best performance. With the same parameters (i.e. no adaptation), change the input frequency between 500 Hz to 2KHz and show experimentally the generalization of the trained model. 


Part 3) 
Now repeat 2 with noise. Create a r.v. u obtained from a Gaussian mixture 𝑝(𝑢) = 0.9𝐺(0,0.1) + 0.1𝐺(4,0.1) and add the noise to the desired response. Quantify the effect of the noise in the model trained with MSE. Now implement the correntropy criterion (MCC) and compare the performance with the MSE. You have to properly determine the kernel size in MCC for optimal results. So
show the effect of the kernel size in performance as a function the kernel size in the MCC cost. You can also experiment to test generalization as in 2. 

## Implementation

#### Import libraries

In [49]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import signal
from scipy import linalg
from statsmodels.tsa.stattools import acf,ccf
%load_ext autotime

The autotime extension is already loaded. To reload it, use:
  %reload_ext autotime
time: 0 ns (started: 2021-03-30 11:26:14 -04:00)


#### Make filter classes

In [48]:
class Wiener():
    def __init__(self, order, start, length):
        self.order = order
        self.start = start
        self.length = length
        
    def learn(self, input_signal, desired_signal):
        '''Find optimal filter weights using Wiener filter given and input and output signal'''        
        desired_signal = np.matrix(desired_signal).T
        input_signal = np.matrix(input_signal).T
        
        A = np.matrix(np.zeros((self.length,self.order)))
        
        for i in range(self.length):
            A[i,:] = input_signal[i+np.arange(self.order)].T # self.order or self.order-1 ???

        R = A.T*A
        P = A.T*desired_signal[self.start:self.length+self.start]
        w_opt = np.linalg.inv(R)*P
        
        return w_opt
        

    
class NLMS():
    
    def __init__(self, step_size, order):
        self.step_size = step_size
        self.order = order
        
    def learn(self, u, d):

        # initialize
        f = np.zeros(len(d))
        e = np.zeros(len(d))
        w = np.random.rand(self.order)
        w_trk = np.zeros((len(d)-self.order, self.order))

        # compute
        for i in range(1, len(d)-self.order):
            y = np.dot(w,u[i:i+self.order])
            e[i] = d[i+self.order] - y
            norm = ((np.linalg.norm(u[i:i+self.order])) ** 2)+0.1
            w = w + (self.step_size * e[i] * u[i])/norm   #normalize the step by the power of the input
            f[i] = y
            w_trk[i] = w 
        return f, w_trk, e

    
    
class GaussianKernel:
    def kernel(self, a, b):
        numer = (np.linalg.norm(a - b)) ** 2
        denom = (2 * self.sigma ** 2)
        return np.exp(-1 * (numer / denom))
    
    
    
class KLMS(GaussianKernel):
    
    def __init__(self, step_size, sigma, order): 
        self.step_size = step_size
        self.sigma = sigma
        self.order = order
    
    def learn(self, input_signal, desired_signal):
        estimates = np.zeros(len(input_signal))
        coefficients = np.zeros(len(input_signal))
        errors = np.zeros(len(input_signal))
        coefficients[0] = self.step_size * desired_signal[0]

        for i in range(1, len(input_signal)-self.order):
            for j in range(i-1):
                
                partial_sum = coefficients[j] * self.kernel(input_signal[i:i+(self.order)], input_signal[j:j+(self.order)])
                estimates[i] += partial_sum                 

            errors[i] = desired_signal[i+self.order] - estimates[i]
            coefficients[i] = self.step_size * errors[i]
        
        return estimates, coefficients, errors
    
    
    
class QKLMS(GaussianKernel):
    
    def __init__(self, step_size, sigma, order, epsilon):
        self.step_size = step_size
        self.sigma = sigma
        self.order = order
        self.epsilon = epsilon
        
    def learn(self, input_signal, desired_signal):
        estimates = np.zeros(len(input_signal))
        errors = np.zeros(len(input_signal))
        coefficients = np.zeros(len(input_signal))
        coefficients[0] = self.step_size * desired_signal[0]
            
        center_vectors = np.empty([0, self.order]) ## HIGHTLIGHT
        center_vectors = np.vstack((center_vectors, input_signal[:self.order]))
        
        distance = []
        size = np.asarray(np.zeros(len(input_signal))) # these keeps track of the network size over iterations
        size[0]=1
        

        for i in range(len(input_signal)-self.order):
            for j in range(center_vectors.shape[0]):  ### HIGHTLIGHT AND LINE BELOW
                partial_sum = coefficients[j] * self.kernel(input_signal[i:i+(self.order)], center_vectors[j])
                estimates[i] += partial_sum                 
                distance.append(np.linalg.norm(np.asarray(input_signal[i:i+(self.order)]) - np.asarray(center_vectors[j])))
            
            errors[i] = desired_signal[i] - estimates[i]  #should it really be desired_signal[i+self.order]
            
            if distance != []: # this probably shouldnt even be necesssary
                dist = np.min(distance)
                loc = np.argmin(distance)

                if dist <= self.epsilon:
                    coefficients[i] = coefficients[loc] + self.step_size * errors[i]
                    size = np.append(size, size[-1])
                    #size.append[size[-1]]
                else:
                    center_vectors = np.vstack((center_vectors, input_signal[i:i+(self.order)]))  ## HIGHTLIGHT np.vstack
                    size = np.append(size, size[-1]+1)
                    coefficients[i] = coefficients[i-1] + self.step_size * errors[i]
                    #size.append[size[-1]+1]
                
            distance = []

        return estimates, coefficients, errors, size

time: 0 ns (started: 2021-03-30 11:26:08 -04:00)


#### Define functions you may need

In [51]:
def ensemble_learning_curve(errors_matrix):
    # ensemble of adaptive filters with same configuration settings
    # get MSE vector for each filter
    # average MSE across filters
    errors = errors_matrix.mean(0)
    curve = []
    for i in range(len(errors)):
        curve.append(np.mean(errors[:i]**2))
    return curve

def freq_response(e, d, fs):
    return rfftfreq(len(e), 1/fs), abs(scipy.fft.rfft(e)/scipy.fft.rfft(d))

def gaussian_kernel(a, b, sigma):
    numer = (np.linalg.norm(a - b)) ** 2
    denom = (2 * sigma ** 2)
    return np.exp(-1 * (numer / denom))

def ERLE(d, e):
    return float(10*np.log10(sum(d**2)/sum(e**2)))

def iqr(x):
    q75, q25 = np.percentile(x, [75 ,25])
    return q75 - q25

def learning_curve(errors):
    curve = []
    for i in range(len(errors)):
        curve.append(np.mean(errors[:i]**2))
    return curve

def klms_misadjustment(u, step_size, sigma):
    # klms: p38 text..  
    # data independent with shift-invariant kernels
    # with Gaussian kernel, this evaluates to 0.5*step_size
    kern = []
    for i in u:
        kern.append(gaussian_kernel(i, i, sigma))
    return step_size*0.5*(1/(len(u)))*np.sum(kern)

def klms_step(u, sigma):
    # p38 text
    kern = []
    for i in u:
        kern.append(gaussian_kernel(i, i, sigma))
    return len(u)/np.sum(kern)

def nlms_misadjustment(u, step_size):
    # nlms: p30 text.. 
    # 0.5*step_size*tr[R] = 0.5*step_size*mean*square
    return 0.5*step_size*np.mean(u**2)

def nlms_step(u):
    # p30 textbook
    return 1/(np.mean(u**2))

def silverman_bandwidth(x):
    return 1.06*np.min([np.std(x),iqr(x)/1.34])*(len(x)**(-1/5))

time: 0 ns (started: 2021-03-30 11:26:45 -04:00)


#### Create signals

In [46]:
fs = 10000
time = np.arange(0, 2, 1/fs)

# generate input signal (2 second sine wave, 1 kHz frequency, 10 kHz sampling)
f_in = 1000
w_in = 2. * np.pi * f_in
sine_1k = np.sin(w_in * time)

# generate desired signal (2 second sine wave, 2 kHz frequency, 10 kHz sampling)
f_des = 2000
w_des = 2. * np.pi * f_des
sine_2k = np.sin(w_des * time)

time: 31 ms (started: 2021-03-30 11:23:35 -04:00)


Part 1) 
Use a linear FIR filter of size 10 trained with Wiener solution and LMS. Verify if the Wiener filter is able to solve this problem. Explain what happens in your own words. 

Part 2) 
Apply the KLMS to this problem (also using a vector of 10 input samples) and show that the solution is quite good. Experiment with different kernel sizes and also stepsizes to find the best performance. With the same parameters (i.e. no adaptation), change the input frequency between 500 Hz to 2KHz and show experimentally the generalization of the trained model. 

Part 3) 
Now repeat 2 with noise. Create a r.v. u obtained from a Gaussian mixture 𝑝(𝑢) = 0.9𝐺(0,0.1) + 0.1𝐺(4,0.1) and add the noise to the desired response. Quantify the effect of the noise in the model trained with MSE. Now implement the correntropy criterion (MCC) and compare the performance with the MSE. You have to properly determine the kernel size in MCC for optimal results. So
show the effect of the kernel size in performance as a function the kernel size in the MCC cost. You can also experiment to test generalization as in 2. 