In [None]:
from random import randint
from typing import Optional

import tqdn

import torch
from torch import nn
from torch.nn import init, functional as F
from torch.utils.data import DataLoader, TensorDataset

In [None]:
class LinearSubmodule(nn.Module):
    """
    LinearSubmodule composes a Linear layer with an activation and dropout layer
    so that we can have more fine-tuned parameterized control over the layers in
    our MultiLayerPerceptron.

    Parameters
    ----------
        in_dim, units: int, int
            Input/Output dimensions, or number of units, in this particular layer

        dropout: float = 0.5
            Probability that a node will drop out of a given layer, using nn.Dropout

        activation: nn.Module = nn.ReLU
            The activation function applied. (Debating on modifying this to use
            torch.nn.functional.relu instead so we can see the difference between
            learning extra gradients on the ReLU).

    """
    def __init__(
        self,
        in_dim: int,
        units: int,
        dropout: float = 0.5,
        activation: nn.Module = nn.ReLU,
    ):
        super().__init__()
        self.linear = nn.Linear(in_dim, units)
        self.activation = activation()
        self.dropout = nn.Dropout(dropout) if dropout > 0.0 else nn.Identity()

    def forward(self, x):
        x = self.linear(x)
        x = self.activation(x)
        x = self.dropout(x)
        return self.sequence(x)


class RepeatedSequential(nn.Sequential):
    """
    RepeatedSequential generates a Sequential submodule using a set of dimensions by
    creating multiple layers of the same moduleclass.

    Parameters
    ----------
        *dims: int
            Input/Output dimensions, or number of units, for each layer. Generates
            a sequence of layer dimensions by zipping.

        moduleclass: Type[nn.Module]
            Factory class used to generate all the layers

        **kwargs
            are forwarded to the moduleclass to configure each layer identically
            execpt for dimensions.

    """
    def __init__(self, *dims: int, moduleclass: Type[nn.Module] = LinearSubmodule, **kwargs):
        super().__init__(*[
            moduleclass(dims[i], dims[i+1], **kwargs)
            for i in range(len(dims) - 1)
        ])


class RandomizedRepeatedSequential(RepeatedSequential):
    """
    RepeatedSequential generates a Sequential submodule using a set of dimensions by
    creating multiple layers of the same moduleclass.

    Parameters
    ----------
        lower: int
            Lower bound on layer dimensions, randomly generated by random.randint

        upper: int
            Upper bound on layer dimensions, randomly generated by random.randint

        layers: int
            number of layers to generate

        **kwargs
            moduleclass and configuration kwargs to configure each layer identically
            execpt for dimensions.

    """
    def __init__(self, lower: int, upper: int, layers: int, **kwargs):
        super().__init__(self, *[randint(lower, upper) for _ in range(layers)], **kwargs)


# TODO:
#     TITLE: Layer dimension generators
#     AUTHOR: frndlytm
#     DESCRIPTION:
#
#         Write a bunch of generators for sequences of dimensions that follow
#         certain growth / decay rules. It could be really interesting to
#         evaluate various MLP dimension arrangements for multiclass classification
#
def exp_decay(start: int, stop: int, base: int) -> Iterator[int]:
    x = start

    while x >= stop:
        yield x
        x //= base



class MultiLayerPerceptron(nn.Module):
    def __init__(
        self,
        in_dim: int,
        out_dim: int,
        units: int,
        n_layers: int,
        dropout: float = 0.5,
        shift: Optional[torch.Tensor] = None,
        scale: Optional[torch.Tensor] = None
    ):
        super().__init__()

        self.shift = nn.Parameter(torch.Tensor(in_dim), requires_grad=False)
        torch.nn.init.zeros_(self.shift)
        if shift is not None:
            self.shift.data = shift

        self.scale = nn.Parameter(torch.Tensor(in_dim), requires_grad=False)
        torch.nn.init.ones_(self.scale)
        if scale is not None:
            self.scale.data = scale

        dims = [in_dim, *(units for _ in range(n_layers-2)), out_dim]
        self.layers = LinearSequential(*dims, moduleclass=LinearSubmodule)
        self.output = torch.nn.Linear(n_units, out_dim)

    def forward(self, x):
        x = (x - self.shift) / self.scale
        x = self.layers(x)
        return self.output(x)

    def classify(self, x, threshold: float = 0.5):
        with torch.no_grad():
            # TODO:
            #     TITLE: Strategy for functional layer
            #     AUTHOR: frndlytm
            #     DESCRIPTION: 
            #
            #         Decide on how we want to manage final functional layer that
            #         actually performs the classification. This could potentially
            #         be controlled externally.
            #
            y_pred = F.sigmoid(self.forward(x))

        return (y_pred > threshold).float()


