In [1]:
import numpy as np
import random
import itertools
import math
import tqdm
from tqdm import trange
import time

# Helper Function

In [2]:
key_mapping={
    'C':0,
    'D':2,
    'E':4,
    'F':5,
    'G':7,
    'A':9,
    'B':11
}
def key2num(key):  
  key=key.upper()
  num=key_mapping[key[0]]
  modifier=len(key)
  if modifier==1:
    return num
  elif key[1]=='#':
    return (num+(modifier-1))%12
  elif key[1]=='B' or key[1]=='-':
    return (num-(modifier-1))%12
  elif key[1]=='X':
    return (num+(modifier-1)*2)%12

# key_list to number_list
def keys2num(keys):
    return [key2num(key) for key in keys]

In [3]:
# function to distingish whether notes in given timestamp and chord are within or outside the chord (0=outside,1=within)
import json
with open('../modules/json_files/keychorddict.json') as json_file:
    chord_notes = json.load(json_file)
  
    def note_2_class(chord,notes_at_t,chord_notes=chord_notes):
        note_in_chord=chord_notes[chord]['idx']
        return [int(note in note_in_chord) for note in notes_at_t]

# HMM

In [4]:
### I threat the total length of time as the length of the observered list

class HMM:
    def __init__(self,no_of_state,no_of_value,states,values):
        #randomize all matrix, each row should sum to 1
        #self.emssion_matrix=np.array([ran/ran.sum() for ran in np.array([np.random.rand(no_of_value) for i in range(no_of_state)])]) #b[i][O]  --> probability to emit values[O] from states[i]
        

        #Modification: 2 parameter for all chord  [note inside chord & note outside chord]
        self.emssion_matrix=np.random.rand(no_of_value)
        self.emssion_matrix=self.emssion_matrix/self.emssion_matrix.sum()
        if self.emssion_matrix[0]>self.emssion_matrix[1]:
            temp=self.emssion_matrix[0]
            self.emssion_matrix[0]=self.emssion_matrix[1]
            self.emssion_matrix[1]=temp
        self.emssion_matrix=np.array([self.emssion_matrix,]*len(states)) 
        self.emssion_matrix=np.array([[0.2,0.8],]*len(states)) 
        # assume same probability to emit "note within chord"  for all chords
        
        self.initial_matrix= np.random.rand(no_of_state) #π[i] --> probability to start at states[i]
        self.initial_matrix/= self.initial_matrix.sum()
        
        self.transition_matrix=np.array([ran/ran.sum() for ran in np.array([np.random.rand(no_of_state) for i in range(no_of_state)])])  #a[i][j] --> probability from states[i] transit to states[j]
        
        self.no_of_state=no_of_state
        self.states=states
        self.values=values
        self.observered=None
        
        self.probit_at_i_table=None
        self.probit_transit_i_j_table=None
        self.forward_table=None
        self.backward_table=None
        
        #self.prev_initial_matrix=self.initial_matrix
        #self.prev_emssion_matrix=self.emssion_matrix
        #self.prev_transition_matrix=self.transition_matrix
    def update_state_name(self,name):
        self.states=name
        
    def debug(self):
        print('initial_matrix\n',self.initial_matrix)
        print('transition_matrix\n',self.transition_matrix)
        print('emission_matrix\n',self.emssion_matrix)
        
    def likelihood(self,state,ob_t):
        
        #Modification: convert observation[t] from notes to 2_classes
        chord=self.states[state]  #numeric to chord name
        ob_t=note_2_class(chord,ob_t) #2 class
        
        prob=1
        for x in ob_t:
            prob*=(self.emssion_matrix[state][x])
        return prob
 
    def forward(self,t,j,ob=None,mode=False):
        if ob is None:
            ob=self.observered
        if np.isnan(self.forward_table[t][j]).any():
            if t==0:
                if mode==True:
                    self.forward_table[t][j]=[self.initial_matrix[j]*self.likelihood(j,ob[t]),0]
                    return self.forward_table[t][j][0],self.forward_table[t][j][1]
                else:
                    self.forward_table[t][j]=self.initial_matrix[j]*self.likelihood(j,ob[t])
                    return self.forward_table[t][j]
            else:          
                if mode==True:
                    result=np.array([self.forward(t-1,i,ob,mode)[0]*self.transition_matrix[i][j] for i in range(self.no_of_state)]) * self.likelihood(j,ob[t])
                    self.forward_table[t][j]=[np.max(result),np.argmax(result)]
                    return self.forward_table[t][j][0],self.forward_table[t][j][1]
                else:
                    self.forward_table[t][j]=sum([self.forward(t-1,i,ob,mode)*self.transition_matrix[i][j] for i in range(self.no_of_state)]) * self.likelihood(j,ob[t])
                    return self.forward_table[t][j]
        else:
            return self.forward_table[t][j]

    def backward(self,t,i,ob=None,mode=False):
        if ob is None:
            ob=self.observered
        if np.isnan(self.backward_table[t][i]) or mode==True:
            if t==len(ob)-1:
                self.backward_table[t][i]=1
                return self.backward_table[t][i]
            else:
                if mode==True:
                    self.backward_table[t][i]=max([self.transition_matrix[i][j]*self.likelihood(j,ob[t+1])*self.backward(t+1,j,ob,mode) for j in range(self.no_of_state)])
                    return self.backward_table[t][i]
                else:
                    self.backward_table[t][i]=sum([self.transition_matrix[i][j]*self.likelihood(j,ob[t+1])*self.backward(t+1,j,ob,mode) for j in range(self.no_of_state)])
                    return self.backward_table[t][i]
        else:
            return self.backward_table[t][i]
            
           
    def probit_at_i(self,t,i,ob=None):#Gamma γt(i) = P(qt = i|O,λ)      
        if ob is None:
            ob=self.observered
        if np.isnan(self.probit_at_i_table[t][i]):
            numerator=self.forward(t,i,ob)*self.backward(t,i,ob)#sum probability of all path passing through state[i] at time t
            denominator=sum([self.forward(t,j,ob)*self.backward(t,j,ob) for j in range(self.no_of_state)]) #prob of passing through  ALL_state at time t
            self.probit_at_i_table[t][i]=numerator/denominator
            return self.probit_at_i_table[t][i]
        else:
            return self.probit_at_i_table[t][i]
    
    def probit_transit_i_j(self,t,i,j,ob=None):#epsilon ξt(i, j) = P(qt = i,qt+1 = j|O,λ)
        if ob is None:
            ob=self.observered
        if np.isnan(self.probit_transit_i_j_table[t][i][j]):
            numerator=self.forward(t,i,ob)*self.transition_matrix[i][j]*self.likelihood(j,ob[t+1])*self.backward(t+1,j,ob)#sum probability of all path transit from state[i] to state[j] at time t
            denominator=sum([sum([self.forward(t,m,ob)*self.transition_matrix[m][n]*self.likelihood(n,ob[t+1])*self.backward(t+1,n,ob) for n in range(self.no_of_state)]) for m in range(self.no_of_state)]) #prob of ALL transition combination at time t
            self.probit_transit_i_j_table[t][i][j]=(numerator/denominator)
            return self.probit_transit_i_j_table[t][i][j]
        else:
            return self.probit_transit_i_j_table[t][i][j]
        
    #modify from https://stackoverflow.com/questions/9729968/python-implementation-of-viterbi-algorithm/9730083 , author RBF06(https://stackoverflow.com/users/3311728/rbf06)
    def predict(self,ob,test_key_name):  
        self.update_state_name([test_key_name.lower()[:-1]+states for states in h_states])
        self.forward_table=np.empty((len(ob),self.no_of_state,2))
        self.forward_table[:]= np.NaN

        T1=np.empty((self.no_of_state,len(ob)),'d')
        T2=np.empty((self.no_of_state,len(ob)),'B')
        for idx in range(self.no_of_state):
            T1[idx,0]=self.forward(0,idx,ob,True)[0]
        T2[:,0]=0
        
        for i in range(1,len(ob)):
            for idx in range(self.no_of_state):
                T1[idx,i],T2[idx,i]=self.forward(i,idx,ob,True)
        x = np.empty(len(ob), 'B')
        x[-1] = np.argmax(T1[:, len(ob) - 1])
        for i in reversed(range(1, len(ob))):
            x[i - 1] = T2[x[i], i]
        return x
    
    def train_supervisied(self,obs,labels,key_name): # by MLE
        initial_matrix= np.zeros(self.no_of_state)
        emssion_matrix=np.zeros((2)) 
        transition_matrix=np.array([np.zeros(self.no_of_state) for i in range(self.no_of_state)])
        
        for idx_1,label in enumerate(labels):
            self.update_state_name([key_name[idx_1].lower()[:-1]+states for states in h_states])
            for idx_2,lab in enumerate(label):
                if idx_2==0:
                    initial_matrix[self.states.index(label[idx_2])]+=1
                else:
                    transition_matrix[self.states.index(label[idx_2-1])][self.states.index(label[idx_2])]+=1
                    ob_t=note_2_class(lab,obs[idx_1][idx_2]) #2 class
                    for x in ob_t:
                        emssion_matrix[x]+=1
                        
        #save back to model
        self.initial_matrix=np.array([item/initial_matrix.sum() if item >0 else 0.0000000000001 for item in initial_matrix])
        self.transition_matrix=np.array([row/row.sum() if row.sum()>0 else row+0.0000000000001 for row in transition_matrix])
        for row in range(self.transition_matrix.shape[0]):
            for col in range(self.transition_matrix.shape[1]):
                if self.transition_matrix[row][col]==0:
                    self.transition_matrix[row][col]=0.0000000000001
        self.emssion_matrix=np.array([emssion_matrix/emssion_matrix.sum()] *self.no_of_state) 

        
        
    def train(self,obs,key_name,epochs=2):
        #O:observed values
        #λ:model parameters

        
        for epoch in range(epochs):
            for ob_idx,ob in enumerate(obs):
                self.update_state_name([key_name[ob_idx].lower()[:-1]+states for states in h_states])
                print('running',ob_idx,key_name[ob_idx].lower()[:-1])
                self.probit_at_i_table=np.empty((len(ob),self.no_of_state))
                self.probit_transit_i_j_table=np.empty((len(ob),self.no_of_state,self.no_of_state))
                self.forward_table=np.empty((len(ob),self.no_of_state))
                self.backward_table=np.empty((len(ob),self.no_of_state))
                self.probit_at_i_table[:]= np.NaN
                self.probit_transit_i_j_table[:]=np.NaN
                self.forward_table[:]= np.NaN
                self.backward_table[:]=np.NaN
                
                #initalize DP table
                for t, i in itertools.product(range(len(ob)),range(self.no_of_state)):
                    self.forward(t,i,ob)
                    self.backward(t,i,ob)
                    self.probit_at_i(t,i,ob)
                    for j in range(self.no_of_state):
                        if t!=len(ob)-1:
                            self.probit_transit_i_j(t,i,j,ob)
                
                
                
                #initial matrix
                for i in range(self.no_of_state):
                    self.initial_matrix[i]=self.probit_at_i(0,i,ob)
                    if self.initial_matrix[i]==0 or np.isnan(self.initial_matrix[i]):
                        self.initial_matrix[i]=0.0000000000001
                #transition matrix
                for i, j in itertools.product(range(self.no_of_state),range(self.no_of_state)):
                    self.transition_matrix[i][j]=sum([self.probit_transit_i_j(t,i,j,ob) for t in range(len(ob)-1)])/sum([self.probit_at_i(t,i,ob) for t in range(len(ob)-1)])
                    if self.transition_matrix[i][j]==0 or np.isnan(self.transition_matrix[i][j]):
                        self.transition_matrix[i][j]=0.0000000000001
                #emission matrix
                for j, k in itertools.product(range(self.no_of_state),range(len(self.values))):   
                    total=0
                    
                    #Modification: convert notes to 2 classes (outside or inside given chord)
                    chord=self.states[j]  #numeric to chord name
                    ob_2_class=[note_2_class(chord,ob_t) for ob_t in ob] #2 class

                    for t in range(len(ob)):
                        if k in ob_2_class[t]:
                            #Modification: multiple by how many times do k appear at timestamp t
                            total+=self.probit_at_i(t,j,ob)*ob_2_class[t].count(k)  
                            
                    #Modification: multiple by len(ob[t]), which is the total length of notes at timestamp t                    
                    self.emssion_matrix[:,k]=total/sum([self.probit_at_i(t,j,ob)*len(ob[t]) for t in range(len(ob))])  
                    
                    #smoothing
                    
                    if self.emssion_matrix[j][k]==0 or np.isnan(self.emssion_matrix[j][k]):
                        self.emssion_matrix[:,k]=0.0000000000001

