# Dirty/clean plates

Preparations

In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the "../input/" directory.
# For example, running this (by clicking run or pressing Shift+Enter) will list the files in the input directory

import os
print(os.listdir('../input'))

# Any results you write to the current directory are saved as output.

import zipfile
with zipfile.ZipFile('../input/plates.zip', 'r') as zip_obj:
   # Extract all the contents of zip file in current directory
   zip_obj.extractall('/kaggle/working/')
    
print('After zip extraction:')
print(os.listdir('/kaggle/working/'))

In [None]:
import warnings
warnings.filterwarnings('ignore') 

In [None]:
import time
import copy
import random
import shutil

import cv2
import torch
import torchvision
from torchvision import transforms, models
import matplotlib.pyplot as plt

from tqdm import tqdm

In [None]:
MEAN = np.array([0.485, 0.456, 0.406])
STD = np.array([0.229, 0.224, 0.225])

In [None]:
DATA_ROOT = '/kaggle/working/plates/'
TRAIN_DIR = 'train'
TEST_DIR = 'test'

In [None]:
shutil.copytree(os.path.join(DATA_ROOT, TRAIN_DIR), TRAIN_DIR)

In [None]:
shutil.copytree(os.path.join(DATA_ROOT, TEST_DIR), os.path.join(TEST_DIR, 'unknown'))

Classes and functions

In [None]:
class DataPreprocessing:
    """Class for datasets preprocessing"""
    
    def _remove_background(self, path, file):
        # TODO: Improve background removing and crop images
        img = cv2.imread(path + file)
    
        img = np.array(img)
        height, width = img.shape[:2]
        mask = np.zeros([height, width], np.uint8)

        bgdModel = np.zeros((1, 65),np.float64)
        fgdModel = np.zeros((1, 65),np.float64)

        rect = (15, 15, width-15, height-15)
        cv2.grabCut(img, mask, rect, bgdModel, fgdModel, 5, cv2.GC_INIT_WITH_RECT)
        mask = np.where((mask==2) | (mask==0), 0, 1).astype('uint8')
        res = img * mask[:, :, np.newaxis]

        # Get the background
        background = img - res

        # Change all pixels in the background that are not black to white
        background[np.where((background > [0, 0, 0]).all(axis = 2))] = [255, 255, 255]

        res = np.array(background + res)
        cv2.imwrite(path + file, res)

    def remove_background(self, image_folders):
        for path in image_folders:
            files = os.listdir(path)
            files = list(filter(lambda x: x.endswith('.jpg'), files))

            for file in tqdm(files):
                self._remove_background(path, file)

In [None]:
class ImageFolderWithPaths(torchvision.datasets.ImageFolder):
    def __getitem__(self, index):
        original_tuple = super(ImageFolderWithPaths, self).__getitem__(index)
        path = self.imgs[index][0]
        tuple_with_path = (original_tuple + (path,))
        return tuple_with_path

In [None]:
def show_input(input_tensor, title=''):
    image = input_tensor.permute(1, 2, 0).numpy()
    image = STD * image + MEAN
    plt.imshow(image.clip(0, 1))
    plt.title(title)
    plt.show()

In [None]:
preprocessor = DataPreprocessing()

In [None]:
# Removing images background
preprocessor.remove_background(image_folders=[
    os.path.join(TRAIN_DIR, 'cleaned/'),
    os.path.join(TRAIN_DIR, 'dirty/'),
    os.path.join(TEST_DIR, 'unknown/')
])

In [None]:
train_transforms = transforms.Compose([
    transforms.RandomRotation(degrees=90, fill=255),
    transforms.CenterCrop(180),
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.ColorJitter(hue=(0.1, 0.2)),
    transforms.ToTensor(),
    transforms.Normalize(MEAN, STD)
])

train_dataset = torchvision.datasets.ImageFolder(TRAIN_DIR, train_transforms)

batch_size = 8
train_dataloader = torch.utils.data.DataLoader(
    train_dataset, batch_size=batch_size, shuffle=True, num_workers=batch_size
)

In [None]:
len(train_dataloader), len(train_dataset)

In [None]:
class_names = ['cleaned', 'dirty']

X_batch, y_batch = next(iter(train_dataloader))

for x_item, y_item in zip(X_batch, y_batch):
    show_input(x_item, title=class_names[y_item])

Model training

