# Preprocessing pipeline

---

## Naming conventions

**.jams**

[Guitarist]\_[Genre][ChordProgression]-[bpm]-[key]\_[Solo or Chords]

Example:

00_BN1-129-Eb_solo.jams

**.wav**

[Guitarist]\_[Genre][ChordProgression]-[bpm]-[key]\_[Solo or Chords]\_[Pickup]\_[Processing]

Example:

00_BN1-129-Eb_solo_hex_cln.wav

## Structure

**Audio**

1. Load .wav file
1. Resample .wav file from 44kHz to 22 kHz
1. Constant-Q Transformatiom
1. Extract Frames
1. Generate windows with 9 frames width

**Labels**

1. Load corresponding .jams file
1. Extract Midi notes and timestamps
1. Generate Labels per window

## Example

### Imports

In [None]:
# import libraries
import librosa
import librosa.display
import jams
import pandas as pd
import numpy as np
import IPython
import os

import matplotlib.pyplot as plt
import seaborn as sns

### Constants

In [None]:
OUTPUT_PATH = "../data/output/"
WINDOW_SIZE = 9

### Load .wav file and .jams file

In [None]:
# example files
annot_file = "00_BN1-129-Eb_solo.jams"
audio_file = "00_BN1-129-Eb_solo_mic.wav"

# get current working directory
swizzle_dir = '/'.join(os.getcwd().split('/')[:-1])
annot_dir = swizzle_dir + '/data/raw/annotation/'
audio_dir = swizzle_dir + '/data/raw/audio_mono-mic/'

# load annotation file and audio file
annot = jams.load(annot_dir+annot_file)
audio, sr = librosa.load(audio_dir+audio_file, sr=22050)

# normalize audio
audio = librosa.util.normalize(audio)

In [None]:
IPython.display.Audio(audio_dir+audio_file)

### Extract midinotes and timestamps

**Extract data**

In [None]:
df_midi = pd.DataFrame()

# get midi notes for all strings for complete song
for idx, i in enumerate(annot['annotations']['note_midi']):
    # extract string played
    string = [idx] * len(i['data'])

    # build temporary table with midi data and string number
    temp = pd.concat([pd.DataFrame(string), pd.DataFrame(i['data'])], axis=1)

    # update df_midi
    df_midi = pd.concat([df_midi, temp], axis=0)
    del temp, string

# calculate the end_time of a note by adding time and duration
df_midi['end_time'] = df_midi['time'] + df_midi['duration']

# correct midi notes
df_midi['corrected_value'] = np.round(df_midi['value'], 0)
df_midi['corrected_value'] = df_midi['corrected_value'].astype('int')


# sort dataframe by time and reset the index
df_midi = df_midi.sort_values(by='time').reset_index()

# drop index and confidence columns
df_midi.drop(['confidence', 'index'], axis=1, inplace=True)

df_midi.head(10)

In [None]:
df_midi.shape

In [None]:
# read in conversion table (midi <-> note <-> frequency)
# source: https://musicinformationretrieval.com/midi_conversion_table.html
df_conv = pd.read_csv('../data/raw/midi_annotations/conversion_table.csv', usecols=['note', 'midi-ET', 'Hertz-ET'])
df_conv.head(5)

In [None]:
# merge df_midi with conversion table
df_midi = df_midi.merge(df_conv, left_on='corrected_value', right_on='midi-ET', how='left')

# drop duplicates (Eb == D#, C# == Db, etc pp)
df_midi.drop_duplicates(subset=['time', 'duration', 'value', 'end_time'], keep='last', inplace=True)

# rename string column from 0 to 'string'
df_midi = df_midi.rename(mapper={0: 'string'}, axis=1)

df_midi.head(10)

In [None]:
# plot result
fig, ax = plt.subplots(figsize=(20, 8))

# scatter plot: string vs time
fig = sns.scatterplot(data=df_midi, x='time', y='string', marker='');

# annotations: notes played
for i in df_midi.values:
    fig.annotate(i[6], xy=(i[1], i[0]-0.04))

fig.set_ylim(0, 5);
fig.set_yticklabels(['E', 'A', 'D', 'G', 'B', 'e']);

### Generate spectrogram from .wav file

In [None]:
# ConstantQ transformation
# it's a function of amplitude vs log(freq)
# hop_length of 512 corresponds to a framerate of 43 fps
# with for example 22 seconds, this will result in ~ 950 values
deepest_note = 'E2'
hop_length = 512

audio_cqt = np.abs(librosa.cqt(audio, sr=sr, hop_length=hop_length, n_bins=192, bins_per_octave=24))

