In [None]:
import numpy as np
import matplotlib.pyplot as plt
import IPython.display as ipd
import scipy.fft
from scipy.linalg import toeplitz, solve

from scipy import io, signal
from IPython.display import Audio

In [None]:
import os
import IPython
os.environ['NUMBA_CACHE_DIR'] = IPython.paths.get_ipython_cache_dir()
import librosa

In [None]:
def lpc_analysis(s, p=20):
    """ compute the LPC analysis using the autocorrelation method
    
    Parameters
    ----------
    x : numpy array
        windowed signal frame as a numpy 1D array.
    p : int
        model order.
        
    Returns
    -------
    ak : numpy array
         model coefficients.
    e : float
        minimum mean squared error.
    e_norm : float
             normalized minimum mean squared error.
    """
    # frame length
    N = s.shape[0]
    
    # compute autocorrelation values
    r = np.zeros((p+1, 1))
    for k in range(p+1):
        r[k] = np.dot(s[:N-k].T, s[k:])

    # solve to compute model coefficients
    ak = solve(toeplitz(r[:p]), r[1:]).squeeze()

    # compute mean squared error
    e = r[0] - np.dot(ak.T, r[1:])

    # compute normalized mean squared error
    e_norm = e / r[0]

    return ak, e, e_norm

In [None]:
def lpc_decomposition(s_win, ak, e, fs, Ndft, Nw):
    # filter obtained from the lpc analysis
    S = 1
    U = np.concatenate([[1], -ak])

    # compute gain 
    G = np.sqrt(e)

    # compute the frequency response of the digital filter
    w, H = signal.freqz(G*S, U, worN=Ndft, whole=True)
    fw = w / (2 * np.pi) * fs

    # impulse response of the LPC filter
    delta = np.zeros(Nw)
    delta[0] = 1
    h = signal.lfilter(G*S, U, delta)

    # magnitude spectrum
    magH = np.abs(H)
    ind_fmx = int(Ndft/2)

    # inverse filter
    A = S*G
    B = U

    # compute the excitation from the inverse filter
    p = signal.lfilter(B, A, s_win)

    # compute the spectrum of the excitation
    P = np.fft.fft(p, Ndft)

    return H, P

In [None]:
def analysis_STFT_LPC(x, fs, L=2048, R=256, win='hann'):
    """ compute the analysis phase of the phase vocoder, i.e. the STFT of the input audio signal
    
    Parameters
    ----------
    x : numpy array
        input audio signal (mono) as a numpy 1D array.
    L : int
        window length in samples.
    R : int
        hop size in samples.
    win : string
          window type as defined in scipy.signal.windows.    
        
    Returns
    -------
    X_stft : numpy array
             STFT of x as a numpy 2D array.
    omega_stft : numpy array
                 frequency values in radians.
    samps_stft : numpy array
                 time sample at the begining of each frame.

    """
    
    # length of the input signal
    M = x.size;      
    
    # number of points to compute the DFT (FFT)
    N = L
    
    # analysis window
    window = signal.windows.get_window(win, L)
   
    # total number of analysis frames
    num_frames = int(np.floor((M - L) / R))

    # initialize stft
    X_stft = np.zeros((N, num_frames), dtype = complex)
    X_env = np.zeros((N, num_frames), dtype = complex)
    X_exc = np.zeros((N, num_frames), dtype = complex)
    
    # process each frame
    for ind in range(num_frames):

        # initial and ending points of the frame
        n_ini = int(ind * R)
        n_end = n_ini + L

        # signal frame
        xr = window*x[n_ini:n_end]

        # save DFT of the signal frame
        X_stft[:, ind] = scipy.fft.fft(xr, N)

        # LPC
        if np.max(abs(xr))>1e-8:
            ak, e, _ = lpc_analysis(xr, p=20)
            X_env[:, ind], X_exc[:, ind] = lpc_decomposition(xr, ak, e, fs, N, N)
        
    # frequency values in radians    
    omega_stft = 2*np.pi*np.arange(N)/N

    # time sample at the center of each frame
    samps_stft = np.arange(L/2, M-L/2, R)[:-1]
 
    return X_stft, X_env, X_exc, omega_stft, samps_stft

