# Index

### - Fuzzy Loss Function

### - Training a Net with FuzzyLoss (ft. pytorch_lightning)

#### a1) ResNet50

#### a2) ResNeXt50_32x4d

#### b) VGG-16

### - ...

### - FICAR System

In [1]:
import numpy as np
import matplotlib.pyplot as plt

import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import torch
import torch.nn.functional as F
from System import FICAR

from fuzzylogic.classes import Domain, Set, Rule
from fuzzylogic.hedges import very
from fuzzylogic.functions import R, S, alpha, triangular

# Fuzzy Loss Function

In [2]:
# from FuzzyLoss import FuzzyLoss

class FuzzyLoss(torch.nn.CrossEntropyLoss):
    def __init__(self, gamma, alpha=None, ignore_index=-100, reduction='mean', class_sizes=[]):
        super().__init__(weight=alpha, ignore_index=ignore_index, reduction='none')
        self.reduction = reduction
        self.gamma = gamma
        if not class_sizes:
            raise ValueError('List with class sizes is required')
        self.class_sizes = class_sizes

    def forward(self, input_, target):
        cross_entropy = super().forward(input_, target)
        target = target * (target != self.ignore_index).long()
        input_prob = torch.gather(F.softmax(input_, 1), 1, F.one_hot(target.type(torch.int64)))
        loss = torch.pow(1 - input_prob, self.gamma) * cross_entropy 
        return torch.mean(loss) if self.reduction == 'mean' \
                                else torch.sum(loss) if self.reduction == 'sum' \
                                else loss

In [3]:
y_true = torch.tensor([4,  1], dtype=torch.int64)
y_pred = torch.tensor([[0 , .7 , 0 ,0 ,  .3], [0, 0.9, 0, 0.1, 0]])

print('DF-CELoss : ', F.cross_entropy(y_pred, y_true).numpy())
print('DF-CELoss : ', torch.nn.CrossEntropyLoss()(y_pred, y_true).numpy())
print('FuzzyLoss : ', FuzzyLoss(gamma=0., class_sizes=[1,1])(y_pred, y_true).numpy())

DF-CELoss :  1.2661572
DF-CELoss :  1.2661572
FuzzyLoss :  1.2661572


In [4]:
# Definir dominios de inputs y output
balance_deg = Domain("Balance_deg", 0, 1, res=0.1)
balance_deg.low = S(0.1, 0.5)
balance_deg.medium = triangular(0.0, 1.0, c=0.5)
balance_deg.high = R(0.5, 1.0)

delta_gamma = Domain("Delta_gamma", -0.2, 0.2, res=0.01)
delta_gamma.ln = S(-0.2, -0.0)
delta_gamma.ze = triangular(-0.2, 0.2, c=-0.0)
delta_gamma.lp = R(0.0, 0.2)

# Reglas
R1 = Rule({(balance_deg.low,): delta_gamma.ln})
R2 = Rule({(balance_deg.medium,): delta_gamma.ze})
R3 = Rule({(balance_deg.high,): very(delta_gamma.lp)})
rules = R1 | R2 | R3

# Inferencia
values = {balance_deg: 0.2}
print(f'R1: {R1(values)}\tR2: {R2(values)}\tR3: {R3(values)}\nDelta Gamma ===> {rules(values)}')

R1: -0.1382113821138212	R2: -0.004878048780487976	R3: None
Delta Gamma ===> -0.09183457051961834


# Training a Net with FuzzyLoss (ft. `pytorch_lightning`)

## a1) ResNet50

In [9]:
from System import get_dataloaders_from_path, plot_images_sample

dataloaders, dataset_sizes, class_names = get_dataloaders_from_path('../data/gender_clf/')
# plot_images_sample(dataloaders['train'])
print('NTrain:', len(dataloaders['train'])*32, '// NVal:', len(dataloaders['val'])*32) # 32-img batches



NTrain: 47040 // NVal: 11680


In [16]:
import pytorch_lightning as pl
from torch import nn
from torchvision.models import resnet50

class ResNetCustom(pl.LightningModule):
    def __init__(self, gamma=0., class_sizes=[]):
        super().__init__()
        self.n_classes = len(class_sizes)
