In [None]:
### CELL 1: Import Python packages ###
import os
from typing import Tuple, Union

# Data and arithmetic packages
import numpy as np

# Music analysis packages
from music21 import *

In [None]:
DATA_DIR = "" # e.g. "C:/midi-music"
OBS = {
    "OBS1": ["<NOTE>", "<CHORD>", "<NOTE>", "<NOTE>", "<REST>", "<NOTE>", "<NOTE>", "<NOTE>", "<CHORD>", "<NOTE>", "<REST>", "<NOTE>", "<NOTE>", "<NOTE>", "<CHORD>"],
    "OBS2": ["<CHORD>", "<CHORD>", "<CHORD>", "<CHORD>", "<NOTE>", "<CHORD>", "<CHORD>", "<REST>", "<CHORD>", "<REST>", "<CHORD>", "<REST>", "<NOTE>", "<CHORD>", "<REST>"],
    "OBS3": ["<REST>", "<NOTE>", "<REST>", "<NOTE>", "<REST>", "<NOTE>", "<CHORD>", "<NOTE>", "<NOTE>", "<CHORD>", "<REST>", "<NOTE>", "<REST>", "<NOTE>", "<NOTE>"],
    "OBS4": ["<REST>", "<NOTE>", "<CHORD>", "<NOTE>", "<CHORD>", "<NOTE>", "<NOTE>", "<REST>", "<CHORD>", "<REST>", "<CHORD>", "<NOTE>", "<NOTE>", "<NOTE>", "<CHORD>"]
}
USE_OBS = OBS["OBS3"]

In [None]:
def load_data() -> list:
    scores = []
    DATA_DIR='/content/drive/MyDrive/midi-music'
    for filename in os.listdir(DATA_DIR):
        if filename.endswith(".mid"):
            filepath = DATA_DIR + '/' + filename
            midi = converter.parse(filepath)
            scores.append(midi)
    return scores

In [None]:
scores=load_data()

In [None]:
scores

[<music21.stream.Score 0x7ce727606c90>,
 <music21.stream.Score 0x7ce726413810>,
 <music21.stream.Score 0x7ce7267c3e10>,
 <music21.stream.Score 0x7ce723812c10>]

In [None]:
def get_indices_of_measures_and_musical_elements_by_score(scores: list) -> Tuple[list, list]:
    all_scores_elements = []
    indices_of_all_measures = []
    for score in scores:
        elements = [element for element in score.recurse()]
        measures = []
        for e in range(len(elements)):
            if isinstance(elements[e], stream.Measure):
                measures.append(e)
        indices_of_all_measures.append(measures)
        all_scores_elements.append(elements)
    return indices_of_all_measures, all_scores_elements

In [None]:
indices_of_all_measures, all_scores_elements = get_indices_of_measures_and_musical_elements_by_score(scores)

In [None]:
def extract_musical_elements(all_scores_elements: list, indices_of_all_measures: list) -> dict:
    def normalize_duration(duration: Union[float, duration.Duration]) -> float:
        _duration = duration
        if type(_duration) is not float:
            _duration = float(_duration)
        return round(_duration, 2)

    def normalize_volume(volume: float) -> float:
        return round(float(volume), 1)

    elements_by_measure_by_score = {}
    num_measures = 0
    len_all_measures = 0
    for s in range(len(all_scores_elements)):
        elements_by_measure_by_score[s] = {}

        # Extract the musical elements of this score
        elements = all_scores_elements[s]
        indices = indices_of_all_measures[s]
        i = 0 # Measure counter
        while True:
            measure = []
            start = indices[i]
            end = indices[i + 1]
            for e in range(start + 1, end):
                element_props = None
                if isinstance(elements[e], note.Rest):
                    element_props = {"name": "<REST>", "duration": normalize_duration(elements[e].quarterLength)}
                if isinstance(elements[e], note.Note):
                    element_props = {"name": "<NOTE>", "pitch": str(elements[e].pitch), "duration": normalize_duration(elements[e].quarterLength)}
                if isinstance(elements[e], chord.Chord):
                    element_props = {"name": "<CHORD>", "notes": [], "duration": normalize_duration(elements[e].duration.quarterLength)}
                    for chord_note in elements[e].notes:
                          element_props["notes"].append(str(chord_note.pitch))
                if not element_props == None:
                    measure.append(element_props)
            elements_by_measure_by_score[s][i] = measure
            num_measures += 1
            len_all_measures += len(measure)
            if i + 1 == len(indices_of_all_measures[s]) - 1:
                break
            else:
                i += 1

    avg_measure_len = round(len_all_measures/num_measures)
    return elements_by_measure_by_score, avg_measure_len

In [None]:
elements_by_measure_by_score, avg_measure_len = extract_musical_elements(all_scores_elements, indices_of_all_measures)

In [None]:
def get_unique_elements_and_freqs(elements_by_measure_by_score: dict) -> Tuple[dict, dict, dict]:
    unique_elems = {}
    freqs = {}
    key = -1

    for score in elements_by_measure_by_score:
        for measure in elements_by_measure_by_score[score]:
            elements = elements_by_measure_by_score[score][measure]
            for element in elements:
                if element not in list(unique_elems.values()):
                    # Create a new key for this element
                    key += 1

                    # Add this element to unique elements
                    unique_elems[key] = element

                    # Add this element to `freqs` dictionary
                    freqs[key] = 1
                else:
                    # Get the key for this element
                    key_for_this_element = list(unique_elems.values()).index(element)

                    # Update the frequency in the `freqs` dictionary
                    freqs[key_for_this_element] += 1

    return unique_elems, freqs

In [None]:
unique_elems, freqs = get_unique_elements_and_freqs(elements_by_measure_by_score)

