Sam Ellis

## 0) Prelims

In [3]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


As a note to the reader: if you wish to run this notebook, you will need to make sure the following paths are correct. im_path must point to the elpv dataset, better_label_path must point to the train.csv, test_path must point to the test.csv, and the resnet paths must point to their corresponding parameter dictionaries. 

In [None]:
im_path = '/elpv-dataset'
train_path = 'train.csv'
test_path = 'test.csv'

res50_path = '/ResNetParams/resnet50.txt'
res101_path = '/ResNetParams/resnet101.txt'
res152_path = '/ResNetParams/resnet152.txt'

## 1) Net Model Classes + Misc

This is a simple implementation of ResNet architecture that allows the users to specify in_channels and num_classes. 

In [None]:
import torch
import torch.nn as nn

class block(nn.Module):
    def __init__(
        self, in_channels, intermediate_channels, identity_downsample=None, stride=1
    ):
        super(block, self).__init__()
        self.expansion = 4
        self.conv1 = nn.Conv2d(
            in_channels, intermediate_channels, kernel_size=1, stride=1, padding=0, bias=False
        )
        self.bn1 = nn.BatchNorm2d(intermediate_channels)
        self.conv2 = nn.Conv2d(
            intermediate_channels,
            intermediate_channels,
            kernel_size=3,
            stride=stride,
            padding=1,
            bias=False
        )
        self.bn2 = nn.BatchNorm2d(intermediate_channels)
        self.conv3 = nn.Conv2d(
            intermediate_channels,
            intermediate_channels * self.expansion,
            kernel_size=1,
            stride=1,
            padding=0,
            bias=False
        )
        self.bn3 = nn.BatchNorm2d(intermediate_channels * self.expansion)
        self.relu = nn.ReLU()
        self.identity_downsample = identity_downsample
        self.stride = stride

    def forward(self, x):
        identity = x.clone()

        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        x = self.conv3(x)
        x = self.bn3(x)

        if self.identity_downsample is not None:
            identity = self.identity_downsample(identity)

        x += identity
        x = self.relu(x)
        return x


class ResNet(nn.Module):
    def __init__(self, block, layers, image_channels, num_classes):
        super(ResNet, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(image_channels, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(
            block, layers[0], intermediate_channels=64, stride=1
        )
        self.layer2 = self._make_layer(
            block, layers[1], intermediate_channels=128, stride=2
        )
        self.layer3 = self._make_layer(
            block, layers[2], intermediate_channels=256, stride=2
        )
        self.layer4 = self._make_layer(
            block, layers[3], intermediate_channels=512, stride=2
        )

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * 4, num_classes)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.reshape(x.shape[0], -1)
        x = self.fc(x)

        return x

    def _make_layer(self, block, num_residual_blocks, intermediate_channels, stride):
        identity_downsample = None
        layers = []

        if stride != 1 or self.in_channels != intermediate_channels * 4:
            identity_downsample = nn.Sequential(
                nn.Conv2d(
                    self.in_channels,
                    intermediate_channels * 4,
                    kernel_size=1,
                    stride=stride,
                    bias=False
                ),
                nn.BatchNorm2d(intermediate_channels * 4),
            )

        layers.append(
            block(self.in_channels, intermediate_channels, identity_downsample, stride)
        )
        self.in_channels = intermediate_channels * 4
        for i in range(num_residual_blocks - 1):
            layers.append(block(self.in_channels, intermediate_channels))

        return nn.Sequential(*layers)

def ResNet50(img_channel=3, num_classes=1000):
    return ResNet(block, [3, 4, 6, 3], img_channel, num_classes)

def ResNet101(img_channel=3, num_classes=1000):
    return ResNet(block, [3, 4, 23, 3], img_channel, num_classes)

def ResNet152(img_channel=3, num_classes=1000):
    return ResNet(block, [3, 8, 36, 3], img_channel, num_classes)

Next, this is a simple class for getting the dataset in a pytorch format. 

In [None]:
import pandas as pd
import torch
import os
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from torchvision.transforms import ToTensor
import math
import torch.optim as optim
import time

class solar_cell_dataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None):
        self.labels = pd.read_csv(csv_file)
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        img_name = os.path.join(self.img_dir, self.labels.iloc[idx, 0])
        img = Image.open(img_name)

        target = torch.tensor(self.labels.iloc[idx, 1:])

        if self.transform:
            img = self.transform(img)

        return img, target

## 2) Loading Model and Data

Another note to the reader: dont bother running this unless you have a beefcake GPU or colab pro. 