#         self.model = resnet50(pretrained=True)
#         for param in self.model.parameters():
#             param.requires_grad = False
#         self.model.fc = nn.Linear(self.model.fc.in_features, self.n_classes, bias=True)
        
        net = resnet50(pretrained=True)
        for param in rr.parameters():
            param.requires_grad = False
        self.model = nn.Sequential(
            net,
            nn.BatchNorm1d(1000),
            nn.Linear(1000, 2048),
            nn.ReLU(),
            nn.BatchNorm1d(2048),
            nn.Linear(2048, 1024),
            nn.ReLU(),
            nn.BatchNorm1d(1024),
            nn.Linear(1024, 2),
        )
            
        self.loss = FuzzyLoss(gamma=gamma, class_sizes=class_sizes).cuda()
        
    def forward(self, x):
        return self.model(x)
    
    def training_step(self, batch, batch_no):
        x, y = batch
        logits = self(x)
        loss = self.loss(logits, y)
        return loss
    
    def configure_optimizers(self):
#         return torch.optim.RMSprop(self.parameters(), lr=0.005)
        return torch.optim.SGD(self.model[0].fc.parameters(), lr=0.001, momentum=0.9)

In [17]:
model = ResNetCustom(gamma=0., class_sizes=[1,1])
trainer = pl.Trainer(gpus=1, max_epochs=30, devices=1, accelerator="gpu")
trainer.fit(model, dataloaders['train'])

  f"Setting `Trainer(gpus={gpus!r})` is deprecated in v1.7 and will be removed"
  f"The flag `devices={devices}` will be ignored, "
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name  | Type       | Params
-------------------------------------
0 | model | Sequential | 29.7 M
1 | loss  | FuzzyLoss  | 0     
-------------------------------------
29.7 M    Trainable params
0         Non-trainable params
29.7 M    Total params
118.862   Total estimated model params size (MB)


Training: 0it [00:00, ?it/s]

  rank_zero_warn("Detected KeyboardInterrupt, attempting graceful shutdown...")


In [18]:
trainer.save_checkpoint("saves/resnet50_gc_focalloss_transfer.pt")

Now we can reload it w/o training:

In [19]:
def get_prediction(x, model: pl.LightningModule):
    model.freeze() # prepares model for predicting
    probabilities = torch.softmax(model(x), dim=1)
    predicted_class = torch.argmax(probabilities, dim=1)
    return predicted_class, probabilities

inference_model = ResNetCustom.load_from_checkpoint("saves/resnet50_gc_focalloss_transfer.pt", map_location="cuda", gamma=0., class_sizes=[1,1])

In [None]:
from tqdm.autonotebook import tqdm

true_y, pred_y = [], []
for batch in tqdm(iter(dataloaders['val']), total=len(dataloaders['val'])):
    x, y = batch
    true_y.extend(y)
    preds, probs = get_prediction(x, inference_model)
    pred_y.extend(preds.cpu())

In [None]:
from sklearn.metrics import classification_report
print(classification_report(true_y, pred_y, digits=3))

## a2) ResNeXt50_32x4d

In [29]:
from torchvision.models import resnext50_32x4d

class ResNext(pl.LightningModule):
    def __init__(self, gamma=0., class_sizes=[]):
        super().__init__()
        self.n_classes = len(class_sizes)
        self.model = resnext50_32x4d(pretrained=True)
        for param in self.model.parameters():
            param.requires_grad = False
        self.model.fc = nn.Linear(self.model.fc.in_features, self.n_classes, bias=True)
        self.loss = FuzzyLoss(gamma=gamma, class_sizes=class_sizes)
        
    def forward(self, x):
        return self.model(x)
    
    def training_step(self, batch, batch_no):
        x, y = batch
        logits = self(x)
        loss = self.loss(logits, y)
        return loss
    
    def configure_optimizers(self):
#         return torch.optim.RMSprop(self.parameters(), lr=0.005)
        return torch.optim.SGD(self.model.fc.parameters(), lr=0.001, momentum=0.9)

In [30]:
model3 = ResNext(gamma=0., class_sizes=[1,1])
trainer3 = pl.Trainer(gpus=1, max_epochs=15, devices=1, accelerator="gpu")
trainer3.fit(model3, dataloaders['train'])

  f"Setting `Trainer(gpus={gpus!r})` is deprecated in v1.7 and will be removed"
  f"The flag `devices={devices}` will be ignored, "
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name  | Type      | Params
------------------------------------
0 | model | ResNet    | 23.0 M
1 | loss  | FuzzyLoss | 0     
------------------------------------
4.1 K     Trainable params
23.0 M    Non-trainable params
23.0 M    Total params
91.936    Total estimated model params size (MB)


Training: 0it [00:00, ?it/s]

`Trainer.fit` stopped: `max_epochs=15` reached.


In [31]:
trainer3.save_checkpoint("saves/resneXt_utk_focalloss.pt")

In [33]:
def get_prediction(x, model: pl.LightningModule):
    model.freeze() # prepares model for predicting
    probabilities = torch.softmax(model(x), dim=1)
    predicted_class = torch.argmax(probabilities, dim=1)
    return predicted_class, probabilities