In [None]:
def calculate_initial_probability_distribution(elements_by_measure_by_score: dict[dict[list]], unique_elems: dict) -> np.ndarray:
    num_elements = len(unique_elems)
    I = np.zeros((num_elements))
    count = 0

    for score in elements_by_measure_by_score:
        for measure in elements_by_measure_by_score[score]:
            first_element = elements_by_measure_by_score[score][measure][0]
            first_element_key = list(unique_elems.values()).index(first_element)
            I[first_element_key] += 1
            count += 1

    I = I/count

    return I

In [None]:
I = calculate_initial_probability_distribution(elements_by_measure_by_score, unique_elems)

In [None]:
print(I.sum())

1.0


In [None]:
### CELL 13: Utility function to generate transition probability matrix ###
def calculate_transition_probability_matrix(elements_by_measure_by_score: dict, unique_elems: dict) -> np.ndarray:
    num_elements = len(unique_elems)
    A = np.zeros((num_elements, num_elements), dtype = "float32")

    for key in range(0, len(unique_elems)):
        unique_element = unique_elems[key]
        count = 0
        for score in elements_by_measure_by_score:
            for m_index in range(len(elements_by_measure_by_score[score])):
                elements = elements_by_measure_by_score[score][m_index]
                for e_index in range(len(elements)):
                    element = elements[e_index]
                    if element == unique_element:
                        count += 1
                        if e_index < len(elements) - 1:
                            next_element = elements[e_index + 1]
                        elif m_index < len(elements_by_measure_by_score[score]) - 1:
                            next_element = elements_by_measure_by_score[score][m_index + 1][0]
                        else:
                            # The last element of the last measure of every score won't have a succeeding musical element.
                            # Arbitrarily assign `next_element` to a half note rest.
                            next_element = {'name': '<REST>', 'duration': 2.0}
                        next_key = list(unique_elems.values()).index(next_element)
                        A[key, next_key] += 1
        A[key, :] = A[key, :]/count
    return A

In [None]:
A = calculate_transition_probability_matrix(elements_by_measure_by_score, unique_elems)

In [None]:
### CELL 15: Utility function to generate emission probability matrix ###
def calculate_emission_probability_matrix(unique_elems: dict, OBS: list) -> np.ndarray:
    num_elements = len(unique_elems)
    B = np.zeros((num_elements, len(OBS)), dtype = "float32")
    for key in unique_elems:
        emitting_element = unique_elems[key]
        for o in range(len(OBS)):
            observation = OBS[o]
            if emitting_element["name"] == observation:
                prob = 1
            else:
                prob = 0
            B[key, o] = prob
    return B

In [None]:
B = calculate_emission_probability_matrix(unique_elems, USE_OBS)

In [None]:
def viterbi(OBS, unique_elems, I, A, B):
    states = list(unique_elems.keys())
    S = len(states)
    T = len(OBS)

    viterbi_lattice = np.zeros((S, T))
    backpointer_matrix = np.zeros((S, T))

    for state in range(len(states)):
        prob = I[state] * B[state][0]
        viterbi_lattice[state][0] = prob
        backpointer_matrix[state][0] = 0

    for t in range(1, len(OBS)):
        for state in range(0, len(states)):
            max_prob = 0
            argmax = 0
            for state_prior in range(len(states)):
                prob = viterbi_lattice[state_prior][t - 1] * A[state_prior][state] * B[state][t]
                if prob > max_prob:
                    max_prob = prob
                    argmax = state_prior
            viterbi_lattice[state][t] = max_prob
            backpointer_matrix[state][t] = argmax
    return viterbi_lattice, backpointer_matrix

In [None]:
viterbi_lattice, backpointer_matrix = viterbi(USE_OBS, unique_elems, I, A, B)

In [None]:
### CELL 19: Utility method to read out best path from Viterbi lattice and backpointer matrix ###
def read_best_path(OBS, unique_elems, viterbi_lattice, backpointer_matrix):
    elements = []

    # Get musical elements for observations in range 1 through len(OBS) starting from the back.
    for t in range(len(OBS) - 1, 0, -1):
        max = 0
        argmax = 0
        for s in range(viterbi_lattice.shape[0]):
            val = viterbi_lattice[s][t]
            if val > max:
                max = val
                argmax = s
        elem_key = backpointer_matrix[argmax][t]
        elem = unique_elems[elem_key]
        elements.append(elem)

    # Get state for first observation.
    max_start = 0
    argmax_start = 0
    for s in range(viterbi_lattice.shape[0]):
        val = viterbi_lattice[s][0]
        if val > max_start:
            max_start = val
            argmax_start = s
    start_elem = unique_elems[argmax_start]
    elements.append(start_elem)


    elements.reverse()
    return elements

In [None]:
elements = read_best_path(OBS, unique_elems, viterbi_lattice, backpointer_matrix)

In [None]:
### CELL 21: Utility function to generate MIDI stream from generated musical elements ###
def generate_midi_stream(elements: list) -> Tuple[list, stream.Stream]:
    music = []
    for element in elements:
        if element["name"] == "<REST>":
            m = note.Rest()
            m.duration.quarterLength = element["duration"]
        if element["name"] == "<CHORD>":
            m = chord.Chord(element["notes"])
            m.duration.quarterLength = element["duration"]
        if element["name"] == "<NOTE>":
            m = note.Note(element["pitch"])
            m.duration.quarterLength = element["duration"]
        music.append(m)
    midi_stream = stream.Stream(music)
    return music, midi_stream

In [None]:
### CELL 22: Generating MIDI stream from generated musical elements ###
music, midi_stream = generate_midi_stream(elements)

In [None]:
### CELL 23: Writing MIDI data to disk ###
FILENAME = "hmm_music_exp.mid"
midi_stream.write("midi", FILENAME)