Anne Katrine Falk 23APR2021

In [None]:
# make cell width follow width of notebook
from IPython.core.display import display, HTML
display(HTML("<style>.container {width:95% !important;}</style>"))

# De-noising a series
Compare Total Variation, Low-pass filter and Savitzky-Golay filter applied to a 1D signal which is mostly smooth, but has a number of sharp steps.

**Total variation** is an optimisation approach, the **low-pass filter** is a 1.order exponential smoothing and the **Savitzky-Golay** filter is an approximation with a low-degree polynomium over a rolling window.

### General dependencies
This section import the dependencies that are used allover. The dependencies for each de-noising method are imported in the respective sections

In [None]:
import numpy as np
from matplotlib import pyplot as plt
import matplotlib
%matplotlib notebook
matplotlib.rcParams['figure.figsize'] = (9, 4)
import pandas as pd

### Methods for creating noisy data with sharp steps

In [None]:
def create_noisy_step(n_sample, around_value):
    return (np.random.random(n_sample)-0.5 + around_value)

In [None]:
def create_noisy_step_input(n_step, samples_in_step):
    step_height = np.random.random(n_step)*20
    
    noisy_step_input =  np.zeros(n_step * samples_in_step)
    
    for k in range(n_step):
        for i in range(samples_in_step):
            noisy_step_input[k*samples_in_step : (k+1)*samples_in_step] = create_noisy_step(samples_in_step, step_height[k])
            
    return noisy_step_input                    

### Create noisy data with sharp steps
The array x_noisy will be used as "the noisy data" in the rest of the notebook

In [None]:
# 10 steps with 2000 samples in each - in total 20.000 samples
x_noisy = create_noisy_step_input(n_step=10, samples_in_step=2000)

In [None]:
# plot the noisy data
fig = plt.figure()
plt.title('Constructed test data: Steps with noise')
T = np.arange(0, len(x_noisy))
plt.plot(T, x_noisy, label='x_noisy')

# "Total variation" -  an optimisation model for de-noising
Total variation reconstruction (Boyd and Vandenberghe p. 314)

**Total variation** reconstruction formulates an optimisation model with a cost function, which is a sum of two terms:
- the least squares deviation from the noisy points
- a regularization term, which is the one-norm of the difference operator $D$ applied to the optimisation variable $x$

$$\| x-x_{noisy} \|_2^2 + \delta \| Dx \|_1$$

Using the 1-norm in the regularisation term is what "does the magic" in NOT easing the corners of the non-smooth steps.

In [None]:
import cvxpy as cp
print(cp.__version__)

### Define the optimisation variable
The optimisation problem will optimise a vector of same size as the noisy input.

In [None]:
x = cp.Variable((len(x_noisy),)) #vector variables are defined with shape=(size,), not shape=(size,1)

In [None]:
# Check dimensions of x and x_noisy
x.shape, x_noisy.shape

### Define the cost function using cvxpy expressions of the cvxpy variable x
cvxpy has a built-in function for the "total variation"-term (the second term), cvxpy.tv. Note that **cvxpy.tv behaves according to whether the cvx-variable x is defined as a vector or as a matrix**.

