In [1]:
from abc_utils import *
import pandas as pd
from hmmlearn import hmm
import numpy as np
from sklearn.metrics import accuracy_score

In [2]:
train_set, train_lengths, _, val_set, val_lengths, _, test_set, test_lengths, _ = load_datasets(return_test=True)

In [3]:
train_set.head(5)

Unnamed: 0,measure,beat,chord,melody,first_key_in_song
0,0.0,1.0,1,80,G major
1,0.0,2.5,1,78,G major
2,0.0,3.0,1,77,G major
3,0.0,3.5,1,75,G major
4,1.0,1.0,22,77,G major


In [4]:
def load_song_subset(train_set, train_lengths, indices):
    """
    Create a subset of the train"""
    end_positions = np.cumsum(train_lengths)
    positions = np.insert(end_positions, 0, np.array([0]))
    songs = []
    for i in indices:
        song = train_set.iloc[positions[i] : positions[i+1]]
        songs.append(song)

    # return songs and lengths
    return pd.concat(songs), train_lengths.iloc[indices]

In [5]:
def ffill_obs(melody_obs: np.ndarray, unique_obs: dict) -> np.ndarray:
    # make a smaller array out of the unique observations
    possible_obs = list(set(unique_obs.flatten()))

    df_melody_obs = pd.Series(melody_obs)
    df_melody_obs[~df_melody_obs.isin(possible_obs)] = np.nan

    # fill forward first to fill all the holes
    df_melody_obs.ffill(inplace=True)

    # then fill backward to catch the case where
    # the beginning is empty
    df_melody_obs.bfill(inplace=True)

    return df_melody_obs.values.flatten()

In [6]:
def chord_accuracy(full_pred: np.array, true_states: np.array, num_chords: int=None, num_notes: int=None):
    '''
    Given the predicted matrix of states, compute the misclassification rate compared with the true_observations.
    Could be edited in the future to also compute the accuracy of our predicted note sequence.
    '''
    # check to make sure these are specified correctly
    if num_chords is None:
        raise ValueError("num_chords must be specified")
    if num_notes is None:
        raise ValueError("num_notes must be specified")
    
    # obtain the actual predicted chords 
    pred_chords = full_pred[:, num_chords-1]
    true_chords = true_states[:len(pred_chords), num_chords-1]

    # obtain the accuracy
    chord_acc = accuracy_score(true_chords, pred_chords)
    
    return chord_acc

In [7]:
def fit_model(train_set: pd.DataFrame, train_lengths: pd.Series, num_chords: int=1, num_notes: int=0, subset: bool=False, indices=None, lam: int=None, trans_prior: float=0.1, emissions_prior: float=0.1):
    """ 
    Takes in the train set and parameters for the state space and returns the trained model, along with all of the dictionaries needed to decode the model as a tuple.

    To train on a smaller subset of the full train set, use the subset argument and pass in the indices needed. Uses the load_song_subset function.
    """
    # check if we want to do a subset of the full train set; if so, perform it
    if subset:
        # check that indices are specified; raise and error if not
        if indices is None:
            raise ValueError("Indices must be specified if subset=True")
        train_set, _ = load_song_subset(train_set, train_lengths, indices)

    # obtain the states and observations from the songs
    true_states, true_observations = dataframe_to_states(train_set, num_chords, num_notes)
    
    # create the transition matrices for the model
    transition_matrix, emission_probs, unique_states, unique_obs, states_to_index, observation_to_index = states_to_transition(true_states, true_observations, lam, trans_prior, emissions_prior)

    # now initialize the model and set the matrices for it
    model = hmm.CategoricalHMM(n_components=transition_matrix.shape[0], init_params='')
    model.transmat_ = transition_matrix.T
    model.emissionprob_ = emission_probs.T

    # starting_state = np.zeros(unique_states.shape[1])
    # starting_state_index = states_to_index[tuple(starting_state)]

    # start_probs = np.zeros(transition_matrix.shape[0])
    # start_probs[starting_state_index] = 1
    
    model.startprob_ = np.ones(transition_matrix.shape[0]) / transition_matrix.shape[0]

    # return the model,  the dictionaries
    return model, (unique_states, unique_obs, states_to_index, observation_to_index)

In [8]:
def predict_states(model: hmm.CategoricalHMM, all_dicts: tuple, observation: np.ndarray, song_lengths: list):
    """
    Uses the model to decode an observation. The all_dicts tuple should contain the model dictionaries returned from fit_model
    Returns the predicted states.
    """
    # unpack the tuple to get what we need
    unique_states, unique_obs, _, observation_to_index = all_dicts

    # perform a forward fill on the observation in case there are any values in it that we have never seen before
    observation = ffill_obs(observation, unique_obs)
    
    # get the indices of the observation
    observation_indices = np.array([int(observation_to_index[(o,)]) for o in observation])

    # get the predicted state indices
    _, pred_indices = model.decode(observation_indices.reshape(-1, 1), lengths=song_lengths)

    # use the unique_states dictionary to take the indices to the actual states
    pred_states = unique_states[pred_indices, :]

    # return the predicted states
    return pred_states