inference_model = ResNext.load_from_checkpoint("saves/resneXt_utk_focalloss.pt", map_location="cuda", gamma=0., class_sizes=[1,1])

from tqdm.autonotebook import tqdm
true_y, pred_y = [], []
for batch in tqdm(iter(dataloaders['val']), total=len(dataloaders['val'])):
    x, y = batch
    true_y.extend(y)
    preds, probs = get_prediction(x, inference_model)
    pred_y.extend(preds.cpu())
    
from sklearn.metrics import classification_report
print(classification_report(true_y, pred_y, digits=3))

  0%|          | 0/220 [00:00<?, ?it/s]

              precision    recall  f1-score   support

           0      0.856     0.676     0.755      3621
           1      0.718     0.879     0.790      3401

    accuracy                          0.774      7022
   macro avg      0.787     0.777     0.773      7022
weighted avg      0.789     0.774     0.772      7022



## b) VGG-16

In [6]:
from torchvision.models import vgg16

class VGGCustom(pl.LightningModule):
    def __init__(self, gamma=0., class_sizes=[]):
        super().__init__()
        self.model = vgg16(num_classes=10)
        self.model.conv1 = nn.Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        self.loss = FuzzyLoss(gamma=gamma, class_sizes=class_sizes)
        
    def forward(self, x):
        return self.model(x)
    
    def training_step(self, batch, batch_no):
        x, y = batch
        logits = self(x)
        loss = self.loss(logits, y)
        return loss
    
    def configure_optimizers(self):
        return torch.optim.RMSprop(self.parameters(), lr=0.005)

In [7]:
model2 = VGGCustom(gamma=0., class_sizes=[1,1])
trainer2 = pl.Trainer(gpus=1, max_epochs=5, devices=1, accelerator="gpu")
trainer2.fit(model2, dataloaders['train'])

  f"Setting `Trainer(gpus={gpus!r})` is deprecated in v1.7 and will be removed"
  f"The flag `devices={devices}` will be ignored, "
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name  | Type      | Params
------------------------------------
0 | model | VGG       | 134 M 
1 | loss  | FuzzyLoss | 0     
------------------------------------
134 M     Trainable params
0         Non-trainable params
134 M     Total params
537.213   Total estimated model params size (MB)


Training: 0it [00:00, ?it/s]

`Trainer.fit` stopped: `max_epochs=5` reached.


In [8]:
trainer2.save_checkpoint("saves/vgg16_utk_focalloss.pt")

In [9]:
def get_prediction(x, model: pl.LightningModule):
    model.freeze() # prepares model for predicting
    probabilities = torch.softmax(model(x), dim=1)
    predicted_class = torch.argmax(probabilities, dim=1)
    return predicted_class, probabilities

inference_model = VGGCustom.load_from_checkpoint("saves/vgg16_utk_focalloss.pt", map_location="cuda", gamma=0., class_sizes=[1,1])

from tqdm.autonotebook import tqdm
true_y, pred_y = [], []
for batch in tqdm(iter(dataloaders['val']), total=len(dataloaders['val'])):
    x, y = batch
    true_y.extend(y)
    preds, probs = get_prediction(x, inference_model)
    pred_y.extend(preds.cpu())
    
from sklearn.metrics import classification_report
print(classification_report(true_y, pred_y, digits=3))

  0%|          | 0/220 [00:00<?, ?it/s]

              precision    recall  f1-score   support

           0      0.516     1.000     0.680      3621
           1      0.000     0.000     0.000      3401

    accuracy                          0.516      7022
   macro avg      0.258     0.500     0.340      7022
weighted avg      0.266     0.516     0.351      7022



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


# FICAR System

In [6]:
system = FICAR(n_classes=4, class_names=['male_w', 'female_w', 'male_b', 'female_b'])
system.train("dummyDL")

P(0,1)-> Fit with pair ('male_w', 'female_w')
P(0,2)-> Fit with pair ('male_w', 'male_b')
P(0,3)-> Fit with pair ('male_w', 'female_b')
P(1,2)-> Fit with pair ('female_w', 'male_b')
P(1,3)-> Fit with pair ('female_w', 'female_b')
P(2,3)-> Fit with pair ('male_b', 'female_b')


In [7]:
y_pred = system.predict('dummyIns')
print(system.decisions)
print(y_pred)

[[0.         0.3120504  0.95452845 0.81355755]
 [0.6879496  0.         0.24934205 0.20867684]
 [0.04547155 0.75065795 0.         0.10901914]
 [0.18644245 0.79132316 0.89098086 0.        ]]
male_b
