In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import zipfile as z
from PIL import Image
from imblearn.under_sampling import RandomUnderSampler
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch
import os
from sklearn import preprocessing# label encoding
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import Dataset, SubsetRandomSampler, DataLoader
from sklearn.model_selection import train_test_split

plt.rcParams['figure.figsize'] = (15, 5)
plt.rcParams['font.size'] = '5'

### Entire Training Dataset

In [None]:
df = pd.read_csv('all_data_info.csv')

In [None]:
print(df.shape)
df.head()

In [None]:
# number of artists
len(df['artist'].unique())

In [None]:
# missing artist names?
df['artist'].isnull().sum()

In [None]:
# num paintings per artist
by_artist = pd.DataFrame(df['artist'].value_counts().reset_index())
by_artist.columns = ['artist','count']
by_artist

In [None]:
# number of artists with > 200 paintings
len(by_artist[by_artist['count'] >= 200])

In [None]:
# remove data where < 200 paintings per artist
by_artist = by_artist[by_artist['count'] >= 200]

In [None]:
plt.hist(by_artist['count'])
plt.title('Distribution of num paintings per artist')
plt.xlabel('num paintings')

In [None]:
# removing artists from the main df that have < 200 paintings
df_200 = df[df['artist'].isin(by_artist['artist'])]

In [None]:
df_200.shape # 37452 rows

In [None]:
df_200['artist'].value_counts()

### Subset: Train 1 and Train 2

__lines below commented so don't need to create df_sub from scratch each time - just load saved pickle file__

In [None]:
# entire training set to large so downloading subset train 1

# filenames_train1 = []
# with z.ZipFile('train_1.zip', 'r') as zip:
#     for info in zip.infolist():
#         name = info.filename.split('/')
#         filenames_train1.append(name[1])

In [None]:
# entire training set to large so downloading subset train 1

# filenames_train2 = []
# with z.ZipFile('train_2.zip', 'r') as zip:
#     for info in zip.infolist():
#         name = info.filename.split('/')
#         filenames_train2.append(name[1])

In [None]:
# train1_df = df[df['new_filename'].isin(filenames_train1)] 

In [None]:
# train2_df = df[df['new_filename'].isin(filenames_train2)] 

In [None]:
# train1_df.reset_index(inplace=True)

In [None]:
# train2_df.reset_index(inplace=True)

In [None]:
# saving train1_df and train2_df
# train1_df.to_pickle("train1_df.pkl")
# train2_df.to_pickle("train2_df.pkl")

In [None]:
# load train1 and train2 dataframes from pickle file
train1_df = pd.read_pickle("train1_df.pkl")
train2_df = pd.read_pickle("train2_df.pkl")

In [None]:
print(train1_df.shape)
train1_df.head()

In [None]:
print(train2_df.shape)
train2_df.head()

In [None]:
# number of unique artists in train1 and train2
print("train1 num artists: ", len(train1_df['artist'].unique()))
print("train2 num artists: ", len(train2_df['artist'].unique()))

In [None]:
# combining both tables
df_sub = pd.concat([train1_df,train2_df], ignore_index=True)
print(df_sub.shape)
# drop 'index' col
df_sub.drop('index', axis=1, inplace=True)
df_sub.head()

In [None]:
# number of unique artists
len(df_sub['artist'].unique())

In [None]:

#merging train 1 and train 2 folders (unzip them first)
#import os
#import shutil

# #Path of source directory & destination directory
#src_directory = 'train_2'
#dst_directory = 'train_1'

# # Extract file from Source directory and move to Destination directory
#for file in os.listdir(src_directory):
     #src_file = os.path.join(src_directory, file)
     #dest_file = os.path.join(dst_directory, file)
     #shutil.move(src_file, dest_file)

# all should be in 'train_1', compress it for storage renamed as train.zip

merged files in train.zip under train_1 directory

In [None]:
# looking at some images
archive = z.ZipFile('train.zip', 'r')
file = df_sub.loc[2]['new_filename']
imgdata = archive.open('train_1/'+file)
img = Image.open(imgdata)
plt.imshow(img)
plt.show()

file = df_sub.loc[19500]['new_filename']
imgdata = archive.open('train_1/'+file)
img = Image.open(imgdata)
plt.imshow(img)
plt.show()

In [None]:
df_sub['artist'].value_counts().head(10)

__largest number of paintings per artist in df_sub is 119 (compared to 500 in entire training data)__

In [None]:
by_artist_1 = pd.DataFrame(df_sub['artist'].value_counts().reset_index())
by_artist_1.columns = ['artist','count']
by_artist_1

In [None]:
plt.hist(by_artist_1['count'])
plt.title('Distribution of num paintings per artist- Train1,2')
plt.xlabel('num paintings')
plt.xticks(range(0, 119,3))
plt.show()

