# Import the Music Transformer Model

In [None]:
!pip install tensorflow-gpu
!pip install -r requirements.txt 
!pip install tfp-nightly

In [None]:
from processor import encode_midi
from processor import decode_midi
from tensorflow.python import keras 
import utils
import os
import numpy as np
import pandas as pd

In [None]:
from model import MusicTransformerDecoder
from custom.layers import *
from custom import callback
import params as par
from tensorflow.python.keras.optimizer_v2.adam import Adam
from data import Data
import utils
import argparse
import datetime
import sys

In [None]:
# Declare constants 
batch_size = 2
pickle_dir = 'Donnees'
max_seq = 2048
epochs = 10
is_reuse = False
load_path = None
save_path = "result/trainedmodel_entrainement"
multi_gpu = False
num_layer = 6 # 6 multihead attention layers. 

In [None]:
# Model Learning rate
l_r = None
learning_rate = callback.CustomSchedule(par.embedding_dim) if l_r is None else l_r
opt = Adam(learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9)

In [None]:
# define model
mt = MusicTransformerDecoder(
            embedding_dim=256,
            vocab_size=par.vocab_size,
            num_layer=6,
            max_seq=max_seq,
            dropout=0.2,
            debug=False, loader_path=load_path)
mt.compile(optimizer=opt, loss=callback.transformer_dist_train_loss)

In [None]:
# Define tensorboard writer (only for logs)
current_time = datetime.datetime.now().strftime('%Y%m%d-%H%M%S')
train_log_dir = 'logs/mt_decoder/'+current_time+'/train'
eval_log_dir = 'logs/mt_decoder/'+current_time+'/eval'
train_summary_writer = tf.summary.create_file_writer(train_log_dir)
eval_summary_writer = tf.summary.create_file_writer(eval_log_dir)

In [None]:
# Loads the model
mt.load_config_file(save_path)
mt.load_ckpt_file(save_path)

mt.reset_metrics()

# Important Functions

In [None]:
import json

def get_data(file,choice='all',nans=True,remove_duos=False):
  """
  Returns a dataframe from the original maestro dataset
  Args:
    file: dataset file path
    choice: choice of the set of composers to return (check ./maestro-v3.0.0/composers.json)
    nans: if false, will remove null values
    remove_duos: if true, will remove pieces of music from a duo of composers

  Returns:
      Dataframe according the parameters

  Raises:
      ValueError: if the choice is not in the possible list of choices.
  """
  data = pd.read_csv(file)
  if not nans:
    try:
      data = data.dropna()
    except:
      data = data.dropna(axis=0, how='all')

  if remove_duos:
    data = data[~data.canonical_composer.str.contains("/")]

  if choice=='all':
    return data

  FILE = open("maestro-v3.0.0/composers.json", "r").read()
  C = json.loads(FILE)
  C = C["everyComposer"]

  if choice in C.keys():
    data = data[data['canonical_composer'].isin(C[choice])]
  else:
    print("Choice not recognized, please check './maestro-v3.0.0/composers.json'")
    raise ValueError

  return data

In [None]:
def get_last_layers(k, tens):
  """
  Returns the last k layers of the model
  Args:
  k: layer number from 1 to 6, 0 < k < 5
  """
  return tens[k:]

def mean_pooling(tens):
  """Averages the matrices of a layer"""
  return np.mean(tens[0][0],axis=0)

In [None]:
def chunks(lst, n):
    """Yield successive n-sized chunks from lst."""
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

list(chunks([1,2,3,4,5,6,7,8,9],4))

[[1, 2, 3, 4], [5, 6, 7, 8], [9]]

In [None]:
from ast import literal_eval

def literal_encoded(data):
  """Parses a string into a python array"""
  return data["encoded_sequence"].apply(lambda seq: literal_eval(seq))

def get_attention(sequence):
  """Computes the self attention from a sequence of tokens"""
  last_layer = get_last_layers(5,np.array(mt.Decoder( np.array([sequence]), training=None, mask=None)[1])) # Get the attention of the last layer
  return mean_pooling(last_layer).flatten() # Compute the mean pooling

