In [1]:
from music21 import *
import music21
import numpy as np
import os
import dill as pickle
from hmmlearn.hmm import MultinomialHMM

try:
    from tqdm.auto import tqdm
except ImportError as e:
    print(e)
    tqdm = lambda x: x

# Classes

In [2]:
class Song:
    def __init__(self):
        """Initialize Song class.
        
        Attributes:
            score (Score): Song from music21 Score class
            key (str): Key of the song
            parts (ndarray(p,n)): Array of parts(p) and notes(n)
            n_parts (int): Number of parts in the song
            stream (Stream): music21 Stream object generated from parts
        """
        return
        
    def parse(self, score):
        """Parse the MusicXML score into a trainable format"""
        parts = []
        discrete = [[]] * len(score.parts)
        
        # Generate discretized notelist for each part
        for i, p in enumerate(score.parts):
            for n in p.recurse().notesAndRests:
                if n.isRest:
                    discrete[i] = discrete[i] + ([0] * int(n.quarterLength*12))
                else:
                    discrete[i] = discrete[i] + ([n.pitch] * int(n.quarterLength*12))
        #pad to make lengths the same
        max_len = max([len(part) for part in discrete])
        discrete = [
            part + [part[-1]] * (max_len - len(part)) for part in discrete
        ]
        
        discrete = np.array(discrete)
        
        # Generate states from the music
        for i, p in enumerate(score.parts):
            t = 0         # Time for dependence on other parts
            notes = []    # States (pitch[0,t], ..., pitch[n_parts,t], dur)
            for n in p.recurse().notesAndRests:
                pitches = [part[t] for part in discrete]
                notes.append(pitches + [n.quarterLength])
                t += int(n.quarterLength*12)
                
            # Add notes list for each part to part list
            parts.append(notes)  
            
        self.score = score
        self.key = score.analyze('key')
        self.parts = np.array(parts, dtype=object)
        self.n_parts = len(parts)  
        
        return self
        
    def gen_stream(self, parts=None):
        """Generate a song from the parts and notes parsed
        
        Parameters:
            parts (list): list of part indices to generate
        """
        # If None, generate all parts
        if parts is None:
            parts = np.arange(self.n_parts)
        parts = np.array(parts, dtype=int)
        
        # Generate the song for the parts indicated
        s = stream.Stream()
        for i, notes in enumerate(self.parts[parts]):
            p = stream.Part(id=i)
            for n in notes: 
                if n[i] == 0:
                    p.append(note.Rest(quarterLength=n[self.n_parts]))
                else:
                    p.append(note.Note(n[i],quarterLength=n[self.n_parts]))
            s.insert(0,p)
        self.stream = s
        
        return self
        
    def play(self):
        """Play the song generated by gen_stream"""
        self.stream.show('midi')  
        
    def save(self, filename):
        """Save the generated song as a midi file"""
        self.stream.write('midi')        # Not working

