In [None]:
import os 
import torch 
import albumentations as A
import cv2
import numpy as np
from albumentations.pytorch import ToTensorV2
import glob as glob
from xml.etree import ElementTree as et
from torch.utils.data import Dataset, DataLoader
from tqdm.auto import tqdm
import torch
import matplotlib.pyplot as plt
import time
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
import torch.nn as nn
import sklearn
from torchvision import ops

In [None]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [None]:
os.chdir('/content/drive/MyDrive/DeepVis/Kontron')
os.getcwd()

'/content/drive/MyDrive/DeepVis/Kontron'

# Config

In [None]:
# Batch, Size, Epoch, Nettype, optimizer


BATCH_SIZE = 4
RESIZE_TO = 512
NUM_EPOCHS = 50

DEVICE = torch.device("cuda")

TRAIN_DIR = "/content/drive/MyDrive/DeepVis/Kontron/Newdata/train_test"
VALID_DIR = "/content/drive/MyDrive/DeepVis/Kontron/Newdata/val_test"

CLASSES = ['background', 'CamFront']
NUM_CLASSES = 2

VISUALIZE_TRANSFORMED_IMAGES = True

OUT_DIR = '/content/drive/MyDrive/DeepVis/Kontron/Newdata/output_test'

SAVE_PLOTS_EPOCH = 100
SAVE_MODEL_EPOCH = 100

# Transformations and loss class

In [None]:
# class to keep track of the Loss
class Averager:
    def __init__(self):
        self.current_total = 0.0
        self.iterations = 0.0
        
    def send(self, value):
        self.current_total += value
        self.iterations += 1
    
    @property
    def value(self):
        if self.iterations == 0:
            return 0
        else:
            return 1.0 * self.current_total / self.iterations
    
    def reset(self):
        self.current_total = 0.0
        self.iterations = 0.0

# training transformations 
def get_train_transform():
    return A.Compose([
        A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225], 
                    max_pixel_value=255.0, p=1.0),

        A.Rotate(limit=10, p=0.2),
        A.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.1, p=0.3),

       
        ToTensorV2(p=1.0),
    ], bbox_params={
        'format': 'pascal_voc',
        'label_fields': ['labels']
    })

# validation transformations
def get_valid_transform():
    return A.Compose([
        ToTensorV2(p=1.0),
    ], bbox_params={
        'format': 'pascal_voc', 
        'label_fields': ['labels']
    })

# print transformed image to the screen
def show_transformed_image(train_loader):
    if len(train_loader) > 0:
        for i in range(3):
            images, targets = next(iter(train_loader))
            images = list(image.to(DEVICE) for image in images)
            targets = [{k: v.to(DEVICE) for k, v in t.items()} for t in targets]
            boxes = targets[i]['boxes'].cpu().numpy().astype(np.int32)
            sample = images[i].permute(1, 2, 0).cpu().numpy()
            for box in boxes:
                cv2.rectangle(sample,
                            (box[0], box[1]),
                            (box[2], box[3]),
                            (0, 0, 0), 0)
            plt.imshow(sample)
            plt.show()
            cv2.waitKey(0)
            cv2.destroyAllWindows()
    

# Datasets

In [None]:
class KDataset(Dataset):
    def __init__(self, dir_path, width, height, classes, transforms=None):
        self.transforms = transforms
        self.dir_path = dir_path
        self.height = height
        self.width = width
        self.classes = classes
        
        # get image paths
        self.image_paths = glob.glob(f"{self.dir_path}/*.jpg")
        self.all_images = [image_path.split('/')[-1] for image_path in self.image_paths]
        self.all_images = sorted(self.all_images)
        
    def __getitem__(self, idx):
            # image name 
            image_name = self.all_images[idx]
            image_path = os.path.join(self.dir_path, image_name)

            # read image 
            image = cv2.imread(image_path)
            # convert color format and resize 
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB).astype(np.float32)
            image_resized = image
            image_resized = cv2.resize(image, (self.width, self.height))
            
          
           # get xml file for the image to claculate IoU later on 
            annot_filename = image_name[:-4] + '.xml'
            annot_file_path = os.path.join(self.dir_path, annot_filename)
            
            boxes = []
            labels = []
            tree = et.parse(annot_file_path)
            root = tree.getroot()
            
            image_width = image.shape[1]
            image_height = image.shape[0]
            
            for member in root.findall('object'):
                
                # get the coordinates of the bounding box
                if member.find('name').text == 'CamFront':
                    labels.append(self.classes.index(member.find('name').text))
                    xmin = int(member.find('bndbox').find('xmin').text)
                    xmax = int(member.find('bndbox').find('xmax').text)
                    ymin = int(member.find('bndbox').find('ymin').text)
                    ymax = int(member.find('bndbox').find('ymax').text)
                    
                
                # resize bounding box 
                    xmin_final = (xmin/image_width)*self.width
                    xmax_final = (xmax/image_width)*self.width
                    ymin_final = (ymin/image_height)*self.height
                    yamx_final = (ymax/image_height)*self.height
                    
                    boxes.append([xmin_final, ymin_final, xmax_final, yamx_final])
                  # boubding box to tensor 
                    boxes = torch.as_tensor(boxes, dtype=torch.float32)
                  # calculate the area of the bounding boxes
                    area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
                    labels = torch.as_tensor(labels, dtype=torch.int64)
                  # make a target dict
                    target = {}
                    target["boxes"] = boxes
                    target["labels"] = labels
                    target["area"] = area

                    image_id = torch.tensor([idx])
                    target["image_id"] = image_id
                    
                    # aply transformations of bounding box 
                    if self.transforms:
                        sample = self.transforms(image = image_resized,
                                                bboxes = target['boxes'],
                                                labels = labels)
                        image_resized = sample['image']
                        target['boxes'] = torch.Tensor(sample['bboxes'])
            
            return image_resized, target     
    
    def __len__(self):
        return len(self.all_images)
    
