In [None]:
# !pip install visual_midi

In [None]:
import pandas as pd
import librosa
import numpy as np
from IPython.display import display
from IPython.display import Audio
import seaborn as sns
import matplotlib.pyplot as plt
import mir_eval.sonify
import copy

## Read Queries and Songs

In [None]:
queries_df = pd.read_pickle("./queries_with_midi.pkl")

In [None]:
queries_df.head()

In [None]:
songs_df = pd.read_pickle("./songs_with_midi.pkl")
songs_df.head()

In [None]:
songs_df["voice_piano_roll"] = songs_df["vocals_midi"].apply(lambda x:x.get_piano_roll(fs=10))

In [None]:
queries_df["piano_roll"] = queries_df["query_basic_midi"].apply(lambda x:x.get_piano_roll(fs=10))

In [None]:
query_test_df = queries_df.sample(frac=0.6)
query_train_df = queries_df[~queries_df.index.isin(query_test_df.index)]
print(query_test_df.shape)
print(query_train_df.shape)

In [None]:
songs_train_df = songs_df[songs_df["Song ID"].isin(query_train_df["Song ID"])]
songs_train_df.shape


In [None]:
songs_test_df = songs_df[~songs_df.index.isin(songs_train_df.index)]
songs_test_df.shape

In [None]:
from sklearn.model_selection import train_test_split

train_songs_ids, validation_songs_ids = train_test_split(songs_train_df["Song ID"].tolist(), test_size=0.2)


In [None]:
len(train_songs_ids)

In [None]:
len(validation_songs_ids)

In [None]:
import tensorflow as tf
import random

def data_aumentation(query_train_df, songs_train_df, songs_ids, factor_increase = 200):
    rows = np.array([])
    songs_scope_df = songs_train_df[songs_train_df["Song ID"].isin(songs_ids)]
    queries_scope_df = query_train_df[query_train_df["Song ID"].isin(songs_ids)]
    for song_id in songs_ids:
        for i in range(factor_increase):
            query_variant = get_random_variation_query_piano_roll(queries_scope_df, song_id)
            song_variant = get_random_variation_song_piano_roll(songs_scope_df, song_id)
            if(np.random.rand()>.5):
                rows = np.append(rows, [query_variant, song_variant, 1.0], axis=0)
            else:
                rows = np.append(rows, [song_variant, query_variant, 1.0], axis=0)
            
            neg_song_id = None
            while neg_song_id == None or neg_song_id == song_id:
                neg_song_id = random.choice(songs_ids)
            query_variant = get_random_variation_query_piano_roll(queries_scope_df, song_id)
            neg_song_variant = get_random_variation_song_piano_roll(songs_scope_df, neg_song_id)
            if(np.random.rand()>.5):
                rows = np.append(rows, [neg_song_variant, query_variant, 0.0], axis=0)
            else:
                rows = np.append(rows, [query_variant, neg_song_variant, 0.0], axis=0)
    return np.asarray(rows)
            
def get_random_variation_song_piano_roll(songs_scope_df,song_id):
    song_row = songs_scope_df[songs_scope_df["Song ID"] == song_id].sample(1).iloc[0]
    piano_roll = song_row["voice_piano_roll"]
    return create_random_variant_piano_roll(piano_roll)
    
def get_random_variation_query_piano_roll(queries_scope_df,song_id):
    query_row = queries_scope_df[queries_scope_df["Song ID"] == song_id].sample(1).iloc[0]
    piano_roll = query_row["piano_roll"]
    return create_random_variant_piano_roll(piano_roll)

def create_random_variant_piano_roll(pr):
    return add_noise_to_notes(add_random_silence(random_pitch_roll(pr)))

def random_pitch_roll(piano_roll):
    return piano_roll
    random_shift = random.choice(range(-12,12))
    return np.roll(piano_roll, shift=random_shift, axis=0)

def add_random_silence(piano_roll):
    return piano_roll
    pos = random.randint(0,piano_roll.shape[1])
    silent_size = random.randint(0,50)
    silent = np.zeros((128, silent_size))
    return np.concatenate((piano_roll[:,:pos],silent,piano_roll[:,pos:]), axis=1)

