# Structure Representation Computation from Audio Input
### Comparing choice of spectral representation for the construction of the repetition similarity graph

## > Library importing

In [None]:
#Computation
import numpy as np
import scipy
from scipy.interpolate import interp2d
import matplotlib.pyplot as plt
import cv2

#Data Processing
import sklearn.cluster

#Audio
import librosa
from librosa import display

#System
import glob
import os
import sys

#Pickling
import dill

## > Serialization

In [None]:
#dill.dump_session('../../dills/fixyou_spectral.db')

In [None]:
#dill.load_session('../../dills/fixyou_spectral.db')

## > Loading audio

In [None]:
#Choose directory containing audiofiles
directory = '../../my_covers/Chopin_Mazurka'

#Read all paths in specified directory
all_filepaths = []
all_names = []
all_live = []
all_covers= []
for root, dirs, files in os.walk(directory):
        for name in files:
            if (('.wav' in name) or ('.aif' in name) or ('.mp3' in name)):
                all_names.append(name)
                filepath = os.path.join(root, name)
                all_filepaths.append(filepath)
                if ('Live' in name):
                    all_live.append(filepath)
                elif ('Covers' in name):
                    all_covers.append(filepath)
    

#Dictionary containing all batches of matrices as described by pipeline documentation in a linearized, sequential format
X = {}

#Load all audiofiles and store in array
all_audio = []
file_no = len(all_filepaths)
for f in range(file_no):
    y, sr = librosa.load(all_filepaths[f], sr=22050, mono=True)
    all_audio.append((y, sr))
    sys.stdout.write("\rLoaded %i/%s pieces." % ((f+1), str(file_no)))
    sys.stdout.flush()
    
X["audio"] = all_audio

## > Self Similarity for Repetitions

### >> Single-Feature Self Similarity Matrix

In [None]:
#Pipeline of primary features to use to compute self similarity
#{stft, log_power_CQT, perceptually_weighted_CQT, mel_spectrogram}

all_stft = []
all_logCQT = []
all_chroma = []

for f in range(file_no):
    
    #STFT
    stft = librosa.stft(y=X["audio"][f][0])
    stft_db = librosa.amplitude_to_db(stft)
    all_stft.append(stft_db)

    #Log-power Constant-Q Transform
    bins_per_oct = 12*3
    n_oct = 7
    CQT = librosa.cqt(y=X["audio"][f][0], sr=X["audio"][f][1], bins_per_octave=bins_per_oct, n_bins=n_oct*bins_per_oct)
    all_logCQT.append(librosa.amplitude_to_db(CQT))

    #Chroma
    chromagram = librosa.feature.chroma_stft(y=X["audio"][f][0], sr=X["audio"][f][1])
    all_chroma.append(chromagram)

    sys.stdout.write("\rComputed spectral representations for %i/%s pieces." % ((f+1), str(file_no)))
    sys.stdout.flush()
    
spectral_rep = {"stft":all_stft, "logCQT":all_logCQT, "chroma":all_chroma}
X["spectral_rep"]=spectral_rep

In [None]:
#Plotting
plt.set_cmap('magma')
fig, ax = plt.subplots(nrows=file_no, ncols=3, figsize=(20,20))
for i in range(file_no):
    librosa.display.specshow(X["spectral_rep"]["stft"][i], ax=ax[i,0], y_axis='hz')
    ax[i, 0].set(title=all_names[i] + ' - STFT')
    librosa.display.specshow(X["spectral_rep"]["logCQT"][i], ax=ax[i,1], y_axis='cqt_hz', bins_per_octave=bins_per_oct)
    ax[i, 1].set(title=all_names[i] + ' - logCQT')
    librosa.display.specshow(X["spectral_rep"]["chroma"][i], ax=ax[i,2], y_axis='cqt_hz', bins_per_octave=bins_per_oct)
    ax[i, 2].set(title=all_names[i] + ' - chroma')

### >> Beat Synchronization

In [None]:
X["reduced_spectral_rep"] = {"stft_beat":[], "logCQT_beat":[], "chroma_beat":[]}

for f in range(file_no):

    #Beat-synchronization
    tempo, beats = librosa.beat.beat_track(y=X["audio"][f][0], sr=X["audio"][f][1], trim=False)
    for rep in ["stft", "logCQT", "chroma"]:
        X["reduced_spectral_rep"][rep+"_beat"].append(librosa.util.sync(X["spectral_rep"][rep][f], beats, aggregate=np.median))
    sys.stdout.write("\rComputed for %i/%s pieces." % ((f+1), str(file_no)))
    sys.stdout.flush()

### >> Short-term History Embedding of spectral representation

In [None]:
steps = 4
X["stacked_spectral_rep"] = {"stacked_stft_beat":[], "stacked_logCQT_beat":[], "stacked_chroma_beat":[]}
for f in range(file_no):
    for rep in ["stft_beat", "logCQT_beat", "chroma_beat"]:
        X["stacked_spectral_rep"]["stacked_"+rep].append(librosa.feature.stack_memory(X["reduced_spectral_rep"][rep][f], steps))
    
    sys.stdout.write("\rComputed for %i/%s pieces." % ((f+1), str(file_no)))
    sys.stdout.flush()