In [3]:
class MusicHMM:
    def __init__(self, n_components, reg_eps=0):
        """Initialize MusicHMM class
        
        Attributes:
            songs (list): List of Song class objects to train on
            n_components (int): number of components for the HMM
            HMMs (list(MultinomialHMM)): HMM objects for each part
            states (ndarray): Array of possible states
            state_ind (func): Map from note state to index
            obs (list): Observed sequences of states by index
            obs_len (list): length of observation sequence for each song
        """
        self.n_components = n_components
        self.reg_eps = reg_eps
        
    def fit(self, songs, parts=[0,1,2,3]):
        """Train the HMM on the list of songs for part indices given
        
        Parameters:
            songs (list): List of parsed Song objects to train on
            parts (list): List of part indices from songs to use in training
        """
        self.songs = songs
        
        # Generate state space data and observations by index
        self.init_states(parts)
        self.init_obs_matrices(parts)
        
        # Train a HMM for each part
        n_parts = len(parts)
        n_features = len(self.states)
        HMMs = []
        for i in range(n_parts):
            obs = np.array(self.obs[i]).reshape(-1, 1)   # Reshape observations
            hmm = MultinomialHMM(n_components=self.n_components)
            hmm.n_features = n_features
            hmm.fit(obs, lengths=self.obs_len[i])
            if self.reg_eps != 0:
                hmm.emissionprob_ = hmm.emissionprob_* (1-hmm.emissionprob_.shape[1] * self.reg_eps / n_features)\
                                    + self.reg_eps/n_features
            HMMs.append(hmm)
            
        self.HMMs = HMMs
        self.n_parts = len(parts)
        self.parts = parts
        
        return self
        
    def init_states(self, parts):
        """Create and save a dictionary of unique note states"""
        states = {tuple(n) for song in self.songs for p in song.parts[parts] for n in p}
                    
        self.states = list(states)
        self.states_dict = {n:i for i,n in enumerate(self.states)}
        # Add a observation state for when we don't recognize the input
        self.state_ind = lambda n: self.states_dict.get(tuple(n), self.missing_state_id)
        self.missing_state_id = len(self.states)
        self.states.append(self.states[0])
        self.states = np.array(self.states)
        
    def init_obs_matrices(self, parts):
        """Create the matrices of note indices observed"""
        # Initialize
        n_parts = len(parts)
        obs = [[]] * n_parts
        obs_len = [[]] * n_parts
        
        for i in range(n_parts):
            for song in self.songs:
                p = song.parts[parts[i]]
                seq = [self.state_ind(n) for n in p]
                obs[i] += seq                                     ###########################
                obs_len[i].append(len(seq))
                
        self.obs = obs
        self.obs_len = obs_len 
                
               
    def gen_song(self, num_notes=40):
        """Sample a new song from the HMM
        
        Parameters:
            num_notes (int): Number of notes to sample from each part
            parts (list): List of part indices to generate in song
        """
        s = stream.Score()
        part_lengths = []
        for i in range(self.n_parts):
            hmm = self.HMMs[i]
            ind = hmm.sample(num_notes)[0].ravel()
            states = self.states[ind]
            T = 0
            
            p = stream.Part(id=i)
            for n in states: 
                if n[i] == 0:
                    p.append(note.Rest(quarterLength=n[self.n_parts]))
                else:
                    p.append(note.Note(n[i],quarterLength=n[self.n_parts]))
                T += n[self.n_parts]
            s.insert(0,p)
            part_lengths.append(T)
        
        max_len = max(part_lengths)    
        for i, p in enumerate(s.parts):
            if part_lengths[i] < max_len:
                p.append(note.Rest(quarterLength=max_len-part_lengths[i]))
            
        new_song = Song()
        new_song.parse(s)
        new_song.gen_stream()
        
        return new_song
    
    def predict_proba(self, songs):
        """Returns the log-likelihood of each song given the model"""
        if issubclass(songs.__class__, Song):
            songs = [songs]
        songs = np.array(songs, dtype=object)
        
        return np.array([self.predict_proba_single(song) for song in songs])
        
    def predict_proba_single(self, song):
        log_likelihood = 0.
        n_parts = len(self.parts)
        return np.sum([hmm.score(self._get_obs(song, self.parts[i])) for i, hmm in enumerate(self.HMMs)])
        """for i, hmm in enumerate(self.HMMs):
            obs = self._get_obs(song, self.parts[i])
            log_likelihood += hmm.score(obs)
        return log_likelihood"""
    
    def _get_obs(self, song, part):
        p = song.parts[self.parts[part]]
        return np.array([self.state_ind(n) for n in p]).reshape(-1,1)

