# Import packages

In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
import time
import mido
import string
from music21 import midi
import json

# dataset
import os
from torch.utils.data import Dataset, DataLoader, random_split

# tensorboard
from torch.utils.tensorboard import SummaryWriter

from music21 import converter, instrument, note, chord
import glob, pickle


# Preprocess data

In [None]:
def msg2dict(msg):
    result = dict()
    if 'note_on' in msg:
        on_ = True
    elif 'note_off' in msg:
        on_ = False
    else:
        on_ = None
    result['time'] = int(msg[msg.rfind('time'):].split(' ')[0].split('=')[1].translate(
        str.maketrans({a: None for a in string.punctuation})))

    if on_ is not None:
        for k in ['note', 'velocity']:
            result[k] = int(msg[msg.rfind(k):].split(' ')[0].split('=')[1].translate(
                str.maketrans({a: None for a in string.punctuation})))
    return [result, on_]

def switch_note(last_state, note, velocity, on_=True):
    # piano has 88 notes, corresponding to note id 21 to 108, any note out of this range will be ignored
    result = [0] * 88 if last_state is None else last_state.copy()
    if 21 <= note <= 108:
        result[note-21] = velocity if on_ else 0
    return result

def get_new_state(new_msg, last_state):
    new_msg, on_ = msg2dict(str(new_msg))
    new_state = switch_note(last_state, note=new_msg['note'], velocity=new_msg['velocity'], on_=on_)\
                if on_ is not None else last_state
    return [new_state, new_msg['time']]

def track2seq(track):
    # piano has 88 notes, corresponding to note id 21 to 108, any note out of the id range will be ignored
    result = []
    last_state, last_time = get_new_state(str(track[0]), [0]*88)
    for i in range(1, len(track)):
        new_state, new_time = get_new_state(track[i], last_state)
        if new_time > 0:
            result += [last_state]*new_time
        last_state, last_time = new_state, new_time
    return result

def mid2arry(mid, min_msg_pct=0.1):
    tracks_len = [len(tr) for tr in mid.tracks]
    min_n_msg = max(tracks_len) * min_msg_pct
    # convert each track to nested list
    all_arys = []
    for i in range(len(mid.tracks)):
        if len(mid.tracks[i]) > min_n_msg:
            ary_i = track2seq(mid.tracks[i])
            all_arys.append(ary_i)
    # make all nested list the same length
    max_len = max([len(ary) for ary in all_arys])
    for i in range(len(all_arys)):
        if len(all_arys[i]) < max_len:
            all_arys[i] += [[0] * 88] * (max_len - len(all_arys[i])) # adding 0's at the end
                            
    final_arr = np.array(all_arys, dtype=np.int8)
    final_arr = final_arr.max(axis=0)
    # trim: remove consecutive 0s in the beginning and at the end
    sums = final_arr.sum(axis=1)
    ends = np.where(sums > 0)[0]
    final_arr = final_arr[min(ends): max(ends)]
    return final_arr