In [None]:
# number of artists with > 80 paintings
len(by_artist_1[by_artist_1['count'] >= 80])

In [None]:
by_artist_1 = by_artist_1[by_artist_1['count'] >= 80]
# removing artists from the main df that have < 80 paintings
df_80 = df_sub[df_sub['artist'].isin(by_artist_1['artist'])]

In [None]:
print(df_80.shape)
print("num artists: ", len(df_80['artist'].unique()))
df_80.head()

__df_80 has data from df_sub with artists having >= 80 paintings (max = 119)__

In [None]:
df_80.groupby('artist').size()

In [None]:
# balancing the classes (making all artists have 80 paintings)
X = df_80.drop('artist', axis=1)
y = df_80['artist']
rus = RandomUnderSampler(random_state=42)
X_rus, y_rus = rus.fit_resample(X, y)

In [None]:
print(X_rus.shape)
print(36*81)
y_rus.value_counts()

Resampled so dataset is now 36 artists each with 81 paintings

In [None]:
# saving X_rus and y_rus as pickle files
# X_rus.to_pickle("X_rus.pkl")
# y_rus.to_pickle("y_rus.pkl")

In [None]:
# X_rus = pd.read_pickle("X_rus.pkl")
# y_rus = pd.read_pickle("y_rus.pkl")

In [None]:
sns.countplot(x='genre', data=X_rus)
plt.xticks(rotation=90)
plt.title('Distribution of Genres')
plt.show()

### Resizing images to 256x256 according to paper. Also normalizing and creating dataloader

In [None]:
# directory = 'train_1'

# for filename in os.listdir(directory):
#     try:
#         if filename.endswith('.jpg'):
#             img_path = os.path.join(directory, filename)
#             with Image.open(img_path) as img:
#                 img_resized = img.resize((256, 256), resample=Image.BILINEAR)
#                 img_resized.convert('RGB').save(os.path.join(directory, filename))
#     except:
#         print(f"Could not process image {filename}")

In [None]:
# remove record of corrupted image file 101947.jpg
X_rus[X_rus['new_filename']=='101947.jpg'] #not included after resampling anyways

__Images resized to 256x256__

In [None]:
filename = df_sub.loc[19400]['new_filename'] 
img = Image.open(os.path.join('train_1', filename))
plt.imshow(img)
print(img.size)  # Should print (256, 256)

In [None]:
# transforming image - normalization improves performance
transform = transforms.Compose(
    [transforms.ToTensor(), 
     transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)) # applying ImageNet normalization
    ])    

In [None]:
# look at transformed image
filename = df_sub.loc[19400]['new_filename'] 
img = Image.open(os.path.join('train_1', filename))
tr_img = transform(img)
plt.imshow(tr_img.numpy().transpose(1,2,0))
plt.show()

In [None]:
le = preprocessing.LabelEncoder() # one hot encoding instead? but 36 classes
artists = list(y_rus.unique())
le.fit(artists)
le.classes_

In [None]:
l = le.transform(y_rus)
y_rus,l

In [None]:
# create dataloaders, batch size etc
class CustomImageDataset(Dataset):
    def __init__(self,img_dir, X,y, transformation): # pass X_rus, y_rus, img_dir='train_1' for training
        self.transformation = transform
        # is encoder needed?
        self.encoder = le
        self.img_dir = img_dir
        self.feats, self.labels = self.get_all(self.img_dir,X,y)
        
    def get_all(self,img_dir,X,y):
        images = []
        labels = []
        
        for index, row in X.iterrows():
            file = row['new_filename']
            img = Image.open(os.path.join(img_dir, file)) 
            data = self.transformation(img)
            images.append(data)
            
            
        labels = self.encoder.transform(y)
        
        
        images = torch.stack(images)
        labels = torch.LongTensor(labels)
        return images, labels
            
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self,item):
        return self.feats[item], self.labels[item]
        

In [None]:
# need to do similar for test data^

In [None]:
#trainset = CustomImageDataset('train_1',X_rus,y_rus,transform)

In [None]:
#torch.save(trainset,"trainset.pt")

In [None]:
# load saved Dataset
trainset = torch.load("trainset.pt")

In [None]:
# split into train and val [80:20]
batch_size= 32
labels = trainset.labels.numpy()
train_indices, val_indices = train_test_split(np.arange(len(labels)),test_size=0.2,
                                              shuffle=True,random_state=42,stratify=labels)
train_sampler = SubsetRandomSampler(train_indices)
val_sampler = SubsetRandomSampler(val_indices)
train_loader = DataLoader(trainset, batch_size=batch_size, sampler=train_sampler,drop_last=True)
val_loader = DataLoader(trainset, batch_size=batch_size, sampler=val_sampler,drop_last=True)

### Baseline CNN
Based on: http://cs229.stanford.edu/proj2018/report/41.pdf

In [None]:
num_classes = 36

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") 
print(device)

