In [1]:
import albumentations as A
import cv2
import numpy as np
from albumentations.pytorch import ToTensorV2
import torch
import os
import glob as glob
from xml.etree import ElementTree as et
from torch.utils.data import Dataset, DataLoader
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from tqdm.auto import tqdm
import time
import matplotlib.pyplot as plt
import json
import pandas as pd
import matplotlib.image as img
plt.style.use('ggplot')

In [2]:
CUDA_VISIBLE_DEVICES = 1

In [3]:
BATCH_SIZE = 4 # increase / decrease according to GPU memeory
RESIZE_TO = 512 # resize the image for training and transforms
NUM_EPOCHS = 50 # number of epochs to train for
DEVICE = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

In [4]:
NUM_CLASSES = 9
# whether to visualize images after crearing the data loaders
VISUALIZE_TRANSFORMED_IMAGES = False
# location to save model and plots
checkpoints_DIR = './checkpoints/ceeri/'
plots_DIR = './plots/ceeri/'
SAVE_PLOTS_EPOCH = 2 # save loss plots after these many epochs
SAVE_MODEL_EPOCH = 2 # save model after these many epochs

In [5]:
# create dataframe from json annotation for future access
# Create a data frame
def createDF(anno_path):
  with open(anno_path, 'r') as f:
    anno = json.load(f)
  df = pd.DataFrame(columns = ['imgID', 'imgPath', 'labels', 'polygons', 'bboxes'])

  #category mapping
  cat_map = {}
  for i in range(len(anno['categories'])):
    cat_map[anno['categories'][i]['id']] = anno['categories'][i]['name']
  # fill the df with entire anno data
  for i in range(len(anno['images'])):
    tempDict = {'imgID': anno['images'][i]['id'],
              'imgPath':anno['images'][i]['file_name'],
              'labels':[],
              'polygons':[],
              'bboxes':[],
              'width': anno['images'][i]['width'],
              'height': anno['images'][i]['height']}
    df = df.append(tempDict, ignore_index=True)

  df.set_index('imgID', inplace=True)

  for i in range(len(anno['annotations'])): 
    
    df.loc[anno['annotations'][i]['image_id']]['labels'].append(anno['annotations'][i]['category_id'])
    df.loc[anno['annotations'][i]['image_id']]['polygons'].append(anno['annotations'][i]['segmentation'])
    df.loc[anno['annotations'][i]['image_id']]['bboxes'].append(anno['annotations'][i]['bbox'])

  df = df.reset_index()
  df['imgID'] = df['imgID'].astype('category').cat.codes
  df.set_index('imgID', inplace=True)
  df.sort_index(inplace=True)
  return df

In [6]:
# training images files directory
TRAIN_DIR = '.'
trainDF = createDF('NewValid.json')
# validation images files directory
VALID_DIR = '.'
testDF = createDF('NewValid.json')
# classes: 0 index is reserved for background
#CLASSES = [
#    '__background__', 'Folding_Knife', 'Straight_Knife', 'Scissor', 'Utility_Knife', 'Multi-tool_Knife'
#]

CLASSES = [
    '__background__', 'knife', 'plier', 'nail_cutter', 'paper_cutter', 'wrench', 'scissor', 'screw_driver', 'hammer'
]


In [7]:
trainDF.head()

Unnamed: 0_level_0,imgPath,labels,polygons,bboxes,height,width
imgID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
0,valid/P01X010-2022061100066.jpg,[2],"[[365, 97, 385, 112, 399, 112, 428, 140, 461, ...","[[365, 75, 106, 76]]",704.0,640.0
1,valid/P01X010-2022061100067.jpg,[2],"[[360, 495, 403, 500, 439, 521, 394, 533, 351,...","[[351, 488, 136, 59]]",656.0,640.0
2,valid/P01X010-2022061100068.jpg,[6],"[[409, 324, 316, 395, 310, 382, 397, 310, 417,...","[[310, 243, 178, 152]]",784.0,640.0
3,valid/P01X010-2022061100080.jpg,"[8, 7]","[[349, 136, 431, 323, 451, 314, 373, 130, 384,...","[[305, 93, 146, 230], [395, 124, 131, 158]]",528.0,640.0
4,valid/P01X010-2022061100081.jpg,"[8, 7]","[[422, 103, 340, 280, 358, 291, 440, 120, 474,...","[[340, 73, 134, 218], [463, 129, 112, 190]]",672.0,640.0


In [8]:
def create_model(num_classes):    
    # load Faster RCNN pre-trained model
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)    
    # get the number of input features 
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    # define a new head for the detector with required number of classes
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes) 
    return model