# Training

In [5]:
import json
with open('../data/training_data.json') as json_file:
    data = json.load(json_file)

In [6]:
h_states=['MinorI', 'MinorI+',
          'MinorbII', 'MinorII', 'MinorII7',
          'MinorIII',
          'MinorIV', 'MinorIV+',
          'MinorV', 'MinorV+', 'MinorV+7',
          'MinorVI', 'MinorGerVI', 'MinorFreVI', 'MinorItaVI',
          'MinorVII', 'MinorDimVII', 'MinorDimVII7',
]#all possible chordm will add the key to the chord afterward e.g. cMinorI

In [7]:
data['Etude_in_C_Minor.mxl'].keys()

dict_keys(['chord_seq', 'note_seq'])

In [8]:
#prepare train data
score_name=['Etude_in_C_Minor',
            'Menuet_in_G_Minor',
            'Nocturne_in_C_Minor',
            'Nocturne_in_E_Minor',
            
            'Prelude_in_B_Minor',

            'Prelude_in_C_Minor',
            'Nocturne_in_F_Minor',
            'Il_Vecchio_Castello',
            'Piano_Sonata_No._11',


            'Etude_in_F_Minor',
            'Moonlight_Sonata_1st_Movement',
           ]
score_key=['Cm','Gm','C#m','Em','Bm','Cm','Fm','G#m','Am','Fm','C#m']