In [None]:
class Initializer:
    def __call__(self, model: nn.Module) -> nn.Module:
        # TODO:
        #     TITLE: Weight Initializer
        #     AUTHOR: frndlytm
        #     DESCRIPTION:
        #
        #         Configure layers by adding handlers that dispatch on the
        #         type of layers using functools.singledispatch
        #
        ...

In [None]:
# If your PC doesn't have enough CPU Ram or Video memory, try decreasing the batch_size
BATCH_SIZE = 128


# BucketIterator allows for data to be split into buckets of equal size,
# any remaining space is filled with pad token
train_iterator, valid_iterator, test_iterator = data.BucketIterator.splits(
    (train_data, valid_data, test_data), 
    batch_size = BATCH_SIZE,
    device = device
)


# initializing model weights for better convergence
model = MultiLayerPerceptrion(...)
init = Initializer({...})
init(model)

optimizer = optim.Adam(model.parameters(), lr=0.001)  # optimizer to train the model
criterion = nn.CrossEntropyLoss()                     # loss criterion

# use gpu if available, These lines move your model to gpu from cpu if available
model = model.to(device)
criterion = criterion.to(device)

# If this line prints cuda, your machine is equipped with a Nvidia GPU and
# PyTorch is utilizing the GPU
print(device)

In [None]:
EPOCHS = 40

def train(
    epochs,
    model,
    optimizer,
    criterion,
    train_iterator,
    valid_iterator
):

    for epoch in tqdm(range(epochs)):
        train_loss = 0.0 
        valid_loss = 0.0

        # start training
        model.train()
        for batch_idx, batch in enumerate(train_iterator):
            text, tags = batch.text, batch.udtags

            # zero the gradients from last batch
            # feed the batch to the model
            optimizer.zero_grad()
            predictions = model(text)
            predictions = predictions.view(-1, predictions.shape[-1])
            tags = tags.view(-1)

            # Evaluate the loss and propagate during training.
            loss = criterion(predictions, tags)
            loss.backward()
            optimizer.step()

            # Cache the stats for epoch logging
            train_acc = categorical_accuracy(predictions, tags)
            train_loss += loss.data.item() * text.size(0)

        train_loss /= len(train_iterator)

        # start validation
        model.eval()
        with torch.no_grad():
            for batch_idx, batch in enumerate(valid_iterator):
                text, tags = batch.text, batch.udtags

                predictions = model(text)
                predictions = predictions.view(-1, predictions.shape[-1])
                tags = tags.view(-1)
                loss = criterion(predictions, tags)

                valid_acc = categorical_accuracy(predictions, tags)
                valid_loss += loss.data.item() * text.size(0)

        valid_loss /= len(valid_iterator)

        # log stats
        logging.info('\n'.join([
            f'Epoch: {epoch+1:02}',
            f'Training Loss: {train_loss:.2f}',
            f'Training Accuracy: {train_acc:.2f}',
            f'Validation Loss: {valid_loss:.2f}',
            f'Validation Accuracy: {valid_acc:.2f}',
            '\n'
        ]))

In [None]:
train(
    EPOCHS,
    model,
    optimizer,
    criterion,
    train_iterator,
    valid_iterator
)

In [None]:
# testing the accuracy on test set
def test(model, test_iterator):
    test_acc=0

    # Computes without the gradients. Use this while testing your model.
    # As we do not intend to learn from the data
    model.eval()
    with torch.no_grad():
        for batch in test_iterator:
            predictions = model(text)
            predictions = predictions.view(-1, predictions.shape[-1])
            tags = tags.view(-1)

    test_acc /= len(test_iterator)

    logging.info(f'Test Acc: {test_acc:.2f}\n')