In [9]:
# this class keeps track of the training and validation loss values...
# ... and helps to get the average for each epoch as well
class Averager:
    def __init__(self):
        self.current_total = 0.0
        self.iterations = 0.0
        
    def send(self, value):
        self.current_total += value
        self.iterations += 1
    
    @property
    def value(self):
        if self.iterations == 0:
            return 0
        else:
            return 1.0 * self.current_total / self.iterations
    
    def reset(self):
        self.current_total = 0.0
        self.iterations = 0.0
        
def collate_fn(batch):
    """
    To handle the data loading as different images may have different number 
    of objects and to handle varying size tensors as well.
    """
    return tuple(zip(*batch))
    
# define the training tranforms
def get_train_transform():
    return A.Compose([
        A.Flip(0.5),
        A.RandomRotate90(0.5),
        A.MotionBlur(p=0.2),
        A.MedianBlur(blur_limit=3, p=0.1),
        A.Blur(blur_limit=3, p=0.1),
        ToTensorV2(p=1.0),
    ], bbox_params={
        'format': 'pascal_voc', 
        'label_fields': ['labels']
    }
    )
# define the validation transforms
def get_valid_transform():
    return A.Compose([
        ToTensorV2(p=1.0),
    ], bbox_params={
        'format': 'pascal_voc', 
        'label_fields': ['labels']
    })    
    
def show_tranformed_image(train_loader):
    """
    This function shows the transformed images from the `train_loader`.
    Helps to check whether the tranformed images along with the corresponding
    labels are correct or not.
    Only runs if `VISUALIZE_TRANSFORMED_IMAGES = True` in config.py.
    """
    if len(train_loader) > 0:
        for i in range(1):
            images, targets = next(iter(train_loader))
            images = list(image.to(DEVICE) for image in images)
            targets = [{k: v.to(DEVICE) for k, v in t.items()} for t in targets]
            boxes = targets[i]['boxes'].cpu().numpy().astype(np.int32)
            sample = images[i].permute(1, 2, 0).cpu().numpy()
            for box in boxes:
                cv2.rectangle(sample,
                            (box[0], box[1]),
                            (box[2], box[3]),
                            (0, 0, 255), 2)
            cv2.imshow('Transformed image', sample)
            cv2.waitKey(0)
            cv2.destroyAllWindows()    

In [10]:
# the dataset class
class ImageDataset(Dataset):
    def __init__(self, dir_path, df, width, height, transforms=None):
        self.dir_path = dir_path
        self.transforms = transforms
        self.height = height
        self.width = width
        self.df = df
    def __getitem__(self, idx):
        # capture the image name and the full image path
        row = self.df.loc[idx]
        image_name = row['imgPath']
        image_path = os.path.join(self.dir_path, image_name)
        # read the image
        image = cv2.imread(image_path)
        # convert BGR to RGB color format
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB).astype(np.float32)
        image_resized = cv2.resize(image, (self.width, self.height))
        image_resized /= 255.0
        
        boxes = []
        labels = row['labels']
        
        # get the height and width of the image
        image_width = row['width']
        image_height = row['height']
        
        # box coordinates are extracted and corrected for image size given
        for bbox in row['bboxes']:
            xmin = bbox[0]          
            xmax = xmin + bbox[2]
            ymin = bbox[1]          
            ymax = ymin + bbox[3]
            
            # resize the bounding boxes according to the...
            # ... desired `width`, `height`
            xmin_final = (xmin/image_width)*self.width
            xmax_final = (xmax/image_width)*self.width
            ymin_final = (ymin/image_height)*self.height
            yamx_final = (ymax/image_height)*self.height
            
            boxes.append([xmin_final, ymin_final, xmax_final, yamx_final])
        
        # bounding box to tensor
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        # area of the bounding boxes
        area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
        # no crowd instances
        iscrowd = torch.zeros((boxes.shape[0],), dtype=torch.int64)
        # labels to tensor
        labels = torch.as_tensor(labels, dtype=torch.int64)
        # prepare the final `target` dictionary
        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        target["area"] = area
        target["iscrowd"] = iscrowd
        image_id = torch.tensor([idx])
        target["image_id"] = image_id
        # apply the image transforms
        if self.transforms:
          sample = self.transforms(image = image_resized,
                                    bboxes = boxes,
                                    labels = labels
                                    )
          
          image_resized = sample['image']
          target['boxes'] = torch.Tensor(sample['bboxes'])
            
        return image_resized, target
    def __len__(self):
        return len(self.df)

In [11]:
# prepare the final datasets and data loaders
train_dataset = ImageDataset(TRAIN_DIR, trainDF, RESIZE_TO, RESIZE_TO, get_train_transform())
valid_dataset = ImageDataset(VALID_DIR, testDF, RESIZE_TO, RESIZE_TO, get_valid_transform())
train_loader = DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=0,
    collate_fn=collate_fn
)
valid_loader = DataLoader(
    valid_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=0,
    collate_fn=collate_fn
)
print(f"Number of training samples: {len(train_dataset)}")
print(f"Number of validation samples: {len(valid_dataset)}\n") 

