In [83]:
import scipy.signal as sgn
import scipy as sp
import numpy as np
import matplotlib.pyplot as plt

import warnings
warnings.filterwarnings('ignore')

# Practical work on the conversion of sampling rate and STFT
Group members: LI Xiquan, ZHU Chenhao


## I. Conversion of sampling rate


### 1. Description of the processing chain
To transform the sampling rate from $F_s = 48kHz$ to $F_s = 32kHz$, we need to perform a resampling by a factor 2/3. To preserve more information, we can first upscale the sampling rate to $F_s = 96kHz$ and then downscale it to $F_s = 32kHz$. Thus, the processing chain contains 
1. Insertion of zeros between each sample, L = 2
2. Apply a low-pass filter $H(z)$ 
$$\begin{equation*}
    H(e^{i2\pi\nu}) =
    \begin{cases}
        L, & \text{if } |\nu| < \frac16  \\
        0, & \text{if } \frac{1}{6} < |\nu| < \frac12
    \end{cases}
\end{equation*}
$$
3. Downsample the signal by a factor 3

### 2. Synthesis of impulse response h(n)

In [None]:
L = 2
M = 3
h = sgn.remez(32, [0, 1/6, 0.27, 0.5], [L, 0])
plt.stem(h)

In [85]:
w,H = sp.signal.freqz(h,1,fs=48E3)

In [None]:
plt.figure()
plt.plot(w,20*np.log10(np.abs(H)))
plt.plot([0,25000],[-50+np.log10(L),-50+np.log10(L)],'r')

### 3. Processing

In [87]:
import os,wave,struct
from IPython.display import Audio
from scipy.io.wavfile import write

In [88]:
data_path = os.getcwd()
filename = 'caravan_48khz.wav'
sound = os.path.join(data_path, filename) 

In [89]:
def load_sound(file):
    return wave.open(file, 'rb')

def plot_sound(data, times, name='default_name', save=False):
    plt.figure(figsize=(30, 4))
    plt.fill_between(times, data)
    plt.xlim(times[0], times[-1])
    plt.xlabel('time (s)')
    plt.ylabel('amplitude')
    if save:
        plt.savefig(name+'.png', dpi=100)
    plt.show()

In [None]:
wavefile = load_sound(sound)
print(wavefile.getparams())

In [91]:
num_samples = int(wavefile.getnframes())
data = wavefile.readframes(num_samples)
data = struct.unpack('{n}h'.format(n=num_samples), data)
x = np.array(data)

In [None]:
Audio(data=sound)

In [93]:
# insert zeros 
def upsample(arr, L):
    """
    Insert L zeros between each element in a NumPy array
    :param arr: Original array
    :param L: Number of zeros to insert
    :return: New array with zeros inserted
    """
    # Create a new array with length equal to the original array length plus the total number of inserted zeros
    new_length = len(arr) + (L-1) * (len(arr) - 1)
    new_arr = np.zeros(new_length)
    
    # Place the original array values in the appropriate positions in the new array
    new_arr[::L] = arr
    
    return new_arr

# Downsample
def downsample(arr, M):
    """
    Downsample a NumPy array
    :param arr: Original array
    :param M: Downsampling factor
    :return: New downsampled array
    """
    return arr[::M]

In [94]:
import time
def resample(x, L, M, h):
    t_start = time.perf_counter()
    y1 = upsample(x, L)
    
    y2 = sgn.lfilter(h, 1, y1)
    
    y3 = downsample(y2, M)
    t_end = time.perf_counter()
    # print('Time cost: %.2f s' % (t_end - t_start))
    
    return y3,t_end - t_start

y,delta_t = resample(x, L, M, h)

In [95]:
Fs = 32000
write('./Processed_audio/caravan_32k.wav', Fs, np.array(y,dtype=np.int16)) # to write a new wave file

In [None]:
# play the new wave file
Audio(data='./Processed_audio/caravan_32k.wav')

### 4. Demonstration of equivalence

![image](Figures/Demonstration.jpg)

### 5. Efficient implementation

#### Figure

![image](Figures/efficient_implementation.jpg)

