## Imports

In [4]:
import numpy as np 
import matplotlib.pyplot as plt
import sys, os
sources_path = './../Sources/'
if sources_path not in sys.path:
    sys.path.append(sources_path)
# from adaptive_filtering.lms import LMS

### Definitions

In [68]:
j = complex(0,1)
n_ensembles = 100   # number of realizations within the ensemble
K = 500             # number of iterations (signal length)
H = np.array([0.32+0.21*j,-0.3+0.7*j,0.5-0.8*j,0.2+0.5*j])
w_o = H            # Unknown system
sigma_n2 = .04     # noise power
N = 4              # Number of coefficients of the adaptive filter
mu = .1            # Convergence factor (step) (0 < mu < 1)
gamma = 1e-12      # small positive constant to avoid singularity

### Computing 

In [52]:
def rolling_window(a, window):
    shape = a.shape[:-1] + (a.shape[-1] - window + 1, window)
    strides = a.strides + (a.strides[-1],)
    return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)

In [69]:
x = (np.sign(np.random.randn(K)) + j*np.sign(np.random.randn(K)))/np.sqrt(2) 
x.shape

(500,)

In [70]:
x_extended = np.append(np.zeros([4-1]), x)
x_extended

array([ 0.        +0.j        ,  0.        +0.j        ,
        0.        +0.j        ,  0.70710678+0.70710678j,
       -0.70710678-0.70710678j, -0.70710678+0.70710678j,
        0.70710678-0.70710678j, -0.70710678-0.70710678j,
       -0.70710678+0.70710678j,  0.70710678-0.70710678j,
        0.70710678+0.70710678j,  0.70710678+0.70710678j,
       -0.70710678+0.70710678j, -0.70710678+0.70710678j,
        0.70710678-0.70710678j, -0.70710678+0.70710678j,
        0.70710678-0.70710678j, -0.70710678-0.70710678j,
        0.70710678+0.70710678j, -0.70710678-0.70710678j,
       -0.70710678-0.70710678j, -0.70710678+0.70710678j,
       -0.70710678-0.70710678j,  0.70710678-0.70710678j,
        0.70710678-0.70710678j, -0.70710678+0.70710678j,
        0.70710678-0.70710678j, -0.70710678-0.70710678j,
        0.70710678+0.70710678j, -0.70710678-0.70710678j,
       -0.70710678-0.70710678j, -0.70710678-0.70710678j,
       -0.70710678-0.70710678j,  0.70710678-0.70710678j,
       -0.70710678-0.70710678j,

In [75]:
arr = np.array([1+0j, 1-1j, 0.5+1j])
k = (1+1j)
print (arr)
print (k*arr)

[1. +0.j 1. -1.j 0.5+1.j]
[ 1. +1.j   2. +0.j  -0.5+1.5j]


In [71]:
rolling_window(x_extended, 4).shape

(500, 4)

In [65]:
for arr in rolling_window(x_extended, 4):
    print (arr.shape, arr, '\n')

(4,) [ 0.        +0.j          0.        +0.j          0.        +0.j
 -0.70710678-0.70710678j] 

(4,) [ 0.        +0.j          0.        +0.j         -0.70710678-0.70710678j
 -0.70710678+0.70710678j] 

(4,) [ 0.        +0.j         -0.70710678-0.70710678j -0.70710678+0.70710678j
 -0.70710678-0.70710678j] 

(4,) [-0.70710678-0.70710678j -0.70710678+0.70710678j -0.70710678-0.70710678j
 -0.70710678-0.70710678j] 

(4,) [-0.70710678+0.70710678j -0.70710678-0.70710678j -0.70710678-0.70710678j
  0.70710678-0.70710678j] 

(4,) [-0.70710678-0.70710678j -0.70710678-0.70710678j  0.70710678-0.70710678j
  0.70710678+0.70710678j] 

(4,) [-0.70710678-0.70710678j  0.70710678-0.70710678j  0.70710678+0.70710678j
 -0.70710678-0.70710678j] 

(4,) [ 0.70710678-0.70710678j  0.70710678+0.70710678j -0.70710678-0.70710678j
 -0.70710678+0.70710678j] 

(4,) [ 0.70710678+0.70710678j -0.70710678-0.70710678j -0.70710678+0.70710678j
 -0.70710678-0.70710678j] 

(4,) [-0.70710678-0.70710678j -0.70710678+0.70710678j 

In [38]:
np.array([0,1,2,3,4,5,6,7,8,9])[1+4-1:1-1:-1]

array([4, 3, 2, 1])

In [52]:
n = np.sqrt(sigma_n2/2)*(np.random.normal(size=K)+j*np.random.normal(size=K)) 
np.absolute(n)**2

array([3.41783981e-03, 2.17367582e-02, 2.23603776e-02, 2.47023157e-02,
       4.98619218e-02, 2.46372470e-02, 1.25894053e-01, 9.32829672e-03,
       2.36470856e-02, 2.17277918e-02, 2.28925683e-01, 1.10478064e-02,
       3.11666478e-02, 1.78727923e-02, 6.54702903e-02, 4.51137647e-03,
       9.17427326e-02, 4.68979274e-03, 2.53344149e-02, 1.03951816e-02,
       2.42115446e-02, 1.00087080e-02, 2.00205649e-02, 6.66704900e-03,
       6.07312190e-03, 9.08294640e-02, 1.12285789e-02, 5.64219547e-02,
       1.93048076e-02, 2.47472581e-02, 5.46907254e-02, 2.10279501e-03,
       5.58629840e-02, 2.22002535e-01, 3.04256081e-02, 1.12076665e-02,
       1.95654377e-02, 8.03370595e-02, 2.94132321e-02, 5.02549319e-02,
       3.01635940e-02, 3.65070578e-02, 1.81503147e-01, 1.18769273e-01,
       7.38986248e-02, 3.98347886e-02, 2.69484022e-02, 2.49085369e-02,
       3.27249606e-02, 7.25599569e-04, 3.83721471e-02, 7.00849509e-03,
       1.45135067e-02, 8.69666718e-03, 6.61524474e-03, 1.26059765e-02,
      

In [26]:
X = np.zeros([N]) # input at a certain iteration (tapped delay line)
d = np.array([]) # Desired signal

for k in np.arange(8):
    print (X)
    X = np.append(x[k], X[0:N-1])
    d = np.append(d, np.dot(w_o, X)+n[k])

[0. 0. 0. 0.]
[0.70710678-0.70710678j 0.        +0.j         0.        +0.j
 0.        +0.j        ]
[-0.70710678-0.70710678j  0.70710678-0.70710678j  0.        +0.j
  0.        +0.j        ]
[ 0.70710678+0.70710678j -0.70710678-0.70710678j  0.70710678-0.70710678j
  0.        +0.j        ]
[ 0.70710678+0.70710678j  0.70710678+0.70710678j -0.70710678-0.70710678j
  0.70710678-0.70710678j]
[-0.70710678-0.70710678j  0.70710678+0.70710678j  0.70710678+0.70710678j
 -0.70710678-0.70710678j]
[ 0.70710678-0.70710678j -0.70710678-0.70710678j  0.70710678+0.70710678j
  0.70710678+0.70710678j]
[-0.70710678-0.70710678j  0.70710678-0.70710678j -0.70710678-0.70710678j
  0.70710678+0.70710678j]


In [18]:
%%time
W = np.ones([n_ensembles, K+1, N], dtype=complex) # coefficient vector for each iteration and realization, w[0] = [1, 1, ..., 1]
MSE = np.zeros([n_ensembles, K]) # MSE vector for each realization
MSE_min = MSE # Minimum MSE for each realization 

for ensemble in np.arange(n_ensembles):
    X = np.zeros([N]) # input at a certain iteration (tapped delay line)
    d = np.array([]) # Desired signal
    
    # Creating the input signal (normalized)
    x = (np.sign(np.random.normal(size=K)) + j*np.sign(np.random.normal(size=K)))/np.sqrt(2) 
    sigma_x2 = np.var(x) # signal power = 1
    n = np.sqrt(sigma_n2/2)*(np.random.normal(size=K)+j*np.random.normal(size=K)) # complex noise
    
    for k in np.arange(K):
        X = np.append(x[k], X[1:N])
        d = np.append(d, np.dot(w_o, X)+n[k])
    
    init_coef = W[ensemble][1]
    filter_order = N-1    
    
    nlms = NLMS(step=mu, filter_order=filter_order, gamma=gamma, init_coef=init_coef)
    nlms.fit(d, x)

#     W[ensemble] = nlms.coef_vector
#     MSE[ensemble] = MSE[ensemble] + abs(nlms.error_vector)**2
#     MSE_min[ensemble] = MSE_min[ensemble] + abs(n)**2

# W_av = np.mean(W, axis=0)
# MSE_av = np.mean(MSE, axis=0)
# MSEmin_av = np.mean(MSE_min, axis=0)
    
print (nlms)



NLMS(step=0.1, gamma=1e-12, filter_order=3)
Wall time: 1.35 s


### Plotting

In [None]:
fig, ax = plt.subplots(ncols=1, nrows=2, figsize=(16,8), sharex=True)
ax[0].plot(np.arange(K), 10*np.log10(MSE_av))
ax[0].set_title('Learning Curve for MSE')
ax[0].set_ylabel('MSE [dB]')
ax[0].grid(True)

ax[1].plot(np.arange(K), 10*np.log10(MSEmin_av))
ax[1].set_title('Learning Curve for MSEmin')
ax[1].set_ylabel('MSEmin [dB]')
ax[1].set_xlabel('Number of iterations, k') 
ax[1].grid(True)

In [None]:
fig, ax = plt.subplots(ncols=1, nrows=2, figsize=(16,8), sharex=True)
ax[0].plot(np.arange(K+1), np.real(W_av[:,0]))
ax[0].set_title('Evolution of the 1st coefficient (real part)')
ax[0].set_ylabel('Coefficient')
ax[0].grid(True)

ax[1].plot(np.arange(K+1), np.imag(W_av[:,0]))
ax[1].set_title('Evolution of the 1st coefficient (imaginary part)')
ax[1].set_ylabel('Coefficient')
ax[1].set_xlabel('Number of iterations, k') 
ax[1].grid(True)

In [9]:
class NLMS(LMS):
    def __init__(self, step, filter_order, gamma, init_coef = None):                
        LMS.__init__(self, step=step, filter_order=filter_order, init_coef=init_coef)        
        self.gamma = gamma
        
    def __str__(self):
        return "NLMS(step={}, gamma={}, filter_order={})".format(self.step, self.gamma, self.filter_order)
        
    def fit(self, d, x):
        # Pre allocations
        self.d = np.array(d)        
        self.x = np.array(x)        
        self.n_iterations = len(self.d)
        
        # Initial State Weight Vector if passed as argument
        self.coef_vector = np.array([self.init_coef]) if self.init_coef is not None else np.array([np.zeros([self.n_coef])])

        # Improve source code regularity
        prefixed_input = np.append(np.zeros([self.n_coef]), self.x)
        
        for i in np.arange(self.n_iterations):        
            regressor = prefixed_input[i+self.n_coef:i:-1]            
            y = np.dot(self.coef_vector[i], regressor)            
            if i == 0:
                self.output_vector = np.array([y])
                self.error_vector = np.array([self.d[i]-self.output_vector[i]])
            else:
                self.output_vector = np.append(self.output_vector, y)
                self.error_vector = np.append(self.error_vector, self.d[i]-self.output_vector[i])
            
            self.error_vector[i] = self.d[i]-self.output_vector[i]            
            self.coef_vector = np.append(self.coef_vector, [self.step/(self.gamma+np.dot(regressor.T, regressor))*np.conj(self.error_vector[i])*regressor], axis=0)
                        
        return self.output_vector, self.error_vector, self.coef_vector
    
print (NLMS(1,2,3))

NLMS(step=1, gamma=3, filter_order=2)


In [8]:
class LMS:
    def __init__(self, step, filter_order, init_coef = None):        
        self.step = step
        self.filter_order = filter_order
        self.init_coef = np.array(init_coef)
    
        # Initialization Procedure
        self.n_coef = self.filter_order + 1
        self.d = None
        self.x = None
        self.n_iterations = None
        self.error_vector = None
        self.output_vector = None
        self.coef_vector = None
        
    def __str__(self):
        return "LMS(step={}, filter_order={})".format(self.step, self.filter_order)
        
    def fit(self, d, x):
        # Pre allocations
        self.d = np.array(d)        
        self.x = np.array(x)        
        self.n_iterations = len(self.d)
        
        # Initial State Weight Vector if passed as argument
        self.coef_vector = np.array([self.init_coef]) if self.init_coef is not None else np.array([np.zeros([self.n_coef])])

        # Improve source code regularity
        prefixed_input = np.append(np.zeros([self.n_coef]), self.x)
        
        for i in np.arange(self.n_iterations):        
            regressor = prefixed_input[i+self.n_coef:i:-1]            
            y = np.dot(self.coef_vector[i], regressor)            
            if i == 0:
                self.output_vector = np.array([y])
                self.error_vector = np.array([self.d[i]-self.output_vector[i]])
            else:
                self.output_vector = np.append(self.output_vector, y)
                self.error_vector = np.append(self.error_vector, self.d[i]-self.output_vector[i])
            
            self.error_vector[i] = self.d[i]-self.output_vector[i]            
            self.coef_vector = np.append(self.coef_vector, [self.step*np.conj(self.error_vector[i])*regressor], axis=0)
                        
        return self.output_vector, self.error_vector, self.coef_vector
    
    def predict(self, x):
        # Makes predictions for a new signal after weights are fit
        return np.dot(self.coef_vector[-1], x)