In [4]:
class MusicHMMClassifier:
    """
    Class for classifying music by composer using HMMs. The predictions are made based on maximum log-likelihood.
    """
    def __init__(self, n_components, parts=[0,1,2,3], reg_eps=1e-2):
        self.labels = []
        self.hmms_maj = []
        self.hmms_min = []
        
        self.n_components = n_components
        self.parts = parts
        self.reg_eps = reg_eps
        
    def fit_items(self, *args):
        """
        Fits the classifiers, where each argument is a tuple of (composer name, [list of Maj songs], [list of Min songs])
        """
        # Train on the given classifiers
        for composer, X_maj, X_min in args:
            if composer in self.labels:
                continue
            else:
                idx = len(self.labels)
                self.labels.append(composer)
            # Train models for major and minor pieces
            if len(X_maj) > 0:
                self.hmms_maj.append(MusicHMM(self.n_components, reg_eps=self.reg_eps).fit(X_maj, parts=self.parts))
            else:
                self.hmms_maj.append(None)
            if len(X_min) > 0:
                self.hmms_min.append(MusicHMM(self.n_components, reg_eps=self.reg_eps).fit(X_min, parts=self.parts))
            else:
                self.hmms_min.append(None)
        
        return self
    
    def fit(X, y, labels=None):
        """
        Fits the classifiers.
        """
        X_maj, X_min = X
        y_maj, y_min = y
        # cast as arrays
        y_maj = np.array(y_maj)
        y_min = np.array(y_min)
        X_maj = np.array(X_maj)
        X_min = np.array(X_min)
        
        if labels is None:
            labels = np.unique(y)
        
        masks = [(y_maj == l, y_min == l) for l in labels]
        self.fit_items(*[(l, X_maj[m_maj], X_min[m_min]) for l, (m_maj, m_min) in zip(labels, masks)])
        
        return self
    
    def _major_likelihood(self, song):
        return [
            mhmm.predict_proba_single(song) for mhmm in self.hmms_maj
        ]
    
    def _minor_likelihood(self, song):
        return [
            mhmm.predict_proba_single(song) for mhmm in self.hmms_min
        ]
    
    def predict_proba(self, X):
        """Returns the log-likelihood of each song given each model.
        This will be a (n, H) array, where n is the number of songs provided and H is the number of composers known to this model.
        """
        songs = X
        if issubclass(songs.__class__, Song):
            songs = [songs]
        
        # Predictions are separate for major/minor key songs
        is_maj = ['major' in song.key.name for song in songs]
        
        likelihoods = np.array([
            self._major_likelihood(song) if maj else self._minor_likelihood(song) for song, maj in zip(songs, is_maj)
        ])
        return likelihoods
        
    def predict(self, X):
        """Predicts the labels for each """
        likelihood = self.predict_proba(X)
        label_indices = np.argmax(likelihood, axis=1)
        return np.array(self.labels, dtype=object)[label_indices]
        

# Load data

In [5]:
def load_clean_split(composer_name, train_prop=0.6, seed=None, accept=lambda p: len(p.parts)==4):
    """
    Loads the corpus, filter to songs with 4 parts, transposes to C, splits into major and minor, splits into train/test.
    train_prop can be either a proportion or a fixed numer.
    
    Returns:
        (major, minor) - tuple of training data
        (major, minor) - tuple of test data
    """
    rng = np.random.RandomState(seed=seed)
    paths = corpus.getComposer(composer_name)
    transposed_major = []
    transposed_minor = []
    # Load songs
    for p_name in tqdm(paths):
        p = corpus.parse(p_name)
        if not accept(p):
            continue
        k = p.analyze('key')
        i = interval.Interval(k.tonic, pitch.Pitch('C'))
        song = Song().parse(p.transpose(i)).gen_stream()
        if 'major' in k.name:
            transposed_major.append(song)
        elif 'minor' in k.name:
            transposed_minor.append(song)
    # Train/test split
    def split(l):
        """Splits the list into train and test portions"""
        l = np.array(l, dtype=object)
        if int(train_prop) == train_prop:
            num = train_prop
        else:
            num = int(len(l) * train_prop)
        # Select which ones to use
        idx = rng.choice(len(l), size=num, replace=False).astype(int)
        mask = np.full_like(l, False, dtype=bool)
        mask[idx] = True
        return l[mask], l[~mask]
        
    maj_tr, maj_test = split(transposed_major)
    min_tr, min_test = split(transposed_minor)
    
    return (maj_tr, min_tr), (maj_test, min_test)

In [6]:
# This takes about 20 minutes to run
bach_train, bach_test = load_clean_split('bach', train_prop=50)
pal_train, pal_test = load_clean_split('palestrina', train_prop=50)

  0%|          | 0/433 [00:00<?, ?it/s]

  0%|          | 0/1318 [00:00<?, ?it/s]

In [11]:
print(np.sum((bach_train[0].shape, bach_test[0].shape, bach_train[1].shape, bach_test[1].shape)))
print(np.sum((pal_train[0].shape, pal_test[0].shape, pal_train[1].shape, pal_test[1].shape)))

