In [1]:
import torch
import torch.nn as nn
import torch.utils.data
import torchvision.datasets
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import numpy as np

from tqdm import tqdm
from copy import deepcopy
from collections import OrderedDict

In [2]:
# tools used or loading cifar10 dataset
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
from torch.utils.data import random_split
from torchvision import transforms


def get_dataloader(dataset, batch_size, return_numpy=False):
    collate_fn = numpy_collate_fn if return_numpy else None
    train_dataloader      = DataLoader(dataset=dataset["train"], batch_size=batch_size, shuffle=True, drop_last=True,
                                       collate_fn=collate_fn)
    validation_dataloader = DataLoader(dataset=dataset["validation"], batch_size=batch_size, shuffle=False, drop_last=False,
                                       collate_fn=collate_fn)
    test_dataloader       = DataLoader(dataset=dataset["test"], batch_size=batch_size, shuffle=False, drop_last=False,
                                       collate_fn=collate_fn)
    return {"train": train_dataloader, "validation": validation_dataloader, "test": test_dataloader}


def numpy_collate_fn(batch):
    imgs = torch.stack([b[0] for b in batch], dim=0).numpy()
    labels = np.array([b[1] for b in batch], dtype=np.int32)
    return imgs, labels


def read_data_sets(data_dir, validation_size=5000):
    """
    Returns the dataset readed from data_dir.
    Uses or not uses one-hot encoding for the labels.
    Subsamples validation set with specified size if necessary.
    Args:
      data_dir: Data directory.
      one_hot: Flag for one hot encoding.
      validation_size: Size of validation set
    Returns:
      Dictionary with Train, Validation, Test Datasets
    """

    mean = (0.491, 0.482, 0.447)
    std  = (0.247, 0.243, 0.262)

    data_transforms = transforms.Compose([
                            transforms.ToTensor(),
                            transforms.Normalize(mean, std)
                        ])

    train_dataset = CIFAR10(root=data_dir, train=True, download=True, transform=data_transforms)
    test_dataset = CIFAR10(root=data_dir, train=False, download=True, transform=data_transforms)

    # Subsample the validation set from the train set
    if not 0 <= validation_size <= len(train_dataset):
        raise ValueError("Validation size should be between 0 and {0}. Received: {1}.".format(
            len(train_dataset), validation_size))

    train_dataset, validation_dataset = random_split(train_dataset,
                                                     lengths=[len(train_dataset) - validation_size, validation_size],
                                                     generator=torch.Generator().manual_seed(42))

    return {'train': train_dataset, 'validation': validation_dataset, 'test': test_dataset}


def get_cifar10(data_dir='data/', validation_size=5000):
    """
    Prepares CIFAR10 dataset.
    Args:
      data_dir: Data directory.
      one_hot: Flag for one hot encoding.
      validation_size: Size of validation set
    Returns:
      Dictionary with Train, Validation, Test Datasets
    """
    return read_data_sets(data_dir, validation_size)

In [3]:
# Seed for reproduceability
seed = 42
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.determinstic = True
torch.backends.cudnn.benchmark = False
np.random.seed(42)

# Setup device-agnostic code
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)

cuda


In [4]:
cifar10 = get_cifar10()
cifar10_loader = get_dataloader(cifar10, 128)

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to data/cifar-10-python.tar.gz


100%|██████████| 170498071/170498071 [00:12<00:00, 14068812.60it/s]


Extracting data/cifar-10-python.tar.gz to data/
Files already downloaded and verified


In [5]:
# transform = transforms.Compose([
#     transforms.ToTensor(),
#     transforms.Normalize((0.4914, 0.4822, 0.4465), (0.247, 0.243, 0.261))
# ])

# cifar10_train = torchvision.datasets.CIFAR10(root='data', train=True, transform=transform, download=True)
# cifar10_valid = torchvision.datasets.CIFAR10(root='data', train=False, transform=transform, download=True)
# cifar10_test = torchvision.datasets.CIFAR10(root='data', train=False, transform=transform, download=True)
# cifar10_loader_train = torch.utils.data.DataLoader(cifar10_train, batch_size=128, shuffle=True)
# cifar10_loader_valid = torch.utils.data.DataLoader(cifar10_valid, batch_size=128, shuffle=False)
# cifar10_loader_test = torch.utils.data.DataLoader(cifar10_test, batch_size=128, shuffle=False)
# cifar10_loader = {'train': cifar10_loader_train, 'validation': cifar10_loader_valid, 'test': cifar10_loader_test}