# extracts overlapping 10sec clips from single midi_file
def extract_clips(file, idx, store_path, counter = 0, ticks = 10000, overlap=False):
    # load the file
    mid = mido.MidiFile(file)
    
    # extract array
    array = mid2arry(mid)
    array2 = array[ticks//2:] # to get 50% overlap
    
    if not os.path.exists(store_path):
        os.makedirs(store_path)
    
    i = counter
    arrays = [array,array2] if overlap else [array] 
    for arr in arrays:
        n = (arr.shape[0]//ticks)*ticks
        clips = np.array_split(arr, np.arange(ticks, n, ticks))
        for c in clips[:-1]:
            np.savez_compressed('{}/{}'.format(store_path,i), clip=c[::500], artist_idx=idx)
            i += 1
    return i
    
def preprocess(data_dir, store_path, min_clips=100):
    print('Preprocessing data')
    if not os.path.exists(store_path):
        os.makedirs(store_path)
    artists_list  = np.sort(os.listdir(data_dir))
    data_path = os.path.join(store_path, 'data')
    with open(os.path.join(store_path, 'readme.txt'), 'w') as f:
        f.write(str(artists_list))
        f.write('\n')
        f.write(str(np.arange(artists_list.shape[0])))
    counter = 0
    counts_string = '\n'
    for idx,artist in enumerate(artists_list):
        path = os.path.join(data_dir, artist)
        files = os.listdir(path)
        midi_files = []
        for f in files:
            if f.endswith('mid'):
                midi_files.append(f)
        midi_files.sort()
        prev_counter = counter
        for file in midi_files:
            counter = extract_clips(os.path.join(path, file),idx, data_path, counter)
            if counter-prev_counter>=min_clips:
                break
        counts_string += '{}: {}\n'.format(artist, counter-prev_counter)
        print(artist, counter-prev_counter)
    
    with open(os.path.join(store_path, 'readme.txt'), 'a') as f:
        f.write(counts_string)

# stores files from each artist separately (useful for seq2seq, generation)
def preprocess_separate(data_dir, store_path, min_clips=1000):
    print('Preprocessing data')
    if not os.path.exists(store_path):
        os.makedirs(store_path)
    artists_list  = np.sort(os.listdir(data_dir))
    with open(os.path.join(store_path, 'readme.txt'), 'w') as f:
        f.write(str(artists_list))
        f.write('\n')
        f.write(str(np.arange(artists_list.shape[0])))
    counts_string = '\n'
    for idx,artist in enumerate(artists_list):
        path = os.path.join(data_dir, artist)
        files = os.listdir(path)
        midi_files = []
        for f in files:
            if f.endswith('mid'):
                midi_files.append(f)
        midi_files.sort()
        counter = 0
        for file in midi_files:
            counter = extract_clips(os.path.join(path, file),idx, os.path.join(store_path, artist), counter)
            if counter>=min_clips:
                break
        counts_string += '{}: {}\n'.format(artist, counter)
        print(artist, counter)
        break
    
    with open(os.path.join(store_path, 'readme.txt'), 'a') as f:
        f.write(counts_string)

# preprocess('./dataset', './preprocessed_dataset')
preprocess_separate('./dataset', './preprocessed_separate')

# Small dataset
## Extract notes of bach and bartok

In [None]:
import os
import shutil
# split into test and train
in_dir = 'dataset'
out_dir = 'small_dataset'
test_frac = 0.2
# first split the dataset
if not os.path.exists(out_dir):
    os.makedirs(out_dir)
# artists_list  = np.sort(os.listdir(data_dir))
artists_list = ['bach', 'bartok']

def create_copies(in_path, out_path, files):
    for f in files:
        shutil.copyfile(os.path.join(in_path, f), os.path.join(out_path, f))
for artist in artists_list:
    files = os.listdir(os.path.join(in_dir, artist))
    files.sort()
    n = len(files)
    n_test = int(test_frac*n)
    n_train = n - n_test
    
    if not os.path.exists(os.path.join(out_dir, artist,'train')):
        os.makedirs(os.path.join(out_dir, artist, 'train'))
        os.makedirs(os.path.join(out_dir, artist, 'test'))
    
    train_files = files[:n_train]
    test_files = files[n_train:]
    
    create_copies(os.path.join(in_dir, artist), os.path.join(out_dir, artist, 'train'), train_files)
    create_copies(os.path.join(in_dir, artist), os.path.join(out_dir, artist, 'test'), test_files)


## Extract notes for each and get total unique notes

In [None]:
all_notes = []
for artist in artists_list:
    artist_path = os.path.join('small_dataset', artist)
    
    for t in ['train', 'test']:
        artist_notes = []
        files_path = os.path.join(artist_path, t)
        for file in glob.glob(files_path+'/*.mid'):
            song_notes = []
            midi = converter.parse(file)

            print("Parsing %s" % file)

            notes_to_parse = None

            try: # file has instrument parts
                s2 = instrument.partitionByInstrument(midi)
                x = [len(v.recurse()) for v in s2.parts]
                idx = argmax(x)
                notes_to_parse = s2.parts[idx].recurse() 
            except: # file has notes in a flat structure
                notes_to_parse = midi.flat.notes

            for element in notes_to_parse:
                if isinstance(element, note.Note):
                    song_notes.append(str(element.pitch))
                elif isinstance(element, chord.Chord):
                    song_notes.append('.'.join(str(n) for n in element.normalOrder))
            artist_notes.append(song_notes)
            
        notes_path = os.path.join(artist_path, '{}_notes'.format(t))
        with open(notes_path, 'wb') as filepath:
            pickle.dump(artist_notes, filepath)
        
        for song_notes in artist_notes:
            all_notes += song_notes

## Save note IDs

In [None]:

sorted_notes = sorted(set(all_notes))
notes_dict = {note:i for i,note in enumerate(sorted_notes)}
json.dump(notes_dict, open(os.path.join(out_dir, 'note_ids.json'), 'w'))

print('No. of unique notes', len(sorted_notes))

# Training on notes

In [5]:
from torch.utils.data import Dataset, DataLoader
import torch
import torch.nn as nn
import numpy as np
# dataset for classification
class predictionDataset(Dataset):
    def __init__(self, songs, notes_dict, sequence_length=100):
        self.inputs = []
        self.outputs = []
        self.sequence_length = sequence_length
        self.num_notes = len(notes_dict)
        for notes in songs:
            for i in range(0, len(notes) - sequence_length, 1):
                sequence_in = notes[i:i + sequence_length]
                sequence_out = notes[i + sequence_length]
                self.inputs.append([notes_dict[char] for char in sequence_in])
                self.outputs.append(notes_dict[sequence_out])
        self.inputs = np.array(self.inputs).reshape(-1, self.sequence_length,1)/self.num_notes
        
        
    def __len__(self):
        return len(self.inputs)
    
    def __getitem__(self, idx):
        
        return torch.FloatTensor(self.inputs[idx]),\
               torch.tensor(self.outputs[idx], dtype=torch.long)
    
data_dir = 'small_dataset'
notes_dict = json.load(open(os.path.join(data_dir, 'note_ids.json'), 'rb'))
num_notes = len(notes_dict)
artist = 'bach'
notes_path = os.path.join(data_dir, artist, 'train_notes')
with open(notes_path, 'rb') as file:
    songs = pickle.load(file)



class melodyNet(nn.Module):
    def __init__(self, out_dim, obs_dim=1, hidden_dim=512, layers=1):
        super(melodyNet, self).__init__()
        self.obs_dim = obs_dim
        self.hidden_dim = hidden_dim
        self.lstm = nn.LSTM(self.obs_dim, hidden_dim, layers) 
        self.fc1 = nn.Linear(hidden_dim, hidden_dim)
        self.bn1 = nn.BatchNorm1d(hidden_dim)
        self.dp = nn.Dropout(0.25)
        self.fc2 = nn.Linear(hidden_dim, out_dim)
        self.relu = nn.ReLU()

    def forward(self,x,h=None):  # x in shape [batch_size, seq_len, obs_dim]
        # reshape,feed to lstm
        out = x.transpose(0,1)                           # reshape for lstm [seq_len, batch_size, inp_dim]
        if h is None:
              out, h = self.lstm(out)                       # [seq_len, batch_size, hidden_dim]            
        else:
              out, h = self.lstm(out, h)
        out = out[-1]         # [batch_size, hidden_dim]
        out = self.dp(self.bn1(self.fc1(out))) # batch_size, out_dim
        out = self.fc2(out)
        return out, h



val_frac = 0.8
batch_size = 20
device = 'cuda:0'
myDataset = predictionDataset(songs, notes_dict)
dataset_size = len(myDataset)
val_size = int(val_frac*dataset_size)
train_size = dataset_size - val_size
train_dataset, val_dataset = random_split(myDataset, [train_size, val_size])
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=10)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=10)