In [None]:
def synthesis_STFT(X_stft, L=2048, R=256, win='hann'):
    """ compute the synthesis using the IFFT of each frame combined with overlap-add
    
    Parameters
    ----------
    X_stft : numpy array
             STFT of x as a numpy 2D array.
    L : int
        window length in samples.
    R : int
        hop size in samples.
    win : string
          window type as defined in scipy.signal.windows.    
        
    Returns
    -------
    x : numpy array
        output audio signal (mono) as a numpy 1D array.
        
    """
    
    # number of frequency bins
    N = X_stft.shape[0];      
 
    # analysis window
    window = signal.windows.get_window(win, L)
   
    # total number of analysis frames
    num_frames = X_stft.shape[1]

    # initialize otuput signal in the time domain
    y = np.zeros(num_frames * R + L)
    
    # process each frame
    for ind in range(num_frames):

        # reconstructed signal frame
        yr = scipy.fft.ifft(X_stft[:,ind], L).real

        # initial and ending points of the frame
        n_ini = ind*R
        n_end = ind*R + L

        # overlap-add the signal frame
        y[n_ini:n_end] += window*yr
        
    # compute the amplitude scaling factor
    C = (L/2)/R
    
    # compensate the amplitude scaling factor
    y /= C
    
    return y

In [None]:
dir_files = './data/'

X_wav, Fs = librosa.load(dir_files + 'Emily_Linge-vocals.wav')
Y_wav, Fs = librosa.load(dir_files + 'Police-vocals-guitar.wav')

In [None]:
tuning_offset_1 = librosa.estimate_tuning(y=X_wav, sr=Fs)
tuning_offset_2 = librosa.estimate_tuning(y=Y_wav, sr=Fs)
print('Estimated tuning deviation for recording 1: %f cents, for recording 2: %f cents' % (tuning_offset_1, tuning_offset_2))

In [None]:
N = 2048
H = 4096
X = librosa.feature.chroma_stft(y=X_wav, sr=Fs, norm=2, hop_length=H, n_fft=N, tuning=tuning_offset_1)
X = X / X.max()
Y = librosa.feature.chroma_stft(y=Y_wav, sr=Fs, norm=2, hop_length=H, n_fft=N, tuning=tuning_offset_2)
Y = Y / Y.max()

plt.figure(figsize=(15, 3))
plt.title('Sequence $X$')
librosa.display.specshow(X, x_axis='frames', y_axis='chroma', cmap='gray_r', hop_length=H)
plt.xlabel('Time (frames)')
plt.ylabel('Chroma')
plt.colorbar()
plt.clim([0, 1])
plt.tight_layout(); plt.show()
# ipd.display(ipd.Audio(X_wav, rate=Fs))

plt.figure(figsize=(15, 3))
plt.title('Sequence $Y$')
librosa.display.specshow(Y, x_axis='frames', y_axis='chroma', cmap='gray_r', hop_length=H)
plt.xlabel('Time (frames)')
plt.ylabel('Chroma')
plt.colorbar()
plt.clim([0, 1])
plt.tight_layout(); plt.show()
# ipd.display(ipd.Audio(Y_wav, rate=Fs))

In [None]:
from synctoolbox.dtw.utils import compute_optimal_chroma_shift, shift_chroma_vectors, make_path_strictly_monotonic

In [None]:
opt_chroma_shift = compute_optimal_chroma_shift(X, Y)
print('Pitch shift between recording 1 and recording 2, determined by DTW:', opt_chroma_shift, 'bins')