In [9]:
def redact(seq, lam):
    """
    Redact a sequence of chords to only contain lam repetitions of any given chord in the sequence. Takes in a one-dimensional sequence and returns the 
    shortened sequence
    """
    if len(seq.shape) != 1:
        raise TypeError("array must be 1-dimensional")

    # start building a mask for the sequence: True if we are below lam repetitions, False otherwise
    curr_val = seq[0]
    length = 1
    mask = [True]

    # iterate through and create the mask
    for i in range(1, len(seq)):
        if seq[i] == curr_val:
            if length <= lam - 1:
                mask.append(True)
            else:
                mask.append(False)
            length += 1
        else:
            mask.append(True)
            curr_val = seq[i]
            length = 1

    # mask out the values and return
    mask = np.array(mask)
    return seq[mask]

In [10]:
def get_prediction(model, all_dicts, val_set: pd.DataFrame, val_lengths: pd.Series, num_chords: int=1, num_notes: int=0, subset: bool=False, indices=None, do_print: bool=True):
    if subset:
        val_set, val_lengths = load_song_subset(val_set, val_lengths, indices)
    
    true_states, new_song_obs = dataframe_to_states(val_set, num_chords, num_notes)

    # get the predicted states (chop off the first element of the songs because it added a 0)
    pred_states = predict_states(model, all_dicts, new_song_obs[1:], val_lengths.values.flatten().tolist())

    
    # print the results, then return the results and the accuracy
    if do_print:
        print("Pred\t\tTrue")
        cumul = np.cumsum(val_lengths.values)
        for i in range(len(pred_states)):
            if i in set(cumul):
                print("----- New Song -----")
            print(f"{pred_states[i]}\t\t{true_states[i]}")

    # get the accuracy
    accuracy = chord_accuracy(pred_states, true_states, num_chords, num_notes)
    print("Accuracy:", accuracy)

    return pred_states, accuracy

In [None]:
# read in string
with open('amazing_grace.txt') as f:
    ama_gra = f.read()

# preprocess string to abc string
ama_gra_abc = dataset_to_abc(ama_gra, 'ama_gra', '001')

# preprocess abc to dataframe 
ag_dataframe = abc_to_dataframe(ama_gra_abc)


In [None]:
# fit the model
# num_chords = 1
# num_notes = 0




# run through the lambda values and get the accuracy on the validation set for that lambda value
for num_c_and_n, t_prior, e_prior in [((1, 0), 50., 2025.), 
                                      ((1, 0), 50., 3012.5), 
                                      ((1, 0), 1037.5, 1037.5),
                                      ((1, 0), 30., 0.),]:
    num_chords, num_notes = num_c_and_n
    model, all_dicts = fit_model(train_set, train_lengths, num_chords, num_notes, trans_prior=t_prior, emissions_prior=e_prior)

    # get the prediction
    print(fr"n_chords = {num_chords}, n_notes = {num_notes}, t_prior = {t_prior}, e_prior = {e_prior}")
    pred_states, acc = get_prediction(model, all_dicts, ag_dataframe, pd.Series([len(ag_dataframe)]), do_print=True)


Processing states:   8%|▊         | 192837/2414976 [00:20<03:52, 9542.40it/s] 


KeyboardInterrupt: 

Processing states: 100%|██████████| 36/36 [00:00<00:00, 4421.00it/s]

Accuracy: 0.3611111111111111





(array([[1],
        [1],
        [1],
        [1],
        [1],
        [1],
        [1],
        [1],
        [1],
        [1],
        [1],
        [1],
        [1],
        [1],
        [1],
        [1],
        [1],
        [1],
        [1],
        [1],
        [1],
        [1],
        [1],
        [1],
        [1],
        [1],
        [1],
        [1],
        [1],
        [1],
        [1],
        [1],
        [1],
        [1],
        [1],
        [1]]),
 0.3611111111111111)

In [12]:
# get the true states (should be the same no matter what)
num_chords = 1
num_notes = 0
true_states, _ = dataframe_to_states(val_set, num_chords, num_notes)

# get the accuracy of the all I sequence
baseline = accuracy_score(true_states[:, num_chords-1].flatten(), np.ones(len(true_states)))
print("Baseline Accuracy:", baseline)

Processing states: 100%|██████████| 604399/604399 [00:18<00:00, 33542.69it/s]

Baseline Accuracy: 0.3456833223031105