### >> Weighted Recurrence Matrix

In [None]:
knn_no = 3
X["ssm"] = {"ssm_s_stft_beat":[], "ssm_s_logCQT_beat":[], "ssm_s_chroma_beat":[]}
for f in range(file_no):
    for rep in ["stft_beat", "logCQT_beat", "chroma_beat"]: 
        X["ssm"]["ssm_s_"+rep].append(librosa.segment.recurrence_matrix(X["stacked_spectral_rep"]["stacked_"+rep][f], width=knn_no, mode='affinity', sym=True))
    sys.stdout.write("\rComputed for %i/%s pieces." % ((f+1), str(file_no)))
    sys.stdout.flush()

### >> Timelag filter & Path enhancement

In [None]:
X["f_ssm"] = {"f_ssm_s_stft_beat":[], "f_ssm_s_logCQT_beat":[], "f_ssm_s_chroma_beat":[]}
df = librosa.segment.timelag_filter(scipy.ndimage.median_filter)

for f in range(file_no):
    for rep in ["stft_beat", "logCQT_beat", "chroma_beat"]: 
        X["f_ssm"]["f_ssm_s_"+rep].append(librosa.segment.path_enhance(df(X["ssm"]["ssm_s_"+rep][f], size=(1, 7)), 15))
    sys.stdout.write("\rComputed for %i/%s pieces." % ((f+1), str(file_no)))
    sys.stdout.flush()

## > Self Similarity for local connections

In [None]:
X["mfcc"]=[]
for f in range(file_no):
    X["mfcc"].append(librosa.feature.mfcc(y=X["audio"][f][0], sr=X["audio"][f][1]))

    sys.stdout.write("\rComputed %i/%s MFCCs." % ((f+1), str(file_no)))
    sys.stdout.flush()

### >> Dimensionality Reduction

In [None]:
X["reduced_mfcc"]={"mfcc_beat":[]}

for f in range(file_no):
    #Beat-synchronization
    tempo, beats = librosa.beat.beat_track(y=X["audio"][f][0], sr=X["audio"][f][1], trim=False)
    X["reduced_mfcc"]["mfcc_beat"].append(librosa.util.sync(X["mfcc"][f], beats))

    sys.stdout.write("\rDownlsampled %i/%s MFCCs." % ((f+1), str(file_no)))
    sys.stdout.flush()

### >> Similarity Sequence Matrix (Gaussian Kernel)

In [None]:
X["S_loc"]={"S_loc_mfcc_beat":[]}

for f in range(file_no):
    path_distance = np.sum(np.diff(X["reduced_mfcc"]["mfcc_beat"][f], axis=1)**2, axis=0)
    sigma = np.median(path_distance)
    path_sim = np.exp(-path_distance / sigma)
    X["S_loc"]["S_loc_mfcc_beat"].append(np.diag(path_sim, k=1) + np.diag(path_sim, k=-1))

    sys.stdout.write("\rComputed for %i/%s pieces." % ((f+1), str(file_no)))
    sys.stdout.flush()

## > Balanced Combination

In [None]:
X["A"]={"stft":[], "logCQT":[], "chroma":[]}
for rep in ["stft", "logCQT", "chroma"]:
    for f in range(file_no):
        S_loc = X["S_loc"]["S_loc_mfcc_beat"][f]
        S_rep = X["f_ssm"]["f_ssm_s_"+rep+"_beat"][f]
        deg_loc = np.sum(S_loc, axis=1)          
        deg_rep = np.sum(S_rep, axis=1)
        mu = deg_loc.dot(deg_loc + deg_rep) / np.sum((deg_loc + deg_rep)**2)
        A = mu * S_rep + (1 - mu) * S_loc
        X["A"][rep].append(A)
        sys.stdout.write("\rComputed for %i/%s pieces." % ((f+1), str(file_no)))
        sys.stdout.flush()

### >> Downsampling

In [None]:
X["A_d"]={"stft":[], "logCQT":[], "chroma":[]}
for f in range(file_no):
    for rep in ["stft", "logCQT", "chroma"]:
        X["A_d"][rep].append(cv2.resize(X["A"][rep][f], (128,128)))
    sys.stdout.write("\rComputed for %i/%s pieces." % ((f+1), str(file_no)))
    sys.stdout.flush()

In [None]:
X["Lsym"] = {"stft":[], "logCQT":[], "chroma":[]}
X["Lsym_d"] = {"stft":[], "logCQT":[], "chroma":[]}
for rep in ["stft", "logCQT", "chroma"]:
    for f in range(file_no):
        X["Lsym"][rep].append(scipy.sparse.csgraph.laplacian(X["A"][rep][f], normed=True))
        X["Lsym_d"][rep].append(scipy.sparse.csgraph.laplacian(X["A_d"][rep][f], normed=True))
        
        sys.stdout.write("\rComputed for %i/%s pieces." % ((f+1), str(file_no)))
        sys.stdout.flush()