368
498


In [12]:
print(bach_train[0].shape, bach_test[0].shape, bach_train[1].shape, bach_test[1].shape)
print(pal_train[0].shape, pal_test[0].shape, pal_train[1].shape, pal_test[1].shape)

(50,) (132,) (50,) (136,)
(50,) (180,) (50,) (218,)


In [20]:
# Further split into a validation set
val_size = 50
bach_fulltest = bach_test
pal_fulltest = pal_test
np.random.shuffle(bach_fulltest[0])
np.random.shuffle(bach_fulltest[1])
np.random.shuffle(pal_fulltest[0])
np.random.shuffle(pal_fulltest[1])
bach_val_maj, bach_test_maj = bach_fulltest[0][:val_size], bach_fulltest[0][val_size:]
bach_val_min, bach_test_min = bach_fulltest[1][:val_size], bach_fulltest[1][val_size:]
pal_val_maj, pal_test_maj = pal_fulltest[0][:val_size], pal_fulltest[0][val_size:]
pal_val_min, pal_test_min = pal_fulltest[1][:val_size], pal_fulltest[1][val_size:]

# Testing a classifier

In [23]:
n_states = 10
mhmm_classifier = MusicHMMClassifier(n_states, parts=[0,1,2,3]).fit_items(
    ('bach', *bach_train),
    ('palestrina', *pal_train)
)

Fitting a model with 25659 free scalar parameters with only 11778 data points will result in a degenerate solution.
Fitting a model with 25659 free scalar parameters with only 11778 data points will result in a degenerate solution.
Fitting a model with 25659 free scalar parameters with only 11778 data points will result in a degenerate solution.
Fitting a model with 25659 free scalar parameters with only 11778 data points will result in a degenerate solution.
Fitting a model with 27659 free scalar parameters with only 12471 data points will result in a degenerate solution.
Fitting a model with 27659 free scalar parameters with only 12471 data points will result in a degenerate solution.
Fitting a model with 27659 free scalar parameters with only 12471 data points will result in a degenerate solution.
Fitting a model with 27659 free scalar parameters with only 12471 data points will result in a degenerate solution.
Fitting a model with 93299 free scalar parameters with only 27292 data p

In [31]:
print("="*4+" Validation set results "+'='*4)
print('Accuracy on major-key Bach songs: {:.6f}'.format(np.mean(mhmm_classifier.predict(bach_val_maj) == 'bach')))
print('Accuracy on minor-key Bach songs: {:.6f}'.format(np.mean(mhmm_classifier.predict(bach_val_min) == 'bach')))
print('Accuracy on major-key Palestrina songs: {:.6f}'.format(np.mean(mhmm_classifier.predict(pal_val_maj) == 'palestrina')))
print('Accuracy on minor-key Palestrina songs: {:.6f}'.format(np.mean(mhmm_classifier.predict(pal_val_min) == 'palestrina')))

==== Validation set results ====
Accuracy on major-key Bach songs: 1.000000
Accuracy on minor-key Bach songs: 1.000000
Accuracy on major-key Palestrina songs: 0.980000
Accuracy on minor-key Palestrina songs: 0.980000


In [30]:
print("="*4+" Test set results "+'='*4)
print('Accuracy on major-key Bach songs: {:.6f}'.format(np.mean(mhmm_classifier.predict(bach_test_maj) == 'bach')))
print('Accuracy on minor-key Bach songs: {:.6f}'.format(np.mean(mhmm_classifier.predict(bach_test_min) == 'bach')))
print('Accuracy on major-key Palestrina songs: {:.6f}'.format(np.mean(mhmm_classifier.predict(pal_test_maj) == 'palestrina')))
print('Accuracy on minor-key Palestrina songs: {:.6f}'.format(np.mean(mhmm_classifier.predict(pal_test_min) == 'palestrina')))

==== Test set results ====
Accuracy on major-key Bach songs: 1.000000
Accuracy on minor-key Bach songs: 1.000000
Accuracy on major-key Palestrina songs: 0.992308
Accuracy on minor-key Palestrina songs: 0.970238
