Imports for the notebook

In [27]:
# Pytorch imports
import torch
import torchvision

from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator

from torch.utils.data import DataLoader, Dataset
from torch.utils.data.sampler import SequentialSampler 

# Loading Data imports 
import pandas as pd
from PIL import Image
import cv2


# Other imports
import numpy as np
import os
from matplotlib import pyplot as plt
import matplotlib.patches as patches
import random as rand
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
import PIL
from PIL import Image

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

The classes to load the data from a folder

In [6]:
class PassionfruitDataset(Dataset):

    def __init__(self, samples, path_samples, path_labels, transforms=None):
        super().__init__()

        self.path_samples = path_samples
        self.path_labels = path_labels
        self.transforms = transforms

        self.samples = samples
        self.expected_output = pd.read_csv(self.path_labels, sep=',')

    def __getitem__(self, index):
        sample_id = self.samples[index]
        id, ext = sample_id.split('.')
        records = self.expected_output[self.expected_output['Image_ID'] == id]

        image = cv2.imread(f'{self.path_samples}/{sample_id}', cv2.IMREAD_COLOR)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB).astype(np.float32)
        image /= 255.0

        boxes = records[['xmin', 'ymin', 'width', 'height']].values
        boxes[:, 2] = boxes[:, 0] + boxes[:, 2]
        boxes[:, 3] = boxes[:, 1] + boxes[:, 3]

        labels = []
        label_dic = {'not': 0, 'fruit_healthy': 1, 'fruit_woodiness': 2, 'fruit_brownspot': 3}
        for i, row in records.iterrows():
            labels.append(label_dic[row['class']])

        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)
        image_id = torch.tensor([index])
        area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
        iscrowd = torch.zeros((records.shape[0],), dtype=torch.int64)

        target = {}
        target['boxes'] = boxes
        target['labels'] = labels
        target['image_id'] = image_id
        target['area'] = area
        target['iscrowd'] = iscrowd

        if self.transforms:
            sample = {
                'image': image,
                'bboxes': target['boxes'],
                'labels': labels
            }
            sample = self.transforms(**sample)
            image = sample['image']
            target['boxes'] = torch.stack(tuple(map(torch.tensor, zip(*sample['bboxes'])))).permute(1, 0)

        return image, target, id

    def __len__(self) -> int:
        return len(self.samples)

In [None]:
class PassionfruitDataset_Test(Dataset):

    def __init__(self, samples, path_samples, transforms=None):
        super().__init__()

        self.path_samples = path_samples
        self.transforms = transforms
        self.samples = samples

    def __getitem__(self, index: int):

        sample_id = self.samples[index]
        id, ext = sample_id.split('.')

        img = cv2.imread(f'{self.path_samples}/{sample_id}', cv2.IMREAD_COLOR)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype(np.float32)
        img /= 255.0

        if self.transforms:
            sample = {
                'image': img,
            }
            sample = self.transforms(**sample)
            img = sample['image']

        return img, id

    def __len__(self) -> int:
        return len(self.samples)

The functions for the transformers

In [8]:
def train_transform():
    return A.Compose([A.Flip(0.5),ToTensorV2(p=1.0)], bbox_params={'format': 'pascal_voc', 'label_fields': ['labels']})

def valid_transform():
    return A.Compose([ToTensorV2(p=1.0)], bbox_params={'format': 'pascal_voc', 'label_fields': ['labels']})

def test_transform():
    return A.Compose([ToTensorV2(p=1.0)])

Load the pretrained model for the program

In [None]:
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
num_classes = 4
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

Load the data and create the data loaders

In [None]:
def collate_fn(batch):
    return tuple(zip(*batch))

In [None]:
# sample_path = '/content/Data/Train_Images'
sample_path = '/content/Data/Train_Images_(augmented)'
label_path = '/content/Train.csv'

sample_ids = list( sorted(os.listdir(os.path.join(sample_path))))
try:
  sample_ids.remove('.DS_Store')
except:
  pass
rand.shuffle(sample_ids)

split = int(len(sample_ids) * 70 / 100)

train_sample_ids = sample_ids[:split]
valid_sample_ids = sample_ids[split:]

train_dataset = PassionfruitDataset(train_sample_ids, sample_path, label_path, train_transform())
valid_dataset = PassionfruitDataset(valid_sample_ids, sample_path, label_path, valid_transform())

train_loader = DataLoader(train_dataset, batch_size=4, shuffle=False, num_workers=2, collate_fn=collate_fn)
valid_loader = DataLoader(train_dataset, batch_size=4, shuffle=False, num_workers=2, collate_fn=collate_fn)