x_train=[]
y_train=[]
for idx_root,name in enumerate(score_name):
    select_idx=[]
    #only extract chord in the same key with the score
    for idx,chord in enumerate(data[name+'.mxl']['chord_seq']):
        if chord.startswith(score_key[idx_root]):
            select_idx.append(idx)
    training=[]
    training_label=[]
    for idx in select_idx:
        training.append(data[name+'.mxl']['note_seq'][idx])
        tail=data[name+'.mxl']['chord_seq'][idx].split('_')[1]
        if tail.startswith('German'):
            tail='Ger'+str(tail[6:])
        elif tail=='Dim7':
            tail='DimVII'
        training_label.append(score_key[idx_root][:-1].lower()+'Minor'+tail)
    training=[keys2num(segment)for segment in training]
    
    x_train.append(training)
    y_train.append(training_label)

In [9]:
test=HMM(len(h_states),2,h_states,["outside chord","inside chord"])

In [10]:
test.train_supervisied(x_train,y_train,score_key)#initialize parameter

# Evaluation

In [11]:
#prepare test data
test_score=['Nocturne_No._20_in_C_Minor',
    
]
test_key=['C#m']


#prepare the test data for a specific score (select by index)
score_test_choice=0


#only extract those chords that are in te same key with the score
test_sample=[]
select_idx=[]
testing_label=[]
name=test_score[score_test_choice]
for idx,chord in enumerate(data[name+'.mxl']['chord_seq']):
    if chord.startswith(test_key[score_test_choice]):
        select_idx.append(idx)