# Convert amplitude to sound pressure level in decibel (dB)
audio_cqt_dB = librosa.amplitude_to_db(audio_cqt, ref=np.max)

# Plot the resulting spectrogram (Frequency vs. Time, colorcode: dB)
# using specshow with y_axis='log', signals happening in the midrange are better visible
fig, ax = plt.subplots(figsize=(14, 5))
img = librosa.display.specshow(audio_cqt_dB, sr=sr, x_axis='time', y_axis='hz', ax=ax) # change y_axis to 'cqt_note' if you want to see the notes
fig.colorbar(img, ax=ax, format="%+2.f dB");

### Extract sliding windows from spectrogram

In [None]:
def pad_frame(input, width: int = 9, left: bool = True):
    """Padding function to account for windows which left or right bounds are
    < 0 or > len(input)

    Args:
        input (list): frame to be padded
        width (int, optional): Window width. Defaults to 9.
        left (bool, optional): Left or right padding. Defaults to True.

    Returns:
        list: the padded input.
    """
    orig_width = len(input)
    padding = width - orig_width

    input = list(input)
    
    if padding == 0:
        return np.array(input)
        
    if left:
        input = [0] * padding + input 

    else:
        input = input + [0] * padding

    return np.array(input)

In [None]:
def extract_frames(input, width: int = 9):
    """Sliding window function to extract windows with set width from an input array

    Args:
        input (list): Spectrogram
        width (int, optional): Window width. Defaults to 9 frames.

    Returns:
        list: list of windows
    """
    
    frames = []
    half_width = width//2

    # i: different frequency bins
    # j: different timepoints

    for i in input:
    
        freq_bin = []
    
        for j, _ in enumerate(i):
            
            # set left and right bounds, so that item j is centered
            lbound = j-half_width
            rbound = j+half_width+1

            # if bounds within input indices, just append 
            if lbound >= 0 and rbound <= len(input[0]):
                freq_bin.append(i[lbound:rbound])
            
            # if left bound below zero, pad left
            elif lbound < 0:
                freq_bin.append(pad_frame(i[0:rbound], width, True))

            # if right bound greater than input length, pad right
            elif rbound > len(input[0]):
                freq_bin.append(pad_frame(i[lbound:], width, False))
                
        frames.append(freq_bin)

    return frames

In [None]:
def get_windows(input):
    """wraps the output from extract_frames in one array

    Args:
        input (list): list of windows
    """
    images = []

    for j in range(len(input[0])):
        temp = []
        for i in input:
            temp.append(i[j])

        images.append(temp)
    
    return images

In [None]:
frames = extract_frames(audio_cqt)
X = get_windows(frames)

In [None]:
# get_windows does this:
# np.swapaxes(nn_input,0,1)

In [None]:
sns.heatmap(get_windows(frames)[0], cmap='gray');


### Extract corresponding label data from .jams

In [None]:
# extract and calculate key numbers
dur = librosa.get_duration(y=audio)
fps = sr // hop_length
n_frames = fps * dur
n_frames_int = int(np.round(n_frames, 0))

# ToDo: label variables with f string
print(f"Duration: {dur:.2f}s\nFrames  : {n_frames_int}\nFPS     : {fps}/s")

In [None]:
n_windows = len(get_windows(frames))
window_labels = []
times = []