Define the hyperparameters

In [None]:
model.to(device)
params = [p for p in model.parameters() if p.requires_grad]
optimiser = torch.optim.SGD(params, lr=0.001, momentum=0.9, weight_decay=0.0005)
num_epochs = 5

Train the model

In [None]:
update = 0

loss_df = pd.DataFrame(columns=['update', 'loss',])

for epoch in range(num_epochs):
    
    for images, targets, ids in train_loader:
      
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        loss = model(images, targets)

        losses = sum(l for loss in loss.values())
        loss_value = losses.item()

        optimiser.zero_grad()
        losses.backward()
        optimiser.step()

        if update % 1 == 0:
            result = {'update': update, 'loss': loss_value}
            loss_df = loss_df.append(result, ignore_index = True)
            loss_df.to_csv('/content/Results_loss.csv')
        
        elif update == 0:
            result = {'update': update, 'loss': loss_value}
            loss_df = loss_df.append(result, ignore_index = True)
            loss_df.to_csv('/content/Results_loss.csv')

        update += 1

torch.save(model.state_dict(), '/content/fasterrcnn_resnet50_fpn.pth')

Create the data loader for the test data

In [None]:
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=False, pretrained_backbone=False)
num_classes = 4
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
model.load_state_dict(torch.load('/content/drive/MyDrive/fasterrcnn_resnet50_fpn.pth'))
model.eval()
x = model.to(device)

In [None]:
sample_path = '/content/Data/Test_Images'
sample_ids = list( sorted(os.listdir(os.path.join(sample_path))))
try:
  sample_ids.remove('.DS_Store')
except:
  pass

test_dataset = PassionfruitDataset_Test(sample_ids, sample_path, test_transform())

test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False, num_workers=2, drop_last=False, collate_fn=collate_fn)

In [None]:
label_pred_dic = {0:'not', 1:'fruit_healthy', 2:'fruit_woodiness', 3:'fruit_brownspot'}
threshold = 0.5
test_df = pd.DataFrame(columns= ['Image_Id','class','confidence','ymin','xmin','ymax','xmax'])

model.eval()

for images, ids in test_loader:

    images = list(image.to(device) for image in images)
    outputs = model(images)

    print(f'Step: {stop}')
    for i, image in enumerate(images):

        boxes = outputs[i]['boxes'].data.cpu().numpy()
        scores = outputs[i]['scores'].data.cpu().numpy()
        labels = outputs[i]['labels'].data.cpu().numpy()

        boxes = boxes[scores >= threshold].astype(np.int32)
        labels = labels[scores >= threshold]
        
        labels = [label_pred_dic[l] for l in labels]

        scores = scores[scores >= threshold]
        image_id = ids[i]
        
        for j in range(len(boxes)):

            result = {
                'Image_Id': image_id,
                'class': labels[j],
                'confidence': scores[j],
                'ymin': boxes[j, 1], 
                'xmin': boxes[j, 0],
                'ymax': boxes[j, 3],
                'xmax': boxes[j, 2]
            }

            test_df = test_df.append(result, ignore_index = True)
            test_df.to_csv('/content/Results_Test.csv') 

test_df.to_csv('/content/Results_Test.csv')        

In [59]:
def showImageBB(image):
    img = image[0]    
    
    try:
        npimg = img.numpy()
        npimg = np.transpose(npimg, (1, 2, 0))
    except:
        npimg = img
    fig, ax = plt.subplots()
    plt.axis('off')
    ax.imshow(npimg)
    for box in image[1]['boxes']:

        xmin = box[0]
        ymin = box[1]
        width = box[2] - box[0]
        height = box[3] - box[1]


        rect = patches.Rectangle((xmin, ymin), width, height, linewidth=3, edgecolor='r', facecolor='none')
        ax.add_patch(rect)
    plt.show()

In [None]:
sample_path = '/media/marcus/b18c7b58-f10e-4d8d-a377-a2cf5a589ad5/marcus/UNI/Honours/Semester 2/COS 711/Assignments/Ass 3/Data/Test_Images/Test_Images'
label_path = '/media/marcus/b18c7b58-f10e-4d8d-a377-a2cf5a589ad5/marcus/UNI/Honours/Semester 2/COS 711/Assignments/Ass 3/Results/Test.csv'

test_sample_ids = list( sorted(os.listdir(os.path.join(sample_path))))
try:
  test_sample_ids.remove('.DS_Store')
except:
  pass
rand.shuffle(test_sample_ids)

labelled_test_data = PassionfruitDataset(test_sample_ids, sample_path, label_path, None)

for i in range(9):
    showImageBB(labelled_test_data[i])