training=[]
training_label=[]
for idx in select_idx:
    training.append(data[name+'.mxl']['note_seq'][idx])
    training_label.append(data[name+'.mxl']['chord_seq'][idx])
    
test_note=training
training=[keys2num(segment)for segment in training]
test_sample=training
testing_label=training_label



test_score_key=test_key[score_test_choice]
test_states=[test_score_key.lower()[:-1]+states for states in h_states]



In [12]:

prediction=test.predict(test_sample,test_score_key)

#rephase prediction to share same format with testinng_label
prediction=[test_states[idx].split('Minor')[1] for idx in prediction]
testing_label=[label.split('_')[1] for label in testing_label]
for idx,lab in enumerate(testing_label):
    if lab.startswith('German'):
        testing_label[idx]='Ger'+str(lab[6:])



#evaluation
correct=0
wrong=0
for idx,label in enumerate(prediction):
    if label==testing_label[idx]:
        correct+=1
    else:
        wrong+=1
print('acc',correct/(correct+wrong),correct,wrong)

acc 0.4603174603174603 29 34


In [16]:
list(notes.keys()),test_score_key[:-1]+'Minor'

(['B#', 'F#', 'C#', 'D#', 'A', 'B', 'C'], 'C#Minor')

In [22]:
noteToChord.NoteToChord(['c','b','D#', 'A','c#','f#','b#'],test_score_key[:-1]+'Minor',1,0)

KeyError: '(0, 0)'

In [14]:
noteToChord.NoteToChord(list(notes.keys()),test_score_key[:-1]+'Minor',1,0)

KeyError: '(0, 0)'

In [13]:
import sys
sys.path.append('../modules')
import noteToChord
note2chord_ans=[]
for notes in test_note:
    note2chord_ans.append(noteToChord.NoteToChord(list(notes.keys()),test_score_key[:-1]+'Minor',1,0)[0]['Chord'].split('Minor')[1])
#evaluation
correct=0
wrong=0
for idx,label in enumerate(note2chord_ans):
    if label==testing_label[idx]:
        correct+=1
    else:
        wrong+=1
print('acc',correct/(correct+wrong),correct,wrong)   

KeyError: '(0, 0)'

# Debug

In [None]:
test.debug()

In [None]:
np.array(prediction)

In [None]:
np.array(testing_label)

In [None]:
test_sample[11]

In [None]:
chord_notes['c#MinorV+7']