In [25]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch
import numpy as np
from tqdm import trange
import random
from torch.utils.data import TensorDataset
import matplotlib.pyplot as plt
import warnings 
import pandas as pd
torch.manual_seed(0)
np.random.seed(0)
torch.cuda.manual_seed_all(0)
random.seed(0)

In [26]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
warnings.filterwarnings("ignore") 

In [27]:
mask_mono = np.array([0, -1, -1, -1, 0, 0, 0])

In [28]:
def split_input(inputs):
    return inputs[:, np.where(mask_mono==0)].squeeze(), \
        inputs[:, np.where(mask_mono!=0)].squeeze() * torch.tensor(mask_mono[np.where(mask_mono!=0)][None,:], dtype=torch.float32).to(device)

In [29]:
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=50):
    losses = []
    val_losses = []
    for _ in trange(num_epochs):
        model.train()
        total = 0
        losses_buffer = []
        for inputs, labels in train_loader:
            inputs_free, inputs_mono = split_input(inputs)
            optimizer.zero_grad()
            outputs = model(inputs_free.float(), inputs_mono.float())
            loss = criterion(outputs, labels.float())
            losses_buffer.append(loss)
            loss.backward()
            optimizer.step()
            
            total += labels.size(0)
        losses.append(np.mean([el.detach().cpu() for el in losses_buffer]))
        
        val_loss = validate_model(model, val_loader, criterion)
        val_losses.append(val_loss)
    
    return losses, val_losses

def validate_model(model, val_loader, criterion):
    model.eval()
    val_loss = []
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs_free, inputs_mono = split_input(inputs)
            outputs = model(inputs_free.float(), inputs_mono.float())
            loss = criterion(outputs, labels.float())
            val_loss += [loss.item()]
    return np.mean(val_loss)

In [30]:
class MonotonicLinear(nn.Linear):
    def __init__(
        self,
        in_features: int, 
        out_features: int, 
        bias: bool = True,
        device=None, 
        dtype=None,
        pre_activation=nn.Identity(),
    ):
        super().__init__(in_features, out_features, bias=bias, device=device, dtype=dtype)
        self.act = pre_activation
        
    def forward(self, x):
        w_pos = self.weight.clamp(min=0.0)
        w_neg = self.weight.clamp(max=0.0)
        x_pos = F.linear(self.act(x), w_pos, self.bias)
        x_neg = F.linear(self.act(-x), w_neg, self.bias)  
        return x_pos + x_neg
    
class MonoModel(torch.nn.Module):
    def __init__(self, input_size_mono, num_layers_mono, num_layers_pre_mono, num_neurons_mono, num_neurons_pre_mono, activation=nn.ReLU()) -> None:
        super().__init__()
        self.pre_mono = torch.nn.ModuleList([torch.nn.LazyLinear(num_neurons_pre_mono) for _ in range(num_layers_pre_mono)])
        self.mono = torch.nn.ModuleList(
            [
                MonotonicLinear(input_size_mono + num_neurons_pre_mono, num_neurons_mono, pre_activation=nn.Identity()),
                *[MonotonicLinear(num_neurons_mono, num_neurons_mono, pre_activation=activation) for _ in range(num_layers_mono)],
                MonotonicLinear(num_neurons_mono, 1, pre_activation=activation),
            ]
        )
    def forward(self, x, x_mono):
        for layer in self.pre_mono:
            x = torch.nn.functional.relu(layer(x))
        
        x = torch.cat((x, x_mono), dim=-1)
        for layer in self.mono:
            x = layer(x)
        
        return x

In [31]:
def run(SEED, n, lr=1e-3, activation=nn.ReLU()):
    torch.manual_seed(SEED)
    np.random.seed(SEED)
    torch.cuda.manual_seed_all(SEED)
    random.seed(SEED)

    df_train = pd.read_csv('data/train_auto.csv',header = None)
    df_train = df_train.dropna(axis=0)        
    X_train = df_train.to_numpy()[:,:-1]
    y_train = df_train.to_numpy()[:,-1:]

    df_val = pd.read_csv('data/test_auto.csv',header = None)
    df_val = df_val.dropna(axis=0)
    X_val = df_val.to_numpy()[:,:-1]
    y_val = df_val.to_numpy()[:,-1:]

    X_train = torch.tensor(X_train).to(device).float()
    X_val = torch.tensor(X_val).to(device).float()
    y_train = torch.tensor(y_train).to(device).float()
    y_val = torch.tensor(y_val).to(device).float()

    train_loader = torch.utils.data.DataLoader(TensorDataset(X_train, y_train), batch_size=8, shuffle=True, drop_last=True)
    val_loader = torch.utils.data.DataLoader(TensorDataset(X_val, y_val), batch_size=8, shuffle=True, drop_last=True)
    
    model = MonoModel((mask_mono!=0).sum(),3,3, n,n, activation=activation).to(device)
    criterion = torch.nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr)
    losses, val_losses = train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=300)
    print("SEED", SEED, "VAL LOSS", np.min(val_losses))
    return losses, val_losses

In [32]:
n = 8
activation = nn.CELU()
train_losses, val_losses = [],[]
for seed in range(5):
    ltrain, lval = run(seed, n, activation=activation)
    train_losses.append(ltrain)
    val_losses.append(lval)

print("---------------------------------")
print("Mean", np.mean([np.min(l) for l in val_losses]))
print("Std", np.std([np.min(l) for l in val_losses]))
print("---------------------------------")

  0%|          | 0/300 [00:00<?, ?it/s]

100%|██████████| 300/300 [00:56<00:00,  5.28it/s]


SEED 0 VAL LOSS 7.126317302385966


100%|██████████| 300/300 [01:09<00:00,  4.32it/s]


SEED 1 VAL LOSS 8.27412760257721


100%|██████████| 300/300 [00:58<00:00,  5.13it/s]


SEED 2 VAL LOSS 7.102252960205078


100%|██████████| 300/300 [00:59<00:00,  5.08it/s]


SEED 3 VAL LOSS 7.074843618604872


100%|██████████| 300/300 [00:53<00:00,  5.64it/s]

SEED 4 VAL LOSS 7.1600691477457685
---------------------------------
Mean 7.347522126303778
Std 0.4641505002376592
---------------------------------