#### Code

In [97]:
def polyphase_decomp(h,n):
    """
    Decompose a filter into polyphase components
    :param h: the filter to be decomposed
    :param n: the number of phases
    :return: a list of polyphase components
    """
    # Create a list to store the polyphase components
    h_list = []

    # Decompose the filter into n polyphase components
    for i in range(n):
        h_list.append(h[i::n])
    
    return h_list

def efficient_resample_23(x,h):
    np.append(x, [0])
    x_lists = []
    h_list_down = polyphase_decomp(h, 3)
    h_list_up = [polyphase_decomp(h, 2) for h in h_list_down]

    X = downsample(x, 3)
    X = sgn.lfilter(h_list_up[0][1], 1, X)
    X = upsample(X, 2)
    x_lists.append(X)

    X = downsample(x, 3)
    X = sgn.lfilter(h_list_up[0][0], 1, X)
    X = upsample(X, 2)
    X = np.pad(X, (1, 0), 'constant') 
    x_lists.append(X)

    X = downsample(x[1::], 3)
    X = sgn.lfilter(h_list_up[1][1], 1, X)
    X = upsample(X, 2)
    X = np.pad(X, (1, 0), 'constant') 
    x_lists.append(X)

    X = downsample(x[1::], 3)
    X = sgn.lfilter(h_list_up[1][0], 1, X)
    X = upsample(X, 2)
    X = np.pad(X, (2, 0), 'constant')  
    x_lists.append(X)

    X = downsample(x[2::], 3)
    X = sgn.lfilter(h_list_up[2][1], 1, X)
    X = upsample(X, 2)
    X = np.pad(X, (2, 0), 'constant')  
    x_lists.append(X)

    X = downsample(x[2::], 3)
    X = sgn.lfilter(h_list_up[2][0], 1, X)
    X = upsample(X, 2)
    X = np.pad(X, (3, 0), 'constant')  
    x_lists.append(X)

    max_len = max([len(x_list) for x_list in x_lists])
    res = np.zeros(max_len)
    for i in range(len(x_lists)):
        x_lists[i] = np.pad(x_lists[i], (0, max_len-len(x_lists[i])), 'constant')
        res = res + np.array(x_lists[i])

    return res

### 6. Comparison with the reference implementation

#### Concurrent version of efficient implementation 

In [98]:
# We transform the implementation into concurrent version in order to mesure its impact on performance.

import concurrent.futures
import time

def process_sequence(params):
    """根据参数执行信号处理"""
    x = params['x']
    downsample_factor = params['downsample_factor']
    upsample_factor = params['upsample_factor']
    pad1_before = params['pad1_before']
    pad2_before = params['pad2_before']
    pad1_after = params['pad1_after']
    pad2_after = params['pad2_after']
    h_filter = params['h_filter']
    x_slice = params.get('x_slice')  

    if x_slice is not None:  
        X = downsample(x[x_slice], downsample_factor)
    else:  
        X = downsample(x, downsample_factor)
    
    X = np.pad(X, (pad1_before, pad1_after), 'constant')
    X = sgn.lfilter(h_filter, 1, X)
    X = upsample(X, upsample_factor)
    X = np.pad(X, (pad2_before, pad2_after), 'constant')
    return X

