In [1]:
import numpy as np
#from sklearn.datasets import fetch_mldata
#from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score
from scipy.signal import convolve2d, convolve
import torch
from torch import nn
from torch.autograd import Variable
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
# from torchvision.datasets import MNIST
# from torchvision import transforms
from IPython.display import clear_output
from typing import Tuple
from tqdm import *
%matplotlib inline

In [2]:
def corrected_f1_score(y_pred: np.ndarray, y_true: np.ndarray) -> float:
    scores = []
    for predict, true in zip(y_pred, y_true):
        pred_size = (predict >= predict[-1]).sum()
        predict = np.argsort(predict)[::-1][:pred_size]

        size = max(pred_size, (true > -1).sum())
        if size > len(predict):
            predict = np.pad(predict, (0, size - len(predict)), constant_values=-1)

        score = f1_score(true[:size], predict, average="macro")
        scores.append(score)
    return sum(scores) / len(scores)

In [3]:
# pred = np.array([
#     np.array([0.1, 0.9, 0.3, 0.8]),
#     np.array([0.0, 0.4, 0.7])
# ])
# y = np.array([
#     np.array([1, 3, -1, -1]),
#     np.array([2, -1, -1, -1]),
# ])
pred = torch.Tensor([
    [0.1, 0.9, 0.3, 0.8],
    [0.1, 0.9, 0.3, 0.4],
])
y = torch.Tensor([
    [1, 3, -1, -1],
    [2, -1, -1, -1],
])

sample = np.array([
    [6, 1, 1, 4],
    [0, 5, 4, 3],
])

corrected_f1_score(pred.data.numpy(), y.data.numpy())

0.5

In [4]:
import matplotlib.pyplot as plt
import note_seq
import bokeh
import librosa.display

from src.features.build_features import detokenize
from src.entities.audio_params import AudioParams
from src.entities.dataset_params import DatasetParams
from src.data.make_dataset import WavMidiDataset, AudioDataset


SAMPLE_RATE = 44100
FRAME_LENGTH = 4096
OVERLAPPING = 8


audio_params = AudioParams(
    sample_rate=SAMPLE_RATE,
    frame_length=FRAME_LENGTH,
    n_mels=512,
    fmin=8,
    fmax=12500,
    window="hann"
)
train_params = DatasetParams(
    root_path="../data/raw/maestro-v3.0.0/",
    metadata="maestro-v3.0.0.csv",
    years_list=[2018],
    split="train",
    audio_params=audio_params,
    feature_size=3,
    overlapping=OVERLAPPING,
)
test_params = DatasetParams(
    root_path="../data/raw/maestro-v3.0.0/",
    metadata="maestro-v3.0.0.csv",
    years_list=[2018],
    split="test",
    audio_params=audio_params,
    feature_size=3,
    overlapping=OVERLAPPING,
)



In [5]:
train_ds = WavMidiDataset(train_params)
test_ds = WavMidiDataset(test_params)

In [6]:
len(train_ds), len(test_ds)

(70, 10)

In [7]:
# frames, notes, times = train_ds[37]
# train_audio_ds = AudioDataset(frames, notes)
# train_loader = DataLoader(train_audio_ds, 1000)

# frames, notes, times = test_ds[7]
# test_audio_ds = AudioDataset(frames, notes)
# test_loader = DataLoader(test_audio_ds, 1000)

# train_loader = DataLoader(train_ds, 1)
# test_loader = DataLoader(test_ds, 1)

In [8]:
# loss = nn.MultiLabelMarginLoss()
# x = torch.FloatTensor([[0.1, 0.9, 0.1, 1]])
# # for target y, only consider labels 3 and 0, not after label -1
# y = torch.LongTensor([[1, 3, -1, -1]])
# # 0.25 * ((1-(0.1-0.2)) + (1-(0.1-0.4)) + (1-(0.8-0.2)) + (1-(0.8-0.4)))
# loss(x, y)