Number of training samples: 502
Number of validation samples: 502



In [12]:
# sanity check of the Dataset pipeline with sample visualization
dataset = ImageDataset(
    TRAIN_DIR, trainDF, RESIZE_TO, RESIZE_TO
    )
print(f"Number of training images: {len(dataset)}")
    
# function to visualize a single sample 
# this function is incapable of dealing with images with multiple objects in them
def visualize_sample(image, target):
  box = target['boxes'][0]
  label = CLASSES[target['labels'][0]]
  cv2.rectangle(
      image, 
      (int(box[0]), int(box[1])), (int(box[2]), int(box[3])),
      (0, 255, 0), 1
      )
  cv2.putText(
      image, label, (int(box[0]), int(box[1]-5)), 
      cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2
      )
  plt.imshow(np.asarray(image))
  plt.grid(None)  
  plt.show()

        
NUM_SAMPLES_TO_VISUALIZE = 5
for index, row in dataset.df.iterrows():
  image, target = dataset[index]
  visualize_sample(image, target)
  NUM_SAMPLES_TO_VISUALIZE -= 1
  if NUM_SAMPLES_TO_VISUALIZE == 0:
    break


Number of training images: 502


[ WARN:0@14.014] global /io/opencv/modules/imgcodecs/src/loadsave.cpp (239) findDecoder imread_('./valid/P01X010-2022061100066.jpg'): can't open/read file: check file path/integrity


error: OpenCV(4.5.5) /io/opencv/modules/imgproc/src/color.cpp:182: error: (-215:Assertion failed) !_src.empty() in function 'cvtColor'


In [None]:
# function for running training iterations
def train(train_data_loader, model):
    print('Training')
    global train_itr
    global train_loss_list
    
     # initialize tqdm progress bar
    prog_bar = tqdm(train_data_loader, total=len(train_data_loader))
    
    for i, data in enumerate(prog_bar):
        optimizer.zero_grad()
        images, targets = data
        
        images = list(image.to(DEVICE) for image in images)
        targets = [{k: v.to(DEVICE) for k, v in t.items()} for t in targets]
        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())
        loss_value = losses.item()
        train_loss_list.append(loss_value)
        train_loss_hist.send(loss_value)
        losses.backward()
        optimizer.step()
        train_itr += 1
    
        # update the loss value beside the progress bar for each iteration
        prog_bar.set_description(desc=f"Loss: {loss_value:.4f}")
    return train_loss_list

# function for running validation iterations
def validate(valid_data_loader, model):
    print('Validating')
    global val_itr
    global val_loss_list
    
    # initialize tqdm progress bar
    prog_bar = tqdm(valid_data_loader, total=len(valid_data_loader))
    
    for i, data in enumerate(prog_bar):
        images, targets = data
        
        images = list(image.to(DEVICE) for image in images)
        targets = [{k: v.to(DEVICE) for k, v in t.items()} for t in targets]
        
        with torch.no_grad():
            loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())
        loss_value = losses.item()
        val_loss_list.append(loss_value)
        val_loss_hist.send(loss_value)
        val_itr += 1
        # update the loss value beside the progress bar for each iteration
        prog_bar.set_description(desc=f"Loss: {loss_value:.4f}")
    return val_loss_list    

In [None]:
# initialize the model and move to the computation device
model = create_model(num_classes=NUM_CLASSES)
model = model.to(DEVICE)
# get the model parameters
params = [p for p in model.parameters() if p.requires_grad]
# define the optimizer
optimizer = torch.optim.SGD(params, lr=0.001, momentum=0.9, weight_decay=0.0005)
# initialize the Averager class
train_loss_hist = Averager()
val_loss_hist = Averager()
train_itr = 1
val_itr = 1
# train and validation loss lists to store loss values of all...
# ... iterations till ena and plot graphs for all iterations
train_loss_list = []
val_loss_list = []
# name to save the trained model with
MODEL_NAME = 'model'
# whether to show transformed images from data loader or not
if VISUALIZE_TRANSFORMED_IMAGES:
  from utils import show_tranformed_image
  show_tranformed_image(train_loader)
