<a href="https://colab.research.google.com/github/chengyang122/mutitaskNAM/blob/main/MutiTaskClassificationTutoral.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!git clone https://github.com/chengyang122/mutitaskNAM.git

Cloning into 'mutitaskNAM'...
remote: Enumerating objects: 24, done.[K
remote: Counting objects: 100% (24/24), done.[K
remote: Compressing objects: 100% (19/19), done.[K
remote: Total 24 (delta 2), reused 21 (delta 2), pack-reused 0[K
Unpacking objects: 100% (24/24), done.


In [4]:
cd mutitaskNAM

/content/mutitaskNAM


In [131]:
import os
import tqdm
import copy
import random
import logging
from absl import app
from absl import flags
from torch.utils.data import TensorDataset, DataLoader
import pandas as pd
import nam.metrics
import nam.data_utils
from nam.model import *

In [None]:
from typing import Union, Iterable, Sized, Tuple
import torch
import torch.nn.functional as F


def truncated_normal_(tensor, mean: float = 0., std: float = 1.):
    size = tensor.shape
    tmp = tensor.new_empty(size + (4,)).normal_()
    valid = (tmp < 2) & (tmp > -2)
    ind = valid.max(-1, keepdim=True)[1]
    tensor.data.copy_(tmp.gather(-1, ind).squeeze(-1))
    tensor.data.mul_(std).add_(mean)


class ActivationLayer(torch.nn.Module):
    def __init__(self,
                 in_features: int,
                 out_features: int):
        super().__init__()
        self.weight = torch.nn.Parameter(torch.empty((in_features, out_features)))
        self.bias = torch.nn.Parameter(torch.empty(in_features))

    def forward(self, x):
        raise NotImplementedError("abstract method called")


class ExULayer(ActivationLayer):
    def __init__(self,
                 in_features: int,
                 out_features: int):
        super().__init__(in_features, out_features)
        truncated_normal_(self.weight, mean=4.0, std=0.5)
        truncated_normal_(self.bias, std=0.5)

    def forward(self, x):
        exu = (x - self.bias) @ torch.exp(self.weight)
        return torch.clip(exu, 0, 1)


class ReLULayer(ActivationLayer):
    def __init__(self,
                 in_features: int,
                 out_features: int):
        super().__init__(in_features, out_features)
        torch.nn.init.xavier_uniform_(self.weight)
        truncated_normal_(self.bias, std=0.5)

    def forward(self, x):
        return F.relu((x - self.bias) @ self.weight)


class FeatureNN(torch.nn.Module):
    def __init__(self,
                 shallow_units: int,
                 hidden_units: Tuple = (),
                 shallow_layer: ActivationLayer = ExULayer,
                 hidden_layer: ActivationLayer = ReLULayer,
                 dropout: float = .5,
                 ):
        super().__init__()
        self.layers = torch.nn.ModuleList([
            hidden_layer(shallow_units if i == 0 else hidden_units[i - 1], hidden_units[i])
            for i in range(len(hidden_units))
        ])
        self.layers.insert(0, shallow_layer(1, shallow_units))

        self.dropout = torch.nn.Dropout(p=dropout)
        self.linear = torch.nn.Linear(shallow_units if len(hidden_units) == 0 else hidden_units[-1], 1, bias=False)
        torch.nn.init.xavier_uniform_(self.linear.weight)


    def forward(self, x):
        x = x.unsqueeze(1)
        for layer in self.layers:
            x = layer(x)
            x = self.dropout(x)
        return self.linear(x)


class NeuralAdditiveModel(torch.nn.Module):
    def __init__(self,
                 input_size: int,
                 shallow_units: int,
                 hidden_units: Tuple = (),
                 shallow_layer: ActivationLayer = ExULayer,
                 hidden_layer: ActivationLayer = ReLULayer,
                 feature_dropout: float = 0.,
                 hidden_dropout: float = 0.,
                 ):
        super().__init__()
        self.input_size = input_size

        if isinstance(shallow_units, list):
            assert len(shallow_units) == input_size
        elif isinstance(shallow_units, int):
            shallow_units = [shallow_units for _ in range(input_size)]

        self.feature_nns = torch.nn.ModuleList([
            FeatureNN(shallow_units=shallow_units[i],
                      hidden_units=hidden_units,
                      shallow_layer=shallow_layer,
                      hidden_layer=hidden_layer,
                      dropout=hidden_dropout)
            for i in range(input_size)
        ])
        self.feature_dropout = torch.nn.Dropout(p=feature_dropout)
        self.bias = torch.nn.Parameter(torch.zeros(1))

    def forward(self, x):
        f_out = torch.cat(self._feature_nns(x), dim=-1)
        f_out = self.feature_dropout(f_out)

        return f_out.sum(axis=-1) + self.bias, f_out

    def _feature_nns(self, x):
        return [self.feature_nns[i](x[:, i]) for i in range(self.input_size)]


In [148]:
#changed the FeatureNN and NeuralAdditiveFunction
class FeatureNN(torch.nn.Module):
    def __init__(self,
                 shallow_units: int,
                 hidden_units: Tuple = (),
                 shallow_layer: ActivationLayer = ExULayer,
                 hidden_layer: ActivationLayer = ReLULayer,
                 dropout: float = .5,
                 output_size = 1
                 ):
        super().__init__()
        self.layers = torch.nn.ModuleList([
            hidden_layer(shallow_units if i == 0 else hidden_units[i - 1], hidden_units[i])
            for i in range(len(hidden_units))
        ])
        self.layers.insert(0, shallow_layer(1, shallow_units))

        self.dropout = torch.nn.Dropout(p=dropout)
        self.linear = torch.nn.Linear(shallow_units if len(hidden_units) == 0 else hidden_units[-1], output_size, bias=False)
        torch.nn.init.xavier_uniform_(self.linear.weight)


    def forward(self, x):
        x = x.unsqueeze(1)
        for layer in self.layers:
            x = layer(x)
            x = self.dropout(x)
        return self.linear(x)


class NeuralAdditiveModel(torch.nn.Module):
    def __init__(self,
                 input_size: int,
                 output_size: int, #not one when it is muti class, in sigle class regression and classification it would be 1
                 shallow_units: int,
                 hidden_units: Tuple = (),
                 shallow_layer: ActivationLayer = ExULayer,
                 hidden_layer: ActivationLayer = ReLULayer,
                 feature_dropout: float = 0.,
                 hidden_dropout: float = 0.,
                 ):
        super().__init__()
        self.input_size = input_size

        if isinstance(shallow_units, list):
            assert len(shallow_units) == input_size
        elif isinstance(shallow_units, int):
            shallow_units = [shallow_units for _ in range(input_size)]

        self.feature_nns = torch.nn.ModuleList([
            FeatureNN(shallow_units=shallow_units[i],
                      hidden_units=hidden_units,
                      shallow_layer=shallow_layer,
                      hidden_layer=hidden_layer,
                      dropout=hidden_dropout,
                      output_size=output_size)
            for i in range(input_size)
        ])
        self.feature_dropout = torch.nn.Dropout(p=feature_dropout)
        self.bias = torch.nn.Parameter(torch.zeros(1))
        self.output_size = output_size
    def _feature_nns(self, x):
        return [self.feature_nns[i](x[:, i]) for i in range(self.input_size)]

    def forward(self, x):
        if self.output_size == 1:
          f_out = torch.cat(self._feature_nns(x), dim=-1)
        else: 
          f_out = torch.stack(self._feature_nns(x), dim = -1)
        # f_out = self.feature_dropout(f_out)
        return f_out.sum(axis=-1) + self.bias, f_out



In [72]:
train, (x_test, y_test) = nam.data_utils.create_test_train_fold(dataset='BreastCancer',
                                                            id_fold=1,
                                                            n_folds=10,
                                                            n_splits=3,
                                                            regression=False)
(x_train, y_train), (x_validate, y_validate) = next(train)
x_train.shape
y_train.shape
len(nam.data_utils.calculate_n_units(x_train, 1000, 2))



30

In [149]:
features = pd.read_csv('data1.csv', index_col=0)
target = pd.read_csv('data2.csv', index_col=0)
s = target['0']
oneHotTarget = pd.get_dummies(s)
x_train_new = features.to_numpy()
y_train_new = oneHotTarget.to_numpy()

In [152]:
model = NeuralAdditiveModel(
    input_size=x_train_new.shape[-1],# feature size, 0 is sample and 1 is the feature, this is one iter of torch dataloader 
    output_size = y_train_new.shape[-1],
    shallow_units=nam.data_utils.calculate_n_units(x_train_new, 1000, 2),#for feature network, it is changing with data and I am not sure why
    hidden_units=list(map(int, [])),#for feature network
    shallow_layer=ExULayer,#special operational layer designed for this model
    hidden_layer=ExULayer,
    hidden_dropout=0.3,
    feature_dropout=0.0
    ).to(device)

In [154]:
train_dataset = TensorDataset(torch.tensor(x_train_new), torch.tensor(y_train_new))
train_loader = DataLoader(train_dataset, batch_size=10, shuffle=True)

In [155]:
#first output is the sum of all output based on input samples 
#second output is the contribution of each features of all samples
for iter, (feature, target) in enumerate(train_loader):
  output, featureoutput = model(feature)
  break

In [156]:
featureoutput[0].shape #new version, every output class ( output in this case) has (input feature), sum of them will become the results.

torch.Size([200, 112])

In [140]:
features = pd.read_csv('data1.csv', index_col=0)

In [181]:
def feature_loss(fnn_out, lambda_=0.):
    return lambda_ * (fnn_out ** 2).sum() / fnn_out.shape[1]

def penalized_cross_entropy(logits, truth, fnn_out, feature_penalty=0.):
    loss = torch.nn.CrossEntropyLoss()
    return loss(logits, truth.argmax(-1)) + feature_loss(fnn_out, feature_penalty)

In [182]:
regression = False
optimizer = torch.optim.AdamW(model.parameters(),
                              lr=1e-3,
                              weight_decay=0.0)
criterion = nam.metrics.penalized_mse if regression else penalized_cross_entropy
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, gamma=0.995, step_size=1)


In [184]:
def train_one_epoch(model, criterion, optimizer, data_loader, device):
    pbar = tqdm.tqdm(enumerate(data_loader, start=1), total=len(data_loader))
    total_loss = 0
    for i, (x, y) in pbar:
        x, y = x.to(device), y.to(device)
        logits, fnns_out = model.forward(x)
        loss = criterion(logits, y, fnns_out, feature_penalty=0.0)
        total_loss -= (total_loss / i) - (loss.item() / i)
        model.zero_grad()
        loss.backward()
        optimizer.step()
        pbar.set_description(f"train | loss = {total_loss:.5f}")
    return total_loss

train_one_epoch(model, criterion, optimizer, train_loader, device)

train | loss = 1.91559: 100%|██████████| 480/480 [00:32<00:00, 14.96it/s]


1.915589958367249

In [201]:
output.shape

torch.Size([10, 200])

In [202]:
def calculate_metric(logits,
                     truths,
                     regression=True):
    """Calculates the evaluation metric."""
    if regression:
        # root mean squared error
        # return torch.sqrt(F.mse_loss(logits, truths, reduction="none")).mean().item()
        # mean absolute error
        return "MAE", ((logits.view(-1) - truths.view(-1)).abs().sum() / logits.numel()).item()
    elif logits.shape[-1] == 1:
        # return sklearn.metrics.roc_auc_score(truths.view(-1).tolist(), torch.sigmoid(logits.view(-1)).tolist())
        return "accuracy", accuracySingle(logits, truths)
    else:
        return "accuracy", accuracyMuti(logits, truths)


def accuracyMuti(logits, truths):
    return ((logits.argmax(-1)==truths.argmax(-1)).sum()/truths.numel()).item()

def accuracySingle(logits, truths):
    return (((truths.view(-1) > 0) == (logits.view(-1) > 0.5)).sum() / truths.numel()).item()

In [198]:
def evaluate(model, data_loader, device):
    total_score = 0
    metric = None
    for i, (x, y) in enumerate(data_loader, start=1):
        x, y = x.to(device), y.to(device)
        logits, fnns_out = model.forward(x)
        metric, score = calculate_metric(logits, y, regression=False)
        total_score -= (total_score / i) - (score / i)
    return metric, total_score

In [199]:
evaluate(model, train_loader, device)

('accuracy', 0.0039010417017076816)