In [None]:
class CNNBase(nn.Module):
    def __init__(self):
        super(CNNBase,self).__init__()
        self.conv1 = nn.Conv2d(3,16,3,padding=1)
        self.conv2 = nn.Conv2d(16,16,3,padding=1)
        self.conv3 = nn.Conv2d(16,16,3,padding=1)
        self.conv4 = nn.Conv2d(16,32,3,padding=1)
        self.conv5 = nn.Conv2d(32,128,3,padding=1)
        self.conv6 = nn.Conv2d(128,256,3,padding=1)
        self.relu = nn.ReLU()
        self.bn16 = nn.BatchNorm2d(16)
        self.bn32 = nn.BatchNorm2d(32)
        self.bn128 = nn.BatchNorm2d(128)
        self.bn256 = nn.BatchNorm2d(256)
        self.dropout = nn.Dropout2d(p=0.5)
        self.fc1 = nn.Linear(16384,2048)
        self.fc2 = nn.Linear(2048,num_classes)
        self.pool = nn.MaxPool2d(2) 
        #self.softmax = nn.Softmax(dim=1) need to remove during training as cross entropy loss already has softmax
        
    def forward(self,x):
        #print("start: ", x.shape)
        x = self.relu(self.bn16(self.conv1(x)))
        #print("conv1: ", x.shape)
        x = self.pool(self.relu(self.bn16(self.conv2(x))))
        #print("conv2: ", x.shape)
        x = self.pool(self.relu(self.bn16(self.conv3(x))))
        #print("conv3: ", x.shape)
        x = self.pool(self.relu(self.bn32(self.conv4(x))))
        #print("conv4: ", x.shape)
        x = self.pool(self.relu(self.bn128(self.conv5(x))))
        #print("conv5: ", x.shape)
        x = self.pool(self.relu(self.bn256(self.conv6(x))))
        #print("conv6: ", x.shape)
        x = torch.flatten(x, start_dim=1)
        #print("flatten: ", x.shape)
        x = self.dropout(x)
        x = self.relu(self.fc1(x))
        #print("fc1: ", x.shape)
        x = self.fc2(x)
        #print("fc2: ", x.shape)
        x = self.dropout(x)
        return x


In [None]:
cnn_base = CNNBase()
cnn_base.to(device)

In [None]:
loss_func = nn.CrossEntropyLoss()
opt = optim.Adam(cnn_base.parameters())

In [None]:
for i, data in enumerate(train_loader, 0):
    inputs,labels = data
    print(inputs.shape)
    print(labels.shape)
    break

In [None]:
epochs = 100
train_losses = []
val_losses = []
best_model_path = 'best_model.pt'
best_val_loss = float('inf')

for epoch in range(epochs):
    batch_loss = []
    cnn_base.train()
    
    for i, data in enumerate(train_loader, 0):
        inputs,labels = data
        # Move the inputs to the specified device.
        inputs, labels = inputs.to(device), labels.to(device)
        
        # Zero the parameter gradients.
        opt.zero_grad()
        
        # forward step
        outputs = cnn_base(inputs)
        loss = loss_func(outputs,labels)
        
        # backward pass
        loss.backward()
        
        # update parameters
        opt.step()
        
        batch_loss.append(loss.data.item())
        
    train_loss = np.mean(batch_loss)
    train_losses.append(train_loss)
    
    cnn_base.eval()
    batch_loss = []
    with torch.no_grad():
        for i, data in enumerate(val_loader, 0):
            inputs,labels = data
            # Move the inputs to the specified device.
            inputs, labels = inputs.to(device), labels.to(device)

            # forward step
            outputs = cnn_base(inputs)
            loss = loss_func(outputs,labels)

            batch_loss.append(loss.data.item())
        val_loss = np.mean(batch_loss)
        val_losses.append(val_loss)
        
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(cnn_base.state_dict(), best_model_path)
    #if epoch%10==0:
    print(f"[{epoch+1}/{epochs}] Training loss: {train_loss:.4f}\t Validation loss: {val_loss:.4f}.")

In [None]:
torch.save(cnn_base.state_dict(), 'latest_model.pt')

In [None]:
# Python program to store list to file using pickle module
import pickle

# write list to binary file
def write_list(a_list, file_name):
    # store list in binary file so 'wb' mode
    with open(file_name, 'wb') as fp:
        pickle.dump(a_list, fp)
        print('Done writing list into a binary file')

# Read list to memory
def read_list(file_name):
    # for reading also binary mode is important
    with open(file_name, 'rb') as fp:
        n_list = pickle.load(fp)
        return n_list

In [None]:
write_list(train_losses,'train_losses')

In [None]:
write_list(val_losses,'val_losses')