In [6]:
def make_plots(logging_dict, model_name, avg_train=True):
#     logging_dict = {'loss': {'train': [], 'validation': []},
#                 'accuracy': {'train': [], 'validation': []},
#                 'lr': [],
#                 'batches_per_epoch': [],}
    epoch_ends = np.cumsum(logging_dict['batches_per_epoch'])

    def get_avg_per_epoch(batch_data):
        result = [None,]
        for i in range(len(epoch_ends) - 1):
            result.append(np.average(batch_data[epoch_ends[i]:epoch_ends[i + 1]]))
        return result

    fig, axes = plt.subplots(1, 2, figsize=(8, 3))
    metrics = ('loss', 'accuracy')
    for metric, ax in zip(metrics, axes.ravel()):
#         ax.plot(logging_dict[metric]['train'])
        if avg_train:
            ax.plot(get_avg_per_epoch(logging_dict[metric]['train']), '.-', label='training set')
            ax.plot(logging_dict[metric]['validation'], '.-', label='validation set')
            ax.set(title=metric, xlabel='epoch', xticks=np.arange(len(epoch_ends)))
        else:
            ax.plot(logging_dict[metric]['train'],'.-', label='training set')
            ax.plot(epoch_ends, logging_dict[metric]['validation'],'.-', label='validation set')
            ax.set(title=metric, xlabel='batch')

    handles, labels = ax.get_legend_handles_labels()
    plt.figlegend(handles=handles, labels=labels, loc='upper center', bbox_to_anchor=(0.5, 0), ncol=2)
    plt.suptitle(model_name)
    plt.tight_layout()
    plt.show()

In [7]:
def evaluate_model(model, data_loader):
    """
    Performs the evaluation of the MLP model on a given dataset.

    Args:
      model: An instance of 'MLP', the model to evaluate.
      data_loader: The data loader of the dataset to evaluate.
    Returns:
        accuracy
    """
    accuracies_per_batch, losses_per_batch = [], []
    loss_module = nn.CrossEntropyLoss()
    # Get accuracy for epoch
    for batch in data_loader:

        # Get validation images and labels
        X = batch[0].to(device)
        y = batch[1].to(device)

        # Get predictions on validation set
        model.eval()
        with torch.no_grad():
            pred_logits = model.forward(X)
            pred_classes = torch.argmax(torch.softmax(pred_logits, dim=1), axis=1)

        # Calculate accuracy := # of correct preds / total # of preds
        current_accuracy = torch.sum(pred_classes == y) / pred_classes.shape[0]
        accuracies_per_batch.append(current_accuracy.item())
        current_loss = loss_module(pred_logits, y).item()
        losses_per_batch.append(current_loss)

    accuracy = np.average(accuracies_per_batch)
    loss = np.average(losses_per_batch)

    return accuracy, loss

In [8]:
def train(model, epochs=15, lr=0.1, momentum=0, verbose=True):

    logging_dict = {'loss': {'train': [], 'validation': []},
                    'accuracy': {'train': [], 'validation': []},
                    'lr': [],
                    'batches_per_epoch': [],
                    'momentum': momentum}

    for epoch in tqdm(range(epochs)):

        batches_per_epoch = 0

        model.train()

        # Loss module and optimizer
        loss_module = nn.CrossEntropyLoss()
        optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=momentum)
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer)


        for batch in cifar10_loader['train']:

            batches_per_epoch += 1

            # Get training images and labels
            X_train = batch[0].to(device)
            y_train = batch[1].to(device)

            # Forward pass
            train_pred_logits = model.forward(X_train)

            # Calculate loss
            loss = loss_module(train_pred_logits, y_train)
            logging_dict['loss']['train'].append(loss.item())

            # Calculate accuracy
            train_pred_class = torch.argmax(torch.softmax(train_pred_logits, dim=1), axis=1)
            train_accuracy = torch.sum(train_pred_class == y_train) / train_pred_class.shape[0]
            logging_dict['accuracy']['train'].append(train_accuracy.item())

            # Zero gradients
            optimizer.zero_grad()

            # Backward pass
            loss.backward()

            # Update parameters
            optimizer.step()


        # Log num of batches for this epoch
        logging_dict['batches_per_epoch'].append(batches_per_epoch)

        # Log current LR
        logging_dict['lr'].append(optimizer.param_groups[0]['lr'])

        # Update LR
        scheduler.step(loss)

        # Get metrics on validation set
        validation_accuracy, validation_loss = evaluate_model(model, cifar10_loader['validation'])
        logging_dict['accuracy']['validation'].append(validation_accuracy.item())
        logging_dict['loss']['validation'].append(validation_loss.item())

        # Determine if best model
        if len(logging_dict['accuracy']['validation']) == 1 or \
            all([validation_accuracy > acc for acc in logging_dict['accuracy']['validation']]):
            best_model = deepcopy(model)

        if verbose:
            print(f'\n{epoch = }, '
                  f'training accuracy: {train_accuracy.item():.3f}, '
                  f'training loss: {loss.item():.3f}',
                  f'validation accuracy: {validation_accuracy.item():.3f}, '
                  f'validation loss: {validation_loss.item():.3f}',
                 )

    # Get metrics on test set
    test_accuracy, test_loss = evaluate_model(best_model, cifar10_loader['test'])

    return best_model, test_accuracy, test_loss, logging_dict