In [None]:
from sklearn.linear_model import LogisticRegression

def get_model(X,y,random_state=42):
  """Initialize our Logistic Regression"""
  return LogisticRegression(random_state=random_state).fit(X, y)

# Composer classifcation using attention matrices

In [None]:
data = get_data(file='maestro-64-unique-titles.csv',choice='top10',nans=True,remove_duos=True)
data.canonical_composer.value_counts()

Franz Schubert             41647
Ludwig van Beethoven       41375
Franz Liszt                39025
Frédéric Chopin            33755
Robert Schumann            25374
Johann Sebastian Bach      15755
Sergei Rachmaninoff        14273
Claude Debussy             11216
Wolfgang Amadeus Mozart     9896
Joseph Haydn                7123
Name: canonical_composer, dtype: int64

In [None]:
def train_test(data,composer1,composer2,train_size,test_size,random_state=42):
  """
  Create an evenly distributed train_test set with data
  Args:
    data: DataFrame object representing our Dataset
    composer1: Name of the first composer
    composer2: Name of the second composer
    
  """
  #Get every title for each composer
  A = data[data['canonical_composer'].isin([composer1])]
  B = data[data['canonical_composer'].isin([composer2])]
  
  #Train set
  temp1 = A[A['split'].isin(["train"])].sample(n=train_size//2,random_state=random_state)
  temp2 = B[B['split'].isin(["train"])].sample(n=train_size//2,random_state=random_state)

  train = pd.concat([temp1,temp2],ignore_index=True)

  #Test set
  temp3 = A[A['split'].isin(["test"])].sample(n=test_size//2,random_state=random_state)
  temp4 = B[B['split'].isin(["test"])].sample(n=test_size//2,random_state=random_state)

  test = pd.concat([temp3, temp4], ignore_index=True)

  return train, test

In [None]:
from tqdm.notebook import tqdm

def encode_attention(dataset,show_tqdm=True):
  """
  Create a dataset with the attentio of a sequence with its composer
  Args:
    dataset: base Dataframe dataset
    show_tqdm: if false, will not show the progress

  Returns:
    df: DataFrame corresponding to each music sequence attention with its composer
  """
  dataset.reset_index(drop=True,inplace=True)
  X = []
  y = []
  PATH = "" # Set initial path
  for ind in tqdm(dataset.index,leave=show_tqdm):
      try:
          new_PATH = "maestro-v3.0.0/"+dataset["midi_filename"].iloc[ind]
          if new_PATH != PATH:
              # If we found a new path this means we have switch to another piece of music so we recompute the encoded sequence
              PATH = new_PATH
              encoded = encode_midi(PATH)
      
          start = dataset["start_ind"].iloc[ind]
          end = dataset["end_ind"].iloc[ind]
          attention = get_attention(encoded[start:end])
      
          X.append(attention)
          y.append(dataset["canonical_composer"].iloc[ind])
      except:
          print("Unexpected error:", sys.exc_info()[0],"Index:",ind)
          pass

  df = pd.DataFrame(data=X)
  df["canonical_composer"] = y
  return df

In [None]:
TESTS = {
    "H-M":["Joseph Haydn","Wolfgang Amadeus Mozart"],
    "M-D":["Wolfgang Amadeus Mozart","Claude Debussy"],
    "C-S":["Frédéric Chopin","Franz Schubert"],
    "C-B":["Frédéric Chopin","Johann Sebastian Bach"],
    "C-D":["Frédéric Chopin","Claude Debussy"]
}

In [None]:
h = TESTS["M-D"]

print("-----------------------------------")
print("Processing...",h)
print("-----------------------------------")

res = [] # Array that will contain our scores
TRAIN_SIZE = 100
TEST_SIZE = 100
nb_seeds = 1

# Here we can switch the file to 'maestro-XX-unique-titles.csv' with XX corresponding to the sequence size (from 32, 64, 96 or 128)
data = get_data(file='maestro-32-unique-titles.csv',choice='top10',nans=True,remove_duos=True)

for seed in range(nb_seeds):

    train, test = train_test(data,h[0],h[1],TRAIN_SIZE,TEST_SIZE,random_state=seed)

    train_attention = encode_attention(train)
    test_attention = encode_attention(test)

    X_train, y_train = train_attention.loc[:, train_attention.columns != 'canonical_composer'], train_attention['canonical_composer']
    X_test, y_test = test_attention.loc[:, test_attention.columns != 'canonical_composer'], test_attention['canonical_composer']

    cls = get_model(X_train,y_train)

    print("Seed :",seed,"Score :",cls.score(X_test,y_test))
    res.append(cls.score(X_test,y_test))

-----------------------------------
Processing... ['Wolfgang Amadeus Mozart', 'Claude Debussy']
-----------------------------------


  0%|          | 0/100 [00:00<?, ?it/s]

  0%|          | 0/100 [00:00<?, ?it/s]

Seed : 0 Score : 0.57


# Cadence detection using attention matrices

In [None]:
!pip uninstall pandas;
!pip install pretty-midi;
!pip install pandas==1.1.0;
%cd /content/gdrive/My Drive/Stage/CodeMusicTransformer

Found existing installation: pandas 0.25.0
Uninstalling pandas-0.25.0:
  Would remove:
    /usr/local/lib/python3.7/dist-packages/pandas-0.25.0.dist-info/*
    /usr/local/lib/python3.7/dist-packages/pandas/*
Proceed (y/n)? y
  Successfully uninstalled pandas-0.25.0
Collecting pandas==1.1.0
  Downloading pandas-1.1.0-cp37-cp37m-manylinux1_x86_64.whl (10.5 MB)
[K     |████████████████████████████████| 10.5 MB 5.3 MB/s 
Installing collected packages: pandas
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
google-colab 1.0.0 requires astor~=0.8.1, but you have astor 0.8.0 which is incompatible.
google-colab 1.0.0 requires requests~=2.23.0, but you have requests 2.22.0 which is incompatible.
google-colab 1.0.0 requires six~=1.15.0, but you have six 1.12.0 which is incompatible.[0m
Successfully installed pandas-1.1.0


/content/gdrive/My Drive/Stage/CodeMusicTransformer


In [None]:
"""
In order to make our model work with cadence folder we have to make small changes to our encoding function
"""
import pretty_midi


RANGE_NOTE_ON = 128
RANGE_NOTE_OFF = 128
RANGE_VEL = 32
RANGE_TIME_SHIFT = 100

START_IDX = {
    'note_on': 0,
    'note_off': RANGE_NOTE_ON,
    'time_shift': RANGE_NOTE_ON + RANGE_NOTE_OFF,
    'velocity': RANGE_NOTE_ON + RANGE_NOTE_OFF + RANGE_TIME_SHIFT
}


class SustainAdapter:
    def __init__(self, time, type):
        self.start =  time
        self.type = type


class SustainDownManager:
    def __init__(self, start, end):
        self.start = start
        self.end = end
        self.managed_notes = []
        self._note_dict = {} # key: pitch, value: note.start

    def add_managed_note(self, note: pretty_midi.Note):
        self.managed_notes.append(note)

    def transposition_notes(self):
        for note in reversed(self.managed_notes):
            try:
                note.end = self._note_dict[note.pitch]
            except KeyError:
                note.end = max(self.end, note.end)
            self._note_dict[note.pitch] = note.start


# Divided note by note_on, note_off
class SplitNote:
    def __init__(self, type, time, value, velocity):
        ## type: note_on, note_off
        self.type = type
        self.time = time
        self.velocity = velocity
        self.value = value

    def __repr__(self):
        return '<[SNote] time: {} type: {}, value: {}, velocity: {}>'\
            .format(self.time, self.type, self.value, self.velocity)


class Event:
    def __init__(self, event_type, value):
        self.type = event_type
        self.value = value

    def __repr__(self):
        return '<Event type: {}, value: {}>'.format(self.type, self.value)

    def to_int(self):
        return START_IDX[self.type] + self.value

    @staticmethod
    def from_int(int_value):
        info = Event._type_check(int_value)
        return Event(info['type'], info['value'])

    @staticmethod
    def _type_check(int_value):
        range_note_on = range(0, RANGE_NOTE_ON)
        range_note_off = range(RANGE_NOTE_ON, RANGE_NOTE_ON+RANGE_NOTE_OFF)
        range_time_shift = range(RANGE_NOTE_ON+RANGE_NOTE_OFF,RANGE_NOTE_ON+RANGE_NOTE_OFF+RANGE_TIME_SHIFT)

        valid_value = int_value

        if int_value in range_note_on:
            return {'type': 'note_on', 'value': valid_value}
        elif int_value in range_note_off:
            valid_value -= RANGE_NOTE_ON
            return {'type': 'note_off', 'value': valid_value}
        elif int_value in range_time_shift:
            valid_value -= (RANGE_NOTE_ON + RANGE_NOTE_OFF)
            return {'type': 'time_shift', 'value': valid_value}
        else:
            valid_value -= (RANGE_NOTE_ON + RANGE_NOTE_OFF + RANGE_TIME_SHIFT)
            return {'type': 'velocity', 'value': valid_value}


def _divide_note(notes):
    result_array = []
    notes.sort(key=lambda x: x.start)

    for note in notes:
        on = SplitNote('note_on', note.start, note.pitch, note.velocity)
        off = SplitNote('note_off', note.end, note.pitch, None)
        result_array += [on, off]
    return result_array


def _merge_note(snote_sequence):
    note_on_dict = {}
    result_array = []

    for snote in snote_sequence:
        # print(note_on_dict)
        if snote.type == 'note_on':
            note_on_dict[snote.value] = snote
        elif snote.type == 'note_off':
            try:
                on = note_on_dict[snote.value]
                off = snote
                if off.time - on.time == 0:
                    continue
                result = pretty_midi.Note(on.velocity, snote.value, on.time, off.time)
                result_array.append(result)
            except:
                print('info removed pitch: {}'.format(snote.value))
    return result_array


def _snote2events(snote: SplitNote, prev_vel: int):
    result = []
    if snote.velocity is not None:
        modified_velocity = snote.velocity // 4
        if prev_vel != modified_velocity:
            result.append(Event(event_type='velocity', value=modified_velocity))
    result.append(Event(event_type=snote.type, value=snote.value))
    return result


def _event_seq2snote_seq(event_sequence):
    timeline = 0
    velocity = 0
    snote_seq = []

    for event in event_sequence:
        if event.type == 'time_shift':
            timeline += ((event.value+1) / 100)
        if event.type == 'velocity':
            velocity = event.value * 4
        else:
            snote = SplitNote(event.type, timeline, event.value, velocity)
            snote_seq.append(snote)
    return snote_seq


def _make_time_sift_events(prev_time, post_time):
    time_interval = int(round((post_time - prev_time) * 100))
    results = []
    while time_interval >= RANGE_TIME_SHIFT:
        results.append(Event(event_type='time_shift', value=RANGE_TIME_SHIFT-1))
        time_interval -= RANGE_TIME_SHIFT
    if time_interval == 0:
        return results
    else:
        return results + [Event(event_type='time_shift', value=time_interval-1)]


def _control_preprocess(ctrl_changes):
    sustains = []

    manager = None
    for ctrl in ctrl_changes:
        if ctrl.value >= 64 and manager is None:
            # sustain down
            manager = SustainDownManager(start=ctrl.time, end=None)
        elif ctrl.value < 64 and manager is not None:
            # sustain up
            manager.end = ctrl.time
            sustains.append(manager)
            manager = None
        elif ctrl.value < 64 and len(sustains) > 0:
            sustains[-1].end = ctrl.time
    return sustains


def _note_preprocess(susteins, notes):
    note_stream = []

    for sustain in susteins:
        for note_idx, note in enumerate(notes):
            if note.start < sustain.start:
                note_stream.append(note)
            elif note.start > sustain.end:
                notes = notes[note_idx:]
                sustain.transposition_notes()
                break
            else:
                sustain.add_managed_note(note)

    for sustain in susteins:
        note_stream += sustain.managed_notes

    note_stream.sort(key= lambda x: x.start)
    return note_stream

def _note_preprocess2(notes):
    note_stream = []

    for note in notes:
      note_stream.append(note)

    note_stream.sort(key= lambda x: x.start)
    return note_stream


def encode_midi_no_sustain(file_path, get_times=False):
    # Only for time comparison with Transformer
    real_times = []
    approx_times = []
    ############################################

    events = []
    notes = []
    mid = pretty_midi.PrettyMIDI(midi_file=file_path)

    for inst in mid.instruments:
        inst_notes = inst.notes
        # ctrl.number is the number of sustain control. If you want to know abour the number type of control,
        # see https://www.midi.org/specifications-old/item/table-3-control-change-messages-data-bytes-2
        # ctrls = _control_preprocess([ctrl for ctrl in inst.control_changes if ctrl.number == 64])
        notes += _note_preprocess2(inst_notes)

    dnotes = _divide_note(notes)

    #print(dnotes)
    dnotes.sort(key=lambda x: x.time)
    #print('sorted:')
    #print(dnotes)
    cur_time = 0
    cur_vel = 0

    # transformer
    sift = 0
    for snote in dnotes:
        #Make dictionnary with id, real_time, apprx_time
        real_times.append(snote.time*1000)# in milliseconds
        #print(snote.time)
        time_sift_container = _make_time_sift_events(prev_time=cur_time, post_time=snote.time)

        for event in time_sift_container:
          sift += event.value + 1
        approx_times.append(sift*10)# There is no 0

        events += _make_time_sift_events(prev_time=cur_time, post_time=snote.time)
        events += _snote2events(snote=snote, prev_vel=cur_vel)
        # events += _make_time_sift_events(prev_time=cur_time, post_time=snote.time)

        cur_time = snote.time
        cur_vel = snote.velocity

    if get_times:
      return [e.to_int() for e in events], real_times, approx_times
    else:
      return [e.to_int() for e in events]


def decode_midi(idx_array, file_path=None):
    event_sequence = [Event.from_int(idx) for idx in idx_array]
    # print(event_sequence)
    snote_seq = _event_seq2snote_seq(event_sequence)
    note_seq = _merge_note(snote_seq)
    note_seq.sort(key=lambda x:x.start)

    mid = pretty_midi.PrettyMIDI()
    # if want to change instument, see https://www.midi.org/specifications/item/gm-level-1-sound-set
    instument = pretty_midi.Instrument(1, False, "Developed By Yang-Kichang")
    instument.notes = note_seq

    mid.instruments.append(instument)
    if file_path is not None:
        mid.write(file_path)
    return mid

In [None]:
PATH = 'Cadences/midi_files/wtc-i-04.mid'
encoded, A, B = encode_midi_no_sustain(PATH,True)
len(encoded)

4698

In [None]:
midi_data = pretty_midi.PrettyMIDI(PATH)

def offset_to_real_time(midi_data,offset):
    tempo_changes = midi_data.get_tempo_changes()
    bpm = tempo_changes[1][0]
    beat_duration_ms = 1000*60/bpm
    return offset * beat_duration_ms

for i in range(9):
    print('offset {} : {}'.format(i,offset_to_real_time(midi_data,i)))

offset 0 : 0.0
offset 1 : 560.748
offset 2 : 1121.496
offset 3 : 1682.2440000000001
offset 4 : 2242.992
offset 5 : 2803.7400000000002
offset 6 : 3364.4880000000003
offset 7 : 3925.2360000000003
offset 8 : 4485.984


In [None]:
temp = encoded[:30] 
time = 0
print("Event || Time")
for event in temp:
  infos = ""
  if 256 <= event <= 355 :
    time += (event-255)*10

  if 0 <= event <= 127:
    infos += "<- Note_On, Pitch = "+str(event)

  if 128 <= event <= 255:
    infos += "<- Note_Off, Pitch = "+str(event - 128)

  if 356 <= event <= 387:
    infos += "<- VelocityEvent, Volume de la note = "+str(event - 355)
  print(event,"||",time,infos)


Event || Time
378 || 0 <- VelocityEvent, Volume de la note = 23
49 || 0 <- Note_On, Pitch = 49
355 || 1000 
355 || 2000 
279 || 2240 
177 || 2240 <- Note_Off, Pitch = 49
378 || 2240 <- VelocityEvent, Volume de la note = 23
48 || 2240 <- Note_On, Pitch = 48
355 || 3240 
267 || 3360 
176 || 3360 <- Note_Off, Pitch = 48
378 || 3360 <- VelocityEvent, Volume de la note = 23
52 || 3360 <- Note_On, Pitch = 52
355 || 4360 
267 || 4480 
180 || 4480 <- Note_Off, Pitch = 52
378 || 4480 <- VelocityEvent, Volume de la note = 23
51 || 4480 <- Note_On, Pitch = 51
355 || 5480 
355 || 6480 
279 || 6720 
179 || 6720 <- Note_Off, Pitch = 51
378 || 6720 <- VelocityEvent, Volume de la note = 23
56 || 6720 <- Note_On, Pitch = 56
378 || 6720 <- VelocityEvent, Volume de la note = 23
49 || 6720 <- Note_On, Pitch = 49
311 || 7280 
177 || 7280 <- Note_Off, Pitch = 49
378 || 7280 <- VelocityEvent, Volume de la note = 23
51 || 7280 <- Note_On, Pitch = 51


In [None]:
def get_start_end_sequence(sequence,time):
  """
  Get the first starting / ending index of a token in a sequence
  Args:
    sequence: the sequence of integers
    time: token 
  """
  start = -1
  end = -1
  temp = False
  for i in range(len(sequence)):
    if sequence[i] == time and not temp: #Entering 
      start = i
      temp = True
    elif (sequence[i] != time or i == len(sequence)-1) and temp: #leaving
      end = i
      return start,end
    else:
      pass
  return 0,0

get_start_end_sequence([0,0,0,1,1,1,2,2,2,2,1,1,1,1,2,2,2],2)

(6, 10)

In [None]:
def get_cadences_sequences(midi_file,csv_file,size, offsets_X_Y=False):
  """
  Generate from a midi_file and a csv_file a dataset with sequences of size size well labeled for cadence detection

  offsets_X_Y will display the indexes of offsets X and Y in the sequence
  """
  offsets = []

  offsets_bis = []

  cadences_file = pd.read_csv(csv_file)
  midi_data = pretty_midi.PrettyMIDI(midi_file)

  for offset in cadences_file.loc[cadences_file['PAC (truth)'] == True]["offset Z"]:
    # for each offset int the file, we will keep its real time in the offsets array and its offset time in the offsets_bis array
    offsets.append(offset_to_real_time(midi_data,offset))
    offsets_bis.append(offset)

  """ Display X_Y"""
  offsets_X = []
  offsets_Y = []  

  for offset in cadences_file.loc[cadences_file['PAC (truth)'] == True]["offset X"]:
    offsets_X.append(offset_to_real_time(midi_data,offset))

  for offset in cadences_file.loc[cadences_file['PAC (truth)'] == True]["offset Y"]:
    offsets_Y.append(offset_to_real_time(midi_data,offset))
  
  """"""


  start_ind = []
  end_ind = []
  contains_cadence = []
  """"""
  offset_if_cadence = []

  X_start = []
  X_end = []
  Y_start = []
  Y_end = []
  Z_start = []
  Z_end = []
  """"""

  encoded, real_time, approx_time = encode_midi_no_sustain(midi_file,True)

  ###
  real_time = [round(i) for i in real_time]
  ###
  """"""
  time = 0
  times = []
  for event in encoded:
    if 256 <= event <= 355 :
      time += (event-255)*10
    times.append(time)
  """"""
  #print("Time from sequence")
  #print(times)
  #print("Approx Time")
  #print(approx_time)
  #print("Real Time")
  #print(real_time)
  i_start = 0
  i_end = size
  while i_end < len(encoded):
    start_ind.append(i_start)
    end_ind.append(i_end)

    contains_cadence.append(False)
    offset_if_cadence.append(None)

    if offsets_X_Y:
      X_start.append(None)
      X_end.append(None)
      Y_start.append(None)
      Y_end.append(None)
      Z_start.append(None)
      Z_end.append(None)

    for i,off in enumerate(offsets):
      index = real_time.index(round(off))

      """"""
      if offsets_X_Y:
        index_X = real_time.index(round(offsets_X[i]))
        index_Y = real_time.index(round(offsets_Y[i]))
      """"""
      #print(i_end, times)
      if times[i_start] < approx_time[index] < times[i_end]:
        #Here we only keep sequences if its cadence is at least after halt of the sequence
        if (times[i_start]+times[i_end])/2 <= approx_time[index] <= times[i_end]:
          contains_cadence[-1] = True
          offset_if_cadence[-1] = offsets_bis[offsets.index(off)]
          if offsets_X_Y:
            X = get_start_end_sequence(times, approx_time[index_X])
            X_start[-1] = int(X[0])
            X_end[-1] = int(X[1])

            Y = get_start_end_sequence(times, approx_time[index_Y])
            Y_start[-1] = int(Y[0])
            Y_end[-1] = int(Y[1])

            Z = get_start_end_sequence(times, approx_time[index])
            Z_start[-1] = int(Z[0])
            Z_end[-1] = int(Z[1])

        else:
          start_ind.pop()
          end_ind.pop()
          contains_cadence.pop()
          offset_if_cadence.pop()

          if offsets_X_Y:
            X_start.pop()
            X_end.pop()
            Y_start.pop()
            Y_end.pop()
            Z_start.pop()
            Z_end.pop()


    i_start += 1
    i_end += 1
# Export all the data in a dataframe
  df = pd.DataFrame()
  df["start_ind"] = start_ind
  df["end_ind"] = end_ind
  df["contains_cadence"] = contains_cadence
  df["offset_if_cadence"] = offset_if_cadence
  if offsets_X_Y:
    df["X_start"] = X_start
    df["X_end"] = X_end
    df["Y_start"] = Y_start
    df["Y_end"] = Y_end
    df["Z_start"] = Z_start
    df["Z_end"] = Z_end

  return df

In [None]:
id = "01"
A = get_cadences_sequences("Cadences/midi_files/wtc-i-"+id+".mid","Cadences/csv_files/wtc-i-"+id+".csv",64,offsets_X_Y=True)
#A.offset_if_cadence.value_counts()
A
A.loc[A['contains_cadence'] == True].head(3)

Unnamed: 0,start_ind,end_ind,contains_cadence,offset_if_cadence,X_start,X_end,Y_start,Y_end,Z_start,Z_end
1157,1157,1221,True,52.0,1168.0,1178.0,1203.0,1207.0,1211.0,1221.0
1158,1158,1222,True,52.0,1168.0,1178.0,1203.0,1207.0,1211.0,1221.0
1159,1159,1223,True,52.0,1168.0,1178.0,1203.0,1207.0,1211.0,1221.0


In [None]:
def downsample(df):
  """
  Downsample a dataframe so the underrepresented class represent 33% of the total dataset
  """

  cadences = df.loc[df['contains_cadence'] == True]
  not_cadences = df.loc[df['contains_cadence'] == False]

  nmin = df['contains_cadence'].value_counts().min()

  undersample = not_cadences.sample(2*nmin,random_state=42)

  return pd.concat([undersample,cadences]).reset_index(drop=True)

id = "01"
A = get_cadences_sequences("Cadences/midi_files/wtc-i-"+id+".mid","Cadences/csv_files/wtc-i-"+id+".csv",64)
print(A.contains_cadence.value_counts())
print("---Downsampling---") 
print(downsample(A).contains_cadence.value_counts())

False    2378
True       66
Name: contains_cadence, dtype: int64
---Downsampling---
False    132
True      66
Name: contains_cadence, dtype: int64


In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import LeaveOneOut
from tqdm.notebook import tqdm
from sklearn.metrics import f1_score
from sklearn.dummy import DummyClassifier
from sklearn.metrics import confusion_matrix

In [None]:
file_numbers = ["0"+str(i) if i<10 else str(i) for i in range(1,25)]
scores = []
scores2 = []
M_scores = np.zeros((2,2))
D_scores = np.zeros((2,2))

try:
  # Remove files that does not contain any candence
  file_numbers.remove("10")
  file_numbers.remove("12")
  file_numbers.remove("20")
  file_numbers.remove("24")
except Exception:
  pass

PATH = "Cadences/midi_files/wtc-i-03.mid"
train_set = pd.DataFrame()
test_set = pd.DataFrame()

for train_index, test_index in tqdm(LeaveOneOut().split(file_numbers)):
  train_set = pd.DataFrame()
  test_set = pd.DataFrame()
  for i in train_index:
    temp = get_cadences_sequences("Cadences/midi_files/wtc-i-"+file_numbers[i]+".mid","Cadences/csv_files/wtc-i-"+file_numbers[i]+".csv",64)
    dataset = downsample(temp)
    encoded = encode_midi_no_sustain("Cadences/midi_files/wtc-i-"+file_numbers[i]+".mid")

    X, y = [], []

    for ind in dataset.index:
      start = dataset["start_ind"].iloc[ind]
      end = dataset["end_ind"].iloc[ind]
      attention = get_attention(encoded[start:end])

      X.append(attention)
      y.append(dataset["contains_cadence"].iloc[ind])

    df = pd.DataFrame(data=X)
    df["contains_cadence"] = y

    train_set = pd.concat([train_set,df],ignore_index=True)


  for j in test_index:
    temp = get_cadences_sequences("Cadences/midi_files/wtc-i-"+file_numbers[j]+".mid","Cadences/csv_files/wtc-i-"+file_numbers[j]+".csv",64)
    dataset = downsample(temp)
    encoded = encode_midi_no_sustain("Cadences/midi_files/wtc-i-"+file_numbers[j]+".mid")
      
    X, y = [], []
    for ind in dataset.index:
      start = dataset["start_ind"].iloc[ind]
      end = dataset["end_ind"].iloc[ind]
      attention = get_attention(encoded[start:end])

      X.append(attention)
      y.append(dataset["contains_cadence"].iloc[ind])

    df = pd.DataFrame(data=X)
    df["contains_cadence"] = y
     
    test_set = pd.concat([test_set,df],ignore_index=True)

  
  #Encode train set


  X_train, y_train = train_set.loc[:, train_set.columns != 'contains_cadence'], train_set['contains_cadence']
  X_test, y_test = test_set.loc[:, test_set.columns != 'contains_cadence'], test_set['contains_cadence']

  lr = LogisticRegression(solver="liblinear").fit(X_train, y_train)
  dummy = DummyClassifier(strategy="stratified").fit(X_train, y_train)
  y_pred = lr.predict(X_test)
  y_dummy_pred = dummy.predict(X_test)

  scores.append(f1_score(y_test,y_pred))
  scores2.append(f1_score(y_test,y_dummy_pred))
  M_scores += confusion_matrix(y_test,y_pred)
  D_scores += confusion_matrix(y_test,y_dummy_pred)
  print("F1 score =",f1_score(y_test,y_pred),"Dummy F1 score =",f1_score(y_test,y_dummy_pred))

In [None]:
M_scores

In [None]:
D_scores