## **Connect to drive**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## **Change the following paths**

In [None]:
import os
# path where train csv file stored
csv_file_path = '/content/drive/My Drive/Assignments/Deeper_Systems/data/train.truth.csv'


# path of the training data
train_data_path = '/content/drive/My Drive/Assignments/Deeper_Systems/data/train'


# path to save the trained model 
save_model = '/content/drive/My Drive/Assignments/Deeper_Systems/resnet'

if not os.path.exists(save_model):
    os.makedirs(save_model)


# path of the testing data
test_path = '/content/drive/My Drive/Assignments/Deeper_Systems/data/test'
test_image_list = os.listdir(test_path)


# path to save the testing images after correcting the orientation
corrected_image_path = '/content/drive/My Drive/Assignments/Deeper_Systems/resnet/corrected_images'

if not os.path.exists(corrected_image_path):
    os.makedirs(corrected_image_path)


# path to save the results
path_to_save = '/content/drive/My Drive/Assignments/Deeper_Systems/resnet/result'

if not os.path.exists(path_to_save):
    os.makedirs(path_to_save)

## **Libraries**

In [None]:
import pandas as pd
import os
from skimage import io
import numpy as np
import cv2

import torch
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

## **Check device**

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


## **Map labels**

In [None]:
def map_label(l):
  d = {'rotated_right':0, 'rotated_left':1, 'upright':2, 'upside_down':3}
  return d[l]

In [None]:
def return_label(l):
  d = {0:'rotated_right', 1:'rotated_left', 2:'upright', 3:'upside_down'}
  return d[l]

## **DataLoader**

In [None]:
class create_dataloader(Dataset):

    def __init__(self, csv_file, root_dir, transform):
        self.data = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        img_name = os.path.join(self.root_dir, self.data.fn[idx])

        try:
          image = io.imread(img_name)
        except:
          image = np.empty([64, 64, 3])

        label = map_label(self.data.label[idx])
        image = self.transform(image)
        sample = {'image': image, 'label': label}

        return sample

In [None]:
transform = transforms.Compose(
                        [transforms.ToTensor()
                        ])

train_dataset = create_dataloader(csv_file=csv_file_path,
                                  root_dir=train_data_path,
                                  transform=transform)

training_data = DataLoader(dataset=train_dataset, batch_size=4, shuffle=True)

## **Neural Network**

In [None]:
# 3*3 convolution
def conv3x3(in_channels, out_channels, stride=1):
    return nn.Conv2d(in_channels, out_channels, kernel_size=3,
                    stride=stride, padding=1, bias=False)

# Residual block
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(ResidualBlock, self).__init__()
        self.conv1 = conv3x3(in_channels, out_channels, stride)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = conv3x3(out_channels, out_channels)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = downsample

    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        if self.downsample:
            residual = self.downsample(x)
        out += residual
        out = self.relu(out)
        return out