def add_noise_to_notes(pr):
    return piano_roll
    def noiser(t):
        if(t>0):
            return t+np.random.normal(0, 5)
        return t
    vfunc = np.vectorize(noiser)
    return np.asarray(vfunc(pr),dtype=np.float32)


In [None]:
train_data = data_aumentation(query_train_df, songs_train_df, train_songs_ids, factor_increase = 200)
val_data = data_aumentation(query_train_df, songs_train_df, validation_songs_ids, factor_increase = 200)

In [None]:
train_data.shape

In [None]:
val_data.shape

In [None]:
from keras.layers import Input, Conv1D, MaxPooling1D, GlobalAveragePooling1D, Dense, Dot, Concatenate, Lambda
from keras.models import Model
import keras.backend as K

def shared_model(input_shape):
    model_input = Input(shape=input_shape)
    conv_output = Conv1D(filters=64, kernel_size=3, activation='relu')(model_input)
    pool_output = MaxPooling1D(pool_size=2)(conv_output)
    global_avg_output = GlobalAveragePooling1D()(pool_output)
    dense_output = Dense(20, activation='relu')(global_avg_output)
    return Model(inputs=model_input, outputs=dense_output)

# Input layer 1
input_1 = Input(shape=(128, None))
shared_output_1 = shared_model((128, None))(input_1)

# Input layer 2
input_2 = Input(shape=(128, None))
shared_output_2 = shared_model((128, None))(input_2)

# Compute cosine similarity
dot_product = Dot(axes=-1, normalize=True)([shared_output_1, shared_output_2])

# Compute cosine similarity with size normalization
cosine_similarity = Lambda(lambda x: x[0] / (K.sqrt(K.sum(K.square(x[1]), axis=-1) * K.sum(K.square(x[2]), axis=-1)) + K.epsilon()))([dot_product, shared_output_1, shared_output_2])

# Concatenate the original outputs and cosine similarity
output = Concatenate()([shared_output_1, shared_output_2, cosine_similarity])

# Create the model
model = Model(inputs=[input_1, input_2], outputs=output)

# Compile and train the model
# ...


In [None]:
np.random.rand()>.5

In [None]:
import tensorflow as tf
mnist = tf.keras.datasets.mnist

(x_train, y_train),(x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0

model = tf.keras.models.Sequential([
  tf.keras.layers.Flatten(input_shape=(28, 28)),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.2),
  tf.keras.layers.Dense(10, activation='softmax')
])

model.compile(
    optimizer='sgd',
    loss='mse',
    metrics=[tf.keras.metrics.MeanSquaredError()])


model.fit(x_train, y_train, epochs=20)

In [None]:

model.evaluate(x_test, y_test)

In [None]:
midi = songs_df["vocals_midi"].iloc[0]

In [None]:
print(list(midi.get_piano_roll(fs=3)[:,330]))

In [None]:
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt


def plot_midi_piano_roll(array_2d):
    array_2d = np.flip(array_2d, axis=0)
    # Set up the plot
    plt.figure(figsize=(10, 8))
    # Create a heatmap
    sns.heatmap(array_2d, cmap='viridis', cbar_kws={'label': 'Values'})

    # Customize labels and title
    plt.xlabel('Columns')
    plt.ylabel('Rows')
    plt.title('2D Array Heatmap')

    # Show the plot
    plt.show()
    query_midi_wav = midi.synthesize()
    display(Audio(data=query_midi_wav, rate=44100))
    
song_row = songs_df[songs_df["Song ID"] == 1118].iloc[0]
# print(song_row)
plot_midi_piano_roll(song_row["vocals_midi"].get_piano_roll(fs=3))
# query_row = queries_df[queries_df["Query ID"] == 'q1'].iloc[0]
# plot_midi_piano_roll(query_row["query_basic_midi"])
# print(query_row)

In [None]:
query_midi_wav = midi.synthesize()
display(Audio(data=query_midi_wav, rate=44100))
    

In [None]:
query_

### Append Features in Dataframes

#### Metrics Based in OnSet Detect

In [None]:
import numpy as np
import nltk
from nltk.util import ngrams