myNet = melodyNet(num_notes)

# optimizer
criterion = nn.CrossEntropyLoss()
epochs = 1000
learning_rate = 1e-3
optimizer = torch.optim.Adam(myNet.parameters(), lr=learning_rate)
lr_func = lambda e: 0.99**e
scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_func)
epochs = 100

myNet = myNet.to(device)
# for x,y in train_loader:
    
#     with torch.no_grad():
#         out = myNet(x.to(device))
#         loss = criterion(out, y.to(device))
#         print(loss)
#     break
    
    
# train
print('starting')
myNet = myNet.to(device)
t_start = time.time()
# tensorboard

writer = SummaryWriter('logs/generation_{}_1lstm_1_fc_lr_{}_2'.format(artist, learning_rate))

def validate():
    val_loss_epoch, c = 0, 0
    myNet.eval()
    for X,Y in val_loader:
        with torch.no_grad():
            outputs, _ = myNet(X.to(device))
            loss = criterion(outputs.to(device), Y.to(device))
            val_loss_epoch += loss.data
            c += 1
    myNet.train()
    val_loss_epoch /= c
    return val_loss_epoch
    
val_loss = validate()
print('Val loss {}'.format(val_loss))
writer.add_scalar('validation loss', val_loss, 0)

starting
Val loss 6.368923187255859


In [9]:
print(num_notes)

585


