In [None]:
from dlcliche.notebook import *
from dlcliche.utils import *

import torch
import pytorch_lightning as pl

sys.path.append('..')
import common as com

param = com.yaml_load('config.yaml')

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
dirs = com.select_dirs(param=param, mode='development')
target_dir = dirs[0]
files = com.file_list_generator(target_dir)
v = com.list_to_vector_array(files[:2])
plt.plot(v[0])

In [None]:
class ToTensor1d(object):
    """
    Args:
        array: (dim,) feature array.
    """
    def __call__(self, array):
        assert len(array.shape) == 1
        return torch.Tensor(array).unsqueeze(0).to(device)
    def __repr__(self):
        return 'to_tensor_1d'

class MelsDataset(torch.utils.data.Dataset):
    def __init__(self, files, n_mels, frames, n_fft, hop_length, power, transform):
        self.transform = transform
        self.files = files
        self.n_mels, self.frames, self.n_fft = n_mels, frames, n_fft
        self.hop_length, self.power = hop_length, power
        self.X = com.list_to_vector_array(files,
                         n_mels=self.n_mels,
                         frames=self.frames,
                         n_fft=self.n_fft,
                         hop_length=self.hop_length,
                         power=self.power)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, index):
        x = self.X[index]
        x = self.transform(x)
        return x, x

# ds = MelsDataset(files[:10],
#                  n_mels=param["feature"]["n_mels"],
#                  frames=param["feature"]["frames"],
#                  n_fft=param["feature"]["n_fft"],
#                  hop_length=param["feature"]["hop_length"],
#                  power=param["feature"]["power"],
#                  transform=ToTensor1d())
# ds[0][0]

In [None]:
import os

import torch
from torch.nn import functional as F
from torch.utils.data import DataLoader
import pytorch_lightning as pl


class LinUnit(torch.nn.Module):
    def __init__(self, in_dim, out_dim):
        super().__init__()
        self.lin = torch.nn.Linear(in_dim, out_dim)
        self.bn = torch.nn.BatchNorm1d(out_dim)
        
    def forward(self, x):
        return torch.relu(self.bn(self.lin(x.view(x.size(0), -1))))


class Task2Model(pl.LightningModule):

    def __init__(self, hparams):
        super().__init__()
        # not the best model...
        self.unit1 = LinUnit(640, 128)
        self.unit2 = LinUnit(128, 128)
        self.unit3 = LinUnit(128, 128)
        self.unit4 = LinUnit(128, 128)
        self.unit5 = LinUnit(128, 8)
        self.unit6 = LinUnit(8, 128)
        self.unit7 = LinUnit(128, 128)
        self.unit8 = LinUnit(128, 128)
        self.unit9 = LinUnit(128, 128)
        self.output = torch.nn.Linear(128, 640)
        self.mseloss = torch.nn.MSELoss()
        self.hparams = hparams

    def forward(self, x):
        x = self.unit1(x.view(x.size(0), -1))
        x = self.unit2(x)
        x = self.unit3(x)
        x = self.unit4(x)
        x = self.unit5(x)
        x = self.unit6(x)
        x = self.unit7(x)
        x = self.unit8(x)
        x = self.unit9(x)
        return self.output(x)

    def training_step(self, batch, batch_nb):
        x, y = batch
        y_hat = self.forward(x)
        loss = self.mseloss(y_hat, y)
        tensorboard_logs = {'train_loss': loss}
        return {'loss': loss, 'log': tensorboard_logs}

    def validation_step(self, batch, batch_nb):
        # OPTIONAL
        x, y = batch
        y_hat = self.forward(x)
        return {'val_loss': self.mseloss(y_hat, y)}

    def validation_epoch_end(self, outputs):
        # OPTIONAL
        avg_loss = torch.stack([x['val_loss'] for x in outputs]).mean()
        tensorboard_logs = {'val_loss': avg_loss}
        return {'avg_val_loss': avg_loss, 'log': tensorboard_logs}

#     def test_step(self, batch, batch_nb):
#         # OPTIONAL
#         x, y = batch
#         y_hat = self.forward(x)
#         return {'test_loss': self.mseloss(y_hat, y)}

#     def test_epoch_end(self, outputs):
#         # OPTIONAL
#         avg_loss = torch.stack([x['test_loss'] for x in outputs]).mean()
#         logs = {'test_loss': avg_loss}
#         return {'avg_test_loss': avg_loss, 'log': logs, 'progress_bar': logs}

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.hparams.lr,
                                betas=(self.hparams.b1, self.hparams.b2))

    def get_ds(self, files):
        return MelsDataset(files,
                           n_mels=self.hparams.n_mels,
                           frames=self.hparams.frames,
                           n_fft=self.hparams.n_fft,
                           hop_length=self.hparams.hop_length,
                           power=self.hparams.power,
                           transform=ToTensor1d())

    def train_dataloader(self):
        # REQUIRED
        return DataLoader(self.get_ds(files), batch_size=self.hparams.batch_size,
                          shuffle=self.hparams.shuffle)

    def val_dataloader(self):
        # OPTIONAL
        return DataLoader(self.get_ds(files[:512]), batch_size=self.hparams.batch_size, shuffle=False)

#     def test_dataloader(self):
#         # OPTIONAL
#         return DataLoader(self.get_ds(files[:512]), batch_size=self.hparams.batch_size)


from argparse import Namespace

args = {
    'epochs': param['fit']['epochs'],
    'batch_size': param['fit']['batch_size'],
    'lr': 0.0002,
    'b1': 0.5,
    'b2': 0.999,
    'shuffle': param['fit']['shuffle'],
    'n_mels': param["feature"]["n_mels"],
    'frames': param["feature"]["frames"],
    'n_fft': param["feature"]["n_fft"],
    'hop_length': param["feature"]["hop_length"],
    'power': param["feature"]["power"],
}
hparams = Namespace(**args)

task2_model = Task2Model(hparams)
print(f'Start training with {torch.cuda.device_count()} GPU(s).')
trainer = pl.Trainer(max_epochs=hparams.epochs, gpus=torch.cuda.device_count())
trainer.fit(task2_model)   

## Data structure check

In [None]:
n_mels = 2
frames = 5
dims = frames * n_mels
vector_array_size = 50
tmp = np.array(list(range(vector_array_size + frames - 1)))
log_mel_spectrogram = np.c_[tmp, tmp+0.1].T

vector_array = np.zeros((vector_array_size, dims))
print(log_mel_spectrogram.shape, vector_array.shape)
for t in range(frames):
    vector_array[:, n_mels * t: n_mels * (t + 1)] = log_mel_spectrogram[:, t: t + vector_array_size].T

print(vector_array)
#print(log_mel_spectrogram)