In [None]:
# GET NOTES METRICS BASED ON ONSET DETECT
def compute_metrics(audio_file):
    y, sr = librosa.load(audio_file)
    times = librosa.onset.onset_detect(y=y, sr=sr, units='time')
    diff_list = []
    for  index, t in enumerate(times):
        if(index >0):
            diff_list.append(t- times[index-1])
    ratio_list = []
    for index, t in enumerate(diff_list):
        if(index >0):
            ratio_list.append(t/diff_list[index-1])
    
    def clean_ratio_list(raw_list):
        def closest(lst, K):
            lst = np.asarray(lst)
            idx = (np.abs(lst - K)).argmin()
            return lst[idx]

        times_proportions = [1,0.5,0.333,0.25,2,0.666,3, 1.5, 0.75, 4,1.333]
        
        result = []
        for index, elem in enumerate(raw_list):
            if(elem>4):
                result.append(str(round(elem)))
            else:
                result.append(str(closest(times_proportions,elem)))
        return result        
    return clean_ratio_list(ratio_list)

def get_query_metric(row):
    return compute_metrics(f"MTG-QBH/audio/{row['Query ID']}.wav")

def get_ngram_from_list(input_list, n):
    n_grams = ngrams(input_list, n)
    return [' '.join(grams) for grams in n_grams]      
def get_vocal_metric(row):
    return compute_metrics(f"output/htdemucs/{row['Song ID']}/vocals.wav")

In [None]:
queries_df["metric"] = queries_df.apply(lambda row: get_query_metric(row), axis=1)

In [None]:
songs_df["vocals_metric"] = songs_df.apply(lambda row: get_vocal_metric(row), axis=1)

#### Onset Midi Melody Filter

In [None]:
def is_inside_beat(note, start, end):
    if(start <= note.start and note.start<= end):
        return True
    if(start <= note.end and note.end<= end):
        return True
    return False

def get_notes_between(notes, start, end):
    return list(filter(lambda x: is_inside_beat(x, start, end), notes))
def get_longest_inside_beat(notes, start, end):
    candidates = get_notes_between(notes, start, end)
    if(len(candidates)>0):
        max_duration = 0
        selected = None
        for cand in candidates:
            dur = min(end, cand.end) - max(start,cand.start)
            if(max_duration < dur):
                max_duration = dur
                selected = cand        
        if(selected is not None):
            note = copy.deepcopy(selected)
            note.start = start
            note.end = start+max_duration
            return note
    return None

def get_all_notes_from(midi):
    notes= []
    for inst in midi.instruments:
        notes = notes + inst.notes
    return notes
    
def clean_midi_based_onset(y, sr, midi):
    times = librosa.onset.onset_detect(y=y, sr=sr, units='time')
    all_notes = get_all_notes_from(midi)
    result_notes = []
    for index, t in enumerate(times):
        if(index< len(times)-1):
            note = get_longest_inside_beat(all_notes, t, times[index+1])
            if(note is not None):
                result_notes.append(note)
                
    copy_midi = copy.deepcopy(midi)
    copy_midi.instruments[0].notes = result_notes
    return copy_midi
def compute_on_set_melody(audio_path, midi):
    y, sr = librosa.load(audio_path)
    return clean_midi_based_onset(y,sr, midi)

def get_query_clean_midi(row):
    midi = row['query_basic_midi']
    return compute_on_set_melody(f"MTG-QBH/audio/{row['Query ID']}.wav", midi )

queries_df["clean_midi_onset"] = queries_df.apply(lambda r: get_query_clean_midi(r), axis=1)

In [None]:

def get_song_clean_midi(row):
    midi = row['vocals_midi']
    return compute_on_set_melody(f"output/htdemucs/{row['Song ID']}/vocals.wav", midi)

songs_df["vocal_clean_midi"] = songs_df.apply(lambda r: get_song_clean_midi(r), axis=1)

In [None]:
songs_df.head()

### On Beat Notes

In [None]:
import collections
import bisect

class ClosestKeyDict:
    def __init__(self, list_pair):
        self._dict = collections.OrderedDict(list_pair)
        self.keys = list(self._dict.keys())
    def get_first_pos_greater(self, v):
        pos = bisect.bisect_left(list(self._dict.keys()), v)
        return pos
    def get_closest_key(self, key_in):
        pos = self.get_first_pos_greater(key_in)
        key = self.keys[pos]
        if(pos>0):
            key_before = self.keys[pos-1]
            if(abs(key_in-key_before) < abs(key-key_in)):
                return key_before
        return key
    def get_closest_value_from_key(self, key_in, threshold = float('inf')):
        key = self.get_closest_key(key_in)
        if(abs(key-search_key)<threshold):
            return self._dict[key]
        return None
        