In [8]:
myNet.train()
for e in range(epochs):
    print('Epoch ', e)
    loss_epoch, c = 0, 0
    for X,Y in train_loader:
        optimizer.zero_grad()
        outputs, _ = myNet(X.to(device))
        loss = criterion(outputs.to(device), Y.to(device))
        loss.backward()
        optimizer.step()
        loss_epoch += loss.data
        c += 1
    loss_epoch /= c
    writer.add_scalar('training loss', loss_epoch, e+1)
    scheduler.step()
    val_loss_epoch = validate()
    writer.add_scalar('validation loss', val_loss_epoch, e+1)
    print('Train loss {}, Val loss {}'.format(loss_epoch, val_loss_epoch))
    
t_end = time.time()
print('time taken {}'.format(t_end-t_start))

Epoch  0
Train loss 4.166576385498047, Val loss 4.75467586517334
Epoch  1
Train loss 4.150659084320068, Val loss 16.248369216918945
Epoch  2
Train loss 4.145793914794922, Val loss 9.664836883544922
Epoch  3
Train loss 4.143385410308838, Val loss 7.904368877410889
Epoch  4
Train loss 4.13670015335083, Val loss 10.290194511413574
Epoch  5
Train loss 4.132782936096191, Val loss 14.2799654006958
Epoch  6
Train loss 4.128384113311768, Val loss 6.69474983215332
Epoch  7
Train loss 4.126110553741455, Val loss 6.290464878082275
Epoch  8
Train loss 4.126192092895508, Val loss 15.882981300354004
Epoch  9
Train loss 4.1223297119140625, Val loss 7.354807376861572
Epoch  10
Train loss 4.121462821960449, Val loss 12.899478912353516
Epoch  11
Train loss 4.119614601135254, Val loss 18.882434844970703
Epoch  12
Train loss 4.118338584899902, Val loss 27.963850021362305
Epoch  13


KeyboardInterrupt: 

In [None]:
device = 'cuda:0'
data_dir = 'small_dataset'
notes_dict = json.load(open(os.path.join(data_dir, 'note_ids.json'), 'rb'))
num_notes = len(notes_dict)
artist = 'bach'
myNet = torch.load('./logs/{}_seq2seq/best_train_model'.format(artist), map_location=device)
myNet.eval()
test_notes_path = os.path.join(data_dir, artist, 'test_notes')
with open(test_notes_path, 'rb') as file:
    test_notes = pickle.load(file)
testDataset = predictionDataset(test_notes, notes_dict)
print(len(test_notes))

    
def continue_seq(sequence, steps):
    second = []
    nxt_seq = sequence
    h = None
    for s in range(steps-1):
        with torch.no_grad():
            out, h = myNet(nxt_seq.to(device), h)
            out = torch.argmax(out, dim=-1)
            second.append(out.cpu().numpy())
            out = out.view(1,1,1).type(torch.FloatTensor)/num_notes
            nxt_seq = torch.cat((nxt_seq[:,1:,:], out), dim=1)
    first = sequence.cpu().numpy().flatten()*(num_notes-1)
    second = np.array(second).flatten()
    return np.concatenate((first, second)).astype(int)

# 1d numpy array
def create_midi(sequence, notes_dict):
    """ convert the output from the prediction to notes and create a midi file
        from the notes """
    offset = 0
    output_notes = []
    reverse_dict = {v:k for k,v in notes_dict.items()}
    notes_sequence = [reverse_dict[v] for v in sequence]
    # create note and chord objects based on the values generated by the model
    for pattern in notes_sequence:
        # pattern is a chord
        if ('.' in pattern) or pattern.isdigit():
            notes_in_chord = pattern.split('.')
            notes = []
            for current_note in notes_in_chord:
                new_note = note.Note(int(current_note))
                new_note.storedInstrument = instrument.Piano()
                notes.append(new_note)
            new_chord = chord.Chord(notes)
            new_chord.offset = offset
            output_notes.append(new_chord)
        # pattern is a note
        else:
            new_note = note.Note(pattern)
            new_note.offset = offset
            new_note.storedInstrument = instrument.Piano()
            output_notes.append(new_note)

        # increase offset each iteration so that notes do not stack
        offset += 0.5

    midi_stream = stream.Stream(output_notes)

    midi_stream.write('midi', fp='output_{}.mid'.format(artist))
    
# inp_sequence = testDataset[1000][0].reshape(1,-1,1)
inp_sequence = torch.randint(low=0, high=num_notes, size=(1,2,1)).type(torch.FloatTensor)/num_notes
full_sequence = continue_seq(inp_sequence, 100)
print(full_sequence)
create_midi(full_sequence, notes_dict)