In [None]:
def train_model(model, dataloader, loss, optimizer, scheduler, num_epochs):
    accuracies = {'train': []}
    losses = {'train': []}
    for epoch in range(num_epochs):
        print('Epoch {}/{}:'.format(epoch, num_epochs - 1), flush=True)

        # Each epoch has a training phase
        phase = 'train'
        model.train()  # Set model to training mode

        running_loss = 0.
        running_acc = 0.

        # Iterate over data
        for inputs, labels in tqdm(dataloader):
            inputs = inputs.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()

            # Forward and backward
            with torch.set_grad_enabled(True):
                preds = model(inputs)
                loss_value = loss(preds, labels)
                preds_class = preds.argmax(dim=1)

                loss_value.backward()
                optimizer.step()

            # Statistics
            running_loss += loss_value.item()
            running_acc += (preds_class == labels.data).float().mean()
        
        scheduler.step()
            
        epoch_loss = running_loss / len(dataloader)
        epoch_acc = running_acc / len(dataloader)

        accuracies[phase].append(epoch_acc)
        losses[phase].append(epoch_loss)

        print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc), flush=True)

    return model, losses, accuracies

In [None]:
# Choose seed for training reproduction
seed = 21
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True

# model = models.resnet18(pretrained=True)
# model = models.resnet34(pretrained=True)
model = models.resnet50(pretrained=True)
# model = models.resnet152(pretrained=True)

# Disable grad for all conv layers
for param in model.parameters():
    param.requires_grad = False

model.fc = torch.nn.Linear(model.fc.in_features, 2)

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

loss = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=1.0)
# optimizer = torch.optim.Adam(model.parameters(), amsgrad=True, lr=0.001)

# Decay LR by a factor of 0.1 every 7 epochs
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)# Choose seed for training reproduction
seed = 21
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True

# model = models.resnet18(pretrained=True)
# model = models.resnet34(pretrained=True)
model = models.resnet50(pretrained=True)
# model = models.resnet152(pretrained=True)

# Disable grad for all conv layers
for param in model.parameters():
    param.requires_grad = False

model.fc = torch.nn.Linear(model.fc.in_features, 2)

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

loss = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=1.0)
# optimizer = torch.optim.Adam(model.parameters(), amsgrad=True, lr=0.001)

# Decay LR by a factor of 0.1 every 7 epochs
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

In [None]:
model, losses, accuracies = train_model(model, train_dataloader, loss,
                                        optimizer, scheduler, num_epochs=40)

In [None]:
plt.rcParams['figure.figsize'] = (10, 5)
for experiment_id in accuracies.keys():
    plt.plot(accuracies[experiment_id], label=experiment_id)
plt.legend()
plt.title('Accuracy')

In [None]:
plt.rcParams['figure.figsize'] = (10, 5)
for experiment_id in losses.keys():
    plt.plot(losses[experiment_id], label=experiment_id)
plt.legend()
plt.title('Loss')

Transforms for test dataset

In [None]:
test_transforms = transforms.Compose([
    transforms.CenterCrop(180),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(MEAN, STD)
])

test_dataset = ImageFolderWithPaths(os.path.join(TEST_DIR), transform=test_transforms)

test_dataloader = torch.utils.data.DataLoader(
    test_dataset, batch_size=8, shuffle=False, num_workers=0
)

In [None]:
test_dataset

Predictions

In [None]:
model.eval()

test_predictions = []
test_img_paths = []
for inputs, labels, paths in tqdm(test_dataloader):
    inputs = inputs.to(device)
    labels = labels.to(device)
    with torch.set_grad_enabled(False):
        preds = model(inputs)
    test_predictions.append(
        torch.nn.functional.softmax(preds, dim=1)[:,1].data.cpu().numpy())
    test_img_paths.extend(paths)

test_predictions = np.concatenate(test_predictions)

In [None]:
inputs, labels, paths = next(iter(test_dataloader))

for img, pred in zip(inputs, test_predictions):
    show_input(img, title=pred)

Submission

In [None]:
submission_df = pd.DataFrame.from_dict({'id': test_img_paths, 'label': test_predictions})

In [None]:
submission_df['label'] = submission_df['label'].map(lambda pred: 'dirty' if pred > 0.63 else 'cleaned')
submission_df['id'] = submission_df['id'].str.replace('test/unknown/', '')
submission_df['id'] = submission_df['id'].str.replace('.jpg', '')
submission_df.set_index('id', inplace=True)
submission_df.head(n=6)

In [None]:
submission_df['label'].value_counts()

In [None]:
submission_df.to_csv(os.path.join(DATA_ROOT, 'submission.csv'))

In [None]:
!rm -rf train test