In [None]:
# plot the loss diagram
plt.plot(train_losses,label='Training Loss')
plt.plot(val_losses, label='Validation Loss')
plt.legend(loc='best')
plt.ylabel('Mean Loss')
plt.xlabel('Epochs')
plt.title(f"Loss graph during the process of training the Baseline CNN.")
plt.show()

Model has overfit. Likely because our data size is much smaller than what the paper used

In [None]:
# create test df
test_df = df[df['in_train']==False]
print(test_df.shape)
test_df.head()

In [None]:
# only using those artists that model was trained on
artists = list(y_rus.unique())
test_df = test_df[test_df['artist'].isin(artists)]

In [None]:
print(test_df.shape)
test_df.head()

In [None]:
# taking a smaller stratified subset of test_df
test_df['artist'].value_counts()

__randomly sample 30 artworks of each artist__

In [None]:
test_sub = test_df.groupby('artist', group_keys=False).apply(lambda x: x.sample(30, random_state = 42))

In [None]:
print(test_sub.shape)
test_sub['artist'].value_counts()

In [None]:
# resize test images
# directory = 'test'

# for filename in os.listdir(directory):
#     try:
#         if filename.endswith('.jpg'):
#             img_path = os.path.join(directory, filename)
#             with Image.open(img_path) as img:
#                 img_resized = img.resize((256, 256), resample=Image.BILINEAR)
#                 img_resized.convert('RGB').save(os.path.join(directory, filename))
#     except:
#         print(f"Could not process image {filename}")

In [None]:
# corrupted files
test_sub[test_sub['new_filename'].isin(['20153.jpg','9989.jpg','100532.jpg','18649.jpg','24000.jpg'])]

In [None]:
filename = test_sub.loc[39942]['new_filename'] 
img = Image.open(os.path.join('test', filename))
plt.imshow(img)
print(img.size)  # Should print (256, 256)

In [None]:
test_art = test_sub['artist'].unique()

In [None]:
le_test = preprocessing.LabelEncoder()
le_test.fit(test_art)
le_test.classes_

In [None]:
# create dataloaders, batch size etc
class TestDataset(Dataset):
    def __init__(self,img_dir, dataframe, transformation): 
        self.transformation = transform
       
        self.encoder = le_test
        self.img_dir = img_dir
        self.feats, self.labels = self.get_all(self.img_dir,dataframe)
        
    def get_all(self,img_dir,dataframe):
        images = []
        labels = []
        
        for index, row in dataframe.iterrows():
            file = row['new_filename']
            img = Image.open(os.path.join(img_dir, file)) 
            data = self.transformation(img)
            images.append(data)
            
            artist = row['artist']
            label = self.encoder.transform([artist])[0]
            labels.append(label)
        
        #labels = self.encoder.transform(y)
        
        images = torch.stack(images)
        labels = torch.LongTensor(labels)
        return images, labels
            
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self,item):
        return self.feats[item], self.labels[item]

In [None]:
testset = TestDataset('test',test_sub,transform)

In [None]:
#torch.save(testset,'testset.pt')

In [None]:
test_loader = DataLoader(testset, batch_size=batch_size ,drop_last=True, shuffle=True)

In [None]:
for i, data in enumerate(test_loader, 0):
    inp,out = data
    print(out) #output is 32 because batch size is 32. each value is a label?

In [None]:
# evaluate on test set
def evaluate(model, test_dataloader, device=device):
    model.eval()
    predictions = []
    ground = []
#     total_correct = 0
#     total_samples = 0
    
    with torch.no_grad():
        for i, data in enumerate(test_dataloader, 0):
            inputs,labels = data
            # Move the inputs to the specified device.
            inputs, labels = inputs.to(device), labels.to(device)
            output = model(inputs)
            
            output_idx = torch.argmax(output,dim=1)
            
            predictions.append(output_idx.numpy())
            ground.append(labels.numpy())
            
#             _, predicted = torch.max(output.data, 1)
#             total_correct += (predicted == labels).sum().item()
#             total_samples += labels.size(0)
            
#     accuracy = 100 * (total_correct / total_samples)
#     print(f'Test Accuracy: {accuracy:.2f}%')
    
    return predictions, ground

In [None]:
model = CNNBase()
model.load_state_dict(torch.load('best_model.pt'))

In [None]:
pred, ground = evaluate(model,test_loader)
y_true = np.concatenate(ground, axis=0)
y_pred = np.concatenate(pred, axis=0)

In [None]:
len(y_true), len(y_pred)

In [None]:
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib
print(classification_report(y_true, y_pred))

In [None]:
matplotlib.rc_file_defaults() # to remove the sns darkgrid style
cm = confusion_matrix(y_true, y_pred)
classes = list(le.classes_)
sns.heatmap(cm, annot=True, cmap='Blues', fmt='g', xticklabels=classes, yticklabels=classes)
plt.xlabel('Predicted label')
plt.ylabel('True label')
plt.title('Confusion Matrix')
plt.show()

#### Testing Accuracy: 9%

