In [1]:
from torch import nn
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
from sklearn.metrics import confusion_matrix
from tqdm import tqdm
from typing import Tuple
from datetime import datetime as dt
import torch.nn.functional as F
import random 
import torch.optim as optim
import torch
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split


  from .autonotebook import tqdm as notebook_tqdm


In [2]:
SEED = 1701
EPOCHS = 20
MODEL_REPO = "/home/luizp/projects/pibit/src/data/processed/csv/nslClean.csv"
BATCH_SIZE = 256
LEARNING_RATE = 1e-3

In [3]:
class ToImage:
    def __call__(self, array: torch.Tensor, keep_normalization=True):
        """ The idea is to convert a 1d array to a 2d array by resizing (with padding) to the square root of the 1d shape

        Ex: 
            - shape: 2048  
            - sqrt(shape) = 45.25 -> round to ceil (46)
            - resize the feature vector to 46x46 
            - return the new feature vector as a RGB PIL Image for torchvision transforms
         """
        #feat = array.shape[0]
        feat = array.shape[0]
        n = int(np.ceil(feat ** 0.5))

        array = array.cpu().numpy().copy()
        
        # Squared size with padding
        array.resize((n, n))
        if not keep_normalization:
            return (array * 255).astype(np.uint8)

        return torch.Tensor(array.astype(np.float32)).unsqueeze(0)

In [4]:
class CustomDataset(Dataset):
    """ Custom dataset class used for applying transforms to the features. """
    def __init__(self, subset: Tuple[torch.Tensor, torch.Tensor], transform=None):
        self.subset = subset
        self.transform = transform
        
    def __getitem__(self, index):
        x, y = self.subset[0][index], self.subset[1][index]
    
        if self.transform:
            x = self.transform(x)

        return x, y
        
    def __len__(self):
        return self.subset[0].size(0)

In [5]:
def validate(device: str, epoch: int, loss_fn, MODEL, dataset: DataLoader):
    # Validation
    MODEL.eval()
    it_eval = tqdm(enumerate(dataset), total=len(dataset))
    running_loss = 0.
    correct = 0
    qt = 1
    metrics = dict(tp=0, tn=0, fp=0, fn=0)
    y_pred = list()
    y_true = list()
    with torch.no_grad():
        for _, (x, y) in it_eval:
            x = x.to(device)
            y = y.to(device)

            output = MODEL(x)
            running_loss += loss_fn(output, y).item()
            y_pred.extend(torch.argmax(output, 1).cpu().numpy())
            y_true.extend(y.data.cpu().numpy())
            correct += torch.sum(torch.argmax(output, 1).eq(y)).item()
            qt += len(x)
            desc = f"[{now()}] Epoch {str(epoch).zfill(3)} Val. Acc: {correct/qt:.4f} Val. Loss: {running_loss / len(dataset):.8f}"
            it_eval.set_description(desc)
    
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
    metrics["tp"] = tp
    metrics["fp"] = fp
    metrics["tn"] = tn
    metrics["fn"] = fn
    return running_loss / len(dataset), correct/qt, metrics

In [6]:
def now():
    return dt.now().strftime("%d-%m-%Y %H-%M-%S")

In [7]:
def train(device: str, epoch: int, optimizer, loss_fn, MODEL, dataset: DataLoader):
    # Put the MODEL in the training mode
    MODEL.train()
    running_loss = 0.
    qt = 1
    correct = 0

    # Just add a fancy progress bar to the terminal...
    it = tqdm(enumerate(dataset), total=len(dataset))

    for _, (x, y) in it:
        x = x.to(device)
        y = y.to(device)
        
        # Make predictions for this batch
        outputs = MODEL(x)

        # Zero your gradients for every batch!
        optimizer.zero_grad()
        loss = loss_fn(outputs, y)
        loss.backward()

        # Adjust learning weights
        optimizer.step()
        correct += torch.sum(torch.argmax(outputs, 1).eq(y)).item()
        qt += len(x)
    
        # Gather data and report
        running_loss += loss.item()

        desc = f"[{now()}] Epoch {str(epoch).zfill(3)} Acc: {correct/qt:.4f} Loss: {running_loss / len(dataset):.8f}"
        it.set_description(desc)
    
    # Loss / Accuracy
    return running_loss / len(dataset), correct/qt