def build_frequencies_dict(f0, voiced_flag):
    times = librosa.times_like(f0)
    pairs = []
    for index, f in enumerate(f0):
        if(voiced_flag[index]):
            pairs.append((times[index], f))
        else:
            pairs.append((times[index], None))
    return ClosestKeyDict(pairs)

def get_audio_signature_transcription(audio_path):
    y, sr = librosa.load(audio_path)
    display(Audio(data=y,rate=sr))
    
    tempo, beats = get_beats(y, sr)
    f0, voiced_flag, voiced_probs = librosa.pyin(y, fmin=librosa.note_to_hz('C2'),fmax=librosa.note_to_hz('C7'), frame_length=1024)
    f0_dict = build_frequencies_dict(f0,voiced_flag)
    trans_1 = get_melody_transcription(f0_dict,tempo, beats, level=1) 
    display_transcript(trans_1, beats, sr)
    trans_2 = get_melody_transcription(f0_dict,tempo, beats, level=2)
    display_transcript(trans_2, beats, sr)
    return [trans_1, trans_2]

def display_transcript(transcripts, beats, sr):
    freq = np.array(list(map(lambda x: librosa.midi_to_hz(x) if x is not None else None , transcripts)))
    mask = np.array(list(map(lambda x: True if x is not None else False , freq)))
    # beats_masked = np.ma.MaskedArray(beats, mask=~mask).compressed()
    times= librosa.times_like(freq, sr=sr)
    beats_masked = np.ma.MaskedArray(times, mask=~mask).compressed()
    freq_masked = np.ma.MaskedArray(freq, mask=~mask).compressed()

    y = mir_eval.sonify.pitch_contour(beats_masked, freq_masked, sr)
    display(Audio(data=y, rate=sr))
def get_melody_transcription(f0_dict, tempo, beats, level = 1):
    result = []
    threshold = tempo/(level*2)
    for index, beat in enumerate(beats):
        freq = f0_dict.get_closest_value_from_key(beat,threshold)
        note = None
        if(freq is not None):
            note = round(librosa.hz_to_midi(freq))
        result.append(note)
    return result
    
def get_subdivision_beats(beats:np.array, num:int):
    result = []
    for ind, beat in enumerate(beats):
        result.append(beat)
        next_beat = None
        if(ind+1<len(beats)):
            next_beat = beats[ind+1]
        if(next_beat is not None):
            diff = next_beat-beat
            step = diff/num
            for index in range(1,num):
                sub_beat = beat+step*index
                result.append(sub_beat)
    return result
                
def get_beats(y, sr):
    tempo, beats = librosa.beat.beat_track(y=y, sr=sr)
    beats_time = librosa.frames_to_time(beats)
    return tempo, beats_time


def plot_transcription(data):
    # Create lists to store the positions and heights of bars
    bar_positions = []
    bar_heights = []

    # Iterate through the data
    for i, value in enumerate(data):        
        bar_positions.append(i)
        if(value is None):
            bar_heights.append(0.0)
        else:
            bar_heights.append(value)

    sns.barplot(x=bar_positions, y=bar_heights, color="blue")

    # Show the plot
    plt.show()
trans_1, trans_2 = get_audio_signature_transcription(f"MTG-QBH/audio/q27.wav")
print(trans_1)
plot_transcription(trans_1)
print(trans_2)
plot_transcription(trans_2)

trans_song_1, trans_song_2 = get_audio_signature_transcription(f"output/htdemucs/789/vocals.wav")
print(trans_song_1)
plot_transcription(trans_song_1)
print(trans_song_2)
plot_transcription(trans_song_2)
# sub_beats = get_subdivision_beats(beats, 2)
# for ind, beat in enumerate(beats):
#     if(ind+1 < len(beats)):
#         print(f"{beats[ind+1]-beats[ind]}")

# for ind, beat in enumerate(sub_beats):
#     if(ind+1 < len(sub_beats)):
#         print(f"{sub_beats[ind+1]-sub_beats[ind]}")


