# Real Time Adaptive Filtering

In [1]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import scipy.signal as sp
import IPython
from scipy.io import wavfile
import librosa  
from sklearn.metrics import mean_squared_error
import pyaudio
import wave
import sys
import soundfile
import math

# AP

Here you can try out the Affine Projection Algorithm in real time using different sets of parameters. You can either use your own files or the ones provided. You can create simulated data by setting simulated to True.

In [2]:
# True if you want to create simulated data, i.e. providing clear signal Y and noise X.
# The noisy signal will be created by adding X to Y
simulated = True

# Choose any wave file as Signal Y (Noisy if simulated=False or Clear if simulated=True)
sY, FsY = librosa.load('SignalX.wav', sr=8000) # Downsample 44.1kHz to 8kHz
print('sampling rate:', FsY, 'Hz, data length:', len(sY), 'samples')

# Choose a wave file as Noise X
sX, FsX = librosa.load('talkingNoise.wav', sr=8000) # Downsample 44.1kHz to 8kHz
print('sampling rate:', FsX, 'Hz, data length:', len(sX), 'samples')

sY = sY[:min(len(sY),len(sX))]

# Creating simulated noise
if simulated:
    sX = sX[:len(sY)]*.5
    sY_clear=sY

    sY = sY+sX

sampling rate: 8000 Hz, data length: 256000 samples
sampling rate: 8000 Hz, data length: 358609 samples


In [3]:
IPython.display.Audio(sY, rate=FsX)

In [4]:
IPython.display.Audio(sX, rate=FsX)

In [5]:
def ap_live(x, d, N, order, a, eps, Nit, w, x_mem, d_mem, ide_eps, ide):
    # Run the AP adaptation and returns filtered data with the updated parameters to be reused for real time filtering
    
    # number of iterations
    L = min(len(x), len(d))
    
    e = np.zeros(L)
    y = np.zeros(L)
    
    # run the adaptation
    for n in range(N-1, L):
        
        X = x[n-N+1:n+1]
        
        # create input matrix and target vector
        x_mem[:,1:] = x_mem[:,:-1]
        x_mem[:,0] = X
        d_mem[1:] = d_mem[:-1]
        d_mem[0] = d[n]
        
        # estimate output and error
        y_mem = np.dot(x_mem.T, w)
        e_mem = d_mem - y_mem
        y[n] = y_mem[-1]
        e[n] = e_mem[0]

        for j in range(Nit):
            dw_part1 = np.dot(x_mem.T, x_mem) + ide_eps
            dw_part2 = np.linalg.solve(dw_part1, ide)
            dw = np.dot(x_mem, np.dot(dw_part2, e_mem))
            w += a * dw    
            
    return y, e[N:], w, x_mem, d_mem, ide_eps, ide

Choose the parameters here and run this code to listen to the filtered signal

In [6]:
"""Real Time AP filtering"""

CHUNK = 2560           # Size of received block from audio stream
taps = 200             # Filter size
DTYPE = np.int16
step_size = .03        # Mu
eps = .05              # Learning rate
order = 10            # Projection order
Nit = 1                # Number of iterations
    

# Converts audio to 16bits wave file
soundfile.write('Sound.wav', sY, FsY, subtype='PCM_16')
soundfile.write('Noise.wav', sX, FsX, subtype='PCM_16')
wf = wave.open('Sound.wav', 'rb')
wf_noise = wave.open('Noise.wav','rb')

p = pyaudio.PyAudio()

stream = p.open(format=p.get_format_from_width(wf.getsampwidth()),
                channels=wf.getnchannels(),
                rate=wf.getframerate(),
                output=True)

stream_noise = p.open(format=p.get_format_from_width(wf_noise.getsampwidth()),
                channels=wf_noise.getnchannels(),
                rate=wf_noise.getframerate(),
                output=True)

data = wf.readframes(CHUNK)
noise = wf_noise.readframes(CHUNK)
prev_data = []
prev_noise = []

# initial guess for the filter
w = np.zeros(taps)

x_mem = np.zeros((taps,order))
d_mem = np.zeros(order)
ide_eps = eps * np.identity(order)
ide = np.identity(order)

for i in range(math.ceil(len(sY)/CHUNK)-1):
    noise = np.frombuffer(noise, dtype=DTYPE)
    data = np.frombuffer(data, dtype=DTYPE)
    data = np.concatenate([prev_data, data])
    noise = np.concatenate([prev_noise, noise])
    data_filtered, w, x_mem, d_mem, ide_eps, ide = ap_live(noise, data, taps, order, step_size, eps, Nit, w, x_mem, d_mem, ide_eps, ide)[1:]
    data_filtered = np.array(data_filtered, dtype=DTYPE)
    stream.write(data_filtered, len(data_filtered))
    prev_data = data[-taps:]
    prev_noise = noise[-taps:]
    data = wf.readframes(CHUNK)
    noise = wf_noise.readframes(CHUNK)


stream.stop_stream()
stream.close()

p.terminate()

# NLMS

Here you can try out the Normalized Least Mean Squares Algorithm in real time using different sets of parameters. You can either use your own files or the ones provided. You can create simulated data by setting simulated to True.

