# **PyTorch Image Classifier**
Contacts: [g.nodjoumi@jacobs-university.de](mailto:g.nodjoumi@jacobs-university.de)

A simple image classifier demo-notebook based on resnet50, to be further developed and used to filter very large dataset before labeling for object detection / image segmentation

## **Dataset**
Dataset must be organized according the following structure:
- rootdir
    - class1
        - img1
        - img2
        - imgX 
        - ...
        
### **Example**     
**E.g. data provided and used for this notebook**
- ./Example_dataset
    - Background
        - ESP_011677_1655_RED_uint8_H0_V0__crop_H0_V1__cropped_cropped.png
        - ...
    - Craters
        - ESP_011386_2065_RED_uint8_H0_V0__crop_H0_V2__cropped_cropped.png
        - ...
    - Skylights
        - ESP_061680_1985_RED_print_H2_V0__crop_cropped.png
        - ...
        
## How it works

- All images are loaded and transformed
- All images indexes are splitted into three sub-dataset indexes
    - train (used for training)
    - valid (used for validate the training)
    - test (used to simulate real-world data)
- Data distributions for sub-datasets are shown in pie charts
- Train and valid indexes are used to load images and ingest into training routine
- Test index is used to load unseen data
- Test data are reandomly picked and predicted

## **Import modules**

In [None]:
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
import matplotlib.pyplot as plt
import numpy as np
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F
from torchvision import datasets, transforms, models
from torch.autograd import Variable
from tqdm.auto import tqdm
import time
import pandas as pd
from sklearn.model_selection import train_test_split

## Data rootdir definition

In [None]:
datadir = '/media/gnodj/W-DATS/HiRiSE_Data/TEST_IMG_CLASSIFICATION_pytorch/'

## Transforms definitions for train and test

In [None]:
transform ={ 'train': transforms.Compose([transforms.Resize(900),
                                          transforms.CenterCrop(900),
                                          transforms.RandomHorizontalFlip(),
                                          transforms.RandomVerticalFlip(),
                                          transforms.RandomRotation([-90,90]),
                                          transforms.ToTensor(),
                                         # transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                          #                     std=[0.229, 0.224, 0.225])
                                         ]),
            'test':transforms.Compose([transforms.Resize(900),
                                       transforms.CenterCrop(900),
                                          transforms.ToTensor(),
                                          #transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                           #                    std=[0.229, 0.224, 0.225])
                                      ]),
            }

Load of full dataset and retrevial of image indeses and classes from dataset

In [None]:
image_dataset = datasets.ImageFolder(datadir, transform=transform['train'])

num_train = len(image_dataset)
indices = list(range(num_train))
print('Dataset contains:', len(image_dataset), ' elements distributed in: ',image_dataset.classes)

## Creation of indexes

In [None]:
start = time.time()
ts, clss = map(list,zip(*image_dataset))
end = time.time()
print(end - start)

In [None]:
columns = ['class','type']
DF = pd.DataFrame(columns=columns)
DF['class']=clss
classes = image_dataset.classes

In [None]:
train_idx, test_idx = train_test_split(indices,
                                          test_size=0.05,
                                          shuffle=True,
                                          stratify=DF['class'].values,
                                          #stratify=DF['type'].values,
                                          random_state=0)

Train_idx is further divided in proper train and valid indexes

In [None]:
train_idx, valid_idx = train_test_split(train_idx,
                                          test_size=0.3,
                                          shuffle=True,
                                          stratify=DF['class'].loc[train_idx],
                                          #stratify=DF.loc[train_data],
                                          random_state=0)

Train and Valid data loading

In [None]:
trainloader = torch.utils.data.DataLoader(image_dataset,
               sampler=train_idx, batch_size=8,pin_memory=True)
validloader = torch.utils.data.DataLoader(image_dataset,
               sampler=valid_idx, batch_size=8,pin_memory=True)

In [None]:
DF.loc[train_idx,'type']='TRAIN'
DF.loc[test_idx,'type']='TEST'
DF.loc[valid_idx,'type']='VALID'

Plot of data distribution

In [None]:
label = classes
plt.figure(figsize = (20,10), facecolor='white',dpi=300)

plt.suptitle('Data distributions', fontsize=15)
ax1 = plt.subplot2grid((1,3),(0,0))
plt.pie(DF.loc[DF['type'] == 'TRAIN'].groupby('class').size(),
        labels=label,autopct=lambda p:f'{p:.2f}%, \n{p*len(train_idx)/100:.0f} Images',
        shadow=False, startangle=90)