for i in range(n_windows):
    lbound = ((i-(9//2))/n_windows) * dur
    rbound = ((i+(9//2))/n_windows) * dur
    window_labels.append(df_midi[(df_midi['time'] >= lbound) & (df_midi['time'] <= rbound)][['string', 'corrected_value']].values)

In [None]:
# unique notes played for this song
df_midi['corrected_value'].nunique()

In [None]:
def midi_to_fret(window_labels):
    tuning = [40, 45, 50, 55, 59, 64]
    fretboard = np.zeros((len(window_labels), 6, 21))
    
    # 1: get string played
    # 2: for each note played during window,
    #    get empty string midi value (esmv)
    # 3: subtract esmv from played note midi value
    # 3.1: if fret played is 0,
    # 4: replace respective value in fretboard
    # 5: set first value to 1, if all other values are 0

    for widx, window in enumerate(window_labels):
        if window.size > 0:
            for item in window:
                # empty string midi value from string played (0-5)
                esmv = tuning[item[0]]
                # convert played note to fret
                fret = item[1] - esmv + 1
                # set fret to 1 in fretboard
                fretboard[widx][item[0]][fret] = 1
    
        # if no note was played in window, set first values to 1
        elif window.size == 0: 
            for idx in range(len(fretboard[widx])):
                fretboard[widx][idx][0] = 1 

    for widx in range(len(fretboard)):
        for idx, string in enumerate(fretboard[widx]):
            if sum(string) == 0:
                fretboard[widx][idx][0] = 1
        
    # return fretboard
    return fretboard

In [None]:
y = midi_to_fret(window_labels)

### Format and shape of CNN training data

In [None]:
y.shape

In [None]:
y[0]

In [None]:
noise_frames_idx = []
data_frames_idx = []

# loop through all frames
for fidx, frame in enumerate(y):
    
    null_strings = 0

    # loop over all strings in frame
    for sidx, string in enumerate(frame):

        # read out indices of '1's
        zero_idx = np.where(string==1)

        # if '1' is at position 0, increase null_string counter
        if np.squeeze(zero_idx).size == 1 and np.squeeze(zero_idx) == 0:
            null_strings += 1

    # check if all strings were not played
    if null_strings == 6:
        noise_frames_idx.append(fidx)
    else:
        data_frames_idx.append(fidx)

print(f"In file \"{annot_file[:-5]}\", {len(noise_frames_idx)}/{len(y)} are empty frames ({np.round(len(noise_frames_idx)/len(y), 2) * 100} %).")

In [None]:
fraction = 0.95

mask = np.ones(len(y), dtype=bool)
noise_frames_idx = noise_frames_idx[:int(len(noise_frames_idx)*fraction)]
mask[noise_frames_idx] = False
y[mask].shape

In [None]:
X = np.array(X)
X.shape

### Saving output of preprocessing

In [None]:
def save_output(data: np.array, labels: np.array):
    # filename has no extenstion
    filename = audio_file.split('.')[0]
    #num_frames = self.load_rep_and_labels_from_raw_file(filename)
    #print "done: " + filename + ", " + str(num_frames) + " frames" 
    save_path = OUTPUT_PATH
    if not os.path.exists(save_path):
        os.makedirs(save_path)
    
    np.savez(save_path + filename + "_data_notebook.npz", data)
    np.savez(save_path + filename + "_labels_notebook.npz", labels)

In [None]:
save_output(X, y)

### Loading output of preprocessing

In [None]:
X_loaded = np.load(OUTPUT_PATH+audio_file.split('.')[0]+'_data_notebook.npz')
y_loaded = np.load(OUTPUT_PATH+audio_file.split('.')[0]+'_labels_notebook.npz')

In [None]:
X_loaded['arr_0'][0]

In [None]:
sns.heatmap(X_loaded['arr_0'][12])

In [None]:
lbound = (12-4) * (1/43)
rbound = (12+4) * (1/43)

print(lbound, rbound)

df_midi[(df_midi['time'] < rbound) & (df_midi['time'] >= lbound)]

In [None]:
y_loaded['arr_0'][12]

In [None]:
window_labels[12]

### Preprocessing user audio data

User data will be handled a bit differently. While it is good for the CNN to see notes in different contexts for the training (hence the overlapping sliding window), it is more important to have maximum resolution while predicting on user audio.

This is why here, we implement a different approach: after the CQT, each frame is made into a 192x9 "window". The CNN then predicts on this "window" and outputs its predictions, which are now on frame level (i.e. with a resolution of frames per second).

In [None]:
def pp_user_audio(audio, window_size):

    # perform CQT
    cqt = np.abs(librosa.cqt(audio, sr=sr, hop_length=hop_length, n_bins=192, bins_per_octave=24))

    # swap axes so it's TIME x FREQUENCIES instead of the other way around
    cqt_swapped = np.swapaxes(cqt, 0, 1)
    
    # initialize numpy array
    n_freqs, n_frames = cqt.shape
    r = np.zeros((n_frames, n_freqs, window_size))

    # construct arrays for each frame over all frequency bins
    for idx, frame in enumerate(cqt_swapped):
        r[idx] = np.swapaxes([frame] * 9, 0, 1)
    
    return r

In [None]:
frame_windows = pp_user_audio(audio, WINDOW_SIZE)

## Tests

In [None]:
# test padding function
test = [1, 1, 1, 1, 1, 1]
assert sum(pad_frame(test, 9, True)) == sum([0, 0, 0, 1, 1, 1, 1, 1, 1])

# test frame_windows output
# the first item of the first frame_windows array
# should match the first item in the fourth frames item
assert frames[0][4][0] == frame_windows[0][0][0]

In [None]:
# test against test output from pipeline
np.round(frame_windows[0][0][0], 4) == np.round(4.93400028*10**-2, 4)