# initialize the Dataset class and the dataloader
    
train_dataset = KDataset(TRAIN_DIR, RESIZE_TO, RESIZE_TO, CLASSES, get_train_transform())
valid_dataset = KDataset(VALID_DIR, RESIZE_TO, RESIZE_TO, CLASSES, get_valid_transform())
train_loader = DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    collate_fn=collate_fn
)
valid_loader = DataLoader(
    valid_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    collate_fn=collate_fn
)
print(f"Number of training samples: {len(train_dataset)}")
print(f"Number of validation samples: {len(valid_dataset)}\n")


# visualize samnples to check if data prepoeration worked fine
if __name__ == '__main__':

    dataset = KDataset(
        TRAIN_DIR, RESIZE_TO, RESIZE_TO, CLASSES
    )
    print(f"Number of training images: {len(dataset)}")
    
    def visualize_sample(image, target):
        box = target['boxes'][0]
        label = "Allen_Bradley"
        cv2.rectangle(
            image, 
            (int(box[0]), int(box[1])), (int(box[2]), int(box[3])),
            (0, 255, 0), 1
        )
        cv2.putText(
            image, label, (int(box[0]), int(box[1]-5)), 
            cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2
        )
        plt.imshow(image)        
        plt.show()
        cv2.waitKey(0)
        
    NUM_SAMPLES_TO_VISUALIZE = 1
    for i in range(NUM_SAMPLES_TO_VISUALIZE):
        image, target = dataset[i]
        visualize_sample(image, target)

# Model

In [None]:
def create_model(num_classes):
    # load the model 
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    
    # change tho model to the input features of the localization task (2)
    in_features = model.roi_heads.box_predictor.cls_score.in_features 
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes) 
    
    return  model
    


# Training

In [None]:
plt.style.use('ggplot')


def train(train_data_loader, model):
    print('Training')
    global train_itr
    global train_loss_list
    
     # initialize tqdm progress bar
    prog_bar = tqdm(train_data_loader, total=len(train_data_loader))
    for i, data in enumerate(prog_bar):
        optimizer.zero_grad()
        images, targets = data
        
        images = list(image.to(DEVICE) for image in images)
        targets = [{k: v.to(DEVICE) for k, v in t.items()} for t in targets]
        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())
        loss_value = losses.item()
        train_loss_list.append(loss_value)
        train_loss_hist.send(loss_value)
        losses.backward()
        optimizer.step()
        train_itr += 1
    
        # update the loss value beside the progress bar for each iteration
        prog_bar.set_description(desc=f"Loss: {loss_value:.4f}")
    return train_loss_list


def validate(valid_data_loader, model):
    print('Validating')
    global val_itr
    global val_loss_list
    
    # initialize tqdm progress bar
    prog_bar = tqdm(valid_data_loader, total=len(valid_data_loader))
    
    for i, data in enumerate(prog_bar):
        images, targets = data
        
        images = list(image.to(DEVICE) for image in images)
        targets = [{k: v.to(DEVICE) for k, v in t.items()} for t in targets]
        
        with torch.no_grad():
            loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())
        loss_value = losses.item()
        val_loss_list.append(loss_value)
        val_loss_hist.send(loss_value)
        val_itr += 1
        # update the loss value beside the progress bar for each iteration
        prog_bar.set_description(desc=f"Loss: {loss_value:.4f}")
    return val_loss_list


