# Imports

In [10]:
import os
import math
from pathlib import Path

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torch.utils.data.sampler import SubsetRandomSampler
from torch.utils.tensorboard import SummaryWriter

import numpy as np

# Constants

In [2]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [3]:
BATCH_SIZE = 64
NUM_WORKERS = 0

In [14]:
ROOT_DIR = Path('.')
DATA_DIR = ROOT_DIR / 'data'
REPORTS_DIR = ROOT_DIR / 'reports'
MODELS_DIR = REPORTS_DIR / 'models'
RESULTS_DIR = REPORTS_DIR / 'results'
RUNS_DIR = REPORTS_DIR / 'runs'

# Data loading

In [4]:
train_transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
     transforms.RandomHorizontalFlip(p=0.5),
     transforms.RandomVerticalFlip(p=0.5)]
)
val_transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
     transforms.RandomHorizontalFlip(p=0.5),
     transforms.RandomVerticalFlip(p=0.5)]
)
test_transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))]
)

In [None]:
train_proportion = 0.9
num_train = 50000

indices = list(range(num_train))
split = int(np.floor(train_proportion * num_train))
np.random.shuffle(indices)

train_idx, val_idx = indices[:split], indices[split:]
train_sampler = SubsetRandomSampler(train_idx)
val_sampler = SubsetRandomSampler(val_idx)

In [None]:

train_dataset = datasets.CIFAR10(root=DATA_DIR, train=True, 
                                 download=True, transform=train_transform)

val_dataset = datasets.CIFAR10(root=DATA_DIR, train=True, 
                               download=True, transform=val_transform)

test_dataset = datasets.CIFAR10(root=DATA_DIR, train=False,
                                download=True, transform=test_transform)

In [None]:
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, 
                          sampler=train_sampler, num_workers=NUM_WORKERS)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, 
                        sampler=val_sampler, num_workers=NUM_WORKERS)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False,
                         num_workers=NUM_WORKERS)

# Aggregating functions

In [None]:
def arithmetic_mean(X, dim, keepdim):
    return torch.mean(X, dim, keepdim)

In [None]:
def minimum(X, dim, keepdim): 
    return torch.min(X, dim, keepdim).values

In [None]:
def product(X, dim, keepdim)
    return torch.prod(X, dim=dim, keepdim=keepdim)

In [None]:
def t_norm_lukasiewicz(X, dim, keepdim):
    sum_X = torch.sum(X, dim=dim, keepdim=keepdim) - 1
    return torch.max(sum_X, torch.tensor(0, device=X.device))

In [None]:
# hamacker

In [None]:
def maximum(X, dim, keepdim): 
    return torch.max(X, dim, keepdim).values

In [None]:
def t_conorm_lukasiewicz(X, dim, keepdim):
    sum_X = torch.sum(X, dim=dim, keepdim=keepdim)
    return torch.min(sum_X, torch.tensor(1.0, device=X.device))

In [None]:
def t_conorm_hamacker(X, dim, keepdim):
    product_X = torch.prod(X, dim=dim, keepdim=keepdim)
    
    sum_X = torch.sum(X, dim=dim, keepdim=keepdim)
    
    numerator = 2 * product_X - sum_X
    denominator = product_X - 1
    
    result = torch.where(
        product_X == 1,
        torch.tensor(1.0),
        numerator / (denominator + 1e-10)  # Adding a small value for numerical stability.
    )
    
    return result

In [None]:
# u_min_max

In [None]:
# u_L_L

# Layers