# Visualize & playback 10 sec clip



In [None]:

def visualize(ary):
    part = np.repeat(ary, 500, axis=0)
    plt.plot(range(part.shape[0]), np.multiply(np.where(part>0, 1, 0), range(1, 89)), marker='.', markersize=1, linestyle='')
    plt.title("Midi clip")
    plt.show()


data = np.load('./preprocessed_separate/bach/100.npz')
part = data['clip']
visualize(part)
print('Artist idx', data['artist_idx'])

    


def arry2mid(ary, tempo=500000):
    # get the difference
    ary = np.repeat(ary, 500, axis=0)
    new_ary = np.concatenate([np.array([[0] * 88]), np.array(ary)], axis=0)
    changes = new_ary[1:] - new_ary[:-1]
    # create a midi file with an empty track
    mid_new = mido.MidiFile()
    track = mido.MidiTrack()
    mid_new.tracks.append(track)
    track.append(mido.MetaMessage('set_tempo', tempo=tempo, time=0))
    # add difference in the empty track
    last_time = 0
    for ch in changes:
        if set(ch) == {0}:  # no change
            last_time += 1
        else:
            on_notes = np.where(ch > 0)[0]
            on_notes_vol = ch[on_notes]
            off_notes = np.where(ch < 0)[0]
            first_ = True
            for n, v in zip(on_notes, on_notes_vol):
                new_time = last_time if first_ else 0
                track.append(mido.Message('note_on', note=n + 21, velocity=v, time=new_time))
                first_ = False
            for n in off_notes:
                new_time = last_time if first_ else 0
                track.append(mido.Message('note_off', note=n + 21, velocity=0, time=new_time))
                first_ = False
            last_time = 0
    return mid_new

mid = arry2mid(part)
mid.save('sample2.mid')


from music21 import midi
print('1')
mf = midi.MidiFile()
print('2')
mf.open('sample2.mid') # path='abc.midi'
mf.read()
mf.close()
s = midi.translate.midiFileToStream(mf)
s.show('midi')

# Create pytorch dataset

In [None]:
# dataset for classification
class classifyDataset(Dataset):
    def __init__(self, root_dir):
        self.num_files = len(os.listdir(os.path.join(root_dir, 'data')))
        self.files = [os.path.join(root_dir, 'data', '{}.npz'.format(i)) for i in range(self.num_files)]
    def __len__(self):
        return self.num_files
    
    def __getitem__(self, idx):
        data = np.load(self.files[idx])
#         print(data['clip'].shape)
        return torch.FloatTensor(data['clip']),\
               torch.ones(data['clip'].shape[0], dtype=torch.long)* torch.tensor(data['artist_idx'])



# Create simple LSTM model

In [None]:
print('started')
class artistClassifyNet(nn.Module):
    def __init__(self, num_artists=10, obs_dim=88, hidden_dim=100):
        super(artistClassifyNet, self).__init__()
        self.num_artists = num_artists
        self.obs_dim = obs_dim
        self.hidden_dim = hidden_dim
        self.lstm = nn.LSTM(self.obs_dim, hidden_dim)           # Input dim is 3, output dim is 3
        self.fc = nn.Linear(hidden_dim, num_artists)
    
    def forward(self,x):  # x in shape [batch_size, seq_len, obs_dim]
        batchSize, seqLen, _ = x.shape
        
        # reshape,feed to lstm
        out = x.transpose(0,1)                           # reshape for lstm [seq_len, batch_size, inp_dim]
        out, _ = self.lstm(out)                          # initialize the hidden states with some data
        
        # reshape and pass through fcn
        out = out.transpose(0,1).contiguous().view(-1,self.hidden_dim)    # [(batch_size)*(seqLen-initSteps)) X hiddenDim]
        out = self.fc(out)                                              # [(batch_size*seq_len)x1]
        out = out.view(batchSize, seqLen, self.num_artists)
        return(out)

def loss_fn(outputs,labels,criterion):
    _,_,outDim = outputs.shape
    loss = criterion(outputs.contiguous().view(-1,outDim), labels.contiguous().view(-1))
    return(loss)
    
val_frac = 0.8
batch_size = 10
device = 'cuda:0'
class_dataset = classifyDataset('./preprocessed_dataset')
dataset_size = len(class_dataset)
val_size = int(val_frac*dataset_size)
train_size = dataset_size - val_size
train_dataset, val_dataset = random_split(class_dataset, [train_size, val_size])
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=10)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=10)

