In [None]:
import torch
import torchvision
import numpy as np
import matplotlib.pyplot as plt
import PIL

In [2]:
dataset_train1 = torchvision.datasets.ImageFolder('./Training/')
dataset_train2 = torchvision.datasets.ImageFolder('./Training/', 
                transform=torchvision.transforms.Resize((224, 224)))

In [None]:
#dataset_train1[0][0].show()
#dataset_train2[0][0].show()
print(dataset_train2[0][1])
#dataset_train2[-1][0].show()
print(dataset_train2[-1][1])

In [None]:
composed = torchvision.transforms.Compose([
    torchvision.transforms.Resize((224, 224)),
    torchvision.transforms.ToTensor(),
    torchvision.transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                    std=[0.229, 0.224, 0.225])
])

dataset_train = torchvision.datasets.ImageFolder('./Training/', transform=composed)
dataset_valid = torchvision.datasets.ImageFolder('./Validation/', transform=composed)
dataset_train[0][0].shape, dataset_train[0][1]

In [4]:
from torch.utils.data import DataLoader

train_loader = DataLoader(dataset_train, batch_size=256, shuffle=True)
valid_loader = DataLoader(dataset_valid, batch_size=256, shuffle=False)

In [None]:
from torch.utils.data import DataLoader
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

In [3]:
from torch import nn
import copy
from IPython.display import clear_output

def compute_error(model, data_loader, criterion, c_sum=False):
    model.eval()
    losses, num_of_el = 0, 0
    with torch.no_grad():
        for x, y in data_loader:
            x = x.to(device)
            y = y.to(device)
            outputs = model(x)
            loss = criterion(outputs, y.type(torch.float32))
            if not c_sum: loss *= len(y)
            losses += loss
            num_of_el += len(y)
    return losses / num_of_el


def train_model(model: nn.Module,
              train_loader: DataLoader,
              valid_loader: DataLoader,
              num_epochs: int,
              optimizer: torch.optim.Optimizer,
              criterion,
              verbose: bool = True,
              verbose_plot: bool = False
              ) -> float:

    best_epoch = None
    best_params = None
    best_val_loss = np.inf
    train_losses, valid_losses = [], []

    for epoch in range(num_epochs):
        model.train()
        _iter = 1
        for inputs, targets in train_loader:
            inputs = inputs.to(device)
            targets = targets.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets.type(torch.float32))
            loss.backward()
            optimizer.step()

            if verbose:
                if _iter % 10 == 0:
                    print(f"Minibatch {_iter:>6}    |  loss {loss.item():>5.2f}  |")
            _iter += 1

        val_loss = compute_error(model, valid_loader, criterion)

        if val_loss < best_val_loss:
            best_epoch = epoch
            best_val_loss = val_loss
            best_params = [copy.deepcopy(p.detach().cpu()) for p in model.parameters()]

        if verbose:
            clear_output(True)
            m = f"After epoch {epoch:>2} | valid loss: {val_loss:>5.2f}"
            print("{0}\n{1}\n{0}".format("-" * len(m), m))

        if verbose_plot:
            train_loss = compute_error(model, train_loader, criterion)
            train_losses.append(train_loss.detach().cpu())
            valid_losses.append(val_loss.detach().cpu())

    if best_params is not None:
        if verbose:
            print(f"\nLoading best params on validation set in epoch {best_epoch} with loss {best_val_loss:.2f}")
        with torch.no_grad():
            for param, best_param in zip(model.parameters(), best_params):
                param[...] = best_param

    if verbose_plot:
        plt.figure(figsize=(6, 3))
        plt.plot(train_losses, c='b', label='train')
        plt.plot(valid_losses, c='r', label='valid')
        plt.grid(ls=':')
        plt.legend()
        plt.show()

    return best_val_loss

In [None]:
# using torchvision
model = torchvision.models.resnet50(weights='ResNet50_Weights.DEFAULT').to(device)

for param in model.parameters():
    param.requires_grad = False

# change the last layer to fit our needs
num_classes = 1
model.fc = nn.Sequential(
    nn.Linear(model.fc.in_features, num_classes),
    torch.nn.Flatten(0, 1),
).to(device)

# train the parameters of the last layer
optimizer = torch.optim.NAdam(model.fc.parameters(), lr=0.005)
criterion = nn.BCEWithLogitsLoss()
train_model(model, train_loader, valid_loader, 5, optimizer, criterion, True, True)

# prediction
#img = PIL.Image.fromarray((np.random.rand(224, 224, 3) * 255).astype(np.uint8))
#model(composed(img)[None, :, :, :])

In [None]:
def accuracy(outputs, targets):
    preds = outputs > 0
    targets = targets.type(torch.bool)
    return sum(preds == targets)
compute_error(model, valid_loader, accuracy, True)

# with self-implemented resnet

In [90]:
import torch.nn.functional as F

