In [1]:
from music21 import *
import glob
from collections import Counter
import numpy as np
import os
from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import TensorDataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
#torch.manual_seed(1)

"""
This function extract the features from the MIDI files.

Input : Directory containing the midi files
outputs : numpy ndarray containing numpy arrays of the concatenated elements of the MIDI files.
          Elements are feature extracted from the MIDI files.
"""
def read_midi_dataset(file):
    data = list()
    for midi in glob.glob(file):
        mu = converter.parse(midi)
        s2 = instrument.partitionByInstrument(mu)
        # parts[0] means we only takes into account piano
        note2parse = s2.parts[0].recurse() 
        temp = list()
        for note_ in note2parse:
            if isinstance(note_, note.Note): # isinstance check if element is a note
                temp.append(str(note_.pitch))
            elif isinstance(note_, chord.Chord): # check if it is a chord
                temp.append('.'.join(str(n) for n in note_.normalOrder))
                
        data.append(temp)
        
    data = np.array(data)

    return data


"""
This function transforms a numpy ndarray containaing arrays of elements of MIDI files into one list of
these elements. Example : [[a,b][c,d]] => [a,b,c,d]
"""
def from_ndarrays_to_list(data):
    return [note for notes_ in data for note in notes_] 


"""
This function deletes from the dataset elements that do not appear more than a particular frequency.
It is a filter.
Input : numpy ndarray containing numpy arrays of the concatenated elements of the MIDI files.
Output : List of list. Each list is a concatenation of all the elements of a MIDI file.
"""
def get_vocabulary(data):
    data_ = from_ndarrays_to_list(data)
    # frequence of notes
    freq = dict(Counter(data_))
    # unique_elements is the sorted set of unique elements of the set of MIDI files. The elements selected depends
    # on a particular frequency. Therefore, it is the total vacabulary of the dataset.
    unique_note = sorted([note_ for note_, count in freq.items()])
    
    vocab_dict = {}
    for i in range(len(unique_note)): vocab_dict[unique_note[i]] = i
        
    return unique_note, vocab_dict


def note_to_vec(vocab, data, size_vocab, dim_embed):
    
    embeds = nn.Embedding(size_vocab, dim_embed)
    
    data_embed = list()
    for song in data:
        data_embed.append(np.array([embeds(torch.tensor([vocab[note]],
                                                        dtype=torch.long)).detach().numpy()[0] for note in song]))

    return data_embed


"""
"""
def note_to_ind(vocab, data):
    note2ind = {u:i for i, u in enumerate(vocab)}
    ind2note = np.array(vocab)
    
    dataInd = list()
    for song in data: dataInd.append(np.array([note2ind[note] for note in song]))
        
    ind2note = {}
    for note, ind in note2ind.items():
        ind2note[ind] = note
    
    return dataInd, note2ind, ind2note


"""
This function creates the X and y matrices needed by the model.
We use a sliding window mechanism in order to create this dataset.
[a,b,c,d,e,f,g] becomes x1=[a,b,c], y1=[d] then x2=[b,c,d], y2=[e] etc.

Input : List of list. Each list is a concatenation of all the elements of a MIDI file.
Output : matrix X and vector y.
"""
def training_target_samples(data_embed, dataInd, window_size, show_example=False): #time_step = window
    x = list()
    y = list()

    """
    for notes_ in data:
        for i in range(len(notes_) - window_size):
            x.append(notes_[i : i + window_size])
            y.append(notes_[i + window_size])
    """
            
    for i in range(len(data_embed)):
        for j in range(len(dataInd[i]) - window_size):
            x.append(data_embed[i][j : j + window_size])
            y.append(dataInd[i][j + window_size])
            
    if show_example is True:
        for i, (trainingInd, targetInd) in enumerate(zip(training[:5], target[:5])):
            print("Step {:4d}".format(i))
            print("  Input: {} ({:s})".format(trainingInd, repr(ind2note[trainingInd])))
            print("  expected output: {} ({:s})".format(targetInd, repr(ind2note[targetInd])))
    
    return np.array(x), np.array(y)

