# Dataset S-VED

In [1]:
%cd ~/hdd/Projet-Recherche

/mnt/sullivan/Projet-Recherche


In [2]:
from torch.utils.data import Dataset
import pandas as pd
import random
import numpy as np
import os
import urllib.request
from PIL import Image, ImageDraw

import cv2
import matplotlib.pyplot as plt

import torch
from torch import tensor
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torch.utils.data import DataLoader
from torch import Tensor
from torchvision.transforms import functional as F


from engine import train_one_epoch, evaluate
from torch.utils.tensorboard import SummaryWriter

# Dataset creation

In [3]:
class SVED_Compose(object):

    def __init__(self, transforms):
        self.transforms = transforms

    def __call__(self, image, target):
        for t in self.transforms:
            image, target = t(image, target)
        return image, target


class SVED_ToTensor(object):

    def __call__(self, image, target):
        image = F.to_tensor(image)
        return image, target


class SVED_RandomHorizontalFlip(object):

    def __init__(self, prob=0.5):
        self.prob = prob

    def __call__(self, image, target):
        if random.random() < self.prob:
            height, width = image.shape[-2:]
            image = image.flip(-1)
            bbox = target["boxes"]
            # bbox: xmin, ymin, xmax, ymax
            bbox[:, [0, 2]] = width - bbox[:, [2, 0]]
            target["boxes"] = bbox
        return image, target


In [4]:
class SVED_Dataset(Dataset):

    def __init__(self, files_dir, transforms=None):
        self._transforms = transforms
        self.files_dir = files_dir

        # sorting the images for consistency
        # To get images, the extension of the filename is checked to be jpg
        self.data = pd.read_csv("SVED_RCNN.csv", index_col=[0])
        self.imgs = [image for image in sorted(os.listdir(files_dir))
                        if image[-4:]=='.jpg']
        self.img_mean = [0.485, 0.456, 0.406]
        self.img_std = [0.229, 0.224, 0.225]



        # classes: 0 index is reserved for background
        self.classes = {'_': 0, 'Initial': 1, 'Decoration': 2, 'ContentIllustration': 3, 'PrintersMark': 4}
        self.num_classes = len(self.classes)
        self.data['label'].replace(self.classes, inplace=True)

    def __getitem__(self, idx):

        # img_name = self.imgs[idx]
        img_name = self.imgs[idx]
        image_path = os.path.join(self.files_dir, img_name)

        # annotation file
        data_img = self.data[self.data['file'] == img_name]

        # reading the images and converting them to correct size and color
        img = cv2.imread(image_path)

        # width and heights parameters
        width, height = img.shape[:-1]

        # Preprocessing of the image
        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype(np.float32)
        img_rgb = img_rgb
        img_res = cv2.resize(img_rgb, (width, height), cv2.INTER_AREA)
        # diving by 255
        img_res /= 255.0
        img_res = ((img_res - self.img_mean) / self.img_std).astype(np.float32)

        boxes = []
        labels = []

        # box coordinates for xml files are extracted and corrected for image size given
        for index, row in data_img.iterrows():
            xmin = row['x'] * width
            xmax = (row['x'] + row['width']) * width
            ymin = row['y'] * height
            ymax = (row['y'] + row['height']) * height
            boxes.append([xmin, ymin, xmax, ymax])

            label = row['label']
            labels.append(label)

        # convert boxes into a torch.Tensor
        boxes = torch.as_tensor(boxes, dtype=torch.float32)

        # getting the areas of the boxes
        area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])

        # suppose all instances are not crowd
        iscrowd = torch.zeros((boxes.shape[0],), dtype=torch.int64)

        labels = torch.as_tensor(labels, dtype=torch.int64)


        targets = {}
        targets["boxes"] = boxes
        targets["labels"] = labels
        targets["area"] = area
        targets["iscrowd"] = iscrowd
        targets["image_id"] = torch.tensor([idx])


        if self._transforms:
            img_res, targets = self._transforms(img_res, targets)

        return img_res, targets

    def __len__(self):
        return len(self.imgs)
    
    # @staticmethod
    # def collate_fn(batch):
    #     return tuple(zip(*batch))
    
    # @staticmethod
    # def collate_fn(batch):
    #     images = []
    #     targets = []
    #     for img, target in batch:
    #         images.append(ToTensor()(img))
    #         targets.append(target)
    #     return images, targets



# # check dataset
# dataset = SVED_Dataset("datasets/iamges/train", transforms=SVED_Compose([SVED_ToTensor(), SVED_RandomHorizontalFlip(.5)]))
# print('length of dataset = ', len(dataset), '\n')
#
# # getting the image and target for a test index.  Feel free to change the index.
# img, targets = dataset[0]
# print(img.shape, '\n',targets)
# img, target = dataset[0]
# print(img.shape, '\n',targets)

def collate_fn(batch):
    images = []
    targets = []
    for img, target in batch:
        images.append(img)
        targets.append(target)
    return images, targets

# Training Faster-RCNN

In [5]:
# define the parameters for training
num_epochs = 5000
batch_size = 1
learning_rate = 5e-3
momentum = 0.9
weight_decay = 0.0005 

# learning rate schedule
lr_gamma = 0.33
lr_dec_step_size = 20

data_transform = {
        "train": SVED_Compose([SVED_ToTensor(), SVED_RandomHorizontalFlip(.12)]),
        "val": SVED_Compose([SVED_ToTensor()]),
        "test": SVED_Compose([SVED_ToTensor()])
    }


