<a href="https://colab.research.google.com/github/hotbread213/createClass/blob/master/2_ConvNet_for_car_damage_classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# IVADO Workshop Day 3 - Deep Learning

In this tutorial, our goal will be to classify different car damages. We have scraped more than 1600 images of damaged cars from google. Those were labeled according to the severity of their damage. We end up with two classes in our dataset, one for major damages (labeled as 1) and another for minor damages (labeled as 0).

To accomplish our goal to discrimate between the two, we will be using a convolutional network that we are going to implement in PyTorch.

In [0]:
!pip3 install torch
!pip3 install googledrivedownloader
!pip3 install tqdm
!pip3 install seaborn

In [0]:
# Data downlooad
from google_drive_downloader import GoogleDriveDownloader

# Deep learning and manipulation
import numpy as np
import torch.nn as nn
import torch
from torch.optim import Adam

#Evaluation
from sklearn.utils import shuffle
from sklearn.metrics import roc_auc_score, log_loss
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix

#Visualization
from tqdm import tqdm_notebook as tqdm
import matplotlib.pyplot as plt
import seaborn as sns

In this example, because of the high computational cost of our task, we will be using a GPU. To let PyTorch know that we use a GPU instead of a regular CPU, we will need to specify our device and mount all torch tensors on it.

In [0]:
DEVICE = 'cuda:0' if torch.cuda.device_count()>0 else 'cpu:0'

In [0]:
DEVICE

Let's download and load our data in this notebook

In [0]:
CARS_ID = '1uI64vTdE-1geqmUq2azU_cwqtdnQMkrI'
GoogleDriveDownloader.download_file_from_google_drive(file_id=CARS_ID,
                                                      dest_path='./car_damages_1600',
                                                      unzip=True)

We will keep only the first 850 examples of each class to have balanced classes for our experiment. This will make our predictions more interpretable in terms of accuracy (if we choose a probability threshold of 0.5).

In [0]:
N=850
major = np.load('./major_damages.npy')[:N]
minor = np.load('./minor_damages.npy')[:N]

We will create our own labels, merge the two classes and shuffle everything.

In [0]:
images = np.vstack([major, minor])
labels = np.vstack([np.ones(N)[:, None], np.zeros(N)[:,None]])

In [0]:
images.shape, labels.shape

In [0]:
images, labels = shuffle(images, labels)

The following function can help us visualize our images. The images were previously resized to 256x256 pixels. It is easier to train a convnet when the images have a square size.

In [0]:
def plot_4_random(images, title=''):
    
    idx = np.random.randint(len(images),size=(2,2))
    
    fig, axes = plt.subplots(2,2)

    for i in range(2):
        for j in range(2):
            axes[i][j].imshow(images[idx[i,j]])
            axes[i][j].get_xaxis().set_visible(False)
            axes[i][j].get_yaxis().set_visible(False)
            axes[i][j].set_frame_on(False)
    
    fig.set_size_inches((14,14))
    fig.suptitle(title, y=0.95)
    
    return idx

Let's check what our data looks like! If you find any weird image, two images that look very similar or one that seems to belong to the wrong class, please let us know. It is a new dataset and google image is not always right so we will clean it for future experiments!

In [0]:
plot_4_random(images[labels.flatten()==1], title='Major car damages')

In [0]:
plot_4_random(images[labels.flatten()==0], title='Minor car damages')

Let's split our data set in train, validation and test sets.

In [0]:
X_train, X_valid, y_train, y_valid = train_test_split(images, labels, test_size=0.4)
X_valid, X_test, y_valid, y_test = train_test_split(X_valid, y_valid, test_size=0.5)

... and we are ready do define our 2D convolutional network.

In [0]:
class CNN(nn.Module):
  
    def __init__(self, hidden_units):
        super(CNN, self).__init__()
        
        self.conv1 = nn.Conv2d(3,8,3)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(3)
        
        self.conv2 = nn.Conv2d(8,8,3)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool2d(3)
        
        self.conv3 = nn.Conv2d(8,16,3)
        self.relu3 = nn.ReLU()
        self.pool3 = nn.MaxPool2d(3)
        
        self.conv4 = nn.Conv2d(16,32,3)
        self.relu4 = nn.ReLU()
        self.pool4 = nn.MaxPool2d(3)
        
        self.fc1 = nn.Linear(32*2*2, hidden_units)
        self.relu5 = nn.ReLU()
        self.fc2 = nn.Linear(hidden_units, 1)

    def forward(self, x):
        
        x = self.pool1(self.relu1(self.conv1(x)))
        x = self.pool2(self.relu2(self.conv2(x)))
        x = self.pool3(self.relu3(self.conv3(x)))
        x = self.pool4(self.relu4(self.conv4(x)))
        
        #We have to reshape the output of the convolutional+pooling layers
        #before feeding it into the dense ones.
        x = self.fc1(x.view(-1,32*2*2))
        x = self.relu5(x)
        x = self.fc2(x)
        
        return torch.sigmoid(x)

In [0]:
CNN(64).forward(torch.ones((1,3,256,256))).size()

We here define our hyperparameters, feel free to tweak them and experiment with different combinations.

