In [1]:
import numpy as np
from matplotlib import pyplot as plt

#import renamed libraries from the numerical toolbox
from local_signal import *
from general import *

import scipy.io.wavfile as wav
from IPython.display import Audio

Problem:

$\hat{a} = argmin_a ||a||_0 $, s.t.

$||y^r - M^r \phi a||_2^2 \leq \epsilon $

$a \in C^N$ : Synthesis coefficients

$\phi in C^{TxN}$ synthesis operator, Gabor Frames (STFT) is well suited for audio signals, therefore $\phi$ represents the matrix associated with the Gabor dictionary

$y^r = M^r y \in R^M$ the reliable samples

$y \in R^T $ the observed signal

$M^r \in R^{MxT} $ matrix comprised of those rows in the identity matrix that chooses the entries of reliable samples

$M^c \in R^{(T-M)xT} $ the matrix choosing the clipped samples,

$\theta^{clip} \in R^{(T-M)} $ the vector  of clipped samples which can only have values $\pm \theta^{clip}$

Problem becomes:

$ \hat{a} = argmin_a ||a||_0 $, s.t.

$ ||y^r - M^r \phi a||^2_2 \leq \epsilon $ and $ |M^c \phi a| \geq |\theta^{clip}|$

With the squared hinge function:

$ h^2 : R \to R_+, z \to    h^2(z) = z^2 , :if z \lt 0, else:  0$

if $ z = x - \theta^{clip} for \theta^{clip} \gt 0$,

squared hinge sets x "free" if $|x| \geq \theta^{clip}$, so we can use the notation:

$[\theta^{clip} - x]^2_+ = \sum_{k:\theta^{clip}_k\gt 0} h^2(x_k - \theta^{clip}_k) + \sum_{k:\theta^{clip}_k\lt 0} h^2(\theta^{clip}_k - x_k) $

unconstrained convex optimization problem:

$ argmin_a \frac{1}{2} ||y^r - M^r \phi a||^2_2 + \frac{1}{2} [\theta^{clip} - M^c \phi a]^2_+ + \lambda||a||_1$

Additionally:

$\phi$: tight Gabor frame as time-frequency dictionary

$\phi*$: The corresponding analysis operator aka Sliding Window or STFT;

$\gamma$: 0.9



In [2]:
def squared_hinge_clipped(x, clipping_value):
    """ 
    Computes the squared hinge loss with clipping.
    """
    mask_below = x < -clipping_value
    sum_below = np.sum((clipping_value - x)[mask_below]**2)
    mask_above = x > clipping_value
    sum_above = np.sum((x - clipping_value)[mask_above]**2)
    return sum_below + sum_above

In [44]:
music_rate, music_signal = wav.read('music.wav')
# convert music_signal to float and normalize to [-1, 1]
music_signal = music_signal.astype(np.float32) / np.max(np.abs(music_signal))
# display music object
Audio(music_signal, rate=music_rate)

In [47]:
# clip signal to values between -20000, 20000
music_signal_clipped = np.clip(music_signal, -0.3, 0.3)
Audio(music_signal_clipped, rate=music_rate)


In [48]:
Jmin = 2

In [49]:
#import istft, stft
from scipy.signal import istft, stft
Psi = lambda a: istft(a, fs=music_rate, nperseg=1024, noverlap=512)[1]
PsiS = lambda a: stft(a, fs=music_rate, nperseg=1024, noverlap=512)[2]

In [111]:
def lasso_shrinkage(x, lambda_):
    """ 
    Computes the soft-thresholding operator.
        x: input signal in the time-frequency domain
        lambda_: threshold
    """ 
    result = 1 - lambda_ / np.abs(x)
    result[result < 0] = 0
    return result * x

# def persistent_empirical_weiner()


$a^{(0)} \in C^N, z^{(0)} = a^{(0)}, k = 1, \delta = ||\phi \phi^*|| $

repeat:

$ g1 = -\phi^*M^{r^T}(y^r - M^r \phi z^{(k-1)})$

$ g2 = -\phi^* M^{c^T} [\theta^{clip} - M^c \phi z^{(k-1)}]_+ $

$ a^{(k)} = S_{\lambda / \delta} ( z^{(k-1)} - \frac{1}{\delta}(g1+g2))$

$ z^{(k)} = a^{(k)} + \gamma (a^{(k)} - a^{(k-1)})$

$ k = k + 1 $

until convergence