# ResNet
class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=4):
        super(ResNet, self).__init__()
        self.in_channels = 64
        self.conv = nn.Conv2d(3, 64, 5)
        self.bn = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.max_pool = nn.MaxPool2d(3)
        self.layer1 = self.make_layer(block, 64, layers[0])
        self.layer2 = self.make_layer(block, 128, layers[0], 2)
        self.layer3 = self.make_layer(block, 256, layers[1], 2)
        self.layer4 = self.make_layer(block, 512, layers[1], 2)
        self.avg_pool = nn.AvgPool2d(3)
        self.fc = nn.Linear(512, num_classes)

    def make_layer(self, block, out_channels, blocks, stride=1):
        downsample = None
        if (stride != 1) or (self.in_channels != out_channels):
            downsample = nn.Sequential(
                conv3x3(self.in_channels, out_channels, stride=stride),
                nn.BatchNorm2d(out_channels))
        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels
        for i in range(1, blocks):
            layers.append(block(out_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        out = self.conv(x)
        out = self.bn(out)
        # print(out.shape)
        out = self.max_pool(out)
        # print(out.shape)
        out = self.layer1(out)
        # print(out.shape)
        out = self.layer2(out)
        # print(out.shape)
        out = self.layer3(out)
        # print(out.shape)
        out = self.layer4(out)
        # print(out.shape)
        out = self.avg_pool(out)
        # print(out.shape)
        out = out.view(out.size(0), -1)
        out = self.fc(out)
        out = torch.softmax(out, dim=1)
        return out


## **Initialize Parameters**

In [None]:
''' Network '''

net_args = {
        "block": ResidualBlock,
        "layers": [2, 2, 2, 2]
    }
net = ResNet(**net_args).to(device)

''' Loss Function '''
criterion = nn.CrossEntropyLoss()

''' Optimizer '''
optimizer = optim.Adam(net.parameters(), lr=0.0001)

## **Training**

In [None]:
for epoch in range(2):  # loop over the dataset multiple times

    running_loss = 0.0
    for i, batch in enumerate(training_data):
        # get the inputs; data is a list of [inputs, labels]
        image= batch['image']
        label= batch['label']
        image = image.to(device)
        label = label.to(device)

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        logits = net(image.float())
        loss = criterion(logits, label)
        loss.backward()
        optimizer.step()

        print('Epoch Number -> {} Iteration Number: {} | Loss: {}'.format(epoch, i+1, loss))

    PATH = os.path.join(save_model, 'train_net.pth')
    torch.save(net.state_dict(), PATH)

print('Finished Training')

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
epoch number: 1 data_number: 7227 Loss: 0.7437856197357178
epoch number: 1 data_number: 7228 Loss: 0.7437458038330078
epoch number: 1 data_number: 7229 Loss: 0.7443171143531799
epoch number: 1 data_number: 7230 Loss: 1.1500239372253418
epoch number: 1 data_number: 7231 Loss: 0.8900158405303955
epoch number: 1 data_number: 7232 Loss: 0.906859815120697
epoch number: 1 data_number: 7233 Loss: 1.0818263292312622
epoch number: 1 data_number: 7234 Loss: 0.9132832884788513
epoch number: 1 data_number: 7235 Loss: 1.074206829071045
epoch number: 1 data_number: 7236 Loss: 1.066899061203003
epoch number: 1 data_number: 7237 Loss: 0.7439416646957397
epoch number: 1 data_number: 7238 Loss: 0.9023317098617554
epoch number: 1 data_number: 7239 Loss: 1.561038851737976
epoch number: 1 data_number: 7240 Loss: 0.923812747001648
epoch number: 1 data_number: 7241 Loss: 0.9084219932556152
epoch number: 1 data_number: 7242 Loss: 0.9145650863647

## **Testing**

In [None]:
PATH = os.path.join(save_model, 'train_net.pth')

''' Network '''
# net = Net().to(device)

net_args = {
        "block": ResidualBlock,
        "layers": [2, 2, 2, 2]
    }
net = ResNet(**net_args).to(device)

net.load_state_dict(torch.load(PATH))

<All keys matched successfully>

In [None]:
transform = transforms.Compose(
                        [transforms.ToTensor()
                        ])

In [None]:

test_image_array = list()

predicted_label = list()

for image_name in test_image_list:
  img = os.path.join(test_path, image_name)
  img = io.imread(img)
  image = transform(img)
  image = image.to(device)

  image = torch.reshape(image, (1, image.shape[0], image.shape[1], image.shape[2]))

  outputs = net(image)
  _, predicted = torch.max(outputs, 1)
  pred_label = predicted.item()
  label = return_label(pred_label)
  predicted_label.append(label)

  # rotated_right
  if pred_label==0:
    corrected_img = cv2.rotate(img, cv2.ROTATE_90_COUNTERCLOCKWISE)
  # rotated_left
  if pred_label==1:
    corrected_img = cv2.rotate(img, cv2.ROTATE_90_CLOCKWISE)
  # upside_down
  if pred_label==3:
    corrected_img = cv2.rotate(img, cv2.ROTATE_180)
  # upright
  if pred_label==2:
    corrected_img = img

  cv2.imwrite(os.path.join(corrected_image_path, image_name), corrected_img)
  test_image_array.append(corrected_img)

  print(image_name)

95-34145495_1950-08-23_2011.jpg
95-302695_1968-10-12_2013.jpg
95-31912895_1964-06-17_1996.jpg
95-32853595_1986-01-08_2015.jpg
95-30352495_1989-06-27_2013.jpg
95-3339395_1965-11-11_2001.jpg
95-32875695_1985-09-09_2009.jpg
95-3203495_1964-10-10_1983.jpg
95-32001395_1988-11-23_2011.jpg
95-33554195_1986-09-21_2012.jpg
95-30983095_1991-05-02_2013.jpg
95-33563895_1990-01-17_2012.jpg
95-34186995_1986-03-27_2011.jpg
95-335195_1964-05-13_2014.jpg
95-29736595_1986-02-17_2010.jpg
95-29409395_1992-02-26_2010.jpg
95-2995995_1966-12-19_2015.jpg
95-33734995_1990-03-24_2012.jpg
95-34802495_1980-11-06_2012.jpg
95-3483095_1986-09-08_2014.jpg
95-3493695_1976-04-28_2008.jpg
95-30122795_1989-04-16_2014.jpg
95-34127595_1973-04-30_2013.jpg
95-31329995_1940-09-14_2005.jpg
95-3475595_1984-05-23_2015.jpg
95-31396795_1987-01-09_2014.jpg
95-31152195_1987-02-11_2012.jpg
95-34428195_1994-09-22_2014.jpg
95-32535595_1937-03-02_2007.jpg
95-3190895_1926-04-04_2005.jpg
95-316095_1962-03-29_2006.jpg
95-30115695_1980-01-3

In [None]:
# Create a dataframe from 2 lists

data_df = {'fn': image_name, 'label': predicted_label}
df = pd.DataFrame(data_df)

df.to_csv(os.path.join(path_to_save, 'test.preds.csv'), index=False)

print('File generated...')

# save numpy array of corrected test image
np.save(os.path.join(path_to_save, 'corrected_test_image_array'), test_image_array)

File generated...