# create the train dataset and dataloader
train_dataset = SVED_Dataset("datasets/images/train", transforms=data_transform["train"])
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=nw, collate_fn=collate_fn)

# create the val dataset and dataloader
val_dataset = SVED_Dataset("datasets/images/val", transforms=data_transform["val"])
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True, num_workers=nw, collate_fn=collate_fn)

# create the val dataset and dataloader
test_dataset = SVED_Dataset("datasets/images/test", transforms=data_transform["test"])
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True, num_workers=nw, collate_fn=collate_fn)

# load a pre-trained Faster R-CNN model from the torchvision model zoo
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
model.to(device)
model.train()

# replace the pre-trained classifier with a new one that has the correct number of classes
num_classes = 4 + 1  # replace with the number of classes in your dataset
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

# move the model to the device
model.to(device)

# define optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=learning_rate, momentum=momentum, weight_decay=weight_decay)

# learning rate scheduler
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=lr_dec_step_size, gamma= lr_gamma)


# train the model for the specified number of epochs
writer = SummaryWriter()
for epoch in range(num_epochs):
    # training for one epoch
    train_loss = train_one_epoch(model, optimizer, train_loader, device, epoch, print_freq=500)
    # write the train loss to tensorboard
    writer.add_scalar('train_lr', train_loss.meters['lr'].value, epoch)
    writer.add_scalar('train_loss', train_loss.meters['loss'].value, epoch)
    writer.add_scalar('train_loss_classifier', train_loss.meters['loss_classifier'].value, epoch)
    writer.add_scalar('train_loss_box_reg', train_loss.meters['loss_box_reg'].value, epoch)
    writer.add_scalar('train_loss_objectness', train_loss.meters['loss_objectness'].value, epoch)
    writer.add_scalar('train_loss_rpn_box_reg', train_loss.meters['loss_rpn_box_reg'].value, epoch)
    # update the learning rate
    lr_scheduler.step()
    # evaluate on the training dataset
    train_output = evaluate(model, train_loader, device=device).coco_eval['bbox'].stats
    # write the training results to tensorboard
    precision, rappel = train_output[:6], train_output[6:]
    writer.add_scalar('train_precision', np.mean(precision[precision >= 0]), epoch)
    writer.add_scalar('train_rappel', np.mean(rappel[rappel >= 0]), epoch)
    # evaluate on the validation dataset
    val_output = evaluate(model, val_loader, device=device).coco_eval['bbox'].stats
    # write the val results to tensorboard
    precision, rappel = val_output[:6], val_output[6:]
    writer.add_scalar('val_precision', np.mean(precision[precision >= 0]), epoch)
    writer.add_scalar('val_rappel', np.mean(rappel[rappel >= 0]), epoch)

test_output = evaluate(model, test_loader, device=device)['bbox'].stats
precision, rappel = test_output[:6], test_output[6:]
writer.add_scalar('test_precision', np.mean(precision[precision >= 0]), epoch)
writer.add_scalar('test_rappel', np.mean(rappel[rappel >= 0]), epoch)
print("Training completed!")
torch.save(model, 'my_weights/faster_rcnn.pkl')

Using 0 dataloader workers


    Found GPU%d %s which is of cuda capability %d.%d.
    PyTorch no longer supports this GPU because it is too old.
    The minimum cuda capability supported by this library is %d.%d.
    
  return _VF.meshgrid(tensors, **kwargs)  # type: ignore[attr-defined]


Epoch: [0]  [0/5]  eta: 0:00:02  lr: 0.001254  loss: 1.8787 (1.8787)  loss_classifier: 1.5545 (1.5545)  loss_box_reg: 0.0019 (0.0019)  loss_objectness: 0.3124 (0.3124)  loss_rpn_box_reg: 0.0099 (0.0099)  time: 0.5242  data: 0.2002  max mem: 1408
Epoch: [0]  [4/5]  eta: 0:00:00  lr: 0.005000  loss: 0.8966 (1.1626)  loss_classifier: 0.2603 (0.7042)  loss_box_reg: 0.0186 (0.0599)  loss_objectness: 0.3124 (0.3780)  loss_rpn_box_reg: 0.0115 (0.0205)  time: 0.5715  data: 0.2523  max mem: 1792
Epoch: [0] Total time: 0:00:02 (0.5719 s / it)
creating index...
index created!


KeyboardInterrupt: 

In [None]:
def prediction_faster_rcnn(image_path):
    categories = ['_', 'Initial', 'Decoration', 'ContentIllustration', 'PrintersMark']
    #image loading
    model.eval()
    transform = ToTensor()
    try:
        image = Image.open(image_path)
    except:
        urllib.request.urlretrieve(image_path, "image.jpg")
        image = Image.open("image.jpg")

    #prediction
    tensor_image = transform(image).unsqueeze(0).to(device)
    prediction = model(tensor_image)

    #list of boxes and labels
    boxes = prediction[0]['boxes']
    labels = prediction[0]['labels']
    scores = prediction[0]['scores']
    keep = torchvision.ops.nms(boxes, scores, .1)
    print(boxes, labels)

    for (box, label) in zip(boxes[keep], labels[keep]):
        rectangle_box = [(box[0], box[1]), (box[2], box[3])]
        rectangle_label = categories[label.item()]

        img_box = ImageDraw.Draw(image)
        img_box.rectangle(rectangle_box, outline="red")
        img_box.text(rectangle_box[1], text=rectangle_label, fill="red")

    return image

In [None]:
prediction_faster_rcnn('datasets/images/train/1015.jpg')