## ResNet

In [None]:
from torchvision import models

In [None]:
rn18 = models.resnet18(pretrained=True)

In [None]:
num_classes = len(le.classes_)
num_classes

In [None]:
# modify last layer
rn18.fc = nn.Sequential(
    nn.Linear(rn18.fc.in_features, num_classes),
    nn.Softmax(dim=1)) # can remove softmax?


In [None]:
# Freeze the model
for param in rn18.parameters():
    param.requires_grad = False
for param in rn18.fc.parameters():
    param.requires_grad = True

In [None]:
# Train the model- only the last layer
optimizer = optim.Adam(rn18.fc.parameters(), lr=0.0001)
criterion = nn.CrossEntropyLoss()

In [None]:
train_loss_rn18 = []
val_loss_rn18 = []
best_val_loss = float('inf')
patience = 3
early_stop_counter = 0
rn18_path = 'best_rn18.pt'

for epoch in range(10):
    rn18.train()
    batch_loss = []
    for i, data in enumerate(train_loader, 0):
        inputs,labels = data
        # Move the inputs to the specified device.
        inputs, labels = inputs.to(device), labels.to(device)
        
        # Zero the parameter gradients.
        optimizer.zero_grad()
        
        # forward step
        outputs = rn18(inputs)
        loss = criterion(outputs,labels)
        
        # backward pass
        loss.backward()
        
        # update parameters
        optimizer.step()
        
        batch_loss.append(loss.data.item())
        
    train_loss = np.mean(batch_loss)
    train_loss_rn18.append(train_loss)
    
    rn18.eval()
    batch_loss = []
    
    with torch.no_grad():
        for i, data in enumerate(val_loader, 0):
            inputs,labels = data
            # Move the inputs to the specified device.
            inputs, labels = inputs.to(device), labels.to(device)

            # forward step
            outputs = rn18(inputs)
            loss = criterion(outputs,labels)

            batch_loss.append(loss.data.item())
            
        val_loss = np.mean(batch_loss)
        val_loss_rn18.append(val_loss)
        
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        early_stop_counter = 0
        torch.save(rn18.state_dict(), rn18_path)
    else:
        early_stop_counter+=1 
        if early_stop_counter >= patience:
            print("Early stopping: validation loss has not improved in {} epochs".format(patience))
            break
    #if epoch%10==0:
    print(f"[{epoch+1}/{epochs}] Training loss: {train_loss:.4f}\t Validation loss: {val_loss:.4f}.")
        

In [None]:
torch.save(rn18.state_dict(), 'latest_rn18.pt')

In [None]:
# plot the loss diagram
plt.plot(train_loss_rn18,label='Training Loss')
plt.plot(val_loss_rn18, label='Validation Loss')
plt.legend(loc='best')
plt.ylabel('Mean Loss')
plt.xlabel('Epochs')
plt.title(f"Loss graph - ResNet18.")
plt.show()

In [None]:
write_list(train_loss_rn18,'train_losses_rn18')
write_list(val_loss_rn18,'val_loss_rn18')

In [None]:
model_rn18 = rn18
model_rn18.load_state_dict(torch.load('best_rn18.pt'))

In [None]:
pred, ground = evaluate(model_rn18,test_loader)
y_true_rn18 = np.concatenate(ground, axis=0)
y_pred_rn18 = np.concatenate(pred, axis=0)

In [None]:
y_true_rn18

In [None]:
y_pred_rn18

In [None]:
print(classification_report(y_true_rn18, y_pred_rn18))

In [None]:
matplotlib.rc_file_defaults() # to remove the sns darkgrid style
cm = confusion_matrix(y_true_rn18, y_pred_rn18)
classes = list(le.classes_)
sns.heatmap(cm, annot=True, cmap='Blues', fmt='g', xticklabels=classes, yticklabels=classes)
plt.xlabel('Predicted label')
plt.ylabel('True label')
plt.title('Confusion Matrix')
plt.show()

#### Testing accuracy = 26%

## Updated ResNet
__Increasing number of Epochs to 20, increasing learning rate to 0.001__

In [None]:
rn18v1 = models.resnet18(pretrained=True)
num_classes = len(le.classes_)
# modify last layer
rn18v1.fc = nn.Sequential(
    nn.Linear(rn18v1.fc.in_features, num_classes),
    nn.Softmax(dim=1)) # can remove softmax?
# Freeze the model
for param in rn18v1.parameters():
    param.requires_grad = False
for param in rn18v1.fc.parameters():
    param.requires_grad = True

