In [None]:
pip install torch-snippets

In [None]:
import os
import sys
import ast
import pandas as pd
from torch_snippets import *
from PIL import Image
from sklearn.model_selection import train_test_split
import torchvision
import glob
from torchvision.ops import nms
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

sys.path.append('/kaggle/input/license-plate')

device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [None]:
# image sizes
ROWS = 500
COLS = 900
BATCH_SIZE = 1 # batch size
N_EPOCHS = 100 # number of epochs
IMAGE_ROOT = '/kaggle/input/license-plate/images'

DF_RAW = df = pd.read_csv('/kaggle/input/license-plate/df.csv')

# create classes (targets) and their labels
label2target = {l:t+1 for t,l in enumerate(DF_RAW['LabelName'].unique())}
label2target['background'] = 0
target2label = {t:l for l,t in label2target.items()}
background_class = label2target['background']
num_classes = len(label2target)

# perform image preprocessing 
def preprocess_image(img):
    img = torch.tensor(img).permute(2,0,1)
    return img.to(device).float()

# class that operates dataset
class OpenDataset(torch.utils.data.Dataset):
    w, h = COLS, ROWS
    
    def __init__(self, df, image_dir=IMAGE_ROOT):
        self.image_dir = image_dir
        self.files = glob.glob(self.image_dir+'/*')
        self.df = df
        self.image_infos = df.ImageID.unique()
        
    def __getitem__(self, ix):
        # load images and masks
        image_id = self.image_infos[ix]
        img_path = self.image_dir +'/' + image_id
        img = Image.open(img_path).convert("RGB")
        img = np.array(img.resize((self.w, self.h), resample=Image.BILINEAR))/255.
        data = df[df['ImageID'] == image_id]
        labels = data['LabelName'].values.tolist()
        data = data[['XMin','YMin','XMax','YMax']].values
        data[:,[0,2]] *= self.w
        data[:,[1,3]] *= self.h
        boxes = data.astype(np.uint32).tolist() # convert to absolute coordinates
        # torch FRCNN expects ground truths as a dictionary of tensors
        target = {}
        target["boxes"] = torch.Tensor(boxes).float()
        target["labels"] = torch.Tensor([label2target[i] for i in labels]).long()
        img = preprocess_image(img)
        return img, target
    
    def collate_fn(self, batch):
        return tuple(zip(*batch)) 

    def __len__(self):
        return len(self.image_infos)
    
# split the dataset to train/test
trn_ids, val_ids = train_test_split(df.ImageID.unique(), test_size=0.1, random_state=99)
trn_df, val_df = df[df['ImageID'].isin(trn_ids)], df[df['ImageID'].isin(val_ids)]
len(trn_df), len(val_df)

train_ds = OpenDataset(trn_df)
test_ds = OpenDataset(val_df)

train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, collate_fn=train_ds.collate_fn, drop_last=True)
test_loader = DataLoader(test_ds, batch_size=BATCH_SIZE, collate_fn=test_ds.collate_fn, drop_last=True) 


In [None]:
# create Faster R-CNN model
def get_model():
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    return model

# Defining training and validation functions for a single batch
def train_batch(inputs, model, optimizer):
    model.train()
    input, targets = inputs
    input = list(image.to(device) for image in input)
    targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
    optimizer.zero_grad()
    losses = model(input, targets)
    loss = sum(loss for loss in losses.values())
    loss.backward()
    optimizer.step()
    return loss, losses

# validate images of the batch
@torch.no_grad() # this will disable gradient computation in the function below
def validate_batch(inputs, model):
    model.train() # to obtain the losses, model needs to be in train mode only. # #Note that here we are not defining the model's forward method 
                  # and hence need to work per the way the model class is defined
    input, targets = inputs
    input = list(image.to(device) for image in input)
    targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

    optimizer.zero_grad()
    losses = model(input, targets)
    loss = sum(loss for loss in losses.values())
    return loss, losses

# set learning rate
def set_learning_rate(optimizer, rate):
    for g in optimizer.param_groups:
        g['lr'] = rate
        
# create model
model = get_model().to(device)
# (optional code) load weights from the "model.pt" file, if exists
model.load_state_dict(torch.load(f'/kaggle/input/license-plate/model.pt', map_location=torch.device('cpu')))