def split_reshape(X, y, split_ratio, size_vocab, dim_embed_x, dim_embed_y):

    X_train, X_test, y_train, y_test = train_test_split(np.array(X_dataset), np.array(y_dataset),
                                                        test_size=split_ratio, random_state=0)
        
    X_train, y_train = reshape_datasets(X_train, np.array(y_train), size_vocab, dim_embed_x, dim_embed_y)
    X_test, y_test = reshape_datasets(X_test, np.array(y_test), size_vocab, dim_embed_x, dim_embed_y)
    
    return X_train, X_test, y_train, y_test

def reshape_datasets(X, y, size_vocab, dim_embed_x, dim_embed_y):
    #y_train = np.eye(size_vocab)[y_train]
    #y_test = np.eye(size_vocab)[y_test]
    
    # batch_size , sequence_length , size_encoding = 1 (> 1 if one-hot encoding)
    
    nb_samples = X.shape[0]
    seq_length = X.shape[1]
    X = np.reshape(X, (nb_samples, seq_length, dim_embed_x))/float(size_vocab) # Normalization
    y = np.reshape(y, (nb_samples, dim_embed_y))


    return X, y

def ind_to_embedding(dataInd, dataEmbed):
    ind2embed = {}
    for i in range(len(dataInd)): # -1270
        ind2embed[dataInd[i]] = dataEmbed[i]
    
    return ind2embed

In [2]:
file = "/home/cj/Bureau/Master2/Q2/deep_learning/project/tf_dataset/*.mid"
data = read_midi_dataset(file)
unique_note, vocab = get_vocabulary(data)
size_vocab = len(unique_note)

dim_x = 4
data_embed = note_to_vec(vocab, data, size_vocab, dim_x)
dataInd, note2ind, ind2note = note_to_ind(unique_note, data)
ind2embed = ind_to_embedding(dataInd[0], data_embed[0])

window_size = 5
X_dataset, y_dataset = training_target_samples(data_embed, dataInd, window_size)

split_ratio = 0.99
dim_y = 1
X_train, X_test, y_train, y_test = split_reshape(X_dataset, y_dataset, split_ratio,
                                                 size_vocab, dim_x, dim_y)

In [3]:
train_data = TensorDataset(torch.from_numpy(X_train), torch.from_numpy(y_train))
test_data = TensorDataset(torch.from_numpy(X_test), torch.from_numpy(y_test))

batch_size = 3

train_loader = DataLoader(train_data, shuffle=True, batch_size=batch_size, drop_last=True)
test_loader = DataLoader(test_data, shuffle=True, batch_size=1, drop_last=True) #batch_size=1 for testing

In [4]:
#for x, y in train_loader:
    #print(x, y)

In [5]:
# torch.cuda.is_available() checks and returns a Boolean True if a GPU is available, else it'll return False
is_cuda = torch.cuda.is_available()

# If we have a GPU available, we'll set our device to GPU. We'll use this device variable later in our code.
if is_cuda:
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

In [6]:
class lstm_model(nn.Module):
    def __init__(self, input_size, output_size, hidden_dim, n_layers, dropout):
        super(lstm_model, self).__init__()

        # Defining some parameters
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers

        #Defining the layers
        self.lstm1 = nn.LSTM(input_size, hidden_dim, n_layers, batch_first=True)
        self.lstm2 = nn.LSTM(hidden_dim, hidden_dim, n_layers)
        self.lstm3 = nn.LSTM(hidden_dim, hidden_dim, n_layers)
        self.dropout = nn.Dropout(dropout)
        self.fully_connected = nn.Linear(hidden_dim, output_size)
      
    
    def forward(self, x):
        
        batch_size = x.size(0)
        len_seq = x.size(1)
        x = x.to(dtype=torch.float64)
        hidden = self.init_hidden(batch_size)
        h_t = hidden[0].to(dtype=torch.float64)
        c_t = hidden[1].to(dtype=torch.float64)
                
        out, (h_t, c_t) = self.lstm1(x, (h_t, c_t))
        h_t = self.dropout(h_t)
        out, (h_t, c_t) = self.lstm2(h_t, (h_t, c_t)) 
        h_t = self.dropout(h_t)
        out, (h_t, c_t) = self.lstm3(h_t, (h_t, c_t)) 
     
        # Reshaping the outputs such that it can be fit into the fully connected layer
        out = out.contiguous().view(-1, self.hidden_dim)
        out = self.fully_connected(out)
 
        return out

    """
    This function can be used to extract the output of the last predictions of each batch
    """
    def last_output(batc_size, out):
        lengths = [len_seq for i in range(batch_size)]
        idx = (torch.LongTensor(lengths) - 1).view(-1, 1)
        idx = idx.expand(len(lengths), out.size(2))
        time_dimension = 1 # because batch_first is True so the time step is dimension 1 !
        idx = idx.unsqueeze(time_dimension)
        
        if out.is_cuda:
            idx = idx.cuda(out.data.get_device())
        
        out = out.gather(time_dimension, idx).squeeze(time_dimension)
        
        return out
 
      
    """
    Generates the hidden state and the cell state used in a lstm layers
    """
    def init_hidden(self,  batch_size):
        # This is what we'll initialise our hidden state as
        return (torch.zeros(self.n_layers, batch_size, self.hidden_dim),
                torch.zeros(self.n_layers, batch_size, self.hidden_dim))