# start the training epochs
for epoch in range(NUM_EPOCHS):
    print(f"\nEPOCH {epoch+1} of {NUM_EPOCHS}")
    # reset the training and validation loss histories for the current epoch
    train_loss_hist.reset()
    val_loss_hist.reset()
    # create two subplots, one for each, training and validation
    figure_1, train_ax = plt.subplots()
    figure_2, valid_ax = plt.subplots()
    # start timer and carry out training and validation
    start = time.time()
    train_loss = train(train_loader, model)
    val_loss = validate(valid_loader, model)
    print(f"Epoch #{epoch} train loss: {train_loss_hist.value:.3f}")   
    print(f"Epoch #{epoch} validation loss: {val_loss_hist.value:.3f}")   
    end = time.time()
    print(f"Took {((end - start) / 60):.3f} minutes for epoch {epoch}")
    if (epoch+1) % SAVE_MODEL_EPOCH == 0: # save model after every n epochs
        torch.save(model.state_dict(), f"{checkpoints_DIR}/model{epoch+1}.pth")
        print('SAVING MODEL COMPLETE...\n')
        
    if (epoch+1) % SAVE_PLOTS_EPOCH == 0: # save loss plots after n epochs
        train_ax.plot(train_loss, color='blue')
        train_ax.set_xlabel('iterations')
        train_ax.set_ylabel('train loss')
        valid_ax.plot(val_loss, color='red')
        valid_ax.set_xlabel('iterations')
        valid_ax.set_ylabel('validation loss')
        figure_1.savefig(f"{plots_DIR}/train_loss_{epoch+1}.png")
        figure_2.savefig(f"{plots_DIR}/valid_loss_{epoch+1}.png")
        print('SAVING PLOTS COMPLETE...')
        
    if (epoch+1) == NUM_EPOCHS: # save loss plots and model once at the end
        train_ax.plot(train_loss, color='blue')
        train_ax.set_xlabel('iterations')
        train_ax.set_ylabel('train loss')
        valid_ax.plot(val_loss, color='red')
        valid_ax.set_xlabel('iterations')
        valid_ax.set_ylabel('validation loss')
        figure_1.savefig(f"{plots_DIR}/train_loss_{epoch+1}.png")
        figure_2.savefig(f"{plots_DIR}/valid_loss_{epoch+1}.png")
        torch.save(model.state_dict(), f"{checkpoints_DIR}/model{epoch+1}.pth")
        
    plt.close('all')   

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
# load the model and the trained weights
model = create_model(num_classes=6).to(device)
model.load_state_dict(torch.load(
    './checkpoints/opixray/model50.pth', map_location=device
))
model.eval()

In [None]:
# directory where all the images are present
DIR_TEST = '../Datasets/OPIXray/test/images/rgb/'
test_images = glob.glob(f"{DIR_TEST}/*.jpg")
print(f"Test instances: {len(test_images)}")

In [None]:
# define the detection threshold...
# ... any detection having score below this will be discarded
detection_threshold = 0.8

In [None]:
for i in range(len(test_images)):
    # get the image file name for saving output later on
    image_name = test_images[i].split('/')[-1].split('.')[0]
    image = cv2.imread(test_images[i])
    orig_image = image.copy()
    # BGR to RGB
    image = cv2.cvtColor(orig_image, cv2.COLOR_BGR2RGB).astype(np.float32)
    # make the pixel range between 0 and 1
    image /= 255.0
    # bring color channels to front
    image = np.transpose(image, (2, 0, 1)).astype(float)
    # convert to tensor
    image = torch.tensor(image, dtype=torch.float).cuda()
    # add batch dimension
    image = torch.unsqueeze(image, 0)
    with torch.no_grad():
        outputs = model(image)
    
    # load all detection to CPU for further operations
    outputs = [{k: v.to('cpu') for k, v in t.items()} for t in outputs]
    # carry further only if there are detected boxes
    if len(outputs[0]['boxes']) != 0:
        boxes = outputs[0]['boxes'].data.numpy()
        scores = outputs[0]['scores'].data.numpy()
        # filter out boxes according to `detection_threshold`
        boxes = boxes[scores >= detection_threshold].astype(np.int32)
        draw_boxes = boxes.copy()
        # get all the predicited class names
        pred_classes = [CLASSES[i] for i in outputs[0]['labels'].cpu().numpy()]
        
        # draw the bounding boxes and write the class name on top of it
        for j, box in enumerate(draw_boxes):
            cv2.rectangle(orig_image,
                        (int(box[0]), int(box[1])),
                        (int(box[2]), int(box[3])),
                        (0, 0, 255), 2)
            cv2.putText(orig_image, pred_classes[j], 
                        (int(box[0]), int(box[1]-5)),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 
                        2, lineType=cv2.LINE_AA)
        cv2.imwrite(f"./results/opixray/{image_name}.jpg", orig_image,)
    print(f"Image {i+1} done...")
    print('-'*50)
print('TEST PREDICTIONS COMPLETE')
#cv2.destroyAllWindows()