In [7]:
def nlms_live(x, d, N, a, eps, Nit, w):
    # Run the NLMS adaptation and returns filtered data with the updated parameters to be reused for real time filtering
    
    # number of iterations
    L = min(len(x), len(d))
    
    e = np.zeros(L)
    
    # run the adaptation
    for n in range(N-1, L):
        
        X = x[n-N+1:n+1]
        muNorm = a/(eps+np.linalg.norm(X)**2)
        
        for j in range(Nit):
            w = w + muNorm * X * (d[n] - np.dot(X.T, w))  
            
        # estimate output and error
        e[n] = d[n] - np.dot(w.transpose(), X) 
        
    return e[N:], w

Choose the parameters here and run this code to listen to the filtered signal

In [8]:
"""Real Time NLMS filtering"""

CHUNK = 2560           # Size of received block from audio stream
taps = 200             # Filter size
DTYPE = np.int16
step_size = .03        # Mu
eps = .05              # Learning Factor epsilon
Nit = 5                # Number of iterations
    

# Converts audio to 16bits wave file
soundfile.write('Sound.wav', sY, FsY, subtype='PCM_16')
soundfile.write('Noise.wav', sX, FsX, subtype='PCM_16')
wf = wave.open('Sound.wav', 'rb')
wf_noise = wave.open('Noise.wav','rb')

p = pyaudio.PyAudio()

stream = p.open(format=p.get_format_from_width(wf.getsampwidth()),
                channels=wf.getnchannels(),
                rate=wf.getframerate(),
                output=True)

stream_noise = p.open(format=p.get_format_from_width(wf_noise.getsampwidth()),
                channels=wf_noise.getnchannels(),
                rate=wf_noise.getframerate(),
                output=True)

data = wf.readframes(CHUNK)
noise = wf_noise.readframes(CHUNK)
prev_data = []
prev_noise = []

# initial guess for the filter
w = np.zeros(taps)

for i in range(math.ceil(len(sY)/CHUNK)):
    noise = np.frombuffer(noise, dtype=DTYPE)
    data = np.frombuffer(data, dtype=DTYPE)
    data = np.concatenate([prev_data, data])
    noise = np.concatenate([prev_noise, noise])
    data_filtered, w = nlms_live(noise, data, taps, step_size, eps, Nit, w)
    data_filtered = np.array(data_filtered, dtype=DTYPE)
    stream.write(data_filtered, len(data_filtered))
    prev_data = data[-taps:]
    prev_noise = noise[-taps:]
    data = wf.readframes(CHUNK)
    noise = wf_noise.readframes(CHUNK)


stream.stop_stream()
stream.close()

p.terminate()

# RLS

Here you can try out the Recursive Least Squares Algorithm in real time using different sets of parameters. You can either use your own files or the ones provided. You can create simulated data by setting simulated to True.

In [9]:
def rls_live(x, d, N, a, Nit, w, R):
    # Run the RLS adaptation and returns filtered data with the updated parameters to be reused for real time filtering
    
    # number of iterations
    L = min(len(x), len(d))
    
    e = np.zeros(L)
    
    # run the adaptation
    for n in range(N-1, L):
        
        X = x[n-N+1:n+1]
        
        for j in range(Nit):
            R1 = np.dot(np.dot(np.dot(R,X),X.T),R)
            R2 = a + np.dot(np.dot(X,R),X.T)
            R = 1/a * (R - R1/R2)
            w = w + np.dot(R, X.T) * (d[n] - np.dot(X.T, w))
            
        # estimate output and error
        e[n] = d[n] - np.dot(w.transpose(), X) 
        
    return e[N:], w, R

Choose the parameters here and run this code to listen to the filtered signal

In [10]:
"""Real Time RLS filtering"""

CHUNK = 1560           # Size of received block from audio stream
taps = 100             # Filter size
DTYPE = np.int16
step_size = .9        # Mu
eps = .05              # Learning Factor epsilon
Nit = 1                # Number of iterations
    

# Converts audio to 16bits wave file
soundfile.write('Sound.wav', sY, FsY, subtype='PCM_16')
soundfile.write('Noise.wav', sX, FsX, subtype='PCM_16')
wf = wave.open('Sound.wav', 'rb')
wf_noise = wave.open('Noise.wav','rb')

p = pyaudio.PyAudio()

stream = p.open(format=p.get_format_from_width(wf.getsampwidth()),
                channels=wf.getnchannels(),
                rate=wf.getframerate(),
                output=True)

stream_noise = p.open(format=p.get_format_from_width(wf_noise.getsampwidth()),
                channels=wf_noise.getnchannels(),
                rate=wf_noise.getframerate(),
                output=True)

data = wf.readframes(CHUNK)
noise = wf_noise.readframes(CHUNK)
prev_data = []
prev_noise = []

# initial guess for the filter
w = np.zeros(taps)
R = 1/eps*np.identity(taps)

for i in range(math.ceil(len(sY)/CHUNK)):
    noise = np.frombuffer(noise, dtype=DTYPE)
    data = np.frombuffer(data, dtype=DTYPE)
    data = np.concatenate([prev_data, data])
    noise = np.concatenate([prev_noise, noise])
    data_filtered, w, R = rls_live(noise, data, taps, step_size, Nit, w, R)
    data_filtered = np.array(data_filtered, dtype=DTYPE)
    stream.write(data_filtered, len(data_filtered))
    prev_data = data[-taps:]
    prev_noise = noise[-taps:]
    data = wf.readframes(CHUNK)
    noise = wf_noise.readframes(CHUNK)


stream.stop_stream()
stream.close()

p.terminate()