In [None]:
def get_n_gram_from_notes(notes, n):
    result = []
    l = len(notes)
    for idx, note in enumerate(notes):
        if(idx+1<l-1):
            diff = notes[idx+1].pitch - note.pitch
            diff = str(diff)
            result.append(diff)
    n_grams = ngrams(result, n)
    return [' '.join(grams) for grams in n_grams]

def get_midi_n_grams(midi, n):
    result = []
    for inst in midi.instruments:
        if(not inst.is_drum):
            n_grams = get_n_gram_from_notes(inst.notes,n)
            result = result+n_grams
    return result

def get_n_grams(midi):
    grams = get_midi_n_grams(midi,3)
    return grams

    

In [None]:
queries_df["query_n_grams"] = queries_df["query_basic_midi"].apply(get_n_grams)
queries_df.head()

In [None]:
# Mean Size Sets for Queries
queries_df["query_n_grams"].apply(lambda x: len(x)).mean()

In [None]:
songs_df["song_n_grams"] = songs_df["vocals_midi"].apply(get_n_grams)
songs_df.head()

In [None]:
# Mean Size Sets for Songs
songs_df["song_n_grams"].apply(lambda x: len(x)).mean()

## Calculate Similarity Traditional

In [None]:
def get_overlap_coef(a,b):
    a_set = set(a)
    b_set = set(b)
    inter_set = a_set.intersection(b_set)
    return len(inter_set)/min(len(a_set),len(b_set))

def get_index_of_song_in_query(cross_df, query_id,song_id):
    query_results = cross_df[cross_df["Query ID"] == query_id]
    list_results = query_results["Song ID"].tolist()
    if(song_id in list_results):
        return list_results.index(song_id)+1
    else:
        return None
    
def inverse(num):
    return 1.0/num

def get_mrr(rank_list):
    result = {}
    ranks = rank_list[np.logical_not(np.isnan(rank_list))]
    inverted_ranks = inverse(ranks)
    result["mrr"] = inverted_ranks.mean()
    result["count"] = len(inverted_ranks)
    return result
def run_experiment(queries_df, songs_df,queries_set_col, songs_set_col):
    q_df = queries_df.copy()
    s_df = songs_df.copy()
    cross_df = q_df[["Query ID",queries_set_col]].merge(s_df[["Song ID",songs_set_col]], how="cross")
    cross_df["similarity"] = cross_df.apply(lambda row: get_overlap_coef(row[queries_set_col], row[songs_set_col]), axis=1)
    cross_df = cross_df.sort_values(by="similarity", ascending=False)
    q_df["index_search"] = q_df.apply(lambda x: get_index_of_song_in_query(cross_df, x["Query ID"], x["Song ID"]) , axis=1)
    
    q_df.sort_values(by="index_search")
    result = get_mrr(q_df["index_search"])
    result["mean_song_set_size"] = s_df[songs_set_col].apply(lambda x: len(x)).mean()
    result["mean_query_set_size"] = q_df[queries_set_col].apply(lambda x: len(x)).mean()
    result["index_search_mean"] = q_df["index_search"].mean()
    return result, q_df, cross_df


In [None]:

def run_experiment_metric(queries_df, songs_df, from_n, to_n):
    def get_multi_grams(set_list, from_n = 3, to_n=5):
        result = []
        for i in range(from_n, to_n+1):
            result = result + get_ngram_from_list(set_list,i)
        return result
    q_df = queries_df.copy()
    s_df = songs_df.copy()
    q_df["query_metric_set"] = q_df["metric"].apply(lambda x: get_multi_grams(x, from_n, to_n))
    s_df["song_metric_set"] = s_df["vocals_metric"].apply(lambda x: get_multi_grams(x, from_n, to_n))
    
    return run_experiment(q_df, s_df,"query_metric_set","song_metric_set")


In [None]:
run_experiment_metric(queries_df, songs_df,2,2)[0]

In [None]:
run_experiment_metric(queries_df, songs_df,3,3)[0]

In [None]:
run_experiment_metric(queries_df, songs_df,4,4)[0]

In [None]:
run_experiment_metric(queries_df, songs_df,5,5)[0]

In [None]:
run_experiment_metric(queries_df, songs_df,6,6)[0]