def parallel_process(x, h):
    # Use dictionaries to pass parameters to the function
    h_list_down = polyphase_decomp(h, 3)
    h_list = [polyphase_decomp(h, 2) for h in h_list_down]
    params_list = [
        {'x': x, 'downsample_factor': 3, 'upsample_factor': 2, 'pad1_before': 0, 'pad2_before': 0,'pad1_after':0,'pad2_after':2, 'h_filter': h_list[0][1], 'x_slice': None},
        {'x': x, 'downsample_factor': 3, 'upsample_factor': 2, 'pad1_before': 0, 'pad2_before': 1,'pad1_after':0,'pad2_after':1, 'h_filter': h_list[0][0], 'x_slice': None},
        {'x': x, 'downsample_factor': 3, 'upsample_factor': 2, 'pad1_before': 0, 'pad2_before': 1,'pad1_after':0,'pad2_after':1, 'h_filter': h_list[1][1], 'x_slice': slice(1, None)},
        {'x': x, 'downsample_factor': 3, 'upsample_factor': 2, 'pad1_before': 0, 'pad2_before': 2,'pad1_after':0,'pad2_after':0, 'h_filter': h_list[1][0], 'x_slice': slice(1, None)},
        {'x': x, 'downsample_factor': 3, 'upsample_factor': 2, 'pad1_before': 0, 'pad2_before': 2,'pad1_after':0,'pad2_after':2, 'h_filter': h_list[2][1], 'x_slice': slice(2, None)},
        {'x': x, 'downsample_factor': 3, 'upsample_factor': 2, 'pad1_before': 0, 'pad2_before': 3,'pad1_after':0,'pad2_after':1, 'h_filter': h_list[2][0], 'x_slice': slice(2, None)}
    ]

    x_lists = []

    t_start = time.perf_counter()
    # Excetute the function in parallel
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = [executor.submit(process_sequence, params) for params in params_list]
        for future in concurrent.futures.as_completed(futures):
            x_lists.append(future.result())

    res = np.sum(x_lists, axis=0)

    t_end = time.perf_counter()
    return res, t_end - t_start

x2 = np.append(x, [0])
y4,delta_t = parallel_process(x, h)


#### Computation of speed up ratio

In [None]:
# Compare average time cost
res_origin = 0
N = 50
for i in range(N):
    _,delta_t = resample(x, L, M, h)
    res_origin+=delta_t

print('Average time cost of resample: %.6f s' % (res_origin/N))

res_efficient = 0
for i in range(N):
    _,delta_t = parallel_process(x, h)
    res_efficient+=delta_t

print('Average time cost of parallel_process: %.6f s' % (res_efficient/N))
print('Speedup: %.2f' % (res_origin/res_efficient))

Speed up ratio of the concurrent version of the efficient implementation is about 1.78 compared with the reference implementation.

#### Comparison of the results

In [None]:
# Result of the reference implementation
y1,_ = resample(x, L, M, h)
write('./Processed_audio/caravan_32k.wav', 32000, np.array(y1,dtype=np.int16))
Audio(data='./Processed_audio/caravan_32k.wav')


In [None]:
# Result of the efficient resampling
y2 = efficient_resample_23(x,h)
write('./Processed_audio/caravan_32k_eff.wav', 32000, np.array(y2,dtype=np.int16))
Audio(data='./Processed_audio/caravan_32k_eff.wav')


In [None]:
# Original sound
Audio(data=sound)


In [None]:
# Comparison of spectrogram of the three signals
plt.figure()
plt.magnitude_spectrum(x, Fs=48E3, scale='dB')
plt.title('Original Signal')
plt.ylim(-120, 100)
plt.xlim(0, 24E3)
plt.magnitude_spectrum(y1, Fs=32E3, scale='dB',color='r',alpha=0.5)
plt.title('Resampled Signal')
plt.ylim(-120, 100)
plt.xlim(0, 24E3)
plt.legend(['Original Signal','Resampled Signal'])

plt.figure()
plt.magnitude_spectrum(y1, Fs=32E3, scale='dB')
plt.title('Resampled Signal')
plt.ylim(-120, 100)
plt.xlim(0, 24E3)
plt.magnitude_spectrum(y2, Fs=32E3, scale='dB',color='r',alpha=0.5) 
plt.title('Efficient Resampled Signal')
plt.ylim(-120, 100)
plt.xlim(0, 24E3)
plt.legend(['Resampled Signal','Efficient Resampled Signal'])
plt.show()



## II. STFT Audio Equalization

### 2.1 STFT Analysis



#### 1. DFT of the hanning window

In [None]:
N_w = 256  # Length of the Hann window
M = 2**20  # DFT length (M >= N_w)

# Generate Hann window
n = np.arange(N_w)
hann_window = np.hanning(N_w)

# Compute DFT
W = np.fft.fft(hann_window, n=M)
frequencies = np.fft.fftfreq(M, d=1.0)