In [112]:
def snr(signal, computed_signal, mask=None):
    """ 
    Computes the signal to noise ratio. If a mask is provided, the SNR is computed only on the masked part of the signal.
    """
    if mask is None:
        mask = np.ones_like(signal, dtype=np.bool)
    return 20 * np.log10(np.sum(signal[mask]) / np.sum((signal[mask] - computed_signal[mask])))

In [113]:
def declipping_ista(clipped_audio, clip_threshold, gamma, thresholding_function, lambdas_, n_iter=1000, i_between_lambdas=500):
    """ 
    Performs declipping using relaxed ISTA.
    assumption: delta = 1
    """
    lambda_i = 0
    lambda_ = lambdas_[lambda_i]

    audio_length = len(clipped_audio)
    #Mr: mask of unclipped values
    #Mc: mask of clipped values
    values_below = clipped_audio < clip_threshold
    values_above = clipped_audio > -clip_threshold
    Mr = values_below * values_above
    Mc = ~Mr

    theta_clip = clipped_audio[Mc]
    #yr: reliable samples
    yr = clipped_audio[Mr]

    # initialize alpha, z
    frequency_domain = PsiS(clipped_audio)
    alpha = np.zeros_like(frequency_domain)
    z = alpha.copy()
    z_prev = z.copy()+1
    
    i_between_lambdas_start = 0
    # ISTA loop
    for i in range(n_iter):
        if i % 100 == 0:
            #this section is using signal that the algorithm should not have access to; it's just for monitoring progress
            computed_snr = snr(music_signal, Psi(z)[:audio_length], mask=Mc)
            print('Iteration: ', i, ' SNR: ', computed_snr)
        g1 = Psi(z)[:audio_length]
        g1 = g1[Mr]
        g1 = yr - g1
        temp_g1 = np.zeros_like(clipped_audio)
        temp_g1[Mr] = g1
        g1 = -PsiS(temp_g1)

        g2 = Psi(z)[:audio_length]
        g2 = g2[Mc]
        g2 = theta_clip - g2
        g2 = np.maximum(g2, 0)
        temp_g2 = np.zeros_like(clipped_audio)
        temp_g2[Mc] = g2
        g2 = -PsiS(temp_g2)

        alpha = thresholding_function(z - (1/1) * (g1 + g2), lambda_)
        z = alpha + gamma*(alpha - z)
        #check if we have converged
        if np.allclose(z, z_prev) or i - i_between_lambdas_start == i_between_lambdas:
            if i - i_between_lambdas_start == i_between_lambdas:
                print("Changing current lambda because of too many iterations")
            print('Reached iteration ', i, ' with lambda = ', lambda_, ' and SNR = ', snr(music_signal, Psi(z)[:audio_length], mask=Mc))
            if lambda_i == len(lambdas_) - 1:
                break
            lambda_i += 1
            lambda_ = lambdas_[lambda_i]
            i_between_lambdas_start = i

        z_prev = z.copy()
            
    unclipped_audio = Psi(z)[:audio_length]
    # replace reliable samples with original values
    unclipped_audio[Mr] = clipped_audio[Mr]
    return unclipped_audio

In [115]:
clip_threshold = 0.5
clipped_audio = np.clip(music_signal, -clip_threshold, clip_threshold)
gamma = 0.9
thresholding_function = lasso_shrinkage
lambdas_ = np.logspace(-1, -1.1, 2)
lambdas_ = [0.1, 0.08, 0.05]

n_iter = 10000
i_between_lambdas = 200

unclipped_audio = declipping_ista(clipped_audio, clip_threshold, gamma, thresholding_function, lambdas_, n_iter, i_between_lambdas)


Iteration:  0  SNR:  0.0
Iteration:  100  SNR:  0.9485289454460144
Reached iteration  136  with lambda =  0.1  and SNR =  0.948529839515686
Iteration:  200  SNR:  3.0606183409690857
Reached iteration  276  with lambda =  0.08  and SNR =  3.0605849623680115


  return 20 * np.log10(np.sum(signal[mask]) / np.sum((signal[mask] - computed_signal[mask])))


Iteration:  300  SNR:  nan
Iteration:  400  SNR:  nan
Reached iteration  423  with lambda =  0.05  and SNR =  nan


In [106]:
# display unclipped audio
Audio(unclipped_audio, rate=music_rate)

In [107]:
unclipped_audio

array([ 0.00564193, -0.00463892, -0.01253761, ..., -0.04701605,
        0.00608074,  0.04403837], dtype=float32)

In [108]:
clipped_audio

array([ 0.00564193, -0.00463892, -0.01253761, ..., -0.04701605,
        0.00608074,  0.04403837], dtype=float32)

In [109]:
Audio(clipped_audio, rate=music_rate)