plt.title('Train Data\n{} Images'.format(len(train_idx), loc='center'))
ax1 = plt.subplot2grid((1,3),(0,1))
plt.pie(DF.loc[DF['type'] == 'VALID'].groupby('class').size(), 
        labels=label,autopct=lambda p:f'{p:.2f}%, \n{p*len(valid_idx)/100:.0f} Images', 
        shadow=False, startangle=90)
plt.title('Valid Data\n{} Images'.format(len(valid_idx), loc='center'))
ax1 = plt.subplot2grid((1,3),(0,2))
plt.pie(DF.loc[DF['type'] == 'TEST'].groupby('class').size(), 
        labels=label,autopct=lambda p:f'{p:.2f}%, \n{p*len(test_idx)/100:.0f} Images', 
        shadow=False, startangle=90)
plt.title('Test Data\n{} Images'.format(len(test_idx), loc='center'))

plt.tight_layout()
plt.show()

Check Device to use

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() 
                                  else "cpu")

Load the module

In [None]:
preT = True
model = models.resnet50(pretrained=True)
#print(model)

Freeze the pretrained layers if selected

In [None]:
if preT == True:
    for param in model.parameters():
        param.requires_grad = False

Define the last fully connected layer

In [None]:
model.fc = nn.Sequential(nn.Linear(2048, 512),
                                 nn.ReLU(),
                                 nn.Dropout(0.2),
                                 nn.Linear(512, 10),
                                 nn.LogSoftmax(dim=1))

Define loss criteria and otpimization function

In [None]:
criterion = nn.NLLLoss()
optimizer = optim.Adam(model.fc.parameters(), lr=0.003)

Send the model to the device

In [None]:
model.to(device)

## **Training**

In [None]:
epochs = 200
steps = 0
running_loss = 0
print_every = 10
train_losses =  []
valid_losses = []