if __name__ == '__main__':
    # initialize the model and move to the computation device
    model = create_model(num_classes=NUM_CLASSES)
    model = model.to(DEVICE)
    # get the model parameters
    params = [p for p in model.parameters() if p.requires_grad]
    # define the optimizer
    optimizer = torch.optim.SGD(params, lr=0.001, momentum=0.9, weight_decay=0.0005)
    # initialize the Averager class
    train_loss_hist = Averager()
    val_loss_hist = Averager()
    train_itr = 1
    val_itr = 1
    # train and validation loss lists 
    train_loss_list = []
    val_loss_list = []
    # name to save the trained model with
    MODEL_NAME = 'model'
    # whether to show transformed images from data loader or not
    if VISUALIZE_TRANSFORMED_IMAGES:
        show_transformed_image(train_loader)
    # start the training epochs
    for epoch in range(NUM_EPOCHS):
        print(f"\nEPOCH {epoch+1} of {NUM_EPOCHS}")
        # reset the training and validation loss histories for the current epoch
        train_loss_hist.reset()
        val_loss_hist.reset()
        # create two subplots, one for each, training and validation
        figure_1, train_ax = plt.subplots()
        figure_2, valid_ax = plt.subplots()
        # start timer and carry out training and validation
        start = time.time()
        train_loss = train(train_loader, model)
        val_loss = validate(valid_loader, model)
        print(f"Epoch #{epoch} train loss: {train_loss_hist.value:.3f}")   
        print(f"Epoch #{epoch} validation loss: {val_loss_hist.value:.3f}")   
        end = time.time()
        print(f"Took {((end - start) / 60):.3f} minutes for epoch {epoch}")
        if (epoch+1) % SAVE_MODEL_EPOCH == 0: # save model after every n epochs
            torch.save(model.state_dict(), f"{OUT_DIR}/model{epoch+1}.pth")
            print('SAVING MODEL COMPLETE...\n')
        
        if (epoch+1) % SAVE_PLOTS_EPOCH == 0: # save loss plots after n epochs
            train_ax.plot(train_loss, color='blue')
            train_ax.set_xlabel('iterations')
            train_ax.set_ylabel('train loss')
            valid_ax.plot(val_loss, color='red')
            valid_ax.set_xlabel('iterations')
            valid_ax.set_ylabel('validation loss')
            figure_1.savefig(f"{OUT_DIR}/train_loss_{epoch+1}.png")
            figure_2.savefig(f"{OUT_DIR}/valid_loss_{epoch+1}.png")
            print('SAVING PLOTS COMPLETE...')
        
        if (epoch+1) == NUM_EPOCHS: # save loss plots and model once at the end
            train_ax.plot(train_loss, color='blue')
            train_ax.set_xlabel('iterations')
            train_ax.set_ylabel('train loss')
            valid_ax.plot(val_loss, color='red')
            valid_ax.set_xlabel('iterations')
            valid_ax.set_ylabel('validation loss')
            figure_1.savefig(f"{OUT_DIR}/train_loss_{epoch+1}.png")
            figure_2.savefig(f"{OUT_DIR}/valid_loss_{epoch+1}.png")
            torch.save(model.state_dict(), f"{OUT_DIR}/model{epoch+1}.pth")
        
        plt.close('all')

# Eval

In [None]:



device = torch.device('cuda')

# load model and weights 
model = create_model(num_classes=2).to(device)
model.load_state_dict(torch.load(
    '/content/drive/MyDrive/DeepVis/Kontron/Newdata/output_test/model50.pth', map_location=device
))
# put model into evaluation mode
model.eval()

# load test data 
DIR_TEST = '/content/drive/MyDrive/DeepVis/Kontron/Newdata/test'
test_images = glob.glob(f"{DIR_TEST}/*.jpg")
bb_test = glob.glob(f"{DIR_TEST}/*.xml")

print(test_images)
print(f"Test instances: {len(test_images)}")

# define classes
CLASSES = ['background', 'Allen-Bradley']

# set confidence threshold for bounding box 
detection_threshold = 0.9
# prepare lists to measure accuracy of the mdoel 

bb_pred_list = [] 
bb_truth_list = []
IOU = []