In [None]:
N = 2048
H = int(0.02*Fs)
X = librosa.feature.chroma_stft(y=X_wav, sr=Fs, norm=2, hop_length=H, n_fft=N, tuning=tuning_offset_1)
X = X / X.max()
Y = librosa.feature.chroma_stft(y=Y_wav, sr=Fs, norm=2, hop_length=H, n_fft=N, tuning=tuning_offset_2)
Y = Y / Y.max()

plt.figure(figsize=(15, 3))
plt.title('Sequence $X$')
librosa.display.specshow(X, x_axis='frames', y_axis='chroma', cmap='gray_r', hop_length=H)
plt.xlabel('Time (frames)')
plt.ylabel('Chroma')
plt.colorbar()
plt.clim([0, 1])
plt.tight_layout(); plt.show()
# ipd.display(ipd.Audio(X_wav, rate=Fs))

plt.figure(figsize=(15, 3))
plt.title('Sequence $Y$')
librosa.display.specshow(Y, x_axis='frames', y_axis='chroma', cmap='gray_r', hop_length=H)
plt.xlabel('Time (frames)')
plt.ylabel('Chroma')
plt.colorbar()
plt.clim([0, 1])
plt.tight_layout(); plt.show()
# ipd.display(ipd.Audio(Y_wav, rate=Fs))

In [None]:
Y = shift_chroma_vectors(Y, opt_chroma_shift)

In [None]:
plt.figure(figsize=(15, 3))
plt.title('Sequence $X$')
librosa.display.specshow(X, x_axis='frames', y_axis='chroma', cmap='gray_r', hop_length=H)
plt.xlabel('Time (frames)')
plt.ylabel('Chroma')
plt.colorbar()
plt.clim([0, 1])
plt.tight_layout(); plt.show()
# ipd.display(ipd.Audio(X_wav, rate=Fs))

plt.figure(figsize=(15, 3))
plt.title('Sequence $Y$ (Shifted)')
librosa.display.specshow(Y, x_axis='frames', y_axis='chroma', cmap='gray_r', hop_length=H)
plt.xlabel('Time (frames)')
plt.ylabel('Chroma')
plt.colorbar()
plt.clim([0, 1])
plt.tight_layout(); plt.show()
# ipd.display(ipd.Audio(Y_wav, rate=Fs))

In [None]:
import libfmp.c3

In [None]:
C = libfmp.c3.compute_cost_matrix(X, Y)
D = libfmp.c3.compute_accumulated_cost_matrix(C)
P = libfmp.c3.compute_optimal_warping_path(D)

plt.close('all')

plt.figure(figsize=(15, 5))
ax = plt.subplot(1, 2, 1)
libfmp.c3.plot_matrix_with_points(C, P, linestyle='-',  marker='', 
    ax=[ax], aspect='equal', clim=[0, np.max(C)], 
    title='$C$ with optimal warping path', xlabel='Sequence Y', ylabel='Sequence X');

ax = plt.subplot(1, 2, 2)
libfmp.c3.plot_matrix_with_points(D, P, linestyle='-', marker='', 
    ax=[ax], aspect='equal', clim=[0, np.max(D)], 
    title='$D$ with optimal warping path', xlabel='Sequence Y', ylabel='Sequence X');

plt.tight_layout()

In [None]:
N = X.shape[1]
M = Y.shape[1]

plt.figure(figsize=(15, 5))
ax_X = plt.axes([0, 0.60, 1, 0.40])
librosa.display.specshow(X, ax=ax_X, x_axis='frames', y_axis='chroma', cmap='gray_r', hop_length=H)
ax_X.set_ylabel('Cromagrama de Emily')
ax_X.set_xlabel('Tiempo (frames)')
ax_X.xaxis.tick_top()
ax_X.xaxis.set_label_position('top') 
# ax_X.set_title('Emily')

ax_Y = plt.axes([0, 0, 1, 0.40])
librosa.display.specshow(Y, ax=ax_Y, x_axis='frames', y_axis='chroma', cmap='gray_r', hop_length=H)
ax_Y.set_ylabel('Cromagrama de The Police')
ax_Y.set_xlabel('Tiempo (frames)')
# ax_Y.set_title('The Police')