for epoch in range(epochs):

    for inputs, labels in trainloader:
        steps += 1
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        logps = model.forward(inputs)
        loss = criterion(logps, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        
        if steps % print_every == 0:
            valid_loss = 0
            accuracy = 0
            model.eval()
            with torch.no_grad():
                for inputs, labels in validloader:
                    inputs, labels = inputs.to(device),labels.to(device)
                    logps = model.forward(inputs)
                    batch_loss = criterion(logps, labels)
                    valid_loss += batch_loss.item()
                    
                   # ps = torch.exp(logps)
                    #top_p, top_class = ps.topk(1, dim=1)
                    #equals = top_class == labels.view(*top_class.shape)
                    #accuracy += torch.mean(equals.type(torch.FloatTensor)).item()
            train_losses.append(running_loss/len(trainloader))
            valid_losses.append(valid_loss/len(validloader))                    
            print(f"Epoch {epoch+1}/{epochs}.. "
                  f"Train loss: {running_loss/print_every:.3f}.. "
                  f"Valid loss: {valid_loss/len(validloader):.3f}.. ")
                  #f"Valid accuracy: {accuracy/len(validloader):.3f}")
            running_loss = 0
            model.train()
torch.save(model, 'test_3.pth')

In [None]:
torch.save(model, 'test_3.pth')

## **Evaluation**
Show training results

In [None]:
plt.plot(train_losses, label='Training loss')
plt.plot(valid_losses, label='Validation loss')
plt.legend(frameon=False)
plt.show()

Evaluating model accuracy

In [None]:
def accuracy(dataloader):
        # tracking test loss
    test_loss = 0.0
    cls_correct = list(0. for i in range(len(classes)))
    cls_totals = list(0. for i in range(len(classes)))

    model.eval()  # it-disables-dropout
    with torch.no_grad():
        correct = 0
        total = 0
        for images, labels in dataloader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            test_loss += loss.item()*images.size(0)
            _, predicted = torch.max(outputs.data, 1)
            correct_ts = predicted.eq(labels.data.view_as(predicted))
            correct = np.squeeze(correct_ts.numpy()) if not torch.cuda.is_available() else np.squeeze(correct_ts.cpu().numpy())
            for i in range(len(labels)):
                label = labels.data[i]
                cls_correct[label] += correct[i].item()
                cls_totals[label] += 1
        test_loss = test_loss/len(dataloader.dataset)

        print(f'Test Loss: {round(test_loss, 6)}')

        for i in range(len(classes)):
            if cls_totals[i] > 0:
                print(f'Accuracy of {classes[i]}: {round(100*cls_correct[i]/cls_totals[i], 2)}%')
            else:
                print(f'Accuracy of {classes[i]}s: N/A (no training examples)')


    print(f'Full Accuracy: {round(100. * np.sum(cls_correct) / np.sum(cls_totals), 2)}% {np.sum(cls_correct)} out of {np.sum(cls_totals)}')

    # Save 
    torch.save(model.state_dict(), 'model.ckpt')

In [None]:
accuracy(validloader)

In [None]:
%cd '/media/gnodj/W-DATS/python_scripts/DeepLearning/PyTorch-ImageClassifier'

## **Testing**
Loading model for testing on test dataset

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model=torch.load('test_3.pth')
model.eval()

### **Definition of functions**
Prediction function

In [None]:
def predict_image(image):
    image_tensor = transform['test'](image).float()
    image_tensor = image_tensor.unsqueeze_(0)
    input = Variable(image_tensor)
    input = input.to(device)
    output = model(input)
    index = output.data.cpu().numpy().argmax()
    return index

Function to get random images from test dataset using test indexs

In [None]:
def get_random_images(num, indice,test_dataset):
    idx=np.random.choice(indice,num)
    loader = torch.utils.data.DataLoader(test_dataset, 
                   sampler=idx, batch_size=num,pin_memory=True)
    dataiter = iter(loader)
    images, labels = dataiter.next()
    return images, labels, idx, loader

Loading of all images using test transform

In [None]:
test_dataset = datasets.ImageFolder(datadir, transform=transform['test'])

In [None]:
test_dataset

Prediction and plotting

In [None]:
to_pil = transforms.ToPILImage()
images, labels, idx, testloader= get_random_images(5, test_idx,test_dataset)
fig=plt.figure(figsize=(20,20))
for ii in range(len(images)):
    image = to_pil(images[ii])
    index = predict_image(image)
    sub = fig.add_subplot(1, len(images), ii+1)
    res = int(labels[ii]) == index
#    sub.set_title(str(classes[index]) + ":" + str(res))
    sub.set_title(str(classes[index]) + ":" + str(res)+'\n'+str(idx[ii])+'\n'+str(index))

    plt.axis('off')
    #r_img = to_pil(test_data[ii][0])
    plt.imshow(image)
plt.show()

Check accuracy of predictions

In [None]:
accuracy(testloader)

Final check to verify that test images have not been used for training

In [None]:
for i in idx:
    if i in train_idx:
        print(i, ' is used for training')
    else:
        print(i, ' is not used')

In [None]:
res_dir = '/media/gnodj/W-DATS/HiRiSE_Data/ImageClassifier_test/png'+'/results'
data_dir = '/media/gnodj/W-DATS/HiRiSE_Data/ImageClassifier_test/png'

In [None]:
import os
try:
    os.mkdir(res_dir)
    os.mkdir(res_dir+'/Unknown')
    for j in image_dataset.classes:
        os.mkdir(res_dir+'/'+j)
        print(str(j))
except Exception as e:
    print(e)

In [None]:
from GenUtils import get_paths
import os
import glob

file_paths = get_paths(data_dir+'/','png')
len(file_paths)

In [None]:
def rnd_imgs(num, indice, test_dataset):#, transform=transform['test']):
    indices = indice
    idx=np.random.choice(indices,num)
    #idx = i[:num]
    print(idx)
    from torch.utils.data.sampler import SubsetRandomSampler
    sampler = SubsetRandomSampler(idx)
    loader = torch.utils.data.DataLoader(test_dataset, 
                   sampler=idx, batch_size=num,pin_memory=True)
    
    dataiter = iter(loader)
    images, labels = dataiter.next()
    return images, labels, idx, loader

In [None]:
#css = list(range(0,len(image_dataset.classes)))
from PIL import Image
num = 5
to_pil = transforms.ToPILImage()
#imgs, lbls, indice, llder= rnd_imgs(10, idcs, new_dataset)
fig=plt.figure(figsize=(30,20))
start = time.time()
#for ii in range(num):
rnd_idx= np.random.choice(file_paths, num)
for ii in range(len(rnd_idx)):
#    image = to_pil(imgs[ii])
    image= Image.open(rnd_idx[ii]).convert("RGB")
    index = predict_image(image)
    sub = fig.add_subplot(1, num, ii+1)
    sub.set_title(str(classes[index]) +'\nImage:\n:'+str(rnd_idx[ii])+'\nIndex:'+str(index))
    plt.axis('off')
    #r_img = to_pil(test_data[ii][0])
    plt.imshow(image)
plt.show()

end = time.time()
print(end - start)