In [1]:
ROOT_PATH = './Positive_data'
TEST_PATH = './test'
PREDICTION_THRES = 0.8
EPOCHS = 5
MIN_SIZE = 800
BATCH_SIZE = 4
DEBUG = False # to visualize the images before training

In [2]:
import torch
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection import  FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator

import time
import os
import numpy as np
import cv2
import glob
import albumentations as A
import pandas as pd
from torch.utils.data import Dataset
from albumentations.pytorch.transforms import ToTensorV2
from torch.utils.data import DataLoader

In [3]:
def model():
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True,min_size=MIN_SIZE)
    # one class is for pot holes, and the other is background
    num_classes = 2
    # get the input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    # replace pre-trained head with our features head
    # the head layer will classify the images based on our data input features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    return model

In [4]:
class PotHoleDataset(Dataset):
    def __init__(self, dataframe, image_dir, transforms=None):
        super().__init__()
        self.image_ids = dataframe['image_id'].unique()
        self.df = dataframe
        self.image_dir = image_dir
        self.transforms = transforms
        
    def __getitem__(self, index: int):
        image_id = self.image_ids[index]
        records = self.df[self.df['image_id'] == image_id]
        image = cv2.imread(f"{self.image_dir}/{image_id}.JPG", cv2.IMREAD_COLOR)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB).astype(np.float32)
        image /= 255.0
    
        # convert the boxes into x_min, y_min, x_max, y_max format
        boxes = records[['x', 'y', 'w', 'h']].values
        boxes[:, 2] = boxes[:, 0] + boxes[:, 2]
        boxes[:, 3] = boxes[:, 1] + boxes[:, 3]
        
        # get the area of the bounding boxes
        area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
        area = torch.as_tensor(area, dtype=torch.float32)
        # we have only one class
        labels = torch.ones((records.shape[0],), dtype=torch.int64)
        
        # supposing that all instances are not crowd
        iscrowd = torch.zeros((records.shape[0],), dtype=torch.int64)
        
        target = {}
        target['boxes'] = boxes
        target['labels'] = labels
        target['image_id'] = torch.tensor([index])
        target['area'] = area
        target['iscrowd'] = iscrowd
        # apply the image transforms
        if self.transforms:
            sample = {
                'image': image,
                'bboxes': target['boxes'],
                'labels': labels
            }
            sample = self.transforms(**sample)
            image = sample['image']
            
            # convert the bounding boxes to PyTorch `FloatTensor`
            target['boxes'] = torch.stack(tuple(map(torch.FloatTensor, 
                                                    zip(*sample['bboxes'])))).permute(1, 0)
        return image, target, image_id
    def __len__(self):
        return self.image_ids.shape[0]

In [5]:
def collate_fn(batch):
    """
    This function helps when we have different number of object instances
    in the batches in the dataset.
    """
    return tuple(zip(*batch))

In [6]:
# function for the image transforms
def train_transform():
    return A.Compose([
        A.Flip(0.5),
        # A.RandomRotate90(0.5),
        # A.MotionBlur(p=0.2),
        # A.MedianBlur(blur_limit=3, p=0.1),
        # A.Blur(blur_limit=3, p=0.1),
        ToTensorV2(p=1.0)
    ], bbox_params={'format': 'pascal_voc', 'label_fields': ['labels']})

In [7]:
# path to the input root directory
DIR_INPUT = ROOT_PATH
# read the annotation CSV file
train_df = pd.read_csv("./_df.csv")
print(train_df.head())
print(f"Total number of image IDs (objects) in dataframe: {len(train_df)}")
# get all the image paths as list
image_paths = glob.glob(f"{DIR_INPUT}/*.JPG")
image_names = []
for image_path in image_paths:
    image_names.append(image_path.split(os.path.sep)[-1].split('.')[0])
print(f"Total number of training images in folder: {len(image_names)}")
image_ids = train_df['image_id'].unique()
print(f"Total number of unique train images IDs in dataframe: {len(image_ids)}")
# number of images that we want to train out of all the unique images
train_ids = image_names[:] # use all the images for training
train_df = train_df[train_df['image_id'].isin(train_ids)]
print(f"Number of image IDs (objects) training on: {len(train_df)}")

FileNotFoundError: [Errno 2] No such file or directory: './_df.csv'

