In [15]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

import torch

from utils.model import TherapeuticRepresentationModel
from utils.train import train, eval
from utils.preprocessing import extract_librosa_features
from utils.dataloader import TherapeuticTracksWithTimesteps
import numpy as np
import os
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter
import wandb



In [35]:
class TherapeuticLSTMClassifier(nn.Module):
    """
    This is a LSTM Model.
    """

    def __init__(self,
                 lstm_input_channels,
                 lstm_hidden_size,
                 lstm_num_layers,
                 linear_num_layers,
                 linear_hidden_size,
                 linear_output_size,
                 h0_size,
                 device):
        super(TherapeuticLSTMClassifier, self).__init__()
        self.lstm = nn.LSTM(
            lstm_input_channels, 
            lstm_hidden_size, 
            lstm_num_layers, 
            batch_first=True, 
        )

        fc_list = [nn.LeakyReLU()]

        if linear_num_layers > 1:
            fc_list.append(nn.Linear(lstm_hidden_size, linear_hidden_size))
            fc_list.append(nn.LeakyReLU())

            for _ in range(linear_num_layers-1):
                fc_list.append(nn.Linear(linear_hidden_size, linear_hidden_size))
                fc_list.append(nn.LeakyReLU())
        else:
            linear_hidden_size = lstm_hidden_size

        fc_list.append(nn.Linear(linear_hidden_size, linear_output_size))
        fc_list.append(nn.Sigmoid())

        self.mlp = nn.Sequential(*fc_list)
        self.h0 = torch.rand(h0_size).double().to(device)
        self.c0 = torch.rand(h0_size).double().to(device)

    def forward(self, x):
        x = x.double()

        out, (h0, c0) = self.lstm(x, (self.h0, self.c0))
        
        #print(out)
        #out = out[:, -1]

        x = self.mlp(out)

        return x

In [49]:
some_sequences = torch.Tensor(np.array([[0,1,2,3],[1,2,2,3],[1,2,2,1],[1,1,1,0],[1,2,3,4],[5,5,5,3],[4,4,4,5],[1,2,4,4],[4,3,2,1],[3,3,3,1]]))
y_s = torch.Tensor(np.array([[0],[0],[1],[1],[0],[1],[0],[0],[1],[1]]))

test_some_sequences = torch.Tensor(np.array([[0,0,0,3],[5,5,5,4],[1,1,1,6],[1,1,0,0]]))
test_y_s = torch.Tensor(np.array([[0],[1],[0],[1]]))

In [50]:
for x, y in zip(some_sequences, y_s):
    print(x)
    print(y)

tensor([0., 1., 2., 3.])
tensor([0.])
tensor([1., 2., 2., 3.])
tensor([0.])
tensor([1., 2., 2., 1.])
tensor([1.])
tensor([1., 1., 1., 0.])
tensor([1.])
tensor([1., 2., 3., 4.])
tensor([0.])
tensor([5., 5., 5., 3.])
tensor([1.])
tensor([4., 4., 4., 5.])
tensor([0.])
tensor([1., 2., 4., 4.])
tensor([0.])
tensor([4., 3., 2., 1.])
tensor([1.])
tensor([3., 3., 3., 1.])
tensor([1.])


In [51]:
model = TherapeuticLSTMClassifier(4, 16, 1, 1, 16, 1, (1, 16), device='cpu')
model.double()

optimizer = optim.Adam(model.parameters(), lr=0.001)

In [52]:
def train():
    model.train()
    device='cpu'
    epochs = 100
    loss_f = nn.BCELoss()
    
    for epoch in range(epochs):
        losses = []
        for x, y in zip(some_sequences, y_s):
            x = x.to(device).unsqueeze(0).double() # first dimension is batch.
            y = y.to(device).unsqueeze(0).double() # first dimension is batch.
                        
            optimizer.zero_grad()
            
            y_hat = model(x)
                        
            loss = loss_f(y_hat, y)
            losses.append(loss.item())
            
            loss.backward()
            optimizer.step()
            
        print(epoch, ": ", np.array(losses).mean())
            
train()

0 :  0.7229251008022894
1 :  0.7138763772119215
2 :  0.7064538383624777
3 :  0.6996190937510922
4 :  0.6931350165162758
5 :  0.6872302855023766
6 :  0.6813832127039844
7 :  0.6754578686674276
8 :  0.6696853524496456
9 :  0.6639161406647558
10 :  0.6581842202088252
11 :  0.6524777212768276
12 :  0.6465220853974524
13 :  0.640351968737529
14 :  0.633209165474818
15 :  0.6229146515527253
16 :  0.6138465563959202
17 :  0.6029401545425521
18 :  0.5912773683922652
19 :  0.5814131676591466
20 :  0.5707487493490541
21 :  0.5602790356898174
22 :  0.5498915185869667
23 :  0.5392108015066202
24 :  0.5284420324214492
25 :  0.5174654789013337
26 :  0.5067873210888457
27 :  0.49520123817272016
28 :  0.48397096450564075
29 :  0.4723673614945745
30 :  0.4611894975078238
31 :  0.4496761294965313
32 :  0.43859389256404235
33 :  0.42728492855181954
34 :  0.4163664284788927
35 :  0.40563688885390564
36 :  0.39466080299757633
37 :  0.3842816331061468
38 :  0.37345630031992144
39 :  0.3632788178592541
40 : 

In [53]:
def test():
    model.eval()
    device='cpu'
    epochs = 100
    loss_f = nn.BCELoss()
    
    losses = []
    for x, y in zip(test_some_sequences, test_y_s):
        x = x.to(device).unsqueeze(0).double() # first dimension is batch.
        y = y.to(device).unsqueeze(0).double() # first dimension is batch.
                                    
        y_hat = model(x)
                        
        loss = loss_f(y_hat, y)
        losses.append(loss.item())
            
               
    print("Test Loss: ", np.array(losses).mean())
    
test()

Test Loss:  0.18962018458410898