In [None]:
class AggPoolingLayer(nn.Module):

    def __init__(self, kernel_size, stride, padding= [0,0,0,0], function, dim = -1, keepdim = False):
        super().__init__()
        
        # Una tupla de 2 elementos con los tamaños [𝑘1,𝑘2] de cada ventana a tratar
        self.kernel_size = kernel_size
        
        # Tupla de 2 elementos que indican el número de elementos (en filas y columnas) que 
        # deben saltarse tras reducir cada ventana, hasta encontrar la siguiente a tratar.
        self.stride = stride
        
        # Tupla de 4 elementos de la forma [𝑝𝑎𝑑_𝑙𝑒𝑓𝑡,𝑝𝑎𝑑_𝑟𝑖𝑔ℎ𝑡,𝑝𝑎𝑑_𝑢𝑝,𝑝𝑎𝑑_𝑑𝑜𝑤𝑛] que indica el 
        # número de nuevas filas o columnas a añadir a la entrada, previo a aplicar la agregación.
        self.padding = padding
        
        # Define function and characteristics
        self.function = function
        self.dim = dim
        self.keepdim = keepdim
    
    def forward(self, X):
        
        # Normalize
        maximum = torch.max(X)
        minimum = torch.min(X)
        X = (X-minimum)/(maximum-minimum)
        
        # Añadir columnas/filas según padding
        X_pad = F.pad(X, pad=self.padding, mode='constant', value=0)
        
        # Vamos extrayendo las ventanas a agregar y colocándolas en filas
        X_aux = X_pad.unfold(2, size=self.kernel_size[0], step=self.stride[0]).unfold(3, size=self.kernel_size[1], step=self.stride[1])
        
        # Ponemos el formato correcto
        X_aux = X_aux.reshape([X_aux.shape[0], X_aux.shape[1], X_aux.shape[2], X_aux.shape[3], X_aux.shape[4] * X_aux.shape[5]]) 
        
        # Agg Func
        Y_temp = self.function(X_aux, dim = self.dim, keepdim = self.keepdim)
        
        # Denormalize 
        Y = minimum + (maximum-minimum) * Y_temp
        
        return Y

In [None]:
class OWALayer(nn.Module):
    def __init__(self, in_features, out_features, bias=True):
        super().__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.use_bias = bias  # Separate flag
        self.weights = nn.Parameter(torch.empty((out_features, in_features)))
        if self.use_bias:
            self.bias = nn.Parameter(torch.empty(out_features))
        else:
            self.register_parameter('bias', None)
        self.init_parameters()
        
    def init_parameters(self):
        stdv = 1. / math.sqrt(self.weights.size(1))
        nn.init.uniform_(self.weights, -stdv, stdv)
        if self.bias is not None:
            nn.init.uniform_(self.bias, -stdv, stdv)
            
    def forward(self, x):
        # Specify the dimension to sort along, typically the feature dimension
        x_sorted, _ = torch.sort(x, dim=1, descending=True)
        normalized_weights = F.softmax(self.weights, dim=1)
        output = torch.matmul(x_sorted, normalized_weights.t())
        if self.bias is not None:
            output += self.bias
        return output

In [None]:
# CHANGE TO POOLING LAYER
class ChoquetLayer(nn.Module):
    def __init__(self, in_features, out_features, bias=True):
        super(ChoquetLayer, self).__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.use_bias = bias

        self.v = nn.Parameter(torch.empty(out_features, in_features))

        if self.use_bias:
            self.bias = nn.Parameter(torch.empty(out_features))
        else:
            self.register_parameter('bias', None)

        self.init_parameters()

    def init_parameters(self):
        stdv = 1. / math.sqrt(self.in_features)
        nn.init.uniform_(self.v, -stdv, stdv)
        if self.use_bias:
            nn.init.uniform_(self.bias, -stdv, stdv)

    def forward(self, x):
        if x.dim() != 2:
            raise ValueError("Input tensor must be 2D (batch_size, in_features)")

        x_sorted, _ = torch.sort(x, dim=1, descending=True)
        v_padded = F.pad(self.v, (0, 1), "constant", 0)
        delta_v = v_padded[:, :-1] - v_padded[:, 1:]
        x_sorted_expanded = x_sorted.unsqueeze(1)  # (batch_size, 1, in_features)
        delta_v_expanded = delta_v.unsqueeze(0)  # (1, out_features, in_features)
        output = torch.sum(x_sorted_expanded * delta_v_expanded, dim=2)

        if self.use_bias:
            output += self.bias

        return output


# Model