# Instantiate the model with hyperparameters
input_size = dim_x # because no one-hot encoder or embedding layer or anything like that
output_size = size_vocab
hidden_dim = 12
n_layers = 1
dropout = 0.3
model = lstm_model(input_size, output_size, hidden_dim, n_layers, dropout)
model = model.to(dtype=torch.float64)
# We'll also set the model to the device that we defined earlier (default is CPU)
model.to(device)

lstm_model(
  (lstm1): LSTM(4, 12, batch_first=True)
  (lstm2): LSTM(12, 12)
  (lstm3): LSTM(12, 12)
  (dropout): Dropout(p=0.3, inplace=False)
  (fully_connected): Linear(in_features=12, out_features=111, bias=True)
)

In [7]:
class attention_model(nn.Module):
    def __init__(self, input_size, output_size, hidden_dim, n_layers, dropout):
        super(attention_model, self).__init__()

        # Defining some parameters
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers

        #Defining the layers
        self.lstm = nn.LSTM(input_size, hidden_dim, n_layers, batch_first=True)
        self.dropout = nn.Dropout(dropout)
        self.fully_connected = nn.Linear(hidden_dim, output_size)
      
    
    def forward(self, x):
        
        batch_size = x.size(0)
        len_seq = x.size(1)
        x = x.to(dtype=torch.float64)
        hidden = self.init_hidden(batch_size)
        h_t = hidden[0].to(dtype=torch.float64)
        c_t = hidden[1].to(dtype=torch.float64)
                
        out, (h_t, c_t) = self.lstm1(x, (h_t, c_t))
        h_t = self.dropout(h_t)
        out, (h_t, c_t) = self.lstm2(h_t, (h_t, c_t)) 
        h_t = self.dropout(h_t)
        out, (h_t, c_t) = self.lstm3(h_t, (h_t, c_t)) 
     
        # Reshaping the outputs such that it can be fit into the fully connected layer
        out = out.contiguous().view(-1, self.hidden_dim)
        out = self.fully_connected(out)
 
        return out

    """
    This function can be used to extract the output of the last predictions of each batch
    """
    def last_output(batc_size, out):
        lengths = [len_seq for i in range(batch_size)]
        idx = (torch.LongTensor(lengths) - 1).view(-1, 1)
        idx = idx.expand(len(lengths), out.size(2))
        time_dimension = 1 # because batch_first is True so the time step is dimension 1 !
        idx = idx.unsqueeze(time_dimension)
        
        if out.is_cuda:
            idx = idx.cuda(out.data.get_device())
        
        out = out.gather(time_dimension, idx).squeeze(time_dimension)
        
        return out
 
      
    """
    Generates the hidden state and the cell state used in a lstm layers
    """
    def init_hidden(self,  batch_size):
        # This is what we'll initialise our hidden state as
        return (torch.zeros(self.n_layers, batch_size, self.hidden_dim),
                torch.zeros(self.n_layers, batch_size, self.hidden_dim))