## >> Eigendecomposition

In [None]:
X["D"] = {"stft":[], "logCQT":[], "chroma":[]}
X["D_d"] = {"stft":[], "logCQT":[], "chroma":[]}

kmin = 2
kmax = 10

for f in range(file_no):
    for rep in ["stft", "logCQT", "chroma"]:
        #eigendecomposition
        evals, evecs = scipy.linalg.eigh(X["Lsym"][rep][f])
        #eigenvector filtering
        evecs = scipy.ndimage.median_filter(evecs, size=(9, 1))
        #normalization
        Cnorm = np.cumsum(evecs**2, axis=1)**0.5
        #create set of approximations
        dist_set = []
        for k in range(kmin, kmax):
            Xs = evecs[:, :k] / Cnorm[:, k-1:k]
            #distance vector to matrix
            distance = scipy.spatial.distance.squareform(scipy.spatial.distance.pdist(Xs, metric='euclidean'))
            dist_set.append(distance)
        X["D"][rep].append(dist_set)

        #eigendecomposition
        evals, evecs = scipy.linalg.eigh(X["Lsym_d"][rep][f])
        #eigenvector filtering
        evecs = scipy.ndimage.median_filter(evecs, size=(9, 1))
        #normalization
        Cnorm = np.cumsum(evecs**2, axis=1)**0.5
        #create set of approximations
        dist_set = []
        for k in range(kmin, kmax):
            Xs = evecs[:, :k] / Cnorm[:, k-1:k]
            #distance vector to matrix
            distance = scipy.spatial.distance.squareform(scipy.spatial.distance.pdist(Xs, metric='euclidean'))
            dist_set.append(distance)
        X["D_d"][rep].append(dist_set)

    sys.stdout.write("\rComputed for %i/%s pieces." % ((f+1), str(file_no)))
    sys.stdout.flush()

## >> Plotting eigenvector distances

### >>> STFT

In [None]:
plt.rcParams['axes.titlepad'] = 15
plt.set_cmap('magma')
fig, ax = plt.subplots(nrows=kmax-kmin, ncols=len(all_names), figsize=(int(3.5*len(all_names)),int(3.7*(kmax-kmin))))
for i in range(file_no):
    for k in range(kmax-kmin):
        ax[k, i].matshow(X["D"]["stft"][i][k])
        ax[k, i].set(title=(all_names[i])[:-3] + ' ' + str(kmin+k))

### >>> logCQT

In [None]:
plt.rcParams['axes.titlepad'] = 15
plt.set_cmap('magma')
fig, ax = plt.subplots(nrows=kmax-kmin, ncols=len(all_names), figsize=(int(3.5*len(all_names)),int(3.7*(kmax-kmin))))
for i in range(file_no):
    for k in range(kmax-kmin):
        ax[k, i].matshow(X["D"]["logCQT"][i][k])
        ax[k, i].set(title=(all_names[i])[:-3] + ' ' + str(kmin+k))

### >>> Chroma

In [None]:
plt.rcParams['axes.titlepad'] = 15
plt.set_cmap('magma')
fig, ax = plt.subplots(nrows=kmax-kmin, ncols=len(all_names), figsize=(int(3.5*len(all_names)),int(3.7*(kmax-kmin))))
for i in range(file_no):
    for k in range(kmax-kmin):
        ax[k, i].matshow(X["D"]["chroma"][i][k])
        ax[k, i].set(title=(all_names[i])[:-3] + ' ' + str(kmin+k))

### STFT Downsampled

In [None]:
plt.rcParams['axes.titlepad'] = 15
plt.set_cmap('magma')
fig, ax = plt.subplots(nrows=kmax-kmin, ncols=len(all_names), figsize=(int(3.5*len(all_names)),int(3.7*(kmax-kmin))))
for i in range(file_no):
    for k in range(kmax-kmin):
        ax[k, i].matshow(X["D_d"]["stft"][i][k])
        ax[k, i].set(title=(all_names[i])[:-3] + ' ' + str(kmin+k))

### >>> logCQT Downsampled

In [None]:
plt.rcParams['axes.titlepad'] = 15
plt.set_cmap('magma')
fig, ax = plt.subplots(nrows=kmax-kmin, ncols=len(all_names), figsize=(int(3.5*len(all_names)),int(3.7*(kmax-kmin))))
for i in range(file_no):
    for k in range(kmax-kmin):
        ax[k, i].matshow(X["D_d"]["logCQT"][i][k])
        ax[k, i].set(title=(all_names[i])[:-3] + ' ' + str(kmin+k))

### >>> Chroma Downsampled

In [None]:
plt.rcParams['axes.titlepad'] = 15
plt.set_cmap('magma')
fig, ax = plt.subplots(nrows=kmax-kmin, ncols=len(all_names), figsize=(int(3.5*len(all_names)),int(3.7*(kmax-kmin))))
for i in range(file_no):
    for k in range(kmax-kmin):
        ax[k, i].matshow(X["D_d"]["chroma"][i][k])
        ax[k, i].set(title=(all_names[i])[:-3] + ' ' + str(kmin+k))