In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch import optim
torch.set_default_tensor_type(torch.cuda.FloatTensor if torch.cuda.is_available() else torch.FloatTensor)
print(torch.cuda.is_available())

True


In [28]:
class SongDataTrain(Dataset):
    def __init__(self, path):
        with np.load(path) as f:
            data = f['X']
            labels = list(f['T'])
        self.data = torch.tensor(data)
        label_set = set(labels)
        mapping = {}
        for count, i in enumerate(label_set):
            mapping[i] = count
        targets = np.zeros(len(labels))
        for i in range(len(targets)):
            targets[i] = mapping[labels[i]]
        self.targets = torch.tensor(targets, dtype=torch.long)
        
        indices = np.random.choice(np.arange(len(self.targets)), len(self.targets), replace=False)
        self.targets = self.targets[indices[int(.1 * len(self.targets)):]]
        self.data = self.data[indices[int(.1 * len(self.data)):]]
    def __getitem__(self, index):
        return (self.data[index], self.targets[index])
    def __len__(self):
        return (len(self.targets))
class SongDataTest(Dataset):
    def __init__(self, path):
        with np.load(path) as f:
            data = f['X']
            labels = list(f['T'])
        self.data = torch.tensor(data)
        label_set = set(labels)
        mapping = {}
        for count, i in enumerate(label_set):
            mapping[i] = count
        targets = np.zeros(len(labels))
        for i in range(len(targets)):
            targets[i] = mapping[labels[i]]
        self.targets = torch.tensor(targets, dtype=torch.long)
        
        indices = np.random.choice(np.arange(len(self.targets)), len(self.targets), replace=False)
        self.targets = self.targets[indices[:int(.1 * len(self.targets))]]
        self.data = self.data[indices[:int(.1 * len(self.data))]]
        print(mapping)
    def __getitem__(self, index):
        return (self.data[index], self.targets[index])
    def __len__(self):
        return (len(self.targets))

In [29]:
class LSTMClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers, batch_size):
        super(LSTMClassifier, self).__init__()
        self.batch_size = batch_size
        self.num_layers = num_layers
        self.hidden_size = hidden_size
        self.input_size = input_size
        self.hidden = self.init_hidden()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
    
    def init_hidden(self):
        h_t = torch.autograd.Variable(torch.zeros(self.num_layers, self.batch_size, self.hidden_size)).cuda()
        h_c = torch.autograd.Variable(torch.zeros(self.num_layers, self.batch_size, self.hidden_size)).cuda()
        return (h_t, h_c)
    
    def forward(self, x):
        output, self.hidden = self.lstm(x, self.hidden)
        output = self.fc(output[:, -1, :])

        return output

In [30]:
# Parameters
input_size = 2584
batch_size = 100
hidden_size = 128
num_layers = 2
output_size = 10
seq_len = 64

dataset_train = SongDataTrain("./audio_sr_label.npz") #initializes our dataset
dataset_test = SongDataTest("./audio_sr_label.npz") #initializes our dataset
dataloader_train = DataLoader(dataset_train, shuffle=True, batch_size=batch_size)
dataloader_test = DataLoader(dataset_test, shuffle=True, batch_size=batch_size)
#double check ^^ above stuff

#Model
model = LSTMClassifier(input_size, hidden_size, output_size, num_layers, batch_size)

#optimizer & criterion
optimizer = optim.Adam(list(model.parameters()), lr=0.01)
criterion = nn.CrossEntropyLoss()

{b'country': 0, b'pop': 1, b'rock': 2, b'jazz': 3, b'disco': 4, b'blues': 5, b'metal': 6, b'reggae': 7, b'hiphop': 8, b'classical': 9}