# predict bb for test images 
for i in range(len(test_images)):
    image_name = test_images[i].split('/')[-1].split('.')[0]
    print(test_images[i])
    print(f"ImagePath: {image_name}")
    image = cv2.imread(test_images[i])
    orig_image = image.copy()

  # adjust color and perform quick normalization
    image = cv2.cvtColor(orig_image, cv2.COLOR_BGR2RGB).astype(np.float32)
    image /= 255.0

    image = np.transpose(image, (2, 0, 1)).astype(np.float)

    image = torch.tensor(image, dtype=torch.float).cuda()

    image = torch.unsqueeze(image, 0)

    # predict bb
    with torch.no_grad():
        outputs = model(image)
    
    outputs = [{k: v.to('cpu') for k, v in t.items()} for t in outputs]
    
    # get truth bb out of xml file 
    if len(outputs[0]['boxes']) != 0:
        bb_file_path = test_images[i][:-4] + '.xml'
        print(f"XmlPath: {bb_file_path}")
        tree = et.parse(bb_file_path)
        root = tree.getroot()

        for member in root.findall('object'):
                if member.find('name').text == 'CamFront':
                    xmin = int(member.find('bndbox').find('xmin').text)
                    xmax = int(member.find('bndbox').find('xmax').text)
                    ymin = int(member.find('bndbox').find('ymin').text)
                    ymax = int(member.find('bndbox').find('ymax').text)

                    bb_truth = [xmin,ymin,xmax,ymax]

        bb_truth_list += bb_truth

        bb_pred_tensor = outputs[0]['boxes'][0]
        bb_pred_list += torch.Tensor.tolist(bb_pred_tensor)
        bb_pred_tensor = bb_pred_tensor.unsqueeze(0)

        bb_truth_tensor = torch.tensor(bb_truth, dtype=torch.float).unsqueeze(0)
        # display coordinates of predicted and truth bb and calculate IoU
        print(f"Truth: {bb_truth_tensor}")
        print(f"Pred: {bb_pred_tensor}")
        iou = torchvision.ops.box_iou(bb_truth_tensor, bb_pred_tensor)
        IOU.append(iou)
        print(f"img {i} IoU: {iou}")

        boxes = outputs[0]['boxes'].data.numpy()
        scores = outputs[0]['scores'].data.numpy()

        # only keep box prediction with high confidence 
        boxes = boxes[scores >= detection_threshold].astype(np.int32)
        print(f"scores: {scores}")
   # draw the bounding boxes and write the class name on top of it
        draw_boxes = boxes.copy()

        pred_classes = [CLASSES[i] for i in outputs[0]['labels'].cpu().numpy()]
        
       
        print(bb_truth)
        for j, box in enumerate(draw_boxes):
            cv2.rectangle(orig_image,
                        (int(box[0]), int(box[1])),
                        (int(box[2]), int(box[3])),
                        (0, 255, 0), 4)
            
            cv2.rectangle(orig_image,
                        (int(bb_truth[0]), int(bb_truth[1])),
                        (int(bb_truth[2]), int(bb_truth[3])),
                        (255, 0, 0), 2)
                       
            cv2.putText(orig_image, pred_classes[j], 
                        (int(box[0]), int(box[1]-5)),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 
                        2, lineType=cv2.LINE_AA)
        plt.imshow(orig_image)
        plt.show()
        cv2.waitKey(1)
        print(int(box[0]),box[1],box[2],box[3])
        img_cropped = orig_image[int(box[1]):int(box[3]),int(box[0]):int(box[2])]
        plt.imshow(img_cropped)
        plt.show()
        cv2.imwrite(f"/content/drive/MyDrive/DeepVis/Kontron/Newdata/output_test/{image_name}.jpg", img_cropped)
        #cv2.imwrite(f"/content/drive/MyDrive/DeepVis/Kontron/Newdata/output_test/{image_name}.jpg", orig_image)

    print(f"Image {i+1} done...")
    print('-'*50)
print('TEST PREDICTIONS COMPLETE')
cv2.destroyAllWindows()
print("")
# display IoUs and avg IoU
IOU_f = [float(a.item()) for a in IOU]
print(f"IOUs: {IOU_f}")
avgIOU = sum(IOU_f) / len(IOU_f)
print(f"avg IOU: {avgIOU}")

# calculate percision and recall
y_true = []
for i in range(len(test_images)):
  y_true.append("positive")
threshold = 0.95
y_pred = ["positive" if score >= threshold else "negative" for score in IOU_f]
print(f"true: {y_true}")
print(f"pred: {y_pred}")





precision = sklearn.metrics.precision_score(y_true=y_true, y_pred=y_pred, pos_label="positive")
print(f"percision: {precision}")

recall = sklearn.metrics.recall_score(y_true=y_true, y_pred=y_pred, pos_label="positive")
print(f"recall: {recall}")

Citation:
https://debuggercafe.com/custom-object-detection-using-pytorch-faster-rcnn/