In [1]:
import os
import numpy as np
import pandas as pd
import music21 as m21
import mido
from typing import List
from itertools import product
from collections import defaultdict
from sklearn.externals import joblib
from scipy.sparse import dok_matrix, vstack
from easy_ml.tools.util import download_from_gcs

In [2]:
%matplotlib inline

In [3]:
from matplotlib import pyplot as plt

In [39]:
DATA_DIR = '/home/jovyan/persistent_data/data/classical-corpus/'

In [4]:
download_from_gcs(bucket_name='midi-files',
                  prefix='collections/classical-collection/k_collection/',
                  local_fs_loc=DATA_DIR)

In [5]:
midi_files = os.listdir(DATA_DIR)

## Note Encoding Schemes
* m-Bar Encodings: Similar to a term-document frequency matrix. Encode a multinomial distribution of notes over a sequence of m measures
    * Parts together in their
    * Pitches encoded as their numerical value, rests encoded as 0
    * Pitches encoded as their numerical value, no rests
* m-bar time series: Each part is an m-bar time series over 127 notes
    * Pitches encoded as their numerical value, rests encoded as 0 (only sensible way in this scenario)
* n-Note Encodings: Similar to n-grams; sequences of m measures will be encoded into $127^n$ features
    * Pitches encoded as their numerical value, rests encoded as 0
    * Pitches encoded as their numerical value, no rests
* n-Note Pitch-Difference Encodings: Similar to above, but instead of mapping each feature distinct pitch as a feature, the difference between subsequent pitches will be analyzed to try to capture melodic patterns
    * Pitches encoded as their numerical value, rests encoded as 0? (probably not sensible in this scenario)
    * Pitches encoded as their numerical value, no rests
    * Do we add durations (like in A Comparison of Statistical Approaches to Symbolic Genre Recognition)

**NOTE** Generate the full feature set by iterating through the entire scale of notes

### n-Note Encodings

In [334]:
# TODO: coansistency in variable namings (e.g. path_ vs path)
# TODO: docstrings
# TODO: save additional metadata?

In [6]:
def window_gen(sequence, n):
    windowed_iterator = []
    low = 0
    high = n
    for note in sequence:
        window = sequence[low:high]
        if len(window) < n:
            break
        low += 1
        high += 1
        yield tuple(window)

In [79]:
class MidiFeatureCorpus(object):
    def __init__(self, path: str, note_window_size: int = 2):
        self.path = path
        self.note_window_size_ = note_window_size
        self.files_ = self._depth_first_midi_search(self.path)
        self.note_sequence_set = self.initialize_note_sequence_set(note_window_size)
        self.sparse_matrix = dok_matrix((len(self.files_),
                                         len(self.note_sequence_set)),
                                        dtype=np.float32)
    
    def _depth_first_midi_search(self, path: str) -> List[str]:
        files_out = []
        paths = os.listdir(path)
        for p in paths:
            full_subpath = path + "/" + p
            try:
                dir_contents = os.listdir(full_subpath)
                dfs_results = self._depth_first_midi_search(full_subpath)
                for file in dfs_results:
                    files_out.append(file)
            except NotADirectoryError:
                if full_subpath.endswith(".mid"):
                    files_out.append(full_subpath)
        return files_out
    
    @staticmethod
    def initialize_note_sequence_set(window_size: int):
        notes = [str(i) for i in range(128)]
        notes_copies = [notes for i in range(window_size)]
        note_sequences = []
        for combo in product(*notes_copies):
            note_sequences.append(combo[0] + "|" + combo[1])
        return note_sequences
    
    @staticmethod
    def get_n_note_sequence(midi: mido.MidiFile,
                            note_window_size: int = 2):
        notes = [str(m.note) for m in midi if m.type == "note_on"]
        n_note_sequences = []
        for note_seq in window_gen(notes, note_window_size):
            n_note_sequences.append("|".join([note for note in note_seq]))
        return n_note_sequences
    
    @staticmethod
    def sequence_encoder(seq: List[str]):
        d = defaultdict(float)
        for entry in seq:
            d[entry] += 1.
        return d
    
    def _parse_file_as_sequence(self, file_name):
        parsed_file = mido.MidiFile(file_name)
        return self.get_n_note_sequence(parsed_file, self.note_window_size_)
        
        
    def parse_corpus(self):
        for i, file in enumerate(self.files_):
            try:
                sequence = self._parse_file_as_sequence(file)
                encoded_sequence = self.sequence_encoder(sequence)
                for (seq, count) in encoded_sequence.items():
                    j = self.note_sequence_set.index(seq)
                    self.sparse_matrix[i, j] = count
            except:
#             except (OSError, KeyError, EOFError, ValueError) as e:
                continue

In [80]:
class LabeledCorpusSet(object):
    def __init__(self, path: str, note_window_size: int = 2):
        self.path_ = path
        self.note_window_size_ = note_window_size
        self.corpus_name_list_ = os.listdir(self.path_)
        self.corpus_labels = []
        self.corpus_list_ = []
        matrix_shape = (0, len(MidiFeatureCorpus.initialize_note_sequence_set(note_window_size)))
        self.sparse_matrix = dok_matrix(matrix_shape)
        self.parsed_ = False
        
    def parse_corpus_set(self):
        """
        Iterates through the files in the corpus. Will ignore directory structure within
        a corpus (e.g. if cantatas and sonatas are in different files)
        """
        matrix_set = []
        for corpus_name in self.corpus_name_list_:
            file_path = self.path_ + corpus_name
            print("reading from {}".format(file_path))
            corpus = MidiFeatureCorpus(file_path, self.note_window_size_)
            corpus.parse_corpus()
            self.corpus_list_.append(corpus)
            for label in range(corpus.sparse_matrix.shape[0]):
                self.corpus_labels.append(corpus_name)
            matrix_set.append(corpus.sparse_matrix)
        self.sparse_matrix = vstack(matrix_set)
        self.parsed_ = True

In [81]:
labeled_corpus = LabeledCorpusSet(DATA_DIR)

In [None]:
labeled_corpus.parse_corpus_set()

reading from /home/jovyan/persistent_data/data/classical-corpus/gabrieli's
reading from /home/jovyan/persistent_data/data/classical-corpus/telemann
reading from /home/jovyan/persistent_data/data/classical-corpus/medtner
reading from /home/jovyan/persistent_data/data/classical-corpus/haydn
reading from /home/jovyan/persistent_data/data/classical-corpus/dvorak
reading from /home/jovyan/persistent_data/data/classical-corpus/reger
reading from /home/jovyan/persistent_data/data/classical-corpus/schoenberg
reading from /home/jovyan/persistent_data/data/classical-corpus/couperin
reading from /home/jovyan/persistent_data/data/classical-corpus/franck
reading from /home/jovyan/persistent_data/data/classical-corpus/lasso
reading from /home/jovyan/persistent_data/data/classical-corpus/joplin
reading from /home/jovyan/persistent_data/data/classical-corpus/busoni
reading from /home/jovyan/persistent_data/data/classical-corpus/bruckner
reading from /home/jovyan/persistent_data/data/classical-corpus/s

In [None]:
joblib.dump(labeled_corpus, "/home/jovyan/persistent_data/data/dumps/labeled_corpus.pkl",
           compress=1)

# Modeling

In [61]:
def log_likelihood(beta):
    theta_dot_x = beta.dot(X.T)
    log_prob_data_given_theta = y * theta_dot_x - np.log(1 + np.exp(theta_dot_x))
    return np.sum(log_prob_data_given_theta)

In [62]:
def predicted_probabilities(beta):
    exp_log_odds = np.exp(np.dot(beta, X.T)).T
    return (1 / (1 + exp_log_odds)).ravel()

def score_function(beta, probabilities, lmbda):
    return np.dot(X.T, (y - probabilities)) + lmbda*beta

def hessian(beta, probabilities, lmbda):
    W = np.eye(X.shape[0])
    for i in range(W.shape[0]):
        W[i,i] = probabilities[i] * (1 - probabilities[i])
    return X.T.dot(W).dot(X) + lmbda*np.eye(X.shape[1])

In [63]:
def newton_step(beta, lmbda):
    probs = predicted_probabilities(beta)
    score = score_function(beta, probs, lmbda)
    hess = hessian(beta, probs, lmbda)
    step = np.linalg.inv(hess).dot(score.T).ravel()
    return (beta - step).reshape(beta.shape)

In [64]:
def newton_raphson(beta_init, lmbda=0.1, num_iter=10):
    ll = []
    current_beta = beta_init
    for i in range(num_iter):
        log_lik = log_likelihood(current_beta)
        ll.append(log_lik)
        current_beta = newton_step(current_beta, lmbda)
    return current_beta, ll

In [65]:
X = labeled_corpus.sparse_matrix.todense()

In [68]:
pd.Series(labeled_corpus.corpus_labels).value_counts()

haydn    739
dtype: int64

In [None]:
beta_init = np.random.uniform(-1, 1, size=(num_classes-1, X.shape[1]))

In [None]:
fitted, ll = newton_raphson(beta_init, 10)