In [9]:
frames, notes, _ = train_ds[10]
audio_ds = AudioDataset(frames, notes)
audio_loader = DataLoader(audio_ds, 15)

In [15]:
audio_ds[0][0]

array([[[-13.187327, -11.557513, -10.042399],
        [-11.021313,  -9.361819, -10.47492 ],
        [-20.814358, -21.469704, -21.86817 ],
        ...,
        [-45.032433, -45.032433, -45.032433],
        [-45.032433, -45.032433, -45.032433],
        [-45.032433, -45.032433, -45.032433]]], dtype=float32)

In [11]:
for X, y in audio_loader:
    pitch, vel = y
    print(X.shape)
    break

torch.Size([15, 1, 512, 3])


In [12]:
def train(network, epochs: int, learning_rate=1e-4, batch_size=100):
    loss_nll = nn.NLLLoss()
    loss_multi = nn.MultiLabelMarginLoss()
    # loss_mse = nn.MSELoss()
    
    optimizer = torch.optim.Adam(network.parameters(), lr=learning_rate)
    train_loss_epochs = []
    test_loss_epochs = []
    train_accuracy_epochs = []
    test_accuracy_epochs = []
    
    try:
        for epoch in range(epochs):

            
            # train
            losses_pitch = []
            losses_vel = []
            accuracies_pitch = []
            accuracies_vel = []
            
            for i, item in enumerate(train_ds):
                frames, notes, _ = item
                train_audio = AudioDataset(frames, notes)
                train_audio_loader = DataLoader(train_audio, batch_size)
                audio_iter = tqdm(train_audio_loader)
                for X, y in audio_iter:
                    pitch, vel = y
                    network.zero_grad()

                    # prediction = network.forward(X)
                    pred_pitch, pred_vel = network.forward(X)

                    loss_batch_pitch = loss_multi(pred_pitch, pitch)
                    losses_pitch.append(loss_batch_pitch.item())
                    loss_batch_pitch.backward()

                    audio_iter.set_description("Epoch: %04d, Audio: %04d/%04d, Iter Loss: %.4f"  %(epoch, i, len(train_ds), loss_batch_pitch))

                    # loss_batch_vel = loss_nll(pred_vel, y[1])
                    # losses_vel.append(loss_batch_vel.item())
                    # loss_batch_vel.backward()

                    optimizer.step()
                    accuracies_pitch.append(corrected_f1_score(pred_pitch.detach().numpy(), pitch.detach().numpy()))
                    # accuracies_pitch.append(losses_pitch[-1])
                    # left count vel
                    # accuracies_vel.append()

            train_loss_epochs.append(np.mean(losses_pitch))
            train_accuracy_epochs.append(np.mean(accuracies_pitch))

            # test
            losses = []
            accuracies = []

            for i, item in enumerate(test_ds):
                frames, notes, _ = item
                test_audio = AudioDataset(frames, notes)
                test_audio_loader = DataLoader(test_audio, batch_size)
                audio_iter = tqdm(test_audio_loader)
                for X, y in audio_iter:
                    pitch, vel = y

                    # prediction = network.forward(X)
                    pred_pitch, pred_vel = network.forward(X)

                    loss_batch = loss_multi(pred_pitch, pitch)
                    losses.append(loss_batch.item())

                    audio_iter.set_description("Epoch: %04d, Audio: %04d/%04d, Iter Loss: %.4f"  %(epoch, i, len(test_ds), loss_batch))

                    accuracies.append(corrected_f1_score(pred_pitch.detach().numpy(), pitch.detach().numpy()))
                    # accuracies.append(losses[-1])

            test_loss_epochs.append(np.mean(losses))
            test_accuracy_epochs.append(np.mean(accuracies))
            
            # output
            clear_output(True)
            print('\rEpoch {0}... (Train/Test) NLL: {1:.3f}/{2:.3f}\tAccuracy: {3:.3f}/{4:.3f}'.format(
                        epoch, train_loss_epochs[-1], test_loss_epochs[-1],
                        train_accuracy_epochs[-1], test_accuracy_epochs[-1]))
            plt.figure(figsize=(12, 5))
            plt.subplot(1, 2, 1)
            plt.plot(train_loss_epochs, label='Train')
            plt.plot(test_loss_epochs, label='Test')
            plt.xlabel('Epochs', fontsize=16)
            plt.ylabel('Loss', fontsize=16)
            plt.legend(loc=0, fontsize=16)
            plt.grid()
            plt.subplot(1, 2, 2)
            plt.plot(train_accuracy_epochs, label='Train accuracy')
            plt.plot(test_accuracy_epochs, label='Test accuracy')
            plt.xlabel('Epochs', fontsize=16)
            plt.ylabel('Loss', fontsize=16)
            plt.legend(loc=0, fontsize=16)
            plt.grid()
            plt.show()
    except KeyboardInterrupt:
        pass