In [None]:
# Train the model- only the last layer
optimizer = optim.Adam(rn18v1.fc.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

In [None]:
train_loss_rn18v1 = []
val_loss_rn18v1 = []
best_val_loss = float('inf')
patience = 3
early_stop_counter = 0
rn18v1_path = 'best_rn18v1.pt'

for epoch in range(20):
    rn18v1.train()
    batch_loss = []
    for i, data in enumerate(train_loader, 0):
        inputs,labels = data
        # Move the inputs to the specified device.
        inputs, labels = inputs.to(device), labels.to(device)
        
        # Zero the parameter gradients.
        optimizer.zero_grad()
        
        # forward step
        outputs = rn18v1(inputs)
        loss = criterion(outputs,labels)
        
        # backward pass
        loss.backward()
        
        # update parameters
        optimizer.step()
        
        batch_loss.append(loss.data.item())
        
    train_loss = np.mean(batch_loss)
    train_loss_rn18v1.append(train_loss)
    
    rn18v1.eval()
    batch_loss = []
    
    with torch.no_grad():
        for i, data in enumerate(val_loader, 0):
            inputs,labels = data
            # Move the inputs to the specified device.
            inputs, labels = inputs.to(device), labels.to(device)

            # forward step
            outputs = rn18v1(inputs)
            loss = criterion(outputs,labels)

            batch_loss.append(loss.data.item())
            
        val_loss = np.mean(batch_loss)
        val_loss_rn18v1.append(val_loss)
        
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        early_stop_counter = 0
        torch.save(rn18v1.state_dict(), rn18v1_path)
    else:
        early_stop_counter+=1 
        if early_stop_counter >= patience:
            print("Early stopping: validation loss has not improved in {} epochs".format(patience))
            break
    #if epoch%10==0:
    print(f"[{epoch+1}/{20}] Training loss: {train_loss:.4f}\t Validation loss: {val_loss:.4f}.")
        

In [None]:
torch.save(rn18v1.state_dict(), 'latest_rn18v1.pt')

In [None]:
# plot the loss diagram
plt.plot(train_loss_rn18v1,label='Training Loss')
plt.plot(val_loss_rn18v1, label='Validation Loss')
plt.legend(loc='best')
plt.ylabel('Mean Loss')
plt.xlabel('Epochs')
plt.title(f"Loss graph - ResNet18v1.")
plt.show()

In [None]:
write_list(train_loss_rn18v1,'train_losses_rn18v1')
write_list(val_loss_rn18v1,'val_loss_rn18v1')

In [None]:
model_rn18v1 = rn18v1
model_rn18v1.load_state_dict(torch.load('best_rn18v1.pt'))

In [None]:
pred, ground = evaluate(model_rn18v1,test_loader)
y_true_rn18v1 = np.concatenate(ground, axis=0)
y_pred_rn18v1 = np.concatenate(pred, axis=0)

In [None]:
print(classification_report(y_true_rn18v1, y_pred_rn18v1))

In [None]:
matplotlib.rc_file_defaults() # to remove the sns darkgrid style
cm = confusion_matrix(y_true_rn18v1, y_pred_rn18v1)
classes = list(le.classes_)
sns.heatmap(cm, annot=True, cmap='Blues', fmt='g', xticklabels=classes, yticklabels=classes)
plt.xlabel('Predicted label')
plt.ylabel('True label')
plt.title('Confusion Matrix')
plt.show()

### Fine Tuning more layers of ResNet18

In [None]:
rn18v2 = models.resnet18(pretrained=True)
    
num_classes = len(le.classes_)
# modify last layer
rn18v2.fc = nn.Sequential(
    nn.Linear(rn18v2.fc.in_features, num_classes),
    nn.Softmax(dim=1)) # can remove softmax?


In [None]:
for name, child in rn18v2.named_children():
    if name in ['fc', 'layer4']:
        print(name + ' is unfrozen')
        for param in child.parameters():
            param.requires_grad = True
    else:
        print(name + ' is frozen')
        for param in child.parameters():
            param.requires_grad = False

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(filter(lambda p: p.requires_grad, rn18v2.parameters()), lr=0.001)

In [None]:
train_loss_rn18v2 = []
val_loss_rn18v2 = []
best_val_loss = float('inf')
patience = 3
early_stop_counter = 0
rn18v2_path = 'best_rn18v2.pt'
epochs=20

for epoch in range(epochs):
    rn18v2.train()
    batch_loss = []
    for i, data in enumerate(train_loader, 0):
        inputs,labels = data
        # Move the inputs to the specified device.
        inputs, labels = inputs.to(device), labels.to(device)
        
        # Zero the parameter gradients.
        optimizer.zero_grad()
        
        # forward step
        outputs = rn18v2(inputs)
        loss = criterion(outputs,labels)
        
        # backward pass
        loss.backward()
        
        # update parameters
        optimizer.step()
        
        batch_loss.append(loss.data.item())
        
    train_loss = np.mean(batch_loss)
    train_loss_rn18v2.append(train_loss)
    
    rn18v2.eval()
    batch_loss = []
    
    with torch.no_grad():
        for i, data in enumerate(val_loader, 0):
            inputs,labels = data
            # Move the inputs to the specified device.
            inputs, labels = inputs.to(device), labels.to(device)

            # forward step
            outputs = rn18v2(inputs)
            loss = criterion(outputs,labels)

            batch_loss.append(loss.data.item())
            
        val_loss = np.mean(batch_loss)
        val_loss_rn18v2.append(val_loss)
        
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        early_stop_counter = 0
        torch.save(rn18v2.state_dict(), rn18v2_path)
    else:
        early_stop_counter+=1 
        if early_stop_counter >= patience:
            print("Early stopping: validation loss has not improved in {} epochs".format(patience))
            break
    #if epoch%10==0:
    print(f"[{epoch+1}/{epochs}] Training loss: {train_loss:.4f}\t Validation loss: {val_loss:.4f}.")
        

In [None]:
torch.save(rn18v2.state_dict(), 'latest_rn18v2.pt')

In [None]:
# plot the loss diagram
plt.plot(train_loss_rn18v2,label='Training Loss')
plt.plot(val_loss_rn18v2, label='Validation Loss')
plt.legend(loc='best')
plt.ylabel('Mean Loss')
plt.xlabel('Epochs')
plt.title(f"Loss graph - ResNet18v2.")
plt.show()

In [None]:
write_list(train_loss_rn18v2,'train_losses_rn18v2')
write_list(val_loss_rn18v2,'val_loss_rn18v2')

In [None]:
model_rn18v2 = rn18v2
model_rn18v2.load_state_dict(torch.load('best_rn18v2.pt'))

In [None]:
pred, ground = evaluate(model_rn18v2,test_loader)
y_true_rn18v2 = np.concatenate(ground, axis=0)
y_pred_rn18v2 = np.concatenate(pred, axis=0)

In [None]:
print(classification_report(y_true_rn18v2, y_pred_rn18v2))

In [None]:
matplotlib.rc_file_defaults() # to remove the sns darkgrid style
cm = confusion_matrix(y_true_rn18v2, y_pred_rn18v2)
classes = list(le.classes_)
sns.heatmap(cm, annot=True, cmap='Blues', fmt='g', xticklabels=classes, yticklabels=classes)
plt.xlabel('Predicted label')
plt.ylabel('True label')
plt.title('Confusion Matrix')
plt.show()

#### Testing Accuracy 48%

__CNN: 9%__

__ResNet18 with 0.0001 learning rate and 10 epochs: 26%__

__ResNet18 with 0.001 learning rate and 20 epochs: 40%__

__ResNet18 with 0.001 learning rate and 20 epochs and fine tuning last two layers: 48%__

In [None]:
#!pip install --upgrade torchvision
from torchvision import models

In [None]:
rn50 = models.resnet50(pretrained=True)

In [None]:

num_classes = len(le.classes_)
num_classes

In [None]:
# modify last layer
rn50.fc = nn.Sequential(
    nn.Linear(rn50.fc.in_features, num_classes),
    nn.Softmax(dim=1))

In [None]:
# Freeze the model
for param in rn50.parameters():
    param.requires_grad = False

for param in rn50.fc.parameters():
    param.requires_grad = True

In [None]:
# Train the model- only the last layer
optimizer = optim.Adam(rn50.fc.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

In [None]:

train_loss_rn50 = []
val_loss_rn50 = []
best_val_loss = float('inf')
patience = 3
early_stop_counter = 0
rn50_path = 'best_rn50.pt'

for epoch in range(20):
    rn50.train()
    batch_loss = []
    for i, data in enumerate(train_loader, 0):
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)
        
        optimizer.zero_grad()
        
        outputs = rn50(inputs)
        loss = criterion(outputs, labels)
        
        loss.backward()
        optimizer.step()
        
        batch_loss.append(loss.data.item())
        
    train_loss = np.mean(batch_loss)
    train_loss_rn50.append(train_loss)
    
    rn50.eval()
    batch_loss = []
    
    with torch.no_grad():
        for i, data in enumerate(val_loader, 0):
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = rn50(inputs)
            loss = criterion(outputs, labels)

            batch_loss.append(loss.data.item())

        val_loss = np.mean(batch_loss)
        val_loss_rn50.append(val_loss)

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        early_stop_counter = 0
        torch.save(rn50.state_dict(), rn50_path)
    else:
        early_stop_counter += 1
        if early_stop_counter >= patience:
            print("Early stopping: validation loss has not improved in {} epochs".format(patience))
            break

    print(f"[{epoch+1}/{10}] Training loss: {train_loss:.4f}\t Validation loss: {val_loss:.4f}.")

In [None]:
torch.save(rn50.state_dict(), 'latest_rn50.pt')

In [None]:
# plot the loss diagram
plt.plot(train_loss_rn50,label='Training Loss')
plt.plot(val_loss_rn50, label='Validation Loss')
plt.legend(loc='best')
plt.ylabel('Mean Loss')
plt.xlabel('Epochs')
plt.title(f"Loss graph - ResNet50.")
plt.show()

In [None]:
write_list(train_loss_rn50,'train_losses_rn50')
write_list(val_loss_rn50,'val_loss_rn50')

In [None]:
pred, ground = evaluate(model_rn50,test_loader)
y_true_rn50 = np.concatenate(ground, axis=0)
y_pred_rn50 = np.concatenate(pred, axis=0)

In [None]:
y_true_rn50


In [None]:
y_pred_rn50

In [None]:
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib
print(classification_report(y_true_rn50, y_pred_rn50, zero_division=1))

In [None]:
from sklearn.metrics import accuracy_score

accuracy = accuracy_score(y_true_rn50, y_pred_rn50)
accuracy

# Testing accuracy = 39%

resnet50 with 2p epochs, 00.001 lr and last 2 layers fine tuned 

In [None]:
from torchvision import models
rn50v1 = models.resnet50(pretrained=True)

In [None]:
num_classes = len(le.classes_)
num_classes 

In [None]:
rn50v1.fc = nn.Sequential(
    nn.Linear(rn50v1.fc.in_features, num_classes),
    nn.Softmax(dim=1))

In [None]:
for name, child in rn50v1.named_children():
    if name in ['layer3', 'layer4', 'fc']:
        print(name + ' is unfrozen')
        for param in child.parameters():
            param.requires_grad = True
    else:
        print(name + ' is frozen')
        for param in child.parameters():
            param.requires_grad = False

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(filter(lambda p: p.requires_grad, rn50v1.parameters()), lr=0.001)


In [None]:
train_loss_rn50v1 = []
val_loss_rn50v1 = []
best_val_loss = float('inf')
patience = 3
early_stop_counter = 0
rn50v1_path = 'best_rn50v1.pt'

for epoch in range(20):
    rn50v1.train()
    batch_loss = []
    for i, data in enumerate(train_loader, 0):
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)
        
        optimizer.zero_grad()
        
        outputs = rn50v1(inputs)
        loss = criterion(outputs, labels)
        
        loss.backward()
        optimizer.step()
        
        batch_loss.append(loss.data.item())
        
    train_loss = np.mean(batch_loss)
    train_loss_rn50v1.append(train_loss)
    
    rn50v1.eval()
    batch_loss = []
    
    with torch.no_grad():
        for i, data in enumerate(val_loader, 0):
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = rn50v1(inputs)
            loss = criterion(outputs, labels)

            batch_loss.append(loss.data.item())

        val_loss = np.mean(batch_loss)
        val_loss_rn50v1.append(val_loss)

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        early_stop_counter = 0
        torch.save(rn50v1.state_dict(), rn50v1_path)
    else:
        early_stop_counter += 1
        if early_stop_counter >= patience:
            print("Early stopping: validation loss has not improved in {} epochs".format(patience))
            break

    print(f"[{epoch+1}/{20}] Training loss: {train_loss:.4f}\t Validation loss: {val_loss:.4f}.")
    

