In [None]:
# default_exp models.components.common

# Common components for models
> Common functions.

In [None]:
# export
import torch as t
import torch.nn as nn
from torch.nn.utils import weight_norm
from torch.autograd.function import Function

# Chomp1d

In [None]:
# export
class Chomp1d(nn.Module):
    """
    Receives x input of dim [N,C,T], and trims it so that only 
    'time available' information is used. Used for one dimensional 
    causal convolutions.
    : param chomp_size: lenght of outsample values to skip.
    """
    def __init__(self, chomp_size):
        super(Chomp1d, self).__init__()
        self.chomp_size = chomp_size

    def forward(self, x):
        return x[:, :, :-self.chomp_size].contiguous()

# CausalConv1D

In [None]:
# export
ACTIVATIONS = ['ReLU',
               'Softplus',
               'Tanh',
               'SELU',
               'LeakyReLU',
               'PReLU',
               'Sigmoid']

class CausalConv1d(nn.Module):
    """
    Receives x input of dim [N,C,T], computes a unidimensional 
    causal convolution.
    
    Parameters
    ----------
    in_channels: int
    out_channels: int
    activation: str 
        https://discuss.pytorch.org/t/call-activation-function-from-string
    padding: int
    kernel_size: int
    dilation: int
    
    Returns:
    x: tesor
        torch tensor of dim [N,C,T]
        activation(conv1d(inputs, kernel) + bias)
    """
    def __init__(self, in_channels, out_channels, kernel_size,
                 padding, dilation, activation, stride:int=1, with_weight_norm:bool=False):
        super(CausalConv1d, self).__init__()
        assert activation in ACTIVATIONS, f'{activation} is not in {ACTIVATIONS}'
        
        self.conv       = nn.Conv1d(in_channels=in_channels, out_channels=out_channels, 
                                    kernel_size=kernel_size, stride=stride, padding=padding,
                                    dilation=dilation)
        if with_weight_norm: self.conv = weight_norm(self.conv)
        
        self.chomp      = Chomp1d(padding)
        self.activation = getattr(nn, activation)()
        self.causalconv = nn.Sequential(self.conv, self.chomp, self.activation)
    
    def forward(self, x):
        return self.causalconv(x)

# TimeDistributed

In [None]:
# export
class TimeDistributed2d(nn.Module):
    """
    Receives x input of dim [N,C,T], reshapes it to [T,N,C]
    Collapses input of dim [T,N,C] to [TxN,C] and applies module to C.
    Finally reshapes it to [N,C_out,T].
    Allows handling of variable sequence lengths and minibatch sizes.
    : param module: Module to apply input to.
    """
    def __init__(self, module):
        super(TimeDistributed2d, self).__init__()
        self.module = module

    def forward(self, x):
        N, C, T = x.size()
        x = x.permute(2, 0, 1).contiguous()
        x = x.view(T * N, -1)
        x = self.module(x)
        x = x.view(T, N, -1)
        x = x.permute(1, 2, 0).contiguous()
        return x

In [None]:
# export
class TimeDistributed3d(nn.Module):
    """
    Receives x input of dim [N,L,C,T], reshapes it to [T,N,L,C]
    Collapses input of dim [T,N,L,C] to [TxNxL,C] and applies module to C.
    Finally reshapes it to [N,L,C_out,T].
    Allows handling of variable sequence lengths and minibatch sizes.
    : param module: Module to apply input to.
    """
    def __init__(self, module):
        super(TimeDistributed3d, self).__init__()
        self.module = module

    def forward(self, x):
        N, L, C, T = x.size()
        x = x.permute(3, 0, 1, 2).contiguous() #[N,L,C,T] --> [T,N,L,C]
        x = x.view(T * N * L, -1)
        x = self.module(x)
        x = x.view(T, N, L, -1)
        x = x.permute(1, 2, 3, 0).contiguous() #[T,N,L,C] --> [N,L,C,T]
        return x

# RepeatVector

In [None]:
# export
class RepeatVector(nn.Module):
    """
    Receives x input of dim [N,C], and repeats the vector 
    to create tensor of shape [N, C, K]
    : repeats: int, the number of repetitions for the vector.
    """
    def __init__(self, repeats):
        super(RepeatVector, self).__init__()
        self.repeats = repeats
        
    def forward(self, x):
        x = x.unsqueeze(-1).repeat(1, 1, self.repeats) # <------------ Mejorar?
        return x