---

Model
---



In [None]:
class Cifar10CNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer1 = nn.Sequential(OrderedDict([
            ('conv1', nn.Conv2d(3, 64, 3, padding=1)),
            ('relu1', nn.ReLU()),
            ('bn1', nn.BatchNorm2d(64)),
            ('conv2', nn.Conv2d(64, 64, 3)),
            ('relu2', nn.ReLU()),
            ('bn2', nn.BatchNorm2d(64)),
            ('maxpool1', nn.MaxPool2d(2)),
            ('dropout1', nn.Dropout2d(0.25))
        ]))
        self.layer2 = nn.Sequential(OrderedDict([
            ('conv1', nn.Conv2d(64, 128, 3, padding=1)),
            ('relu1', nn.ReLU()),
            ('bn1', nn.BatchNorm2d(128)),
            ('conv2', nn.Conv2d(128, 128, 3)),
            ('relu2', nn.ReLU()),
            ('bn2', nn.BatchNorm2d(128)),
            ('maxpool1', nn.MaxPool2d(2)),
            ('dropout1', nn.Dropout2d(0.25))
        ]))
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(128 * 6 * 6, 512)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(512, 10)
        self.softmax = nn.Softmax(1)

    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.flatten(x)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.softmax(x)
        return x

In [None]:
model = Cifar10CNN().to(device)
best_model, test_accuracy, test_loss, logging_dict = train(model)



---



---



https://github.com/chenjie/PyTorch-CIFAR-10-autoencoder/blob/master/main.py