In [None]:
def assess(model, test, seq_len, input_size):
    correct = 0
    total = 0
    model.eval()
    dataloader = dataloader_test if test == True else dataloader_train
    for x, labels in dataloader:
        x = x.reshape(-1, seq_len, input_size)
        x = x.type(torch.FloatTensor).cuda()
        out = model(x)
        _, predicted = torch.max(out.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        break
    return correct/total


In [26]:
def get_val_pred(model, seq_len, input_size):
    model.eval()
    predict = []
    actual = []
    for i, (x, labels) in enumerate(dataloader_test):
        x = x.reshape(-1, seq_len, input_size)
        x = x.type(torch.FloatTensor).cuda()
        out = model(x)
        _, predicted = torch.max(out.data, 1)
        predicted_n = predicted.cpu().data.numpy()
        label_n = labels.cpu().data.numpy()
        predict.append(predicted_n)
        actual.append(predicted_n)
    

    with open("predict_lstm.csv", "a") as f:
        f.write(",".join([str(e) for e in predict]) + ",")
    with open("actual_lstm.csv", "a") as f:
        f.write(",".join([str(e) for e in actual]) + ",")

In [None]:
training_acc = []
validation_acc = []
losses = []
epochs = 50

for epoch in range(epochs):
    for i, (_data, _target) in enumerate(dataloader_train):
        model.train()
        model.hidden = model.init_hidden()
        model.zero_grad()
        _data = _data.type(torch.FloatTensor).cuda()
        prediction = model(_data)
        
        loss = criterion(prediction, _target)
        loss.backward()
        optimizer.step()
        
        if i % 20 == 0:
            print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}' .format(epoch+1, epochs, i+1, len(dataloader_train), loss.item()))
            train_acc = assess(model, False, seq_len, input_size)
            training_acc.append(train_acc)
            val_acc = assess(model, True, seq_len, input_size)
            validation_acc.append(val_acc)
            losses.append(loss)
            print(train_acc, val_acc)
        
        if epoch == 49 and i % 20 == 0:
            val_pred = get_val_pred(model, seq_len, input_size)
    
    torch.save(model.state_dict(), "./model" + str(epoch).zfill(2))
    
with open("training_acc_lstm.csv", "a") as f:
    f.write(",".join([str(e) for e in training_acc]) + ",")
with open("validation_acc_lstm.csv", "a") as f:
    f.write(",".join([str(e) for e in validation_acc]) + ",")
with open("loss_acc_lstm.csv", "a") as f:
    f.write(",".join([str(e) for e in losses]) + ",")
    
    

Epoch [1/50], Step [1/9], Loss: 2.3038
0.12 0.05
Epoch [2/50], Step [1/9], Loss: 1.8758
0.26 0.2
Epoch [3/50], Step [1/9], Loss: 1.7620
0.35 0.31
Epoch [4/50], Step [1/9], Loss: 1.6711
0.36 0.39
Epoch [5/50], Step [1/9], Loss: 1.6421
0.41 0.3
Epoch [6/50], Step [1/9], Loss: 1.3416
0.5 0.47
Epoch [7/50], Step [1/9], Loss: 1.3859
0.49 0.41
Epoch [8/50], Step [1/9], Loss: 1.5634
0.56 0.49
Epoch [9/50], Step [1/9], Loss: 1.0972
0.63 0.52
Epoch [10/50], Step [1/9], Loss: 1.1015
0.66 0.62
Epoch [11/50], Step [1/9], Loss: 1.0039
0.7 0.58
Epoch [12/50], Step [1/9], Loss: 0.6878
0.66 0.7
Epoch [13/50], Step [1/9], Loss: 0.7081
0.81 0.72
Epoch [14/50], Step [1/9], Loss: 0.4946
0.82 0.81
Epoch [15/50], Step [1/9], Loss: 0.5311
0.77 0.77
Epoch [16/50], Step [1/9], Loss: 0.6382
0.86 0.75
Epoch [17/50], Step [1/9], Loss: 0.4069
0.8 0.77
Epoch [18/50], Step [1/9], Loss: 0.3470
0.82 0.81
Epoch [19/50], Step [1/9], Loss: 0.3229
0.9 0.83
Epoch [20/50], Step [1/9], Loss: 0.2279
0.92 0.81
Epoch [21/50], S

In [None]:
mapping = {b'metal': 0, b'jazz': 1, b'disco': 2, b'reggae': 3, b'country': 4, b'pop': 5, b'rock': 6, b'blues': 7, b'classical': 8, b'hiphop': 9}

In [10]:
print(mapping)

{b'metal': 0, b'jazz': 1, b'disco': 2, b'reggae': 3, b'country': 4, b'pop': 5, b'rock': 6, b'blues': 7, b'classical': 8, b'hiphop': 9}