In [None]:
run_experiment_metric(queries_df, songs_df,7,7)[0]

In [None]:
result, _, cr_sim = run_experiment_metric(queries_df, songs_df,4,6)
result

In [None]:
cr_sim["similarity"].hist()

In [None]:
run_experiment_metric(queries_df, songs_df,5,6)[0]

In [None]:

def run_experiment_midi(queries_df, songs_df,midi_to_gram = None,):
    q_df = queries_df.copy()
    s_df = songs_df.copy()
    q_df["query_n_grams"] = q_df["query_basic_midi"].apply(midi_to_gram)
    s_df["song_n_grams"] = s_df["vocals_midi"].apply(midi_to_gram)
   
    return run_experiment(q_df, s_df,"query_n_grams","song_n_grams")


In [None]:
run_experiment_midi(queries_df, songs_df, lambda midi: get_midi_n_grams(midi,3))[0]

In [None]:
run_experiment_midi(queries_df, songs_df, lambda midi: get_midi_n_grams(midi,4))[0]


In [None]:
run_experiment_midi(queries_df, songs_df, lambda midi: get_midi_n_grams(midi,5))[0]

In [None]:
run_experiment_midi(queries_df, songs_df, lambda midi: get_midi_n_grams(midi,6))[0]

In [None]:
run_experiment_midi(queries_df, songs_df, lambda midi: get_midi_n_grams(midi,7))[0]

In [None]:
run_experiment_midi(queries_df, songs_df, lambda midi: get_midi_n_grams(midi,8))[0]

In [None]:
run_experiment_midi(queries_df, songs_df, lambda midi: get_midi_n_grams(midi,9))[0]

In [None]:
run_experiment_midi(queries_df, songs_df, lambda midi: get_midi_n_grams(midi,14))[0]

In [None]:
def get_multi_grams(query_midi, from_n = 3, to_n=5):
    result = []
    for i in range(from_n, to_n+1):
        result = result + get_midi_n_grams(query_midi,i)
    return result

run_experiment_midi(queries_df, songs_df, lambda midi: get_multi_grams(midi,4,9))[0]

In [None]:
result, q_df, _ = run_experiment_midi(queries_df, songs_df, lambda midi: get_multi_grams(midi,4,14))
result

In [None]:
def run_experiment_clean_midi(queries_df, songs_df,midi_to_gram = None,):
    q_df = queries_df.copy()
    s_df = songs_df.copy()
    q_df["query_n_grams"] = q_df["clean_midi_onset"].apply(midi_to_gram)
    s_df["song_n_grams"] = s_df["vocal_clean_midi"].apply(midi_to_gram)
   
    return run_experiment(q_df, s_df,"query_n_grams","song_n_grams")


In [None]:
result, q_df, _ = run_experiment_clean_midi(queries_df, songs_df, lambda midi: get_multi_grams(midi,3,3))
result

In [None]:
result, q_df, _ = run_experiment_clean_midi(queries_df, songs_df, lambda midi: get_multi_grams(midi,4,4))
result

In [None]:
result, q_df, _ = run_experiment_clean_midi(queries_df, songs_df, lambda midi: get_multi_grams(midi,5,5))
result

In [None]:
result, q_df, _ = run_experiment_clean_midi(queries_df, songs_df, lambda midi: get_multi_grams(midi,6,6))
result

In [None]:
result, q_df, _ = run_experiment_clean_midi(queries_df, songs_df, lambda midi: get_multi_grams(midi,7,7))
result

In [None]:
result, q_df, _ = run_experiment_clean_midi(queries_df, songs_df, lambda midi: get_multi_grams(midi,3,8))
result

In [None]:
result, q_df, _ = run_experiment_clean_midi(queries_df, songs_df, lambda midi: get_multi_grams(midi,4,6))
result

In [None]:
(q_df["index_search"]<=10).sum()/(q_df["index_search"]).count()

In [None]:
q_df["index_search"].hist()

In [None]:
q_df[q_df["index_search"]>300].sample(10)

---

### EDA Exploring Query Song

In [None]:
from bokeh.plotting import figure, show, output_notebook
from bokeh.models import ColumnDataSource