class BasicBlock(nn.Module):
    expansion = 1
    def __init__(self, in_planes, planes, stride):
        super().__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion * planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion * planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion * planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class ResNet(nn.Module):
    def __init__(self, block, size_blocks, num_blocks, num_classes=10):
        super().__init__()
        
        self.in_planes = size_blocks[0]
        self.conv1 = nn.Conv2d(3, self.in_planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(self.in_planes)

        layers = []
        for size, num in zip(size_blocks[1:], num_blocks):
            stride = 1 if len(layers) == 0 else 2
            layers.append(self._make_layer(block, size, num, stride=stride))
        self.layers = nn.Sequential(*layers)

        self.linear = nn.Linear(size_blocks[-1] * block.expansion, num_classes)
        self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layers(out)
        out = self.avg_pool(out)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out.flatten() # for binary classification

def ResNet18():
    return ResNet(BasicBlock, [16, 16, 32, 64, 128], [2, 2, 2, 2], 1)
        

In [None]:
# mean and std

composed = torchvision.transforms.Compose([
    torchvision.transforms.Resize((110, 90)),
    torchvision.transforms.ToTensor(),
])
dataset_train = torchvision.datasets.ImageFolder('./Training/', transform=composed)
train_loader = DataLoader(dataset_train, batch_size=256, shuffle=True)

mean, std = 0., 0.
img_cnt = 0
for images, y in train_loader:
    batch_samples = images.size(0)
    images = images.view(batch_samples, images.size(1), -1)
    mean += images.mean(2).sum(0)
    std += images.std(2).sum(0)
    img_cnt += batch_samples

mean /= img_cnt
std /= img_cnt
print(mean, std)

In [10]:
dataset_train3 = torchvision.datasets.ImageFolder('./Training/', 
                transform=torchvision.transforms.Resize((110, 90)))
#dataset_train3[0][0].show()

In [106]:
composed = torchvision.transforms.Compose([
    torchvision.transforms.Resize((110, 90)),
    torchvision.transforms.ToTensor(),
    torchvision.transforms.Normalize(mean=mean, std=std)
])

dataset_train = torchvision.datasets.ImageFolder('./Training/', transform=composed)
dataset_valid = torchvision.datasets.ImageFolder('./Validation/', transform=composed)

train_loader = DataLoader(dataset_train, batch_size=64, shuffle=True)
valid_loader = DataLoader(dataset_valid, batch_size=64, shuffle=False)

In [108]:
model = ResNet18().to(device)

In [None]:
img, _ = next(iter(train_loader))
img = img.type(torch.float32).to(device)
print(model(img))
del img

In [None]:
optimizer = torch.optim.NAdam(model.parameters(), lr=0.0002)
criterion = nn.BCEWithLogitsLoss()
train_model(model, train_loader, valid_loader, 1, optimizer, criterion, True, True)

In [None]:
def accuracy(outputs, targets):
    preds = outputs > 0
    targets = targets.type(torch.bool)
    return sum(preds == targets)
compute_error(model, valid_loader, accuracy, True)

# explainability

In [128]:
class Net_Grad(nn.Module):
    def __init__(self, model):
        super().__init__()
        self.model = model

        self.layers = nn.Sequential(
            model.conv1,
            model.bn1,
            nn.ReLU(),
            model.layers,
        )

        self.avg_pool = model.avg_pool
        self.linear = model.linear

        self.gradients = None

    def activations_hook(self, grad):
        self.gradients = grad

    def forward(self, x):
        x = self.layers(x)
        h = x.register_hook(self.activations_hook)
        x = self.avg_pool(x)
        x = x.view(x.size(0), -1)
        x = self.linear(x)
        return x.flatten()

    def get_activations_gradient(self):
        return self.gradients

    def get_activations(self, x):
        return self.layers(x)

In [None]:
netG = Net_Grad(model).to(device)
grad.eval()

train_loader = DataLoader(dataset_train, batch_size=1, shuffle=True)
img, _ = next(iter(train_loader))
img = img.type(torch.float32).to(device)

preds = netG(img)
preds.backward()
gradients = netG.get_activations_gradient()
pooled_gradient = torch.mean(gradients, dim=[0, 2, 3])
activations = netG.get_activations(img).detach()
for i in range(128):
    activations[:, i, :, :] *= pooled_gradient[i]
heatmap = torch.mean(activations, dim=1).squeeze().cpu()
heatmap = np.maximum(heatmap, 0)
heatmap /= torch.max(heatmap)
plt.matshow(heatmap.squeeze())

In [None]:
invTrans = torchvision.transforms.Compose([ 
    torchvision.transforms.Normalize(
        mean = [ 0., 0., 0. ],
        std = [ 1/std[0], 1/std[1], 1/std[2] ]),
    torchvision.transforms.Normalize(
        mean = [ -mean[0], -mean[1], -mean[2] ],
        std = [ 1., 1., 1. ]),
])

image = PIL.Image.fromarray((invTrans(img) * 255).detach().cpu().numpy().astype(np.uint8)[0, 0, :, :])
print(preds) # >0  => man
image

In [None]:
heatmap_colored = (plt.cm.jet(heatmap) * 255).astype(np.uint8)
heatmap_image = PIL.Image.fromarray(heatmap_colored).resize(image.size, PIL.Image.BILINEAR)
blended_image = PIL.Image.blend(image.convert('RGBA'), heatmap_image.convert('RGBA'), alpha=0.5)
blended_image