In [28]:
maxpool = nn.MaxPool2d((4, 1))
maxpool2 = nn.MaxPool2d((1, 3))

for X, y in audio_loader:
    pitch, vel = y
    X = maxpool(X)
    print(X.shape)
    X = maxpool2(X)
    print(X.shape)
    break

torch.Size([15, 1, 128, 3])
torch.Size([15, 1, 128, 1])


In [31]:
class DummyNet(nn.Module):
    def __init__(self) -> None:
        super().__init__()

        self.conv_layer1 = nn.Sequential(
            nn.Conv2d(1, 64, 3, padding=1),
            nn.Dropout(0.3),
            nn.Tanh(),
            nn.MaxPool2d((4, 1))
        )
        self.conv_layer2 = nn.Sequential(
            nn.Conv2d(64, 128, 3, padding=1),
            nn.Dropout(0.3),
            nn.Tanh(),
            nn.MaxPool2d(1, 3)
        )
        self.flatten = nn.Flatten()

        self.linear_layer = nn.Sequential(
            nn.Linear(5504, 3000),
            nn.Dropout(0.3),
            nn.ReLU(),
            nn.Linear(3000, 1000),
            nn.Dropout(0.3),
            nn.ReLU(),
            nn.Linear(1000, 129),
            nn.LogSoftmax(dim=1)
        )

    def forward(self, x):
        x = self.conv_layer1(x)
        x = self.conv_layer2(x)
        x = self.flatten(x)
        x = self.linear_layer(x)
        return x, 90

In [32]:
net = DummyNet()
train(net, 20, 1e-5, 100)

Epoch: 0000, Audio: 0000/0070, Iter Loss: 0.1127: 100%|██████████| 203/203 [02:08<00:00,  1.58it/s]
Epoch: 0000, Audio: 0001/0070, Iter Loss: 0.6439:  76%|███████▌  | 154/203 [01:37<00:30,  1.59it/s]


In [45]:
frames, notes, times = train_ds[37]
midi_filename, _ = train_ds._data.iloc[37]
ns = note_seq.midi_file_to_note_sequence(train_ds._root_path + midi_filename)

In [47]:
notes[0]

