# Predictive Models

In the last two notebooks we had a look at two of the components of the Basis Mixer. In this notebook we add the third part of the puzzle: the **Predictive Models**.

A predictive model is defined as a mathematical which maps score information (encoded by the basis functions) $\mathbf{Y}$ to expressive parameters $\mathbf{Y}$

$$F(\boldsymbol{\Phi}) = \mathbf{Y}$$

In [7]:
import numpy as np
import torch
from basismixer.predictive_models import NNModel, FullPredictiveModel

from torch import nn


In [8]:
class DenseModel(NNModel):
    """Simple Dense FFNN
    """

    def __init__(self,
                 input_size, output_size,
                 hidden_size, dropout=0.0,
                 nonlinearity=nn.ReLU(),
                 input_names=None,
                 output_names=None,
                 input_type=None,
                 dtype=torch.float32,
                 device=None):
        super().__init__(input_names=input_names,
                         output_names=output_names,
                         input_type=input_type,
                         dtype=dtype,
                         device=device,
                         is_rnn=False)

        self.input_size = input_size
        if not isinstance(hidden_size, (list, tuple)):
            hidden_size = [hidden_size]
        self.hidden_size = hidden_size
        self.output_size = output_size

        if not isinstance(dropout, (list, tuple)):
            self.dropout = len(self.hidden_size) * [dropout]
        else:
            if len(dropout) != len(self.hidden_size):
                raise ValueError('`dropout` should be the same length '
                                 'as `hidden_size`.')

        if not isinstance(nonlinearity, (list, tuple)):
            self.nonlinearity = len(self.hidden_size) * [nonlinearity]
        else:
            if len(nonlinearity) != len(self.hidden_size):
                raise ValueError('`nonlinearity` should be the same length ',
                                 'as `hidden_size`.')

        if self.output_names is None:
            self.output_names = [str(i) for i in range(self.output_size)]

        in_features = input_size
        hidden_layers = []
        for hs, p, nl in zip(self.hidden_size, self.dropout, self.nonlinearity):
            hidden_layers.append(nn.Linear(in_features, hs))
            in_features = hs
            hidden_layers.append(nl)

            if p != 0:
                hidden_layers.append(nn.Dropout(p))

        self.hidden_layers = nn.Sequential(*hidden_layers)

        self.output = nn.Linear(in_features=self.hidden_size[-1],
                                out_features=self.output_size)

    def forward(self, x):
        h = self.hidden_layers(x)
        output = self.output(h)
        return output


In [9]:
class GRUModel(NNModel):
    """Simple RNN
    """

    def __init__(self,input_size, output_size,
                 recurrent_size, n_layers, dropout=0.0,
                 bidirectional=True,
                 batch_first=True,
                 input_names=None,
                 output_names=None,
                 input_type=None,
                 dtype=torch.float32,
                 device=None):
        super().__init__(input_names=input_names,
                         output_names=output_names,
                         input_type=input_type,
                         dtype=dtype,
                         device=device,
                         is_rnn=True)
        self.input_size = input_size
        self.recurrent_size = recurrent_size
        self.output_size = output_size
        self.n_layers = n_layers
        self.batch_first = batch_first
        self.bidirectional = bidirectional
        self.rnn = nn.GRU(input_size, self.recurrent_size,
                          self.n_layers,
                          batch_first=batch_first, dropout=dropout,
                          bidirectional=self.bidirectional)
        dense_in_features = (self.recurrent_size * 2 if
                             self.bidirectional else self.recurrent_size)
        self.output = nn.Linear(in_features=dense_in_features,
                                out_features=output_size)

        if self.output_names is None:
            self.output_names = [str(i) for i in range(self.output_size)]

    def init_hidden(self, batch_size):
        if self.bidirectional:
            n_layers = 2 * self.n_layers
        else:
            n_layers = self.n_layers
        return torch.zeros(n_layers, batch_size, self.recurrent_size)

    def forward(self, x):
        batch_size = x.size(0)
        seq_len = x.size(1)
        h0 = self.init_hidden(batch_size).type(x.type())
        # Compute output of the rnn layer
        gru_output, _ = self.rnn(x, h0)
        # adjust shape if network is bidirectional
        flatten_shape = (self.recurrent_size * 2
                         if self.bidirectional else self.recurrent_size)
        y = self.output(gru_output.contiguous().view(-1, flatten_shape))
        y = y.view(batch_size, seq_len, self.output_size)

        return y


In [10]:
input_size1 = 10
output_size1 = 3
recurrent_size = 7
hidden_size = 5
n_layers = 1

input_names1 = ['i{0}'.format(i) for i in range(input_size1)]
output_names1 = ['o{0}'.format(i) for i in range(output_size1)]
model1 = DenseModel(input_size=input_size1,
                                output_size=output_size1,
                                 hidden_size=hidden_size,
                                 input_names=input_names1,
                                 output_names=output_names1)


input_size2 = 7
output_size2 = 2
input_names2 = ['i{0}'.format(i + input_size1) for i in range(input_size2)]
output_names2 = ['o{0}'.format(i + output_size1) for i in range(output_size2)]
model2 = GRUModel(input_size=input_size2,
                               output_size=output_size2,
                               recurrent_size=recurrent_size,
                               n_layers=n_layers,
                               input_names=input_names2,
                               output_names=output_names2,
                               input_type='onsetwise')

input_names = input_names1 + input_names2 + ['not_in_input']
output_names = output_names1 + output_names2 + ['not_in_output']
model = FullPredictiveModel(
    [model1, model2], input_names, output_names,
    default_values=dict([(pn, 0) for pn in output_names]))

input_size = len(input_names)
output_size = len(output_names)

batch_size = 2
seq_len = 5

x = np.random.rand(seq_len, input_size).astype(np.float32)

onsets = np.random.randint(0, seq_len, seq_len, dtype=np.int)
onsets.sort()

preds = model.predict(x, onsets)

print(preds)


[(0.23464012, 0.14540018, 0.4331936 , 0.38567597, -0.272734  , 0.)
 (0.15854833, 0.11864954, 0.46437374, 0.38567597, -0.272734  , 0.)
 (0.23740956, 0.08178899, 0.3842159 , 0.40522426, -0.28221244, 0.)
 (0.30344945, 0.05653504, 0.4298972 , 0.4090488 , -0.308708  , 0.)
 (0.21593724, 0.02287424, 0.38663086, 0.4090488 , -0.308708  , 0.)]