myNet = artistClassifyNet()
for x,y in train_loader:
    print(x.shape, y.shape)
    with torch.no_grad():
        out = myNet(x)
        loss = loss_fn(out, y, criterion)
        print(loss)
    break
    
# optimizer
criterion = nn.CrossEntropyLoss()
learning_rate = 1e-3
optimizer = torch.optim.Adam(myNet.parameters(), lr=learning_rate)
# lr_func = lambda e: 0.99**e
# scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_func)
epochs = 100



In [None]:
# train
print('starting')
myNet = myNet.to(device)
t_start = time.time()
# tensorboard

writer = SummaryWriter('logs/classification_1lstm_1fc_lr_{}_3'.format(learning_rate))

def validate():
    val_loss_epoch, c = 0, 0
    correct, total = 0, 0
    for X,Y in val_loader:
        with torch.no_grad():
            outputs = myNet(X.to(device))
            loss = loss_fn(outputs.to(device), Y.to(device), criterion)
            val_loss_epoch += loss.data
            c += 1
            total += X.shape[0]
            correct += (torch.argmax(outputs[:,-1, :], dim=-1).cpu() == Y[:,-1]).numpy().sum()
    val_loss_epoch /= c
    val_accuracy = correct/total
    return val_loss_epoch, val_accuracy
    
val_loss, val_acc = validate()
print('Val loss {}, Val accuracy {}'.format(val_loss, val_acc))
writer.add_scalar('validation accuracy', val_acc, 0)

for e in range(epochs):
    loss_epoch, c = 0, 0
    for X,Y in train_loader:
        optimizer.zero_grad()
        outputs = myNet(X.to(device))
        loss = loss_fn(outputs.to(device), Y.to(device), criterion)
        loss.backward()
        optimizer.step()
        loss_epoch += loss.data
        c += 1
    loss_epoch /= c
    writer.add_scalar('training loss', loss_epoch, e+1)
#     scheduler.step()
    val_loss_epoch, val_acc_epoch = validate()
    writer.add_scalar('validation loss', val_loss_epoch, e+1)
    writer.add_scalar('validation accuracy', val_acc_epoch, e+1)
    print('Train loss {}, Val loss {}, Val accuracy {}'.format(loss_epoch, val_loss_epoch, val_acc_epoch))
t_end = time.time()
print('time taken {}'.format(t_end-t_start))


In [None]:
# save state dict or entire model
if not os.path.exists('models'):
    os.makedirs('models')
torch.save(myNet, 'models/1_layer_lstm_classifier')
torch.save(myNet.state_dict(), 'models/1_layer_lstm_classifier_state_dict')

# Music Generation LSTM




In [None]:
a = np.zeros((3,2))
print(a.shape)
a = a[None]
print(a.shape)

In [None]:
# dataset for classification
class predictionDataset(Dataset):
    def __init__(self, root_dir):
        self.num_files = len(os.listdir(root_dir))
        self.files = [os.path.join(root_dir, '{}.npz'.format(i)) for i in range(self.num_files)]
    def __len__(self):
        return self.num_files
    
    def __getitem__(self, idx):
        data = np.load(self.files[idx])
        return torch.FloatTensor(data['clip'][:8]/127>0.5),\
               torch.tensor(data['clip'][8:]/127>0.5, dtype=torch.long)


class melodyNet(nn.Module):
    def __init__(self, obs_dim=88, hidden_dim=100, layers=2, pred_steps=12):
        super(melodyNet, self).__init__()
        self.obs_dim = obs_dim
        self.hidden_dim = hidden_dim
        self.pred_steps = pred_steps
        self.lstm = nn.LSTM(self.obs_dim, obs_dim, layers) 
#         self.sigmoid = nn.Sigmoid()
        self.fc = nn.Linear(hidden_dim, obs_dim*2)
        self.softmax = nn.Softmax(dim=-1)
        
    def forward(self,x):  # x in shape [batch_size, seq_len, obs_dim]
        batchSize, seqLen, _ = x.shape
        
        # reshape,feed to lstm
        interm = x.transpose(0,1)                           # reshape for lstm [seq_len, batch_size, inp_dim]
        interm, h = self.lstm(interm)                          # initialize the hidden states with some data
        interm = interm[-1][None]
        out = torch.zeros((batchSize, self.pred_steps, self.obs_dim), dtype=torch.float)
        for i in range(self.pred_steps):
            interm, h = self.lstm(interm, h)  # interm is [1, batch_size, hid_dim]
            out[:, i, :] = self.softmax(interm.view(batchSize, self.obs_dim, 2))[:,:,-1]