def plot_midi(midi):
    notes = list(map(lambda note: (note.start,note.pitch,note.end-note.start), midi.instruments[0].notes))
    # Sample data
    x = [t[0] for t in notes]
    y = [t[1] for t in notes]
    z = [t[2] for t in notes]
    
    output_notebook()
    
    # Create a ColumnDataSource with the data
    source = ColumnDataSource(data=dict(time=x, pitch=y, duration=z))

    # Create a Bokeh figure
    p = figure(width=400, height=400, title="Scatter Plot with Rectangles")

    # Add rectangles to the plot
    p.rect(x='time', y='pitch', width='duration', height=1, source=source, fill_alpha=0, line_color='black')

    # Set axis labels
    p.xaxis.axis_label = 'Time'
    p.yaxis.axis_label = 'Pitch'

    # Show the plot
    show(p)

In [None]:

def does_overlap(noteA, noteB):
    if(noteA.start <= noteB.start and noteB.start<= noteA.end):
        return True
    if(noteB.start <= noteA.start and noteA.start<= noteB.end):
        return True
    if(noteA.start <= noteB.end and noteB.end<= noteA.end):
        return True
    if(noteB.start <= noteA.end and noteA.end<= noteB.end):
        return True
    return False
    
def clean_midi(midi_in):
    midi = copy.deepcopy(midi_in)
    inst_index = 0
    for instrument in midi.instruments:
        clean_notes = []
        for note in instrument.notes:
            clean_notes.append(note)
            intersection = list(filter(lambda x:does_overlap(note, x),  clean_notes))
            if(len(intersection)==0):
                clean_notes.append(note)
            else:
                pass
#                 velocities = list(map(lambda x: x.velocity, intersection))
#                 index_max = np.argmax(velocities)
                
#                 clean_notes.append(intersection[index_max])

        # clean_notes = sorted(list(set(clean_notes)), key=lambda x: x.start)
        
        result = []
        median_duration = np.median(list(map(lambda x: x.end-x.start, clean_notes)))
        print(f"MEDIAN:{median_duration}")
        for index, note in enumerate(clean_notes):
            is_note_clean = True
            if(index > 0):
                if(clean_notes[index-1].pitch == note.pitch):
                    is_note_clean = False
            current_duration = note.end - note.start
            if(median_duration/current_duration > 8):
                is_note_clean = False
            if(is_note_clean):
                result.append(note)
            
        instrument.notes = result
        midi.instruments[inst_index] = instrument
        inst_index +=1 
    return midi

In [None]:
def get_f0_time_notes(y, sr):
    

In [None]:


def analyse_query_song(query_id, song_id, midi_to_gram):
    q_y, sr = librosa.load(f"MTG-QBH/audio/{query_id}.wav")
    display(Audio(data=q_y, rate=sr))
    
    query_data = queries_df[queries_df["Query ID"] == query_id].iloc[0]
    query_midi = query_data["query_basic_midi"]
    plot_midi(query_midi)
    query_midi_wav = query_midi.synthesize()
    display(Audio(data=query_midi_wav, rate=44100))
    
    s_y, sr = librosa.load(f"songs_wav/{song_id}.wav")
    display(Audio(data=s_y, rate=sr))
    
    s_vocals_y, sr = librosa.load(f"output/htdemucs/{song_id}/vocals.wav")
    display(Audio(data=s_vocals_y, rate=sr))
    
    song_data = songs_df[songs_df["Song ID"] == int(song_id)].iloc[0]
    song_vocals_midi = song_data["vocals_midi"]
    plot_midi(song_vocals_midi)
    song_vocals_midi_wav = song_vocals_midi.synthesize()
    display(Audio(data=song_vocals_midi_wav, rate=44100))
    

    query_set = midi_to_gram(query_midi)
    song_set = midi_to_gram(song_vocals_midi)
    similarity = get_overlap_coef(query_set, song_set)
    
    display(f"Similarity:{similarity}")
    
    clean_query_midi = clean_midi_based_onset(q_y,sr, query_midi)
    plot_midi(clean_query_midi)
    clean_query_midi_wav = clean_query_midi.synthesize()
    display(Audio(data=clean_query_midi_wav, rate=44100))
    query_set = midi_to_gram(clean_query_midi)
    
    
    clean_song_midi = clean_midi_based_onset(s_vocals_y,sr,song_vocals_midi)
    plot_midi(clean_song_midi)
    clean_song_midi_wav = clean_song_midi.synthesize()
    display(Audio(data=clean_song_midi_wav, rate=44100))
    song_set = midi_to_gram(clean_song_midi)