In [None]:
train_dataset = PotHoleDataset(train_df, DIR_INPUT, train_transform())
train_data_loader = DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    collate_fn=collate_fn
)

In [None]:
# the computation device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model().to(device)
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9, weight_decay=0.0005)

In [None]:
def train(train_dataloader):
    model.train()
    running_loss = 0
    for i, data in enumerate(train_dataloader):
        
        optimizer.zero_grad()
        images, targets, images_ids = data[0], data[1], data[2]
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
        loss_dict = model(images, targets)
        loss = sum(loss for loss in loss_dict.values())
        running_loss += loss.item()
        loss.backward()
        optimizer.step()
        if i % 25 == 0:
            print(f"Iteration #{i} loss: {loss}")
    train_loss = running_loss/len(train_dataloader.dataset)
    return train_loss

In [None]:
def save_model():
    torch.save(model.state_dict(), 'checkpoints/fasterrcnn_resnet50_fpn.pth')

In [None]:
def visualize():
    """
    This function will only execute if `DEBUG` is `True` in 
    `config.py`.
    """
    images, targets, image_ids = next(iter(train_data_loader))
    images = list(image for image in images)
    targets = [{k: v for k, v in t.items()} for t in targets]
    for i in range(1):
        boxes = targets[i]['boxes'].cpu().numpy().astype(np.int32)
        sample = images[i].permute(1,2,0).cpu().numpy()
        fig, ax = plt.subplots(1, 1, figsize=(15, 12))
        for box in boxes:
            cv2.rectangle(sample,
                        (box[0], box[1]),
                        (box[2], box[3]),
                        (220, 0, 0), 3)
        ax.set_axis_off()
        plt.imshow(sample)
        plt.show()

In [None]:
if DEBUG:
    visualize()

In [None]:
# num_epochs = EPOCHS
# for epoch in range(num_epochs):
#     start = time.time()
#     train_loss = train(train_data_loader)
# #     print(f"Epoch #{epoch} loss: {train_loss}")   
#     end = time.time()
#     print(f"Took {(end - start) / 60} minutes for epoch {epoch}")

In [None]:
save_model()

In [None]:
import numpy as np
import cv2
import os
import torch
from tqdm import tqdm
import config
from model import model

In [None]:
# set the computation device
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
# load the model and the trained weights
model = model().to(device)
model.load_state_dict(torch.load('./fasterrcnn_resnet50_fpn.pth'))

In [None]:
DIR_TEST = config.TEST_PATH
test_images = os.listdir(DIR_TEST)
print(f"Validation instances: {len(test_images)}")

In [None]:
detection_threshold = config.PREDICTION_THRES
model.eval()
with torch.no_grad():
    for i, image in tqdm(enumerate(test_images), total=len(test_images)):
        orig_image = cv2.imread(f"{DIR_TEST}/{test_images[i]}", cv2.IMREAD_COLOR)
        image = cv2.cvtColor(orig_image, cv2.COLOR_BGR2RGB).astype(np.float32)
        # make the pixel range between 0 and 1
        image /= 255.0
        image = np.transpose(image, (2, 0, 1)).astype(np.float)
        image = torch.tensor(image, dtype=torch.float).cuda()
        image = torch.unsqueeze(image, 0)
        cpu_device = torch.device("cpu")
        outputs = model(image)
        
        outputs = [{k: v.to(cpu_device) for k, v in t.items()} for t in outputs]
        if len(outputs[0]['boxes']) != 0:
            for counter in range(len(outputs[0]['boxes'])):
                boxes = outputs[0]['boxes'].data.numpy()
                scores = outputs[0]['scores'].data.numpy()
                boxes = boxes[scores >= detection_threshold].astype(np.int32)
                draw_boxes = boxes.copy()
                
            for box in draw_boxes:
                cv2.rectangle(orig_image,
                            (int(box[0]), int(box[1])),
                            (int(box[2]), int(box[3])),
                            (0, 0, 255), 3)
                cv2.putText(orig_image, 'PotHole', 
                            (int(box[0]), int(box[1]-5)),
                            cv2.FONT_HERSHEY_SIMPLEX, 2, (0, 0, 255), 
                            2, lineType=cv2.LINE_AA)
            cv2.imwrite(f"test_predictions/{test_images[i]}", orig_image,)
print('TEST PREDICTIONS COMPLETE')