# Plot the magnitude of the DFT (in dB)
plt.plot(frequencies[:M//10], 20*np.log10(np.abs(W[:M//10])))
plt.title("DFT of the Hann Window")
plt.xlabel("Frequency (normalized)")
plt.ylabel("Magnitude (dB)")
main_lobe_end = np.where(np.abs(W[:M//2]) < 1e-3)[0][0]  

plt.axvline(frequencies[main_lobe_end], color='r', linestyle='--', label=f'Main Lobe Width ≈ {frequencies[main_lobe_end]*2:.4f}')
plt.legend()
plt.grid()

print(f"The main lobe width of the hanning window is: {4/N_w}")
plt.show()


The width of the main lobe could be obtained by $ L = \frac{4}{N_w}$. Since the fourier transform of the window could be written as: 

 $${ W(f)={\frac {1}{2}}{\frac {\operatorname {sinc} (N_wf)}{(1-N_w^{2}f^{2})}}={\frac {\sin(\pi N_wf)}{2\pi N_wf(1-N_w^{2}f^{2})}}}$$

We can deduce that the first zero was obtained when $f_0 = \frac{2}{N_w}$. So $L = 2f_0 = \frac{4}{N_w}$. 

#### 2. Low-pass convention of STFT
We have: 
$$
\begin{align*}
W_x(\lambda, b) &= \sum_{n\in \mathbb{Z}} x(n) w(n-b) e^{-2j\pi\lambda n}  \\
                &= \sum_{n\in \mathbb{Z}} x(n) e^{-2j\pi\lambda n} \cdot w(b-n)  \\
                &= (\tilde{x}_\lambda * w)(b)
\end{align*}
$$
Where $\tilde{x}_\lambda = x(n) e^{-2j\pi\lambda n}$, and $\tilde{X}_\lambda(f) = X(f + \lambda)$. So STFT, at frequency $\lambda$, could be seen as a convolution between the signal $\tilde{x}_\lambda$ with a **low-pass** filter $w$. The filter is of type 1 or 2 according to its length, since it is symmetric. 


#### 3. Band-pass convention of STFT

The designation is because the formula could be seen as a convolution between the signal and a band-pass filter: 
$$
\begin{align*}
\tilde{X}(\lambda, b) &= \sum_{n \in \mathbb{Z}} x(n + b) w(n) e^{-j2\pi \lambda n} \\
                      &\underset{m=n+b}{=} \sum_{m \in \mathbb{Z}} x(m) w(m-b) e^{-j2\pi \lambda (m-b)}  \\
                      &=\sum_{m \in \mathbb{Z}} x(m) w(b-m) e^{j2\pi \lambda (b-m)} \\
                      &=(x * \tilde{w}_\lambda) (b)
\end{align*} 
$$
With $\tilde{w}_\lambda(n) = w(n) e^{2j\pi\lambda n}$, $\tilde{W}_\lambda(f) = W(f-\lambda)$, so it is a band-pass filter. 
And we have: 

$$
\begin{align*}
\tilde{X}(\lambda, b)&= \sum_{n \in \mathbb{Z}} x(n + b) w(n) e^{-j2\pi \lambda n} \\
                     &\underset{m=n+b}{=} \sum_{m \in \mathbb{Z}} x(m) w(m-b) e^{-j2\pi \lambda (m-b)} \\
                     &=W_x(\lambda, b) \cdot e^{2j\pi \lambda b}
\end{align*}
$$
The implementation of the notebook correspond to the band-pass convention. 

#### 4. STFT based on filtering

In [64]:
N = x.shape[0] # % longueur du signal
Nw = 32  # longueur de la fenetre
w = np.hanning(Nw) # définition de la fenetre d'analyse
ws = w.copy; # définition de la fenêtre de synthèse
R = 1 # incrément sur les temps d'analyse, appelé hop size, t_a=uR
M = 32 # ordre de la tfd
L = M/2+1
affich = 1 ; # pour affichage du spectrogramme, 0 pour
             # pour faire analyse/modif/synthèse sans affichage
             # note: cf. spectrogram sous Matlab
Nt = np.rint((N - Nw) / R) # calcul du nombre de tfd à calculer
Nt = Nt.astype(int)
y = np.zeros((N,1)) # signal de synthèse

if affich:
    Xtilde = np.zeros((M,Nt),dtype=complex)

In [65]:
for u in np.arange(0,Nt).reshape(-1): # boucle sur les trames
    deb = u * R + 1 # début de trame
    fin = deb + Nw # fin de trame
    tx = np.multiply(x[np.arange(deb.astype(int),fin.astype(int))],w) # calcul de la trame 
    X = np.fft.fft(tx,M) # tfd à l'instant b
    if affich:
        Xtilde[:,u] = X  
    # opérations de transformation (sur la partie \nu > 0)
    # ....
    Y = X.copy
    # fin des opérations de transformation
    # resynthèse
    # overlap add
    # Xtilde: [F, T]

def extents(f):
  delta = f[1] - f[0]
  return [f[0] - delta/2, f[-1] + delta/2]

In [None]:
Audio(np.real(Xtilde[2, :]), rate=48000)

In [None]:
if affich:
    plt.imshow(20*np.log10(np.abs(Xtilde[np.arange(0,L,dtype=int),:])), aspect='auto',interpolation='none',
               origin='lower', extent=[0, (Nt * R + Nw) / Fs, 0, Fs/2])

$x_2(u) = \tilde{X}(2, u)$ is complexe. In terms of filtering, we have:
$$
\tilde{X}(\lambda, b) = (x * \tilde{w}_\lambda) (b)
$$
with $\tilde{w}_\lambda(n) = w(n) e^{2j\pi\lambda n}$, so for its implementation we have: 

In [68]:
Xtilde2 = []
for i in range(M): 
    wtilde = w * np.exp(2j*np.pi*np.arange(len(w)) * i/M)
    Xtilde2.append(sp.signal.lfilter(wtilde, [1], x))

Xtilde2 = np.stack(Xtilde2)

In [None]:
if affich:
    plt.imshow(20*np.log10(np.abs(Xtilde2[np.arange(0,L,dtype=int),:])), aspect='auto',interpolation='none',
               origin='lower', extent=[0, (Nt * R + Nw) / Fs, 0, Fs/2])

In [None]:
Audio(np.real(Xtilde2[2]), rate=48000)

### 2.2 Reconstruction of the signal

#### 5. Demonstration of the sufficient condition and experimental verification

We have
$$ 
\begin{align*}
y_s(u,n) &= \frac{1}{M} \sum_{k=0}^{M-1} \tilde{X}(k,u) e^{j2\pi\frac{nk}{M}} \cdot w_s(n) \\
         &= \frac{1}{M}  w_s(n) \cdot \sum_{k=0}^{M-1} \sum_{m \in \mathbb{Z}}[x(m+uR)w(m) e^{-j2\pi\frac{mk}{M}}]\cdot e^{j2\pi\frac{nk}{M}} \\
         &= \frac{1}{M}  w_s(n) \cdot \{\sum_{m \in \mathbb{Z}}x(m+uR)w(m)\sum_{k=0}^{M-1} e^{-j2\pi\frac{mk}{M}}\cdot e^{j2\pi\frac{nk}{M}}\} \\
         &= w_s(n) \sum_{m \in \mathbb{Z}}x(m+uR)w(m) \frac{1}{M}\sum_{k=0}^{M-1}e^{j2\pi\frac{(n-m)k}{M}}\\
\end{align*}
$$



We can remark that if $n-m \in M\mathbb{Z}$, $\frac{1}{M}\sum_{k=0}^{M-1}e^{j2\pi\frac{(n-m)k}{M}} = 1$ and

otherwise if $n-m\neq 0$, we have $\frac{1}{M}\sum_{k=0}^{M-1}e^{j2\pi\frac{(n-m)k}{M}} = \frac{1}{M} \frac{1-e^{j2\pi (n-m)}}{1-e^{j2\pi\frac{n-m}{M}}}$. The term $e^{j2\pi(n-m)} = 1$ as $n-m\in \mathbb{Z}$, and therefore the whole expression is equal to 0.

To sum up, we have $\frac{1}{M}\sum_{k=0}^{M-1}e^{j2\pi\frac{(n-m)k}{M}} = \mathbf{1}_{n-m\in M\mathbb{Z}}$


We then substitute this result in the expression of $y(n)$ to get
$$
\begin{align*}
y(n) &= \sum_{u \in \mathbb{Z}} y_s(u,n-uR) \\
     &= \sum_{u \in \mathbb{Z}} w_s(n-uR) \sum_{m \in \mathbb{Z}}x(m+uR)w(m) \mathbf{1}_{n-uR-m\in M\mathbb{Z}}
\end{align*} 
$$
$ \mathbf{1}_{n-uR-m\in M\mathbb{Z}} = 1 $ only when $n-uR-m = qM$ with $q\in \mathbb{Z}$, i.e. $m = n-uR-qM$. 
Whereas $m\leq M-1,n\leq M-1$ and $n\geq 0,m\geq 0$, we have $q=0$ and $m = n-uR$. We can then rewrite the expression of $y(n)$ as
$$
\begin{align*}
y(n) &= \sum_{u \in \mathbb{Z}} w_s(n-uR) x(n-uR+uR)w(n-uR) \\
     &= x(n) \sum_{u \in \mathbb{Z}} w_s(n-uR)w(n-uR)
\end{align*}
$$

If we have 
$$
\sum_{u \in \mathbb{Z}} w_s(n-uR)w(n-uR) = 1
$$ 
then we will have $y(n) = x(n)$ for all $n$. This is thus a sufficient condition for the perfect reconstruction of the signal.


In [71]:
def ola(w = None,hop = None,Nb = 10000): 
# function output = ola(w,hop,Nb)
# realise l'addition-recouvrement de la fenetre w,
# avec un décalage hop et un nombre Nb de fenetres.
# par defaut Nb = 10;
    
    w = w[:, np.newaxis]
    N = len(w)
    output = np.zeros(((Nb - 1) * hop + N,1)) # réserve l'espace memoire
    
    for k in np.arange(0,Nb).reshape(-1):
        deb = k* hop
        fin = deb + N
        output[np.arange(deb,fin)] = output[np.arange(deb,fin)] + w # OLA
    
    return output

In [None]:
plt.plot(ola(w*w, round(1/4*len(w))))

We found that the result of OLA is approximately constant, where we have achieved this condition. 

#### 6. Implementation of resynthesis and verification

In [73]:
def istft(Xtilde, w=w, hop=R):
    """
    
    """
    N = len(w)
    Nb = Xtilde.shape[1]
    x = np.zeros((Nb - 1) * hop + N)
        
    for k in np.arange(0,Nb).reshape(-1):
        deb = k* hop
        fin = deb + N
        x[np.arange(deb,fin)] = x[np.arange(deb,fin)] + np.fft.ifft(Xtilde[:, k]) * w # OLA
    
    return x

In [74]:
y = istft(Xtilde=Xtilde, w=w, hop=R)

In [None]:
Audio(y, rate=48000)

In [None]:
plot_sound(x, np.arange(len(x)))

In [None]:
plot_sound(y, np.arange(len((y))))

We found that the reconstruction is almost lossless. 

### 2.3 STFT Equalizer
#### 7. Audio equalizer realisation

In [78]:
def equalizer(wk, X): 
    """
    wk: [M, ] - weights to every channel
    X: [M, T] - STFT of the original signal
    """
    y = istft(wk[:, None]*X)
    return y

wk_1, wk_2 = np.zeros(32), np.zeros(32)
wk_1[0] = 1
wk_2[-1] = 1
y_low = equalizer(wk_1, X=Xtilde)
y_high = equalizer(wk_2, X=Xtilde)

In [None]:
Audio(y_low, rate=48000)

In [None]:
Audio(y_high, rate=48000)

We can hear that by tailoring the frequency component of the STFT, we could obtain different frequency component of the original signal.  