#     clean_query_midi = clean_midi(query_midi)
#     plot_midi(clean_query_midi)
#     clean_query_midi_wav = clean_query_midi.synthesize()
#     display(Audio(data=clean_query_midi_wav, rate=44100))
#     query_set = midi_to_gram(clean_query_midi)
    
    
#     clean_song_midi = clean_midi(song_vocals_midi)
#     plot_midi(clean_song_midi)
#     clean_song_midi_wav = clean_song_midi.synthesize()
#     display(Audio(data=clean_song_midi_wav, rate=44100))
#     song_set = midi_to_gram(clean_song_midi)
    
    clean_similarity = get_overlap_coef(query_set, song_set)
    
    display(f"Similarity Clean :{clean_similarity}")
    
    query_metric = get_metric_list(q_y, sr)
    vocals_metric = get_metric_list(s_vocals_y, sr)
        
    query_gram = get_ngram_from_list(query_metric,4)
    vocals_gram = get_ngram_from_list(vocals_metric,4)
    
    metric_similarity = get_overlap_coef(query_gram, vocals_gram)
    
    display(f"Metric Similarity:{metric_similarity}")
    
    
    
    return metric_similarity
analyse_query_song("q59","1396",   lambda midi: get_multi_grams(midi,4,14))

In [None]:
y, sr = librosa.load(librosa.ex('trumpet'))
display(Audio(data=y, rate=sr))
librosa.onset.onset_detect(y=y, sr=sr, units='time')
o_env = librosa.onset.onset_strength(y=y, sr=sr)
times = librosa.times_like(o_env, sr=sr)
onset_frames = librosa.onset.onset_detect(onset_envelope=o_env, sr=sr)
onset_frames

In [None]:
import numpy as np

                   

In [None]:
import matplotlib.pyplot as plt
D = np.abs(librosa.stft(y))
fig, ax = plt.subplots(nrows=2, sharex=True)
librosa.display.specshow(librosa.amplitude_to_db(D, ref=np.max),
                         x_axis='time', y_axis='log', ax=ax[0])
ax[0].set(title='Power spectrogram')
ax[0].label_outer()
ax[1].plot(times, o_env, label='Onset strength')
ax[1].vlines(times[onset_frames], 0, o_env.max(), color='r', alpha=0.9,
           linestyle='--', label='Onsets')
ax[1].legend()



In [None]:
from visual_midi import Plotter
from visual_midi import Preset
from pretty_midi import PrettyMIDI

preset = Preset(plot_width=850)
plotter = Plotter(preset, plot_max_length_bar=4)
plotter.plot(midi)
_ColorGroupMeta

In [None]:
# EXPERIMENT
# y, sr = librosa.load(librosa.ex('choice'))
def shift_audio(query_filename, steps = 4):
    y, sr = librosa.load(f"MTG-QBH/audio/{query_filename}")
    y_third = librosa.effects.pitch_shift(y, sr=sr, n_steps=steps)
    y_strech = librosa.effects.time_stretch(y,  rate=0.8)
    display(Audio(data=y, rate=sr))
    display(Audio(data=y_third, rate=sr))
    display(Audio(data= y_strech, rate=sr))
shift_audio("q3.wav",4)

# shift_audio("q3.wav",2)
# shift_audio("q3.wav",-12)
# shift_audio("q3.wav",12)

In [None]:

# y, sr = librosa.load(librosa.ex('choice'))
y, sr = librosa.load(f"MTG-QBH/audio/q3.wav")

In [None]:
_, beat_frames = librosa.beat.beat_track(y=y, sr=sr,
                                         hop_length=512)

In [None]:
beat_frames

In [None]:
beat_samples = librosa.frames_to_samples(beat_frames)


In [None]:
beat_samples

In [None]:
intervals = librosa.util.frame(beat_samples, frame_length=2, hop_length=1).T



In [None]:
intervals

In [None]:
y_out = librosa.effects.remix(y, intervals[::-1])

In [None]:
display(Audio(data= y, rate=sr))
display(Audio(data= y_out, rate=sr))