# L1Regularizer

In [None]:
# export
class L1Regularizer(nn.Module):
    """
    Layer meant to apply elementwise L1 regularization to a dimension.
    Receives x input of dim [N,C] and returns the input [N,C].
    """
    def __init__(self, in_features, l1_lambda):
        super(L1Regularizer, self).__init__()
        self.l1_lambda = l1_lambda
        self.weight = t.nn.Parameter(t.rand((in_features), dtype=t.float),
                                     requires_grad=True)
    
    def forward(self, x):
        # channelwise regularization, turns on or off channels
        x = t.einsum('bp,p->bp', x, self.weight)
        return x

    def regularization(self):
        return self.l1_lambda * t.norm(self.weight, 1)

In [None]:
import numpy as np
from sklearn import linear_model

np.random.seed(1)

X1  = np.random.normal(0, 1, (1000,1))
X   = np.random.normal(0, 1, (1000, 99))
X   = np.concatenate([X1, X], axis=1)
eps = np.random.normal(0, 0.1, (1000))
beta = np.array([1] + [0]*99)
Y =  X @ beta.T + eps
Y = np.expand_dims(Y, 1)
print("X.shape", X.shape)
print("beta.shape", beta.shape)
print("Y.shape", Y.shape)

# model = linear_model.Lasso(alpha=0.1)
# model.fit(X, Y)
# print("model.coef_.shape", model.coef_.shape)
# model.coef_

X.shape (1000, 100)
beta.shape (100,)
Y.shape (1000, 1)


In [None]:
from torch import nn, optim
from torch.utils.data import DataLoader, Dataset

import numpy as np
import time
from scipy.stats import hmean
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
class _Model(nn.Module):  

    def __init__(self, in_features, l1_lambda):
        super(_Model, self).__init__()
        self.l1 = L1Regularizer(in_features, l1_lambda)
        self.linear_layer = nn.Linear(in_features=in_features, 
                                      out_features=1, 
                                      bias=False)

    def forward(self, x):
        x = self.l1(x.float())
        y_hat = self.linear_layer(x)
        return y_hat
    
class Data(Dataset):
    
    # Constructor
    def __init__(self, Y, X):
        self.X = X
        self.Y = Y
        self.len = Y.shape[0]

    # Getter
    def __getitem__(self, index):          
        return self.X[index], self.Y[index]
    
    # Get Length
    def __len__(self):
        return self.len

In [None]:
model = _Model(in_features=X.shape[1], l1_lambda=0.07)
dataloader = DataLoader(dataset=Data(X=X, Y=Y), batch_size=512)
optimizer = optim.Adam(model.parameters(), lr=0.001)

print(model)

def train_model(model, epochs, print_progress=False):
    start = time.time()
    step = 0 
    training_trajectory = {'epoch': [],
                           'train_loss': []}
    
    criterion = t.nn.MSELoss()
    
    for epoch in range(epochs):
        for x, y in dataloader:
            x, y = x.float(), y.float() # Type compatibility
            
            step += 1
            y_hat = model(x)
            
            training_loss = criterion(y, y_hat) + model.l1.regularization()
            
            optimizer.zero_grad()
            training_loss.backward()
            optimizer.step()
            
        if epoch % 100 == 0: 
            training_trajectory['epoch'].append(epoch)
            train_loss = training_loss.detach().numpy()
            training_trajectory['train_loss'].append(train_loss)

            
            display_str = f'epoch: {epoch} step: {step} time: {time.time()-start:03.3f} ** '
            display_str += f'train_loss: {train_loss:.4f}'
            print(display_str)
            
    return model, training_trajectory

_Model(
  (l1): L1Regularizer()
  (linear_layer): Linear(in_features=100, out_features=1, bias=False)
)


In [None]:
#model, training_trajectory = train_model(model=model, epochs=2000)
# plt.plot(training_trajectory['epoch'], training_trajectory['train_loss'])
# plt.xlabel('Epochs')
# plt.ylabel('MSE + L1 Loss')
# plt.grid()
# plt.show()
# model.l1.weight