step = 100
y_min_X, y_max_X = ax_X.get_ylim()
y_min_Y, y_max_Y = ax_Y.get_ylim()
for t in P[0:-1:step, :]: 
    ax_X.vlines(t[0], y_min_X, y_max_X, color='r')
    ax_Y.vlines(t[1], y_min_Y, y_max_Y, color='r')

ax = plt.axes([0, 0.40, 1, 0.20])
for p in P[0:-1:step, :]: 
    ax.plot((p[0]/N, p[1]/M), (1, -1), color='r')
    ax.set_xlim(0, 1)
    ax.set_ylim(-1, 1)
ax.set_xticks([])
ax.set_yticks([]);

In [None]:
print('Length of warping path obtained from MrMsDTW:', P.T.shape[1])
wp = make_path_strictly_monotonic(P.T)
print('Length of warping path made strictly monotonic:', wp.shape[1])

In [None]:
import libtsm

pitch_shift_for_audio_1 = -opt_chroma_shift % 12
if pitch_shift_for_audio_1 > 6:
    pitch_shift_for_audio_1 -= 12
audio_1_shifted = libtsm.pitch_shift(X_wav, pitch_shift_for_audio_1 * 100, order="tsm-res")

L = 2048
R = 256

X_stft, X_env, X_exc, _, _ = analysis_STFT_LPC(X_wav, Fs, L, R)
Xs_stft, Xs_env, Xs_exc, _, _ = analysis_STFT_LPC(audio_1_shifted[:,0], Fs, L, R)

Xm_stft = Xs_exc*X_env

xm = synthesis_STFT(Xm_stft, L, R)

# The TSM functionality of the libtsm library expects the warping path to be given in audio samples.
# Here, we do the conversion and additionally clip values that are too large.
time_map = wp.T * H
time_map = np.concatenate((time_map, np.array([[len(X_wav)-1,len(Y_wav)-1]])))

time_map = libtsm.ensure_validity(time_map)

y_hpstsm = libtsm.hps_tsm(X_wav, time_map)
stereo_sonification = np.hstack((Y_wav.reshape(-1, 1), y_hpstsm))

# print('Original signal 1', flush=True)
# ipd.display(ipd.Audio(X_wav, rate=Fs, normalize=True))

# print('Original signal 2', flush=True)
# ipd.display(ipd.Audio(Y_wav, rate=Fs, normalize=True))

print('Synchronized versions', flush=True)
ipd.display(ipd.Audio(stereo_sonification.T, rate=Fs, normalize=True))

In [None]:
scipy.io.wavfile.write("results/Emily_Police_synchronized.wav", Fs, stereo_sonification)

In [None]:
y_hpstsm = libtsm.hps_tsm(audio_1_shifted, time_map)
stereo_sonification = np.hstack((Y_wav.reshape(-1, 1), y_hpstsm))

# print('Original signal 1', flush=True)
# ipd.display(ipd.Audio(X_wav, rate=Fs, normalize=True))

# print('Original signal 2', flush=True)
# ipd.display(ipd.Audio(Y_wav, rate=Fs, normalize=True))

print('Synchronized versions', flush=True)
ipd.display(ipd.Audio(stereo_sonification.T, rate=Fs, normalize=True))

In [None]:
y_hpstsm = libtsm.hps_tsm(xm, time_map)
stereo_sonification = np.hstack((Y_wav.reshape(-1, 1), y_hpstsm))

# print('Original signal 1', flush=True)
# ipd.display(ipd.Audio(X_wav, rate=Fs, normalize=True))

# print('Original signal 2', flush=True)
# ipd.display(ipd.Audio(Y_wav, rate=Fs, normalize=True))

print('Synchronized versions', flush=True)
ipd.display(ipd.Audio(stereo_sonification.T, rate=Fs, normalize=True))