# set optimizer
optimizer = torch.optim.SGD(model.parameters(), lr=0.005, momentum=0.9)
#optimizer = torch.optim.Adam(model.parameters(), lr=0.005)

n_epochs = N_EPOCHS
log = Report(n_epochs)


In [None]:
# create data frame for saving training/testing data
epochs_aver_dict = {}

for epoch in range(n_epochs):
    
    # training
    _n = len(train_loader)
    for ix, inputs in enumerate(train_loader):
        loss, losses = train_batch(inputs, model, optimizer)
        loc_loss, regr_loss, loss_objectness, loss_rpn_box_reg = \
            [losses[k] for k in ['loss_classifier','loss_box_reg','loss_objectness','loss_rpn_box_reg']]
        pos = (epoch + (ix+1)/_n)
        log.record(pos, trn_loss=loss.item(), trn_loc_loss=loc_loss.item(), 
                   trn_regr_loss=regr_loss.item(), trn_objectness_loss=loss_objectness.item(),
                   trn_rpn_box_reg_loss=loss_rpn_box_reg.item(), end='\r')
    
    # testing
    _n = len(test_loader)
    for ix,inputs in enumerate(test_loader):
        loss, losses = validate_batch(inputs, model)
        loc_loss, regr_loss, loss_objectness, loss_rpn_box_reg = \
          [losses[k] for k in ['loss_classifier','loss_box_reg','loss_objectness','loss_rpn_box_reg']]
        pos = (epoch + (ix+1)/_n)
        log.record(pos, val_loss=loss.item(), val_loc_loss=loc_loss.item(), 
                  val_regr_loss=regr_loss.item(), val_objectness_loss=loss_objectness.item(),
                  val_rpn_box_reg_loss=loss_rpn_box_reg.item(), end='\r')
    
    # save training/testing data of the current epoch
    epochs_aver = log.report_avgs(epoch+1)
    epochs_aver_dict[epoch+1] = epochs_aver
    
    # change learning rate in some epochs
    if epoch==10:
        set_learning_rate(optimizer, 0.001)
    elif epoch==70:
        set_learning_rate(optimizer, 0.0005)

# save training/testing data
epochs_aver_df = pd.DataFrame.from_dict(epochs_aver_dict, orient="index") 
epochs_aver_df.index.name = 'Epoch'
epochs_aver_df.to_csv(f'/kaggle/working/epochs_aver_df.csv')
# plot train/test graph
log.plot_epochs(['trn_loss','val_loss'])
# save model
torch.save(model.state_dict(),f'/kaggle/working/model_1.pt')

# get from the "output" boxes and labels
def decode_output(output):
    'convert tensors to numpy arrays'
    bbs = output['boxes'].cpu().detach().numpy().astype(np.uint16)
    labels = np.array([target2label[i] for i in output['labels'].cpu().detach().numpy()])
    confs = output['scores'].cpu().detach().numpy()
    ixs = nms(torch.tensor(bbs.astype(np.float32)), torch.tensor(confs), 0.05)
    bbs, confs, labels = [tensor[ixs] for tensor in [bbs, confs, labels]]

    if len(ixs) == 1:
        bbs, confs, labels = [np.array([tensor]) for tensor in [bbs, confs, labels]]
    return bbs.tolist(), confs.tolist(), labels.tolist()

# perform testing over test images
model.eval()
for ix, (images, targets) in enumerate(test_loader):
    images = [im for im in images]
    outputs = model(images)
    for ix, output in enumerate(outputs):
        bbs, confs, labels = decode_output(output)
        info = [f'{l}@{c:.2f}' for l,c in zip(labels, confs)]
        show(images[ix].cpu().permute(1,2,0), bbs=bbs, texts=labels, sz=25)


In [None]:
# this code is served to save the current model and training/testing data 
# it can be usefull if the training process is unterrupted
# save the current training/testing data 
epochs_aver_df = pd.DataFrame.from_dict(epochs_aver_dict, orient="index") 
epochs_aver_df.index.name = 'Epoch'
epochs_aver_df.to_csv(f'/kaggle/working/epochs_aver_df.csv')
# save the current model
torch.save(model.state_dict(),f'/kaggle/working/model_1.pt')
log.plot_epochs(['trn_loss','val_loss'])