In [None]:
torch.save(rn50v1.state_dict(), 'latest_rn50v1.pt')

In [None]:
# plot the loss diagram
plt.plot(train_loss_rn50v1,label='Training Loss')
plt.plot(val_loss_rn50v1, label='Validation Loss')
plt.legend(loc='best')
plt.ylabel('Mean Loss')
plt.xlabel('Epochs')
plt.title(f"Loss graph - ResNet50v1.")
plt.show()

In [None]:
write_list(train_loss_rn50v1,'train_losses_rn50v1')
write_list(val_loss_rn50v1,'val_loss_rn50v1')

In [None]:
model_rn50v1 = rn50v1
model_rn50v1.load_state_dict(torch.load('best_rn50v1.pt'))



In [None]:
pred, ground = evaluate(model_rn50v1,test_loader)
y_true_rn50v1 = np.concatenate(ground, axis=0)
y_pred_rn50v1 = np.concatenate(pred, axis=0)

In [None]:
y_true_rn50v1

In [None]:
y_pred_rn50v1

In [None]:
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib
print(classification_report(y_true_rn50v1, y_pred_rn50v1, zero_division=1))


In [None]:
matplotlib.rc_file_defaults() # to remove the sns darkgrid style
cm = confusion_matrix(y_true_rn50v1, y_pred_rn50v1)
classes = list(le.classes_)
sns.heatmap(cm, annot=True, cmap='Blues', fmt='g', xticklabels=classes, yticklabels=classes)
plt.xlabel('Predicted label')
plt.ylabel('True label')
plt.title('Confusion Matrix')
plt.show()

In [None]:
from sklearn.metrics import accuracy_score

accuracy = accuracy_score(y_true_rn50v1, y_pred_rn50v1)
accuracy

# testing accuracy = 32%