(array([128,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,
         -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,
         -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,
         -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,
         -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,
         -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,
         -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,
         -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,
         -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,
         -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1,  -1]),
 array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0

In [31]:
frames_reshaped = torch.Tensor(frames.reshape(-1, 128))

In [32]:
frames_reshaped.shape

torch.Size([24094, 128])

In [33]:
pred_pitch, pred_vel = net(frames_reshaped)

In [34]:
pred_pitch.shape

torch.Size([24094, 129])

In [35]:
def decode_predict(pitches, vels):
    notes = []
    pitches = pitches.detach()
    softmax = nn.Softmax(dim=1)
    pitches = softmax(pitches).numpy()
    for pitch in pitches:
        pred_size = (pitch >= pitch[128]).sum()
        # pred_size = 5
        pitch = np.argsort(pitch)[::-1][:pred_size]
        vel = np.full_like(pitch, vels)

        notes.append((pitch, vel))
    return notes

In [36]:
pred_notes = decode_predict(pred_pitch, pred_vel)

In [None]:
import bokeh

fig = note_seq.plot_sequence(ns, False)
fig_proc = note_seq.plot_sequence(detokenize(pred_notes, times, audio_params.frame_time), False)

fig.height = 500
fig_proc.height = 500
fig.width = 1500
fig_proc.width = 1500

bokeh.plotting.output_notebook()
bokeh.plotting.show(fig)
bokeh.plotting.show(fig_proc)

### Conv2D

In [5]:
#constants
image_size = (28, 28)
conv1 = 32
conv2 = 64

# conv_nn
class ConvTwoD(nn.Module):
    def __init__(self, image_size: Tuple[int, int],
        conv1=conv1, conv2=conv2
    ):
        super(ConvTwoD, self).__init__()
        self.conv_layers1 = nn.Sequential(
            nn.Conv2d(1, conv1, kernel_size=(5,5), stride=(1,1)),
            nn.Tanh(),
            nn.Dropout(0.5),
            nn.MaxPool2d(kernel_size=(2,2), stride=(2,2))
        )
        
        self.conv_layers2 = nn.Sequential(
            nn.Conv2d(conv1, conv2, kernel_size=(3,3), stride=(2,2)),
            nn.Tanh(),
            nn.Dropout(0.5),
            nn.MaxPool2d(kernel_size=(2,2))
        )
        
        self.linear_velocity = nn.Sequential(
            nn.Linear(
                conv2 * image_size[0]//2 * image_size[1]//2,
                128
            ),
            nn.LogSoftmax(dim=1)
        )

        self.linear_pitch = nn.Sequential(
            nn.Linear(
                conv2 * image_size[0]//2 * image_size[1]//2,
                128
            ),
            nn.LogSoftmax(dim=1)
        )
    
    def forward(self, x):
#         print("======")
#         print(x.shape)
        x = self.conv_layers1(x)
#         print(x.shape)
        x = self.conv_layers2(x)
#         print(x.shape)
        x = x.view(x.size(0), -1)
#         print(x.shape)
        pitch = self.linear_pitch(x)
        velocity = self.linear_velocity(x)
        
        return pitch, velocity

### Conv1D

In [6]:
#constants
image_size = 28
conv1 = 32
conv2 = 64

# conv_nn
class ConvOneD(nn.Module):
    def __init__(self, image_size: Tuple[int, int],
        conv1=conv1, conv2=conv2
    ):
        super(ConvOneD, self).__init__()
        self.conv_layers1 = nn.Sequential(
            # nn.Conv2d(1, conv1, kernel_size=(5,5), stride=(1,1)),
            nn.Tanh(),
            nn.Dropout(0.5),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        
        self.conv_layers2 = nn.Sequential(
            # nn.Conv2d(conv1, conv2, kernel_size=(3,3), stride=(2,2)),
            nn.Tanh(),
            nn.Dropout(0.5),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        
        self.linear_velocity = nn.Sequential(
            nn.Linear(
                conv2 * image_size,
                128
            ),
            nn.LogSoftmax(dim=1)
        )

        self.linear_pitch = nn.Sequential(
            nn.Linear(
                conv2 * image_size,
                128
            ),
            nn.LogSoftmax(dim=1)
        )
    
    def forward(self, x):
#         print("======")
#         print(x.shape)
        x = convolve(x, 5, mode="same")
        x = self.conv_layers1(x)
#         print(x.shape)
        x = convolve(x, 5, mode="same")
        # x = self.conv_layers2(x)
#         print(x.shape)
        x = x.view(x.size(0), -1)
#         print(x.shape)
        pitch = self.linear_pitch(x)
        velocity = self.linear_velocity(x)
        
        return pitch, velocity

In [None]:
network1 = Conv(image_size=image_size)
train(network1, 5)