#             out[:, i, :] = self.fc(interm.view(-1,self.hidden_dim)).view(batchSize, 1, self.obs_dim)
            
        return(out)
    

class melodyNet2(nn.Module):
    def __init__(self, obs_dim=88, hidden_dim=200, layers=5, pred_steps=12):
        super(melodyNet2, self).__init__()
        self.obs_dim = obs_dim
        self.hidden_dim = hidden_dim
        self.pred_steps = pred_steps
        self.lstm = nn.LSTM(self.obs_dim, hidden_dim, layers) 
        self.sigmoid = nn.Sigmoid()
        self.fc = nn.Linear(hidden_dim, obs_dim*2)
        self.softmax = nn.Softmax(dim=-1)
        
    def forward(self,x):  # x in shape [batch_size, seq_len, obs_dim]
        batchSize, seqLen, _ = x.shape
        
        # reshape,feed to lstm
        interm = x.transpose(0,1)                           # reshape for lstm [seq_len, batch_size, inp_dim]
        interm, h = self.lstm(interm)                          # initialize the hidden states with some data
        interm = self.fc(interm[-1])
        interm = interm.view(1,batchSize,self.obs_dim,2)[:,:,:,-1]
#         .view(1,batchSize,self.obs_dim, 2)[:,:,-1] # [1,batch_size, obs_dim]
        out = torch.zeros((batchSize, self.pred_steps, self.obs_dim, 2), dtype=torch.float).to(device)
        for i in range(self.pred_steps):
            interm, h = self.lstm(interm, h)  # interm is [1, batch_size, hid_dim]
            interm = self.fc(interm[-1])
            interm = interm.view(1,batchSize,self.obs_dim,2)
            out[:, i, :, :] += interm.view(batchSize, self.obs_dim, 2).to(device)
#             out[:, i, :] = self.fc(interm.view(-1,self.hidden_dim)).view(batchSize, 1, self.obs_dim)
            interm = interm[:,:,:,-1]
            
            
        return(out)



def loss_fn(outputs,labels,criterion):
    _,_,_,outDim = outputs.shape
    loss = criterion(outputs.contiguous().view(-1,outDim), labels.contiguous().view(-1))
    return(loss)
    
val_frac = 0.8
batch_size = 20
device = 'cuda:0'
class_dataset = predictionDataset('./preprocessed_separate/bach')
dataset_size = len(class_dataset)
val_size = int(val_frac*dataset_size)
train_size = dataset_size - val_size
train_dataset, val_dataset = random_split(class_dataset, [train_size, val_size])
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=10)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=10)

myNet = melodyNet2()
# optimizer
criterion = nn.CrossEntropyLoss(weight=torch.FloatTensor([0.2,0.8]))
learning_rate = 1e-3
optimizer = torch.optim.Adam(myNet.parameters(), lr=learning_rate)
lr_func = lambda e: 0.99**e
scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_func)
epochs = 100

myNet = myNet.to(device)
for x,y in train_loader:
    print(x.shape, y.shape)
    with torch.no_grad():
        out = myNet(x.to(device))
        loss = loss_fn(out, y.to(device), criterion.to(device))
        print(loss)
    break
    
    
# train
print('starting')
myNet = myNet.to(device)
t_start = time.time()
# tensorboard

writer = SummaryWriter('logs/generation_bach_1lstm_1_fc_lr_{}_1'.format(learning_rate))

def validate():
    val_loss_epoch, c = 0, 0
    for X,Y in val_loader:
        with torch.no_grad():
            outputs = myNet(X.to(device))
            loss = loss_fn(outputs.to(device), Y.to(device), criterion.to(device))
            val_loss_epoch += loss.data
            c += 1
    val_loss_epoch /= c
    return val_loss_epoch
    
val_loss = validate()
print('Val loss {}'.format(val_loss))
writer.add_scalar('validation loss', val_loss, 0)

In [None]:
for e in range(1000):
    loss_epoch, c = 0, 0
    for X,Y in train_loader:
        optimizer.zero_grad()
        outputs = myNet(X.to(device))
#         print('oputputs', outputs)
#         print('y', Y)
#         outputs[:]=0
        loss = loss_fn(outputs.to(device), Y.to(device), criterion)