In [0]:
HIDDEN_UNITS = 64
BATCH_SIZE = 64
EPOCHS = 20
LEARNING_RATE = 0.001

In [0]:

# Instantiate our CNN with a choice of hidden units
net = CNN(HIDDEN_UNITS).to(DEVICE)

# We need an optimizer for our network params, Adam is a popular and good choice.
optimizer = Adam(net.parameters(), lr=LEARNING_RATE)

# And a loss function, BCE (for binary crossentropy) is used for binary classification, 
# where we have a sigmoid function on top of our network's output.
criterion = nn.BCELoss().to(DEVICE)
        
# Two dicts of lists to keep track of loss and auroc during training
historical_acc = {'train':[], 'valid':[]} 
historical_loss = {'train':[], 'valid':[]} 

# run the main training loop for number of epochs
for epoch in tqdm(range(EPOCHS), desc='Epochs'):
        
    for i in tqdm(range(len(X_train)//BATCH_SIZE + 1), desc='Batch'):
        
            # Begin and end of current batch in our array
            start = i*BATCH_SIZE
            end = min((i+1)*BATCH_SIZE,len(X_train))
            
            # We have to convert ou subset of examples to a torch tensor
            X = torch.tensor(X_train[start:end].astype(np.float32)).permute(0,3,1,2).to(DEVICE)
            y = torch.tensor(y_train[start:end].astype(np.float32)).to(DEVICE)
            
            # Re-init torch's gradient or hence they will accumulate
            optimizer.zero_grad()
            # Do our predidction on considered batch
            net_out = net(X)
            # Compute loss
            loss = criterion(net_out, y)
            
            # Do the gradient update
            loss.backward()
            optimizer.step()
    
    # Use the no_grad context to evaluate the network,
    # all the forward passes done will not be considered
    # during the next gradient update. This is important.
    with torch.no_grad():
        
        X = torch.tensor(X_valid.astype(np.float32)).permute(0,3,1,2).to(DEVICE)
        y = torch.tensor(y_valid.astype(np.float32)).to(DEVICE)
        net_out = net(X).detach().cpu().numpy()
        valid_acc = (y.detach().cpu().numpy()==(net_out>0.5)).mean()
        valid_loss = log_loss(y.detach().cpu().numpy(), net_out)
        
        X = torch.tensor(X_train.astype(np.float32)).permute(0,3,1,2).to(DEVICE)
        y = torch.tensor(y_train.astype(np.float32)).to(DEVICE)
        net_out = net(X).detach().cpu().numpy()
        train_acc = (y.detach().cpu().numpy()==(net_out>0.5)).mean()
        train_loss = log_loss(y.detach().cpu().numpy(), net_out)
        
        historical_acc['valid'].append(valid_acc)
        historical_acc['train'].append(train_acc)
        
        historical_loss['valid'].append(valid_loss)
        historical_loss['train'].append(train_loss)
        
    # Shuffle our training data to avoid repeating the same updates over and over.
    X_train, y_train = shuffle(X_train, y_train)
    
    print('Epoch {} --- Train acc: {:f} - Valid acc. {:f} --- Train loss: {:f} - Valid loss: {:f}'.format(epoch,
                                                                                                 train_acc,
                                                                                                 valid_acc,
                                                                                                 train_loss,
                                                                                                 valid_loss))
    

At the opposite of our previous example, where our two classes where pretty imbalanced, we can now evaluate our model accroding to the prediction accuracy. Let's visualize how the training went.

In [0]:
fig, axes = plt.subplots(1,2)

axes[0].plot(historical_loss['valid'], label='Valid')
axes[0].plot(historical_loss['train'], label='Train')
axes[0].set_xlabel('Epochs')
axes[0].set_ylabel('Log-loss')

axes[1].plot(historical_acc['valid'])
axes[1].plot(historical_acc['train'])
axes[1].set_xlabel('Epochs')
axes[1].set_ylabel('Accuracy')

fig.suptitle('Evolution of metrics during training')
fig.set_size_inches((16,8))
fig.legend()

We can inspect how our convnet managed to classify the examples of both class with a confusion matrix.

In [0]:
with torch.no_grad():
        
        X = torch.tensor(X_test.astype(np.float32)).permute(0,3,1,2).to(DEVICE)
        y = torch.tensor(y_test.astype(np.float32)).to(DEVICE)
        net_out = net(X).detach().cpu().numpy()
        test_acc = (y.detach().cpu().numpy()==(net_out>0.5)).mean()
        test_loss = log_loss(y.detach().cpu().numpy(), net_out)

plt.figure(figsize=(12,8))
conf_mat = confusion_matrix(y.detach().cpu().numpy(), net_out>0.5)
sns.heatmap(conf_mat, annot=True, cmap='Greys', fmt='.1f')

Let's visualize some of of the bad classification to see if there was any ambiguity.

In [0]:
y=y.detach().cpu().numpy() 
false_positives = X_test[(y.flatten()!=(net_out>0.5).flatten()) & (y==0).flatten()]
false_negatives = X_test[(y.flatten()!=(net_out>0.5).flatten()) & (y==1).flatten()]

In [0]:
plot_4_random(false_positives, 'False positives')

In [0]:
plot_4_random(false_negatives, 'False negatives')