In [8]:
class Swish(nn.Module):
    def forward(self, x):
        return x * torch.sigmoid(x)

class SEBlock(nn.Module):
    def __init__(self, in_channels, reduction=4):
        super(SEBlock, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(
            nn.Linear(in_channels, in_channels // reduction, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(in_channels // reduction, in_channels, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        b, c, _, _ = x.size()
        y = self.avg_pool(x).view(b, c)
        y = self.fc(y).view(b, c, 1, 1)
        return x * y

class MBConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels, expand_ratio, stride, kernel_size, se_ratio=0.25):
        super(MBConvBlock, self).__init__()
        self.stride = stride
        self.use_residual = self.stride == 1 and in_channels == out_channels

        hidden_dim = in_channels * expand_ratio
        self.expand = nn.Sequential(
            nn.Conv2d(in_channels, hidden_dim, 1, 1, 0, bias=False),
            nn.BatchNorm2d(hidden_dim),
            Swish()
        )

        self.depthwise = nn.Sequential(
            nn.Conv2d(hidden_dim, hidden_dim, kernel_size, stride, kernel_size // 2, groups=hidden_dim, bias=False),
            nn.BatchNorm2d(hidden_dim),
            Swish()
        )

        if se_ratio:
            se_channels = int(in_channels * se_ratio)
            self.se = SEBlock(hidden_dim, se_channels)
        else:
            self.se = None

        self.project = nn.Sequential(
            nn.Conv2d(hidden_dim, out_channels, 1, 1, 0, bias=False),
            nn.BatchNorm2d(out_channels)
        )

    def forward(self, x):
        y = self.expand(x)
        y = self.depthwise(y)

        if self.se:
            y = self.se(y)

        y = self.project(y)

        if self.use_residual:
            return x + y
        else:
            return y

class EfficientNet(nn.Module):
    def __init__(self, num_classes=2, width_coefficient=1.0, depth_coefficient=1.0, dropout_rate=0.2):
        super(EfficientNet, self).__init__()

        # Define the architecture parameters
        self.width_coefficient = width_coefficient
        self.depth_coefficient = depth_coefficient
        self.dropout_rate = dropout_rate

        # Define the base number of channels
        base_channels = 128

        # Define the network architecture
        channels = int(128 * width_coefficient)
        self.features = nn.Sequential(
            nn.Conv2d(1, channels, 3, 2, 1, bias=False),
            nn.BatchNorm2d(channels),
            Swish()
        )

        # MBConv Blocks
        mbconv_settings = [
            # t, c, n, s, k, se
            [1, 24, 1, 1, 3, 0.25],
            [6, 24, 2, 2, 3, 0.25],
            [6, 40, 2, 2, 5, 0.25],
            [6, 40, 3, 2, 3, 0.25],
            [6, 112, 3, 1, 5, 0.25],
            [6, 112, 4, 2, 5, 0.25],
            [6, 144, 4, 4, 3, 0.25],
        ]

        #for t, c, n, s, k, se in mbconv_settings:
        #    out_channels = int(c * width_coefficient)
        #    layers = []
        #    for i in range(n):
        #        stride = s if i == 0 else 1
        #        layers.append(MBConvBlock(channels, out_channels, t, stride, k, se))
        #        channels = out_channels
        #    setattr(self, f'mbconv{n}', nn.Sequential(*layers))
        for i, (t, c, n, s, k, se) in enumerate(mbconv_settings, 1):
            out_channels = int(c * width_coefficient)
            layers = []
            for _ in range(n):
                stride = s if i == 1 else 1  # For the first block in each setting, use the specified stride
                layers.append(MBConvBlock(channels, out_channels, t, stride, k, se))
                channels = out_channels
            setattr(self, f'mbconv{i}', nn.Sequential(*layers))
        # Head
        last_channels = int(1280 * width_coefficient)
        self.head = nn.Sequential(
            nn.Conv2d(channels, last_channels, 1, 1, 0, bias=False),
            nn.BatchNorm2d(last_channels),
            Swish(),
            nn.AdaptiveAvgPool2d(1),
        )

        # Fully-connected layer
        self.classifier = nn.Sequential(
            nn.Dropout(dropout_rate),
            nn.Linear(last_channels, num_classes),
        )

    def forward(self, x):
        x = self.features(x)
        x = self.mbconv1(x)
        x = self.mbconv2(x)
        x = self.mbconv3(x)
        x = self.mbconv4(x)
        x = self.mbconv5(x)
        x = self.mbconv6(x)
        x = self.mbconv7(x)
        x = self.head(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x


In [9]:
def no_update():
    # Reproducibility
    torch.manual_seed(SEED)
    np.random.seed(SEED)
    random.seed(SEED)

    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    print(device)

    MODEL = EfficientNet().to('cuda')

    #if MODEL is None or not isinstance(nn.Module):
    #    raise TypeError("The model does not exist or isn't an instance of nn.Module (PyTorch)")
    
    transform = transforms.Compose([
        ToImage(),
    ])
    csv_path = "/home/luizp/projects/pibit/src/data/processed/csv/nslClean.csv"
    df = pd.read_csv(csv_path)
    y = df.iloc[:, -1].values
    x = df.iloc[:, :-1].values
    y = y[::-1]
    x = np.squeeze(x)
    #x = x.reshape((1, -1))
    x_train, x_test, y_train, y_test = train_test_split(
        x, y, test_size=0.1, random_state=42
    )

    #raise Exception("Precisa carregar e separar (treino, validação e teste) um dataset qualquer para executar")
    x_train = torch.Tensor(x_train)
    y_train = torch.LongTensor(y_train)
    x_test  = torch.Tensor(x_test)
    y_test  = torch.LongTensor(y_test)
    #x_val   = torch.Tensor(x_val)
    #y_val   = torch.LongTensor(y_val)

    # A parte de fazer o reshape está quando passo uma transformada (ToImage()) como parâmetro
    cd_train = CustomDataset(subset=(x_train, y_train), transform=transform)
    cd_test  = CustomDataset(subset=(x_test, y_test), transform=transform)
    #cd_val   = CustomDataset(subset=(X_val, y_val), transform=transform)

    data_train = DataLoader(cd_train, shuffle=True, batch_size=BATCH_SIZE, num_workers=8)
    data_test  = DataLoader(cd_test, shuffle=True, batch_size=BATCH_SIZE, num_workers=8)
    #data_val   = DataLoader(cd_val, shuffle=True, batch_size=BATCH_SIZE, num_workers=8)

    loss_fn   = nn.CrossEntropyLoss()
    optimizer = optim.Adam(MODEL.parameters(), lr=LEARNING_RATE)

    for epoch in range(1, EPOCHS + 1):
        train_loss, train_acc = train(device, epoch, optimizer, loss_fn, MODEL, data_train)
        #val_loss, val_acc, _ = validate(device, epoch, optimizer, loss_fn, MODEL, data_val)

        _, _, metrics = validate(device, epoch, loss_fn, MODEL, data_test)
        print(metrics)


In [10]:
if __name__ == "__main__":
    no_update()
    print("Finished experiment!")

cuda


[21-02-2024 09-57-04] Epoch 001 Acc: 0.4938 Loss: 0.08583818:  12%|█▏        | 53/443 [00:43<05:21,  1.21it/s]


KeyboardInterrupt: 