In [8]:
# Define hyperparameters for the training

print_every = batch_size
valid_loss_min = np.Inf

n_epochs = 1
lr=0.01

# Define Loss, Optimizer, accuracy
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

train_accuracy = list()
test_accuracy = list()

save = 0

# Training Run
#model.to(dtype=torch.float64).train()
model.train()
for epoch in range(1, n_epochs + 1):
    optimizer.zero_grad() # Clears existing gradients from previous epoch
    losses = []
    true_pred_train = 0
    true_pred_test = 0
    i = 0
    for inputs, targets in train_loader:
        i+=1
        # Forward
        inputs, targets = inputs.to(device), targets.to(device)
        pred_train = model(inputs)
        
        # Compute Loss and backpropagation
        loss = criterion(pred_train, targets.view(-1).long())
        losses.append(loss.item())
        loss.backward() # Does backpropagation and calculates gradients
        optimizer.step() # Updates the weights accordingly
        
        # Check if the right target has been predicted for the last input of the batch
        prob = nn.functional.softmax(pred_train[-1], dim=0).data 
        note_ind = torch.max(prob, dim=0)[1].item() # [1] take indice
       
        if note_ind == targets[-1].item(): # compare to the last target of the batch
            true_pred_train+=1
        
    # mean accuracy of this epoch
    train_accuracy.append(true_pred_train/len(train_loader.dataset))
    
    model.eval()
    val_losses = []
    for inputs, target in test_loader:

        inputs, target = inputs.to(device), target.to(device)
        pred_test = model(inputs)
        
        val_loss = criterion(pred_test, target.view(-1).long())
        val_losses.append(val_loss.item())
        
        prob = nn.functional.softmax(pred_test[-1], dim=0).data 
        note_ind = torch.max(prob, dim=0)[1].item() # [1] take indice
        
        if note_ind == target[-1].item():
            true_pred_test+=1
    
    test_accuracy.append(true_pred_test/len(test_loader.dataset))
    
    model.train()

    print("Epoch: {}/{}...".format(epoch, n_epochs),

        "Training Loss: {:.6f}...".format(np.mean(losses)),
        "Validation Loss Loss: {:.6f}".format(np.mean(val_losses)))
    
    save+=1
    if save == 15:
        save = 0
        if np.mean(val_losses) <= valid_loss_min:
            valid_loss_min = np.mean(val_losses)
            torch.save(model.state_dict(), './state_dict.pt')
            print('Validation loss decreased ({:.6f} --> {:.6f}). Saving model ...'.
                  format(valid_loss_min,np.mean(val_losses)))
      

Epoch: 1/1... Training Loss: 4.704359... Validation Loss Loss: 4.681416


# Music Generation

In [78]:
def initialise_generation(dataInd, window_size, ind2note, ind2embed):
    song_ind = np.random.randint(0, len(dataInd))
    note_ind = np.random.randint(0+window_size, len(dataInd[song_ind])-window_size-1)
    init_sequence = dataInd[song_ind][note_ind:note_ind+window_size]
    
    music_generated = list()
    input_sequence = list()
    for ind in init_sequence:
        note = ind2note[ind]
        embed = ind2embed[ind]
        input_sequence.append(embed)
        music_generated.append(note)
        
        
    return music_generated, np.array(input_sequence)

def music_generation(input_seq):
    input_seq = np.reshape(input_seq, (1, input_seq.shape[0], dim_x))
    input_seq = torch.from_numpy(input_seq)
    input_seq.to(device)
    
    pred = model(input_seq)
    prob = nn.functional.softmax(pred_test[-1], dim=0).data 
    note_ind = torch.max(prob, dim=0)[1].item() # [1] take indice
    
    
    return note_ind

In [90]:
music_generated, input_sequence = initialise_generation(dataInd, window_size, ind2note, ind2embed)

nb_steps = 5
for i in range(nb_steps):
    pred_ind = music_generation(input_sequence)
    pred_note = ind2note[pred_ind]
    music_generated.append(pred_note)
    pred_embed = ind2embed[pred_ind]
    input_sequence = np.append(np.delete(input_sequence, 0, axis=0), [pred_embed], axis=0)