In [None]:
class Autoencoder(nn.Module):
    def __init__(self, in_channels, out_channels, importance=1, k=3, s=2, p=1, **kwargs):
        super(Autoencoder, self).__init__()
        self.importance = importance
        self.encoder = nn.Sequential(
            nn.Conv2d(in_channels, 12, 4, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(12, 24, 4, stride=2, padding=1),
            nn.ReLU(),
			nn.Conv2d(24, 48, 4, stride=2, padding=1),
            nn.ReLU(),
        )
        self.decoder = nn.Sequential(
			nn.ConvTranspose2d(48, 24, 4, stride=2, padding=1),
            nn.ReLU(),
			nn.ConvTranspose2d(24, 12, 4, stride=2, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(12, out_channels, k, stride=s, padding=p, **kwargs),
            nn.Sigmoid(),
        )

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded * x * self.importance

In [None]:
class AECifar10CNN(nn.Module):
    def __init__(self, importance=[1, 1]):
        super().__init__()
        self.layer1 = nn.Sequential(OrderedDict([
            ('conv1', nn.Conv2d(3, 64, 3, padding=1)),
            ('ae1', Autoencoder(64, 64, output_padding=1)),
            ('relu1', nn.ReLU()),
            ('bn1', nn.BatchNorm2d(64)),
            ('conv2', nn.Conv2d(64, 64, 3)),
            ('ae2', Autoencoder(64, 64, k=3, s=3, p=3)),
            ('relu2', nn.ReLU()),
            ('bn2', nn.BatchNorm2d(64)),
            ('maxpool1', nn.MaxPool2d(2)),
            ('dropout1', nn.Dropout2d(0.25)),
        ]))
        self.layer2 = nn.Sequential(OrderedDict([
            ('conv1', nn.Conv2d(64, 128, 3, padding=1)),
            ('ae1', Autoencoder(128, 128, s=4, output_padding=2)),
            ('relu1', nn.ReLU()),
            ('bn1', nn.BatchNorm2d(128)),
            ('conv2', nn.Conv2d(128, 128, 3)),
            ('ae2', Autoencoder(128, 128, k=2, s=4, output_padding=1)),
            ('relu2', nn.ReLU()),
            ('bn2', nn.BatchNorm2d(128)),
            ('maxpool1', nn.MaxPool2d(2)),
            ('dropout1', nn.Dropout2d(0.25)),
        ]))
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(128 * 6 * 6, 512)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(512, 10)
        self.softmax = nn.Softmax(1)
        self.importance = importance

    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.flatten(x)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.softmax(x)
        return x

In [None]:
ae_model = AECifar10CNN().to(device)
best_ae_model, ae_test_accuracy, ae_test_loss, ae_logging_dict = train(ae_model)

In [None]:
# autoencoder = Autoencoder().to(device)

# # Define an optimizer and criterion
# criterion = nn.BCELoss()
# optimizer = torch.optim.Adam(autoencoder.parameters())

# for epoch in tqdm(range(50)):
#         running_loss = 0.0
#         for i, (inputs, _) in enumerate(cifar10_loader['train'], 0):
#             inputs = inputs.to(device)

#             # ============ Forward ============
#             encoded, outputs = autoencoder(inputs)
#             loss = criterion(outputs, inputs)
#             # ============ Backward ============
#             optimizer.zero_grad()
#             loss.backward()
#             optimizer.step()

#             # ============ Logging ============
#             # running_loss += loss.data

#         print(f'{epoch + 1, i + 1} {loss: .3f}')
#         # running_loss = 0.0



---



In [9]:
class eca_layer(nn.Module):
    """Constructs a ECA module.

    Args:
        channel: Number of channels of the input feature map
        k_size: Adaptive selection of kernel size
    """
    def __init__(self, channel, k_size=3):
        super(eca_layer, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.conv = nn.Conv1d(1, 1, kernel_size=k_size, padding=(k_size - 1) // 2, bias=False)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        # feature descriptor on the global spatial information
        y = self.avg_pool(x)

        # Two different branches of ECA module
        y = self.conv(y.squeeze(-1).transpose(-1, -2)).transpose(-1, -2).unsqueeze(-1)

        # Multi-scale information fusion
        y = self.sigmoid(y)

        return x * y.expand_as(x)

In [None]:
class ECACifar10CNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer1 = nn.Sequential(OrderedDict([
            ('conv1', nn.Conv2d(3, 64, 3, padding=1)),
            ('eca1', eca_layer(64)),
            ('relu1', nn.ReLU()),
            ('bn1', nn.BatchNorm2d(64)),
            ('conv2', nn.Conv2d(64, 64, 3)),
            ('eca2', eca_layer(64)),
            ('relu2', nn.ReLU()),
            ('bn2', nn.BatchNorm2d(64)),
            ('maxpool1', nn.MaxPool2d(2)),
            ('dropout1', nn.Dropout2d(0.25)),
        ]))
        self.layer2 = nn.Sequential(OrderedDict([
            ('conv1', nn.Conv2d(64, 128, 3, padding=1)),
            ('eca1', eca_layer(128)),
            ('relu1', nn.ReLU()),
            ('bn1', nn.BatchNorm2d(128)),
            ('conv2', nn.Conv2d(128, 128, 3)),
            ('eca2', eca_layer(128)),
            ('relu2', nn.ReLU()),
            ('bn2', nn.BatchNorm2d(128)),
            ('maxpool1', nn.MaxPool2d(2)),
            ('dropout1', nn.Dropout2d(0.25)),
        ]))
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(128 * 6 * 6, 512)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(512, 10)
        self.softmax = nn.Softmax(1)

    def forward(self, x):
        # print(x.shape, self.ae(x)[1].shape)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.flatten(x)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.softmax(x)
        return x

In [None]:
eca_model = ECACifar10CNN().to(device)
best_eca_model, eca_test_accuracy, eca_test_loss, eca_logging_dict = train(eca_model)



---



---



In [None]:
class AECACifar10CNN(nn.Module):
    def __init__(self, importance=[1, 1]):
        super().__init__()
        self.layer1 = nn.Sequential(OrderedDict([
            ('conv1', nn.Conv2d(3, 64, 3, padding=1)),
            ('eca1', eca_layer(64)),
            ('ae1', Autoencoder(64, 64, output_padding=1)),
            ('relu1', nn.ReLU()),
            ('bn1', nn.BatchNorm2d(64)),
            ('conv2', nn.Conv2d(64, 64, 3)),
            ('eca2', eca_layer(64)),
            ('ae2', Autoencoder(64, 64, k=3, s=3, p=3)),
            ('relu2', nn.ReLU()),
            ('bn2', nn.BatchNorm2d(64)),
            ('maxpool1', nn.MaxPool2d(2)),
            ('dropout1', nn.Dropout2d(0.25)),
        ]))
        self.layer2 = nn.Sequential(OrderedDict([
            ('conv1', nn.Conv2d(64, 128, 3, padding=1)),
            ('eca1', eca_layer(128)),
            ('ae1', Autoencoder(128, 128, s=4, output_padding=2)),
            ('relu1', nn.ReLU()),
            ('bn1', nn.BatchNorm2d(128)),
            ('conv2', nn.Conv2d(128, 128, 3)),
            ('eca2', eca_layer(128)),
            ('ae2', Autoencoder(128, 128, k=2, s=4, output_padding=1)),
            ('relu2', nn.ReLU()),
            ('bn2', nn.BatchNorm2d(128)),
            ('maxpool1', nn.MaxPool2d(2)),
            ('dropout1', nn.Dropout2d(0.25)),
        ]))
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(128 * 6 * 6, 512)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(512, 10)
        self.softmax = nn.Softmax(1)
        self.importance = importance

    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.flatten(x)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.softmax(x)
        return x

In [None]:
aeca_model = AECACifar10CNN().to(device)
best_aeca_model, aeca_test_accuracy, aeca_test_loss, aeca_logging_dict = train(aeca_model)



---



---



In [None]:
import torch.nn.functional as F


In [None]:
class ChannelPool(nn.Module):
    def forward(self, x):
        return torch.cat( (torch.max(x,1)[0].unsqueeze(1), torch.mean(x,1).unsqueeze(1)), dim=1 )
class SpatialGate(nn.Module):
    def __init__(self):
        super(SpatialGate, self).__init__()
        kernel_size =7
        self.compress = ChannelPool()
        self.spatial = BasicConv(2, 1, kernel_size, stride=1, padding=(kernel_size-1) // 2, relu=False)
    def forward(self, x):
        x_compress = self.compress(x)
        x_out = self.spatial(x_compress)
        scale = F.sigmoid(x_out) # broadcasting
        return x * scale
class BasicConv(nn.Module):
    def __init__(self, in_planes, out_planes, kernel_size, stride=1, padding=0, dilation=1, groups=1, relu=True, bn=True, bias=False):
        super(BasicConv, self).__init__()
        self.out_channels = out_planes
        self.conv = nn.Conv2d(in_planes, out_planes, kernel_size=kernel_size, stride=stride, padding=padding, dilation=dilation, groups=groups, bias=bias)
        self.bn = nn.BatchNorm2d(out_planes,eps=1e-5, momentum=0.01, affine=True) if bn else None
        self.relu = nn.ReLU() if relu else None

    def forward(self, x):
        x = self.conv(x)
        if self.bn is not None:
            x = self.bn(x)
        if self.relu is not None:
            x = self.relu(x)
        return x

class ECA_Spatial(nn.Module):
    def __init__(self, gate_channels):
        super(ECA_Spatial, self).__init__()
        self.ChannelGate = eca_layer(gate_channels)
        self.SpatialGate = SpatialGate()
    def forward(self, x):
        x_out = self.ChannelGate(x)
        x_out = self.SpatialGate(x_out)
        return x_out

In [None]:
class ECASPCifar10CNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer1 = nn.Sequential(OrderedDict([
            ('conv1', nn.Conv2d(3, 64, 3, padding=1)),
            ('att', ECA_Spatial(64)),
            ('relu1', nn.ReLU()),
            ('bn1', nn.BatchNorm2d(64)),
            ('conv2', nn.Conv2d(64, 64, 3)),
            ('att', ECA_Spatial(64)),
            ('relu2', nn.ReLU()),
            ('bn2', nn.BatchNorm2d(64)),
            ('maxpool1', nn.MaxPool2d(2)),
            ('dropout1', nn.Dropout2d(0.25)),

        ]))
        self.layer2 = nn.Sequential(OrderedDict([
            ('conv1', nn.Conv2d(64, 128, 3, padding=1)),
            ('att', ECA_Spatial(128)),
            ('relu1', nn.ReLU()),
            ('bn1', nn.BatchNorm2d(128)),
            ('conv2', nn.Conv2d(128, 128, 3)),
            ('att', ECA_Spatial(128)),
            ('relu2', nn.ReLU()),
            ('bn2', nn.BatchNorm2d(128)),
            ('maxpool1', nn.MaxPool2d(2)),
            ('dropout1', nn.Dropout2d(0.25)),

        ]))
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(128 * 6 * 6, 512)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(512, 10)
        self.softmax = nn.Softmax()

    def forward(self, x):
        # print(x.shape, self.ae(x)[1].shape)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.flatten(x)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.softmax(x)
        return x

In [None]:
ecasp_model = ECASPCifar10CNN().to(device)
best_ecaspmodel, ecasp_test_accuracy, ecasp_test_loss, ecasp_logging_dict = train(ecasp_model)



---



In [17]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)



---



In [None]:
class DeeperCifar10CNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer1 = nn.Sequential(OrderedDict([
            ('conv1', nn.Conv2d(3, 64, 3, padding=1)),
            ('relu1', nn.ReLU()),
            ('bn1', nn.BatchNorm2d(64)),
            ('conv2', nn.Conv2d(64, 64, 3)),
            ('relu2', nn.ReLU()),
            ('bn2', nn.BatchNorm2d(64)),
            ('maxpool1', nn.MaxPool2d(2)),
            ('dropout1', nn.Dropout2d(0.25)),

        ]))
        self.layer2 = nn.Sequential(OrderedDict([
            ('conv1', nn.Conv2d(64, 128, 3, padding=1)),
            ('relu1', nn.ReLU()),
            ('bn1', nn.BatchNorm2d(128)),
            ('conv2', nn.Conv2d(128, 128, 3)),
            ('relu2', nn.ReLU()),
            ('bn2', nn.BatchNorm2d(128)),
            ('maxpool1', nn.MaxPool2d(2)),
            ('dropout1', nn.Dropout2d(0.25)),

        ]))

        self.layer3 = nn.Sequential(OrderedDict([
            ('conv1', nn.Conv2d(128, 256, 3, padding=1)),
            ('relu1', nn.ReLU()),
            ('bn1', nn.BatchNorm2d(256)),
            ('conv2', nn.Conv2d(256, 256, 3)),
            ('relu2', nn.ReLU()),
            ('bn2', nn.BatchNorm2d(256)),
            ('maxpool1', nn.MaxPool2d(2)),
            ('dropout1', nn.Dropout2d(0.25)),

        ]))
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(256 * 6 * 3, 512)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(512, 10)
        self.softmax = nn.Softmax()

    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.flatten(x)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.softmax(x)
        return x

In [None]:
deeper_model = DeeperCifar10CNN().to(device)
best_deepermodel, deeper_test_accuracy, deeper_test_loss, deeper_logging_dict = train(deeper_model)

In [None]:
count_parameters(deeper_model)



---



---



In [10]:
class LinearAutoencoder(nn.Module):
    def __init__(self, input_size=32 * 32, layers=[128, 64, 12, 3]):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_size, layers[0]),
            nn.ReLU(),
            nn.Linear(layers[0], layers[1]),
            nn.ReLU(),
            nn.Linear(layers[1], layers[2]),
        )

        self.decoder = nn.Sequential(
            nn.Linear(layers[2], layers[1]),
            nn.ReLU(),
            nn.Linear(layers[1], layers[0]),
            nn.ReLU(),
            nn.Linear(layers[0], input_size),
            nn.Sigmoid(),
        )
        for layer in [*self.encoder.modules(), *self.decoder.modules()]:
            if isinstance(layer, nn.Linear):
                nn.init.kaiming_normal_(layer.weight)

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return x * decoded * 2.0

In [12]:
class LAECACifar10CNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer1 = nn.Sequential(OrderedDict([
            ('conv1', nn.Conv2d(3, 64, 3, padding=1)),
            ('eca1', eca_layer(64)),
            ('ae1', LinearAutoencoder(32)),
            ('relu1', nn.ReLU()),
            ('bn1', nn.BatchNorm2d(64)),
            ('conv2', nn.Conv2d(64, 64, 3)),
            ('eca2', eca_layer(64)),
            ('ae2', LinearAutoencoder(30)),
            ('relu2', nn.ReLU()),
            ('bn2', nn.BatchNorm2d(64)),
            ('maxpool1', nn.MaxPool2d(2)),
            ('dropout1', nn.Dropout2d(0.25)),
        ]))
        self.layer2 = nn.Sequential(OrderedDict([
            ('conv1', nn.Conv2d(64, 128, 3, padding=1)),
            ('eca1', eca_layer(128)),
            ('ae1', LinearAutoencoder(15)),
            ('relu1', nn.ReLU()),
            ('bn1', nn.BatchNorm2d(128)),
            ('conv2', nn.Conv2d(128, 128, 3)),
            ('eca2', eca_layer(128)),
            ('ae2', LinearAutoencoder(13)),
            ('relu2', nn.ReLU()),
            ('bn2', nn.BatchNorm2d(128)),
            ('maxpool1', nn.MaxPool2d(2)),
            ('dropout1', nn.Dropout2d(0.25)),
        ]))
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(128 * 6 * 6, 512)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(512, 10)
        self.softmax = nn.Softmax(1)

    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.flatten(x)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.softmax(x)
        return x

In [13]:
laeca_model = LAECACifar10CNN().to(device)
best_laecamodel, laeca_test_accuracy, laeca_test_loss, laeca_logging_dict = train(laeca_model)

  7%|▋         | 1/15 [00:42<10:00, 42.91s/it]


epoch = 0, training accuracy: 0.453, training loss: 2.010 validation accuracy: 0.477, validation loss: 1.977


 13%|█▎        | 2/15 [01:20<08:37, 39.84s/it]


epoch = 1, training accuracy: 0.555, training loss: 1.912 validation accuracy: 0.561, validation loss: 1.902


 20%|██        | 3/15 [01:58<07:46, 38.88s/it]


epoch = 2, training accuracy: 0.578, training loss: 1.850 validation accuracy: 0.624, validation loss: 1.836


 27%|██▋       | 4/15 [02:36<07:03, 38.51s/it]


epoch = 3, training accuracy: 0.734, training loss: 1.740 validation accuracy: 0.672, validation loss: 1.794


 33%|███▎      | 5/15 [03:14<06:22, 38.23s/it]


epoch = 4, training accuracy: 0.672, training loss: 1.794 validation accuracy: 0.701, validation loss: 1.763


 40%|████      | 6/15 [03:52<05:43, 38.18s/it]


epoch = 5, training accuracy: 0.688, training loss: 1.779 validation accuracy: 0.710, validation loss: 1.751


 47%|████▋     | 7/15 [04:29<05:04, 38.03s/it]


epoch = 6, training accuracy: 0.648, training loss: 1.814 validation accuracy: 0.721, validation loss: 1.741


 53%|█████▎    | 8/15 [05:08<04:27, 38.27s/it]


epoch = 7, training accuracy: 0.648, training loss: 1.784 validation accuracy: 0.743, validation loss: 1.723


 60%|██████    | 9/15 [05:46<03:48, 38.14s/it]


epoch = 8, training accuracy: 0.789, training loss: 1.689 validation accuracy: 0.753, validation loss: 1.711


 67%|██████▋   | 10/15 [06:24<03:09, 38.00s/it]


epoch = 9, training accuracy: 0.656, training loss: 1.798 validation accuracy: 0.742, validation loss: 1.720


 73%|███████▎  | 11/15 [07:02<02:31, 37.98s/it]


epoch = 10, training accuracy: 0.703, training loss: 1.750 validation accuracy: 0.755, validation loss: 1.705


 80%|████████  | 12/15 [07:39<01:53, 37.92s/it]


epoch = 11, training accuracy: 0.797, training loss: 1.669 validation accuracy: 0.773, validation loss: 1.690


 87%|████████▋ | 13/15 [08:17<01:15, 37.88s/it]


epoch = 12, training accuracy: 0.758, training loss: 1.699 validation accuracy: 0.774, validation loss: 1.689


 93%|█████████▎| 14/15 [08:55<00:37, 37.86s/it]


epoch = 13, training accuracy: 0.828, training loss: 1.639 validation accuracy: 0.779, validation loss: 1.684


100%|██████████| 15/15 [09:33<00:00, 38.21s/it]


epoch = 14, training accuracy: 0.828, training loss: 1.643 validation accuracy: 0.786, validation loss: 1.677





In [18]:
count_parameters(laeca_model)

2722272

In [21]:
!mkdir ./models

In [22]:
save_model_and_data('linear_eca', best_laecamodel, laeca_test_accuracy, laeca_test_loss, laeca_logging_dict)

# GradCam

Docs: https://jacobgil.github.io/pytorch-gradcam-book/introduction.html

In [None]:
%pip install grad-cam --quiet
from pytorch_grad_cam import GradCAM
from pytorch_grad_cam.utils.model_targets import ClassifierOutputTarget
from pytorch_grad_cam.utils.image import show_cam_on_image

In [None]:
classes_names = {i: c for i, c in enumerate (('airplane', 'automobile', 'bird', 'cat', 'deer',
                                              'dog', 'frog', 'horse', 'ship', 'truck',))}

def show_activations(model, target_layers, image_number: int | None = None, use_cuda=True):
    model.eval()
    if image_number == None:
        img, label = cifar10['train'][np.random.randint(350)]
    else:
        img, label = cifar10['train'][image_number]
    input_tensor = img.unsqueeze(0)
    cam = GradCAM(model=model, target_layers=target_layers, use_cuda=use_cuda)
    targets = [ClassifierOutputTarget(label)] if label else None
    grayscale_cam = cam(input_tensor, targets)
    grayscale_cam = grayscale_cam[0, :]
    def normalize(img):
        return ((img - img.min()) / (img.max() - img.min()))

    visualization = show_cam_on_image(normalize(torch.permute(img, (1, 2, 0))).numpy(), grayscale_cam, use_rgb=True)

    fig, axs = plt.subplots(1, 2)
    axs = axs.ravel()
    axs[0].imshow(torch.permute(img, (1, 2, 0)))
    axs[0].axis('off')
    axs[1].imshow(visualization)
    axs[1].axis('off')
    plt.suptitle(classes_names[label])




---



In [None]:
!rm -r './models/cnn/'

In [19]:
import os
import pickle
from datetime import datetime
# os.mkdir('./models/')
def save_model_and_data(name, model, test_accuracy, test_loss, logging_dict):
    os.mkdir(f'./models/{name}')
    torch.save(model.state_dict(), f'./models/{name}/{name}.pt')
    data = {'test_accuracy': test_accuracy,
            'test_loss': test_loss,
            'logging_dict': logging_dict,
            'datetime': datetime.now()}
    with open(f'./models/{name}/{name}.pkl', 'wb') as f:
        pickle.dump(data, f)

In [None]:
models_data = {
    'cnn': [best_model, test_accuracy, test_loss, logging_dict],
    'autoencoder': [best_ae_model, ae_test_accuracy, ae_test_loss, ae_logging_dict],
    'eca': [best_eca_model, eca_test_accuracy, eca_test_loss, eca_logging_dict],
    'autoencoder_eca': [best_aeca_model, aeca_test_accuracy, aeca_test_loss, aeca_logging_dict],
    'c_autoencoder_eca': [best_caecamodel, caeca_test_accuracy, caeca_test_loss, caeca_logging_dict],
    'linear_autoencoder_eca': [best_laecamodel, laeca_test_accuracy, laeca_test_loss, laeca_logging_dict],
    'eca_spatial': [best_ecaspmodel, ecasp_test_accuracy, ecasp_test_loss, ecasp_logging_dict],
    'deeper_cnn': [best_deepermodel, deeper_test_accuracy, deeper_test_loss, deeper_logging_dict],
}

for md in models_data:
    save_model_and_data(md, *models_data[md])

In [None]:
!zip -r models.zip models/