In [None]:
class LeNetModel(nn.Module):

    def __init__(self, conv_filters=[64, 64], linear_sizes=[384, 192], num_classes=10):
        super(LeNetModel, self).__init__()
        self.conv1 = nn.Conv2d(3, conv_filters[0], [2,2], [1,1])
        self.pool1 = nn.MaxPool2d([2,2], [2,2])
        self.conv2 = nn.Conv2d(conv_filters[0], conv_filters[1], [2,2], [1,1])
        self.pool2 = nn.MaxPool2d([2,2], [2,2])
        self.fc1 = nn.Linear(conv_filters[1]*7*7, linear_sizes[0])       
        self.fc2 = nn.Linear(linear_sizes[0], linear_sizes[1])
        self.fc3 = nn.Linear(linear_sizes[1], num_classes)
        
    def forward(self, x: torch.Tensor):
        x = F.relu(self.conv1(x))
        x = self.pool1(x)
        x = F.relu(self.conv2(x))
        x = self.pool2(x)
        x = x.flatten(start_dim=1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# Training

In [None]:
writer = SummaryWriter(log_dir=RUNS_DIR)

In [None]:
def train(model, train_loader, criterion, optimizer, val_loader=None, num_epochs=20, device='cuda'):
    train_acc = []  
    train_loss = []
    if val_loader is not None:
        val_acc = []
        val_loss = []

    for epoch in range(num_epochs):
        running_loss = 0.0
        count_evaluated = 0
        count_correct = 0
        
        for batch_idx, data in enumerate(train_loader, 0):    
            model.train()  
            inputs, labels = data[0].to(device), data[1].to(device)  
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            count_evaluated += inputs.shape[0]
            count_correct += torch.sum(labels == torch.max(outputs, dim=1)[1])
            
        print('Training: [%d, %5d] loss: %.3f' % (epoch + 1, batch_idx + 1, running_loss / (batch_idx+1)))
        
        train_loss.append(running_loss / (batch_idx+1))
        train_acc.append(float(count_correct) / count_evaluated)
        
        if val_loader is not None:
            running_loss_val = 0.0
            count_evaluated = 0
            count_correct = 0
            model.eval()
            
            with torch.no_grad():
                for val_batch_idx, data_val in enumerate(val_loader, 0):
                    inputs_val, labels_val = data_val[0].to(device), data_val[1].to(device)
                    outputs_val = model(inputs_val)
                    loss = criterion(outputs_val, labels_val)
                    running_loss_val += loss.item()
                    count_evaluated += inputs_val.shape[0]
                    count_correct += torch.sum(labels_val == torch.max(outputs_val, dim=1)[1])
                    
                val_loss.append(running_loss_val / (val_batch_idx + 1))
                acc_val = float(count_correct) / count_evaluated
                
                print('Validation: epoch %d - acc: %.3f' %
                            (epoch + 1, acc_val))
                val_acc.append(acc_val)
                
        # Tensorboard
        writer.add_scalar('Loss/Validation', val_loss[-1], global_step=epoch)
        writer.add_scalar('Accuracy/Validation', val_acc[-1], global_step=epoch)
        writer.add_scalar('Loss/Train', train_loss[-1], global_step=epoch)
        writer.add_scalar('Accuracy/Train', train_acc[-1], global_step=epoch)
        for name, param in model.named_parameters():
            writer.add_histogram(f"Parameters/{name}", param, epoch)
            if param.grad is not None:
                writer.add_histogram(f"Gradients/{name}", param.grad, epoch)
                
    return model

# Testing

In [None]:
def test(model, test_loader, criterion, device='cuda'):
    with torch.no_grad():
        number_samples = 0
        number_correct = 0
        running_loss_test = 0.0
        for test_batch_idx, data_test in enumerate(test_loader, 0):
            inputs_test, labels_test = data_test[0].to(device), data_test[1].long().to(device)
            outputs_test = model(inputs_test)
            loss = criterion(outputs_test, labels_test)
            running_loss_test += loss.cpu().numpy()
            
            _, outputs_class = torch.max(outputs_test, dim=1)
            number_correct += torch.sum(outputs_class == labels_test).cpu().numpy()
            number_samples += len(labels_test)
            
        acc_test = number_correct / number_samples
        
        print('Test - Accuracy: %.3f' % acc_test)
        print('Test - CrossEntropy: %.3f' % (running_loss_test / (test_batch_idx+1)))