In [None]:
######### HYPER PARAMS ###########
batch_size = 64
cores = 0 #-1
lr = .001
momentum = .6
epochs = 100
################################

In [None]:
models = [('ResNet50', ResNet50(img_channel=1, num_classes=2), res50_path),
          ('ResNet101', ResNet101(img_channel=1, num_classes=2), res101_path),
          ('ResNet151', ResNet152(img_channel=1, num_classes=2), res152_path)]
          
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

name, net, params = models[0]
net.to(device)
print(name)

cuda
ResNet50


I trained these networks induvidually by changing models[0] $\Rightarrow$ models[1] or whichever one needed training. 

In [None]:
from torchvision.transforms.transforms import RandomApply
import torchvision.transforms as T


transforms = T.Compose([
    T.ToTensor(),
    T.RandomApply(nn.ModuleList([T.RandomAffine(degrees = (0, 30), translate = (0.1, 0.2), scale = (0.5, 1))]), p=.2)
])

s = solar_cell_dataset(csv_file=train_path, img_dir=im_path, transform=transforms)
t = solar_cell_dataset(csv_file=test_path, img_dir=im_path, transform=ToTensor())


train_loader = DataLoader(s,
                          batch_size=batch_size,
                          num_workers=cores)
test_loader = DataLoader(t,
                         batch_size=batch_size,
                         num_workers=cores)

Below is a classification function for dealing with the output of the CNN's

In [None]:
import torch.nn.functional as F
s_ = nn.Softmax(dim=1)
def classify(x):
  x = s_(x)
  x = torch.argmax(x, dim=1)
  # x = F.one_hot(x, num_classes=4)
  return x

In the training loop, I go for a set number of epochs but I check the test error after every 5 epochs. At the end I saved the model parameters.

In [None]:
loss_criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=lr, momentum = momentum)

start = time.time()
for epoch in range(epochs):

  t_loss = 0
  acc = 0
  for idx, data in enumerate(train_loader):
    img, target = data
    img = img.to(device)
    target = target.type(torch.FloatTensor)
    target = target.to(device)

    optimizer.zero_grad()

    output = net(img)
    loss = loss_criterion(output, target)
    loss.backward()
    optimizer.step()

    t_loss += loss.item()
    acc += (classify(output) == torch.argmax(target, dim=1)).float().sum()


  accuracy = 100 * acc / (len(s))
  print(f'[{epoch}] loss : {t_loss:.3f}, accuracy : {accuracy:.2f}%, time : {time.time() - start:.2f}')

  if (epoch + 1) % 5 == 0:
    acc = 0
    with torch.no_grad():
      for idx, data in enumerate(test_loader):
        img, target = data
        img = img.to(device)
        target = target.type(torch.FloatTensor)
        target = target.to(device)

        output = net(img)
        acc += (classify(output) == torch.argmax(target, dim=1)).float().sum()
    accuracy = 100 * acc / (len(t))
    print('======================================================')
    print(f'TEST ACCURACY : {accuracy:.2f}%')
    print('======================================================')

# torch.save(net.state_dict(), params)
print('DONE')

[0] loss : 13.901, accuracy : 71.43%, time : 25.82
[1] loss : 14.064, accuracy : 71.88%, time : 51.16
[2] loss : 13.894, accuracy : 71.43%, time : 76.42
[3] loss : 13.585, accuracy : 72.50%, time : 101.87
[4] loss : 13.542, accuracy : 72.55%, time : 127.27
TEST ACCURACY : 69.89%
[5] loss : 13.343, accuracy : 73.68%, time : 154.63
[6] loss : 13.264, accuracy : 73.79%, time : 180.95
[7] loss : 13.524, accuracy : 74.24%, time : 206.31
[8] loss : 12.940, accuracy : 73.23%, time : 231.68
[9] loss : 13.078, accuracy : 74.07%, time : 257.25
TEST ACCURACY : 70.79%
[10] loss : 12.754, accuracy : 73.57%, time : 284.68
[11] loss : 12.630, accuracy : 74.07%, time : 309.98
[12] loss : 12.992, accuracy : 74.63%, time : 335.24
[13] loss : 12.444, accuracy : 75.03%, time : 360.59
[14] loss : 12.453, accuracy : 74.92%, time : 385.83
TEST ACCURACY : 72.58%
[15] loss : 12.215, accuracy : 74.80%, time : 413.37
[16] loss : 12.169, accuracy : 75.31%, time : 438.69
[17] loss : 12.097, accuracy : 75.14%, time