#         print(loss)
        loss.backward()
        optimizer.step()
        loss_epoch += loss.data
        c += 1
    loss_epoch /= c
    writer.add_scalar('training loss', loss_epoch, e+1)
    scheduler.step()
    val_loss_epoch = validate()
    writer.add_scalar('validation loss', val_loss_epoch, e+1)
    print('Train loss {}, Val loss {}'.format(loss_epoch, val_loss_epoch))
t_end = time.time()
print('time taken {}'.format(t_end-t_start))

In [None]:
# save state dict or entire model
if not os.path.exists('models'):
    os.makedirs('models')
torch.save(myNet, 'models/bach_lstm_predictor')
torch.save(myNet.state_dict(), 'models/bach_state_dict')

In [None]:
# Compare generated vs true on validation set    
from music21 import midi
print('here')
def play_clip(path):
    mf = midi.MidiFile()
    mf.open(path) # path='abc.midi'
    mf.read()
    mf.close()
    s = midi.translate.midiFileToStream(mf)
    s.show('midi')


X,Y = val_dataset[4]
with torch.no_grad():
    outputs = myNet(X[None].to(device)).to('cpu').numpy()[0]*127
    outputs = np.argmax(outputs, axis=-1)
    print('outputs', outputs.shape)
    outputs = outputs.astype(np.uint8)
    print('max val', np.min(outputs, axis=0))
#     print(outputs)
#     print(outputs.shape)
#     outputs = np.random.choice([0,127], p=[0.99,0.01], size=(20,88))
    x = (X.to('cpu').numpy()*127).astype(int)
    y = (Y.to('cpu').numpy()*127).astype(int)
    print(x.shape, y.shape, outputs.shape)
    gt = np.concatenate((x,y), axis=0)
    generated = np.concatenate((x,outputs), axis=0)
    mid = arry2mid(gt)
    mid.save('gt.mid')

    mid = arry2mid(generated)
    mid.save('generated.mid')

    play_clip('gt.mid')
    play_clip('generated.mid')
    print(generated[-1])
    visualize(gt)
    visualize(generated)
#     plt.plot(range(part.shape[0]), np.multiply(np.where(part>0, 1, 0), range(1, 89)), marker='.', markersize=1, linestyle='')
#     plt.title("Generated")
#     plt.show()
# for X,Y in val_loader:
#     with torch.no_grad():
#         
#                 
#         break


In [None]:
# visualize data
# a = np.random.random((3,3))
# print(a)
# print(a[[1,2],1])
# import torch
# torch.__version__
int('0308')
max([3,0,1])

In [None]:
import torch
a = torch.randint(5, size = (5,2))
b = torch.tensor([1,2])
print(a,b)        
print(torch.matmul(a,b))

class musicClassifyNet(nn.Module):
    def __init__(self, inpDim, hiddenDim, outDim, initSteps):
        super(adversaryNet, self).__init__()
        self.inpDim = inpDim
        self.hiddenDim = hiddenDim
        self.outDim = outDim
        self.initSteps = initSteps                     
        self.lstm1 = nn.LSTM(inpDim, hiddenDim)           # Input dim is 3, output dim is 3
        self.lstm2 = nn.LSTM(hiddenDim, hiddenDim)
        self.fc1 = nn.Linear(hiddenDim, outDim)
#         self.fc2 = nn.Linear(hiddenDim, outDim)
        
    def forward(self,x):  # x in shape [batch_size, seq_len, inp_dim]
        batchSize, seqLen, _ = x.shape
        
        # reshape,feed to lstm
        out = x.transpose(0,1)                           # reshape for lstm [seq_len, batch_size, inp_dim]
        initData, data = out[:self.initSteps], out[self.initSteps:]  # initialization data and actual data to generate output
        out, h1 = self.lstm1(initData)                  # initialize the hidden states with some data
#         _, h2 = self.lstm2(out)
        
        out, _ = self.lstm1(data, h1)                         # get actual output to be use for prediction
#         out, _ = self.lstm2(out, h2)
        
        # reshape and pass through fcn
        out = out.transpose(0,1).contiguous().view(-1,self.hiddenDim)    # [(batch_size)*(seqLen-initSteps)) X hiddenDim]
        out = self.fc1(out)                                              # [(batch_size)*(seqLen-initSteps)) X outDim]
#         out = self.fc2(out)
        
        # reshape and return
        out = out.view(batchSize, seqLen-self.initSteps,self.outDim) # batch_size x (seqLen-initSteps) X outDim
        return(out)