## RESNET 50

In [1]:
import torch
import torch.nn as nn
import torch.functional as F
import torch.optim as optim
from torch.optim import lr_scheduler
import torchvision
from torchvision import datasets, models, transforms
import numpy as np
import matplotlib.pyplot as plt
import os
import time
import copy

In [2]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
path = 'data/PokemonData/'

In [3]:
data_transforms = transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize(mean, std)
    ])mean  = np.array([0.458, 0.456, 0.406])
std = np.array([0.229, 0.224, 0.225])



images = datasets.ImageFolder(path, data_transforms)
image_classes = images.classes
#print(image_classes)
train_img, val_img, test_img = torch.utils.data.random_split(images, [0.7, 0.1, 0.2])
    

In [4]:
# hyperparameters
num_epochs = 20
batch_size = 32
learning_rate = 0.1

In [5]:
train_loader = torch.utils.data.DataLoader(dataset=train_img, batch_size = batch_size, shuffle = True)
valid_loader = torch.utils.data.DataLoader(dataset=val_img, batch_size = batch_size, shuffle = True)
test_loader = torch.utils.data.DataLoader(dataset=test_img, batch_size = batch_size, shuffle = False)

dataloaders = {
   'train': train_loader,
   'valid': valid_loader
}

dataset_sizes = {
    'train': len(train_loader),
    'valid': len(valid_loader)    
}

### MODEL

In [6]:
class subResNet(nn.Module):
    def __init__(self, channel1, channel2, type = 0, identity_downsample = None, stride = 1):
        super(subResNet, self).__init__()
        if type == 0:
            self.conv1 = nn.Conv2d(channel2, channel2, 1, stride = stride, padding = 0)
        elif type == 1:
            self.conv1 = nn.Conv2d(channel2*4, channel2, 1, stride = stride, padding = 0)
        elif type == 2:
            self.conv1 = nn.Conv2d(channel2*2, channel2, 1, stride = stride, padding = 0)


        self.bn1 = nn.BatchNorm2d(channel2)
        self.conv2 = nn.Conv2d(channel2, channel2, 3, stride = 1, padding = 1)
        self.bn2 = nn.BatchNorm2d(channel2)
        self.conv3 = nn.Conv2d(channel2, channel2*4, 1, stride = 1, padding = 0)
        self.bn3 = nn.BatchNorm2d(channel2*4)
        self.relu = nn.ReLU()
        self.identity_downsample = identity_downsample
    
    def forward(self, x):
        identity = x.clone()
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        x = self.conv3(x)
        x = self.bn3(x)
        if self.identity_downsample != None:
            identity = self.identity_downsample(identity)
        x += identity
        x = self.relu(x)
        return x

class ResNet50(nn.Module):
    def __init__(self, layers, channels, classes):
        super(ResNet50, self).__init__()
        self.conv1 = nn.Conv2d(channels, 64, 7, stride = 2, padding  = 3)
        self.relu = nn.ReLU()
        self.bn = nn.BatchNorm2d(64)
        self.maxpool = nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)

        self.layer1 = self.make_layer(layers[0], 64, 1)
        self.layer2 = self.make_layer(layers[1], 128, 2)
        self.layer3 = self.make_layer(layers[2], 256, 2)
        self.layer4 = self.make_layer(layers[3], 512, 2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc1 = nn.Linear(2048, classes)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.avgpool(x)
        x = x.reshape(x.shape[0], -1)
        x = self.fc1(x)
        return x

    def make_layer(self, num_blocks, in_channels, stride):
        layer = []
        identity_ds = None
        if stride != 1:
            identity_ds = nn.Sequential(
                nn.Conv2d(in_channels*2, in_channels*4, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(in_channels*4),
            )
        else:
            identity_ds = nn.Sequential(
                nn.Conv2d(64, 256, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(256),
            )
        if in_channels == 64:
            layer.append(subResNet(in_channels, in_channels, 0, identity_downsample=identity_ds, stride = stride))
        else:
            layer.append(subResNet(in_channels, in_channels, 2, identity_downsample=identity_ds, stride = stride))
            
        for i in range(1, num_blocks):
            layer.append(subResNet(in_channels, in_channels, 1))

        return nn.Sequential(*layer)



### TRAINING

In [7]:
model = ResNet50(layers = [3, 4, 6, 3], channels = 3, classes = len(image_classes)).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr = learning_rate, weight_decay = 0.0001, momentum = 0.9)
scheduler = lr_scheduler.CosineAnnealingLR(optimizer, 100)

In [None]:
since = time.time()
best_acc = 0.0
for epoch in range(num_epochs):
    print(f'Epoch {epoch+1}/{num_epochs}')
    print('-'*10)
    for phase in ['train', 'valid']:
        if phase == 'train':
            model.train()
        else:
            model.eval()
        
        running_loss = 0.0
        running_corrects = 0.0
        n_total = 0.0
        for inputs, labels in dataloaders[phase]:
            inputs = inputs.to(device)
            labels = labels.to(device)
            with torch.set_grad_enabled(phase == 'train'):
                outputs = model(inputs)
                _, pred = torch.max(outputs, 1)
                loss = criterion(outputs, labels)
                
                if phase == 'train':
                    optimizer.zero_grad()
                    loss.backward()
                    optimizer.step()
            
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(pred == labels.data)
            n_total += outputs.shape[0]
        if phase == 'train':
            scheduler.step()
        epoch_loss = running_loss / (n_total)
        epoch_acc = running_corrects.double() / (n_total)
        print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
        if phase == 'valid' and epoch_acc > best_acc:
            best_acc = epoch_acc
            best_model_wts = copy.deepcopy(model.state_dict())
    
    print()


time_elapsed = time.time() - since
print(f'Training complete in {time_elapsed//60:.0f}m {time_elapsed%60:.0f}s')
print(f'Best accuracy: {best_acc:4f}')
model.load_state_dict(best_model_wts)

### TESTING

In [None]:
with torch.no_grad():
    n_correct = 0
    n_samples = 0
    n_class_correct = [0 for i in range(len(image_classes))]
    n_class_samples = [0 for i in range(len(image_classes))]
    for images, labels in test_loader:
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        # max returns (value ,index)
        _, predicted = torch.max(outputs, 1)
        n_samples += labels.size(0)
        n_correct += (predicted == labels).sum().item()
        
        for i in range(batch_size):
            label = labels[i]
            pred = predicted[i]
            if (label == pred):
                n_class_correct[label] += 1
            n_class_samples[label] += 1

    acc = 100.0 * n_correct / n_samples
    print(f'Accuracy of the network: {acc} %')

    for i in range(10):
        acc = 100.0 * n_class_correct[i] / n_class_samples[i]
        print(f'Accuracy of {image_classes[i]}: {acc} %')