# **Download and Prepare Dataset**

In [None]:
from google.colab import drive
from IPython import display

display.clear_output()

drive.mount('/content/drive')

!7z x /content/drive/MyDrive/Colab\ Notebooks/Data\ Science/Learning/PyTorch/archive.zip

In [None]:
import pandas as pd

dataset = pd.read_csv('/content/data/train_solution_bounding_boxes (1).csv')
dataset.head()

In [None]:
image_unique = dataset.image.unique()
image_unique

array(['vid_4_1000.jpg', 'vid_4_10000.jpg', 'vid_4_10040.jpg',
       'vid_4_10020.jpg', 'vid_4_10060.jpg', 'vid_4_10100.jpg',
       'vid_4_10120.jpg', 'vid_4_10140.jpg', 'vid_4_1020.jpg',
       'vid_4_1040.jpg', 'vid_4_10480.jpg', 'vid_4_10500.jpg',
       'vid_4_10520.jpg', 'vid_4_1060.jpg', 'vid_4_10960.jpg',
       'vid_4_10980.jpg', 'vid_4_11000.jpg', 'vid_4_11020.jpg',
       'vid_4_11240.jpg', 'vid_4_11260.jpg', 'vid_4_11280.jpg',
       'vid_4_11380.jpg', 'vid_4_11400.jpg', 'vid_4_11420.jpg',
       'vid_4_11440.jpg', 'vid_4_11900.jpg', 'vid_4_11880.jpg',
       'vid_4_11920.jpg', 'vid_4_11940.jpg', 'vid_4_11960.jpg',
       'vid_4_11980.jpg', 'vid_4_12000.jpg', 'vid_4_12040.jpg',
       'vid_4_12100.jpg', 'vid_4_12060.jpg', 'vid_4_12080.jpg',
       'vid_4_12120.jpg', 'vid_4_12140.jpg', 'vid_4_12160.jpg',
       'vid_4_12180.jpg', 'vid_4_12200.jpg', 'vid_4_12220.jpg',
       'vid_4_12240.jpg', 'vid_4_12260.jpg', 'vid_4_12280.jpg',
       'vid_4_12300.jpg', 'vid_4_12320.jpg',

In [None]:
import torch
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as T
from PIL import Image
import os

class Car_data(Dataset):
  def __init__(self, data, image_unique, labels):
    self.unique = image_unique
    self.data = data
    self.labels = labels
  def __len__(self):
    return len(self.unique)
  def __getitem__(self, idx):
    image_name = self.unique[idx]
    img = Image.open(os.path.join(self.data, image_name)).convert('RGB')
    boxes = self.labels[(image_name == self.labels['image'])].values[:, 1:].astype('float')
    labels = torch.ones((boxes.shape[0]), dtype = torch.int64)
    target = {}
    target['boxes'] = torch.tensor(boxes)
    target['label'] = labels
    return T.ToTensor()(img), target

In [None]:
def custom_collate(data):
  return data

In [None]:
from sklearn.model_selection import train_test_split

x_train, x_val = train_test_split(image_unique, test_size = 0.2, random_state= 42)
train_data = Car_data('/content/data/training_images', x_train, dataset)
val_data = Car_data('/content/data/training_images', x_val, dataset)
train_loader = DataLoader(train_data, batch_size = 1, shuffle = True, collate_fn = custom_collate)
val_loader = DataLoader(val_data, batch_size = 1, shuffle = True, collate_fn = custom_collate)

# **Build Faster-R-CNN**

In [None]:
from torchvision.models.detection import fasterrcnn_resnet50_fpn

In [None]:
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
model = fasterrcnn_resnet50_fpn(weights='FasterRCNN_ResNet50_FPN_Weights.COCO_V1')
num_classes = 2
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

In [None]:
model

# **Train Model**

In [None]:
!pip install torchmetrics[detection]

In [None]:
from torch import optim
import torch.nn.functional as F
from tqdm import tqdm

optimizer = optim.SGD(model.parameters(), lr = 0.001, momentum = 0.9, weight_decay = 0.0005)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model.to(device)

def train(model, dataloader, epochs, optimizer, val_loader):
  losses = []
  best_loss = 0
  best_ap = 0
  mAps = []
  for epoch in range(epochs):
    model.train()
    epoch_loss = 0
    loop = tqdm(enumerate(dataloader), total = len(dataloader))
    for idx, data in loop:
      imgs = []
      targets = []
      for d in data:
        imgs.append(d[0].to(device))
        targ = {}
        targ['boxes'] = d[1]['boxes'].to(device)
        targ['labels'] = d[1]['label'].to(device)
        targets.append(targ)
      loss_dict = model(imgs, targets)
      loss = sum(v for v in loss_dict.values())
      epoch_loss += loss.cpu().detach().numpy()
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()  
    map = validate(model, val_loader)
    mAps.append(map)
    loop.set_description(f"mAp: {map}")
    loop.set_postfix(loss = epoch_loss)
    losses.append(epoch_loss)
    if map > best_ap:
      best_ap = map
      torch.save({
          "model_state_dict": model.state_dict(),
          "optimizer_state_dict": optimizer.state_dict()
      }, 'best_model.pt')
  return losses, mAps

In [None]:
from torchvision import ops
from torchmetrics.detection import mean_ap

def meanAveragePrecision(predict, target):
  metric = mean_ap.MeanAveragePrecision()
  metric.update(predict, target)
  x = metric.compute()
  mAp = x['map'].numpy()
  return mAp

def validate(model, dataloader):
  model.eval()
  mAp = 0
  with torch.no_grad():
    for data in dataloader:
      target = []
      imgs = []
      for d in data:
        imgs.append(d[0].to(device))
        targ = {}
        targ['boxes'] = d[1]['boxes'].to(device)
        targ['labels'] = d[1]['label'].to(device)
        target.append(targ)
      output = model(imgs)
      mAp += meanAveragePrecision(output, target)
  return mAp / len(dataloader)

In [None]:
loss, mAp = train(model, train_loader, 30, optimizer, val_loader)

# **Inference**

In [None]:
!wget https://app.roboflow.com/ds/s4gd3l9oss?key=pxE4Cs4MS4
!7z x /content/s4gd3l9oss?key=pxE4Cs4MS4

--2023-04-07 20:55:15--  https://app.roboflow.com/ds/s4gd3l9oss?key=pxE4Cs4MS4
Resolving app.roboflow.com (app.roboflow.com)... 151.101.65.195, 151.101.1.195
Connecting to app.roboflow.com (app.roboflow.com)|151.101.65.195|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://storage.googleapis.com/roboflow-platform-exports/ueF6mYObchVdiLSL6TW20tlvMoc2/9gZnTjSTwYamLUdfxx4G/4/yolov8.zip?X-Goog-Algorithm=GOOG4-RSA-SHA256&X-Goog-Credential=481589474394-compute%40developer.gserviceaccount.com%2F20230407%2Fauto%2Fstorage%2Fgoog4_request&X-Goog-Date=20230407T205515Z&X-Goog-Expires=901&X-Goog-SignedHeaders=host&X-Goog-Signature=7d294d9ded89296f5c846160fcfe23051af383338dd65c0f8520567a9ecb05bdf008b4d167a7671d24c0066ebbcc56f6425b0045f3565db2b3432c7e6261387ce02a099487d1e2537870755b254ebefeb6f72aa34d10b37a9384cef224f96b3b7961210fdc3962dc94933f4aa1f92c0c8a0839a819dd46da1f1084f0a87e6b181ee4a4727c4c45e2d6e209e9692d6df1f1e57142fb49880ada24eba3b704ffaef8fcd6eb87a4156683

In [None]:
import glob

for img in glob.glob('/content/valid/images/*.[jp][pn]g'):
  dir_label = img.replace('/images', '/labels').replace('.jpg', '.txt').replace('.png', '.txt')
  with open(dir_label, 'r') as f:
    lines = f.readlines()
    if len(lines) == 0:
      os.remove(dir_label)
      os.remove(img)

In [None]:
import cv2
from google.colab.patches import cv2_imshow
import glob
import numpy as np

def get_bboxes(labels):
  bboxes = []
  with open(labels, 'r') as f:
    lines = f.readlines()
    for line in lines:
      values = line.split(' ')
      if len(values) < 4: break
      boxes = []
      boxes.append(float(values[1]))
      boxes.append(float(values[2])) 
      boxes.append(float(values[3]))
      boxes.append(float(values[4].split('\n')[0]))
      bboxes.append(boxes)
  return bboxes

def desnormalize(bboxes, image):
  image = Image.open(image).convert('RGB')
  width, height = image.size
  for bbox in bboxes:
    xmin = bbox[0] - bbox[2]/2
    ymin = bbox[1] - bbox[3]/2
    xmax = bbox[0] + bbox[2]/2
    ymax = bbox[1] + bbox[3]/2
    bbox[0] = xmin * width
    bbox[1] = ymin * height
    bbox[2] = xmax * width
    bbox[3] = ymax * height
  return bboxes

def show_image(image, bboxes, save = False, dir = None):
  if not os.path.isdir(dir):
    os.mkdir(dir)
  img = np.array(Image.open(image).convert('RGB')).astype('float')
  for bbox in bboxes:
    img = cv2.rectangle(
        img,
        (int(bbox[0]), int(bbox[1])),
        (int(bbox[2]), int(bbox[3])),
        color = (255, 0, 0),
        thickness = 2
    )
  if save:
    if dir is None:
      dir = '/content/show_image'
      if not os.path.isdir(dir):
          os.mkdir(dir)
    name = image.split('/')[-1]
    cv2.imwrite(os.path.join(dir, name), img)
  else:
    cv2.imshow(img)

def inference(model, source, save = False, dir = None):
  model.eval()
  files = glob.glob(source + "/*.[jp][pn]g")
  results = []
  targets = []
  mAps = []
  for file in files:
    image = Image.open(file).convert('RGB')
    transform = T.ToTensor()
    image = transform(image)
    dir_file = file.replace('/images', '/labels').replace('.jpg', '.txt').replace('.png', '.txt')
    target = {}
    bboxes = get_bboxes(dir_file)
    bboxes = desnormalize(bboxes, file)
    label = torch.ones(len(bboxes), dtype = torch.int64).to(device)
    target['labels'] = label
    target['boxes'] = torch.tensor(bboxes).to(device)
    targets.append(target)
    with torch.no_grad():
      output = model([image.to(device)])
    '''result = {}
    result['boxes'] = output[0]['boxes']
    result['labels'] = output[0]['labels']
    result['scores'] = output[0]['scores']
    result_list = []
    target_list = []
    result_list.append(result)
    target_list.append(target)
    mAp = meanAveragePrecision(result_list, target_list)
    mAps.append(mAp)
    results.append(result)
    if save:
      if dir is None:
        dir = '/content/show_image'
        if not os.path.isdir(dir):
          os.mkdir(dir)
      image_name = file.split('/')[-1]
      dir_save = os.path.join(dir, image_name)
      os.mkdir(dir_save)
      show_image(file, result['boxes'], save = True, dir = f'{dir_save}/inference')
      show_image(file, target['boxes'], save = True, dir = f'{dir_save}/ground_truth')
      dir_lbl = dir_save + '/' + image_name.replace('.jpg', '.txt').replace('.png', '.txt')
      with open(dir_lbl, 'a') as f:
        f.writelines(
            'Image: ' + ' ' + image_name + '\n' +
            'mAp: ' + ' ' + str(mAp) + '\n'
            'Bboxes: {' + '\n'
        )
        for box in result['boxes']:
          f.writelines(
              ' ' + str(box[0]) + ' ' + str(box[1]) + ' ' + str(box[2]) + ' ' + str(box[3]) + '\n'
          )
        f.writelines('}')'''
  return results, targets, mAps


In [None]:
checkpoint = torch.load('/content/best_model.pt')
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

In [None]:
source = '/content/valid/images'
results, targets, mAps = inference(model, source, save = True)

In [None]:
print(results)

In [None]:
print(mAps)

[array(0.15148515, dtype=float32), array(0.33168316, dtype=float32), array(0.5, dtype=float32), array(0.3, dtype=float32), array(0.36666667, dtype=float32), array(0.4, dtype=float32), array(0.53168315, dtype=float32), array(0.45049506, dtype=float32), array(0., dtype=float32), array(0.1009901, dtype=float32), array(0.5, dtype=float32), array(0.4, dtype=float32), array(0.3, dtype=float32), array(0.3, dtype=float32), array(0.2, dtype=float32), array(0.5, dtype=float32), array(0.3, dtype=float32), array(0.5, dtype=float32), array(0.1009901, dtype=float32), array(0.48118812, dtype=float32), array(0.8, dtype=float32), array(0.4, dtype=float32), array(0.6, dtype=float32), array(0., dtype=float32), array(0.4, dtype=float32), array(0.5, dtype=float32), array(0.4, dtype=float32), array(0.6, dtype=float32), array(0.2, dtype=float32), array(0.4, dtype=float32), array(0.4, dtype=float32), array(0.5, dtype=float32), array(0.4884489, dtype=float32), array(0.42524752, dtype=float32), array(0.32524753