The below formulation of the cost function behaves unexpectedly, if x is defined as a matrix with one column x=cp.Variable((len(x_noisy),1), **use cp.Variable((len(x_noisy),)**

In [None]:
# First term is the 2-norm of (x-x_noisy), second term is the regularisation parameter (delta) times the total variation of x
delta = 10
cost = cp.sum(cp.square(x-x_noisy)) + delta*cp.tv(x)

### Solve the optimisation problem

In [None]:
%%timeit -n 1 -r 1
problem = cp.Problem(cp.Minimize(cost))
problem.solve() #  solve using the default open-source solver
#problem.solve(solver=cp.MOSEK) #  solve using mosek. Requires a mosek license, and is approximately 4 times faster than the default
problem.status, cost.value

In [None]:
fig = plt.figure()
plt.title('Total variation de-noising')

T = np.arange(0, len(x_noisy))
plt.plot(T, x_noisy, label='x_noisy')
plt.plot(T, x.value, label=f'reconstructed, $\delta=${delta}')
plt.legend()

### Calculate the trade-off curve between $\| x-x_{noisy} \|_2^2$ and $\| Dx \|_1$
This step solves optimisation problem for different values of delta. The goal is to explore the trade-off curve between the objectives. This is the same as the Pareto frontier.

delta is defined as a cvxpy.Parameter. This has the implication that we can loop over delta (to explore the trade-off) without (re-)building the entire optimisation problem each time. When delta is defined as a parameter, the optimisation problem is defined (and compiled) once, and only delta is replaced before each call to problem.solve().

A Parameter is the only entity, which can be changed _after_ cvxpy has compiled the problem. 

In [None]:
def calculate_tradeoff(delta_list):
            
    # lists to pick up the two competing terms in the objective value, the objective value itself and the optimal x
    sq_err = []
    total_var = []
    obj = []
    x_opt = []
    
    #  don't define x inside the loop because this is time-consuming for a large series - re-use instead
    x = cp.Variable((len(x_noisy),)) 
    delta = cp.Parameter(nonneg=True)
    
    cost = cp.sum(cp.square(x-x_noisy)) + delta*cp.tv(x)
    problem = cp.Problem(cp.Minimize(cost))

    for d in delta_list:
        print(f'Solving optimisation with delta = {d}')
        delta.value = d

        problem.solve() #  solve using the default open-source solver
        #problem.solve(solver=cp.MOSEK) #  solve using mosek. Requires a mosek license, and is approximately 4 times faster than the default
        
        # append the result to the lists
        sq_err.append(cp.sum(cp.square(x-x_noisy)).value)
        total_var.append(cp.tv(x).value)
        obj.append(cost.value)
        x_opt.append(x.value)
    return sq_err, total_var, obj, x_opt

In [None]:
# NB! This step is the time-consuming step.
delta_list = [0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.75, 1, 2, 10, 20]
sq_err, total_var, obj, x_opt =calculate_tradeoff(delta_list)

In [None]:
# collect the results in a dataframe 
df = pd.DataFrame(data={'delta': delta_list, 'sq_err': sq_err, 'total_var': total_var, 'obj': obj, 'x_opt': x_opt})
df

### Plot the trade-off curve
The trade-off curve for the bi-objective optimization problem $min (\| x-x_{noisy} \|_2^2, \| Dx \|_1)$ shows how the values of the two competing terms varies with different choices of $\delta$. It shows that the higher value of $\delta$ we choose, the larger does the (squared) deviation from the optimal value to the observations become. The hardest possible regularisation would result in $x_{opt}$ becoming a horizontal line (all $\Delta x_i$ are zero). In this case $\| x-x_{noisy} \|_2^2 = \| x_{noisy} \|_2^2$, because we can assume that $x_{opt} = 0$.

For $\delta=10 $ and $\delta = 20$ we approach this limit, where $\| Dx \|_1$ cannot be reduced much more. We want the optimisation to maintain only the sharp steps and iron out the high-frequency noise.

The trade-off curve is also known as the Pareto frontier.

In [None]:
# plot the trade-off curve
fig = plt.figure()
plt.title('trade-off between $\|\| x_{opt}-x_{noisy} \|\|_2^2$ and $\|\| Dx \|\|_1$')
plt.scatter(df['total_var'], df['sq_err'])
plt.xlabel('$\| \| Dx \| \|_1$')
plt.ylabel('$\| \| x_{opt}-x_{noisy} \| \|_2^2$')

for i in df.index:
    delta = df['delta'].iloc[i]
    fig.axes[0].annotate(text = f'{delta}', xy = (df['total_var'].iloc[i], df['sq_err'].iloc[i]))

### Total variation de-noising for different choices of $\delta$
You'll have to zoom to se the very slight difference between $\delta=10$ and $\delta=20$.

$\delta=1$ clearly leaves more of the noise.

In [None]:
fig = plt.figure()
plt.title('Total variation de-noising')

T = np.arange(0, len(x_noisy))
plt.plot(T, x_noisy, label='x_noisy')
df_rows = [7, 9, 10]

for i in df_rows:
    delta = df['delta'].iloc[i]
    plt.plot(T, df['x_opt'].iloc[i], label=f'reconstructed, $\delta=${delta}')
plt.legend()

# Lowpass filter
The low-pass filter cuts frequencies above a threshold. The lower the threshold, the more noise is removed, but at the cost of an increasing phase-lag.

In [None]:
import control
print(control.__version__)

In [None]:
def lowpass_tf(a):
    """
    Transfer function for exponential decay.
    dx/dt + a*x = u
    
    Parameters
    ----------
    a : float
    """
    # create a variable 's' to allow algebraic operations for SISO systems
    s = control.tf('s')
    # transfer function G(s)=1/(s+a) is scaled by a to get output 1 for s=0
    return a/(s+a)    

In [None]:
# print the transfer function for a selected frequency
print(lowpass_tf(a=1))

In [None]:
def plot_lowpass(x_noisy, frequency_list):
    fig = plt.figure()
    plt.title('Low-pass filter')
    
    T = np.arange(0, len(x_noisy))
    plt.step(T,x_noisy, label='x_noisy')
    for f in frequency_list:
        _, Y, _ = control.forced_response(lowpass_tf(f), T, x_noisy, X0=x_noisy[0])
        plt.step(T, Y, label=f'cut frequency = {f}')
    plt.legend() 

In [None]:
plot_lowpass(x_noisy, [1, 0.1, 0.01])

# Savitsky-Golay filter
The Savitsky-Golay filter aproximates the data by a low-order polynomium over a rolling window. This approach works well for smooth data, but for data with sudden steps it tries to make a smooth transition, which results in overshoot/undershoot around the step.

In [None]:
from scipy import signal

In [None]:
def plot_savitzky_golay(x_noisy, window_list, polyorder):
    """
    Calculate and plot the Savizky-Golay smoothing of a noisy series for a list of different windows
    
    x_noisy: np.ndarray
        vector of noisy data
    window_list: list of odd integers
        The window length must be odd as the window is centered at the point in question and should have equal extent
        on both sides.
    polyorder: int
        Order of the approximation polynomium    
    """
    fig = plt.figure()
    plt.title(f'Savitzky-Golay filter of order {polyorder}')
    T = np.arange(0, len(x_noisy))
    plt.step(T,x_noisy, label='x_noisy')
    for window_length in window_list:
        Y = signal.savgol_filter(x_noisy, window_length, polyorder=2)
        plt.step(T, Y, label=f'window length = {window_length}')
    plt.legend()

In [None]:
plot_savitzky_golay(x_noisy, window_list=[25, 51, 101], polyorder=2)

# Compare Total variation, Low-pass and Savitzky-Golay
The below plot compares the three de-noising methods. If you zoom to one of the corners of a step you will see:

The Low-pass filter has a phase lag, i.e. it takes some time for the filtered signal to "climb" a step, and likewise the decline at the end of a step is delayed
The Savitsky-Golay filter has an overshoot/undershoot around the step changes
The Total variation masters the step changes while at the same time removing the noise during the steps

In [None]:
cut_freq = 0.1 # for low-pass filter
window_length = 25 # for Savitzky-Golay filter

T = np.arange(0, len(x_noisy))
_, x_lowpass, _ = control.forced_response(lowpass_tf(cut_freq), T, x_noisy, X0=x_noisy[0])
x_savgol = signal.savgol_filter(x_noisy, window_length, polyorder=2)

fig = plt.figure()
plt.title('Low-pass and Savitzky-Golay and Total variation')

plt.step(T, x_noisy, label='x_noisy')
plt.step(T, x_lowpass, label=f'low-pass, cut frequency = {cut_freq}')
plt.step(T, x_savgol, label=f'Savitzky-Golay, window length = {window_length}')
plt.step(T, x.value, label=f'total variation, $\delta=${delta}')

plt.legend()