<a href="https://colab.research.google.com/github/POE-DAMERON/Glie-44/blob/main/Model/Baseline_Implementation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np
from sklearn import datasets
import tensorflow_hub as hub
from PIL import Image, ImageDraw, ImageFont
from os import path, listdir
from pathlib import Path
import cv2 as cv
from google.colab import drive
import sys
import io

%matplotlib inline

'''
  Downloads Data from the VisDrone dataset.
  Input is which dataset to download:
    - 1 is the developper testing dataset
    - 2 is the actual challenge testing dataset
    - 3 is the val dataset
    - Otherwise, the training dataset is extracted
'''

def initialize_training(file = 0):
  !git clone https://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@github.com/POE-DAMERON/Glie-44.git
  drive.mount('/content/drive')

  
  if file == 1:
    !unzip /content/drive/MyDrive/Glie_44/VisDrone2019-MOT-test-dev.zip
  elif file == 2:
    !unzip /content/drive/MyDrive/Glie_44/VisDrone2019-MOT-test-challenge.zip
  elif file == 3:
    !unzip /content/drive/MyDrive/Glie_44/VisDrone2019-MOT-val.zip
  else:
    !unzip /content/drive/MyDrive/Glie_44/VisDrone2019-MOT-train.zip

initialize_training()

Cloning into 'Glie-44'...
remote: Enumerating objects: 2036, done.[K
remote: Counting objects: 100% (467/467), done.[K
remote: Compressing objects: 100% (405/405), done.[K
Receiving objects: 100% (2036/2036), 336.31 MiB | 37.00 MiB/s, done.
remote: Total 2036 (delta 221), reused 138 (delta 48), pack-reused 1569[K
Resolving deltas: 100% (239/239), done.
Checking out files: 100% (1516/1516), done.


**New Model using Pytorch**

Main.py

In [None]:
%%shell

git clone https://github.com/pytorch/vision.git
cd vision
git checkout v0.3.0

cp references/detection/utils.py ../
cp references/detection/transforms.py ../
cp references/detection/coco_eval.py ../
cp references/detection/engine.py ../
cp references/detection/coco_utils.py ../

In [None]:
import torch
from engine import train_one_epoch, evaluate
import utils
import os
import pandas as pd
import transforms as T
from pathlib import Path
import csv
import time

"""
  get_results takes the result of the evaluate function as input
  and returns the results as a String.
"""

def get_results(evaluator):

  # Changes the system's output to a custom buffer

  old_stdout = sys.stdout
  sys.stdout = buffer = io.StringIO()

  # Outputs the result of the evaluation into the buffer

  result = str(evaluator.coco_eval)
  test = evaluator.coco_eval.items()
  for iou_type, coco_eval in test:
    print("IoU metric: {}".format(iou_type))
    try:
      print(coco_eval)
    except:
      pass

  # Switches back to the system's output

  sys.stdout = old_stdout
  return buffer.getvalue()

"""
  Adds the arguments and results of a training session into a csv file
  The inputs are:
  a dictionnary of arguments,
  the results as a String (with get_results for instance),
  is_saved specifies whether the model was saved or not,
  a path to save the model,
  a path to save the records.
"""

def add_to_record(arguments, output, is_saved = False, path_to_saved_model = '', filepath = 'drive/MyDrive/Glie_44/training.csv'):
  with open(filepath, 'a', newline='') as f:
    writer = csv.writer(f)
    writer.writerow([is_saved,
                     path_to_saved_model,
                     arguments['number_of_epochs'],
                     arguments['batch_size'],
                     arguments['optimizer'],
                     arguments['lr'],
                     arguments['weight_decay'],
                     arguments['momentum'],
                     arguments['lr_scheduler_step_size'],
                     arguments['lr_scheduler_gamma'],
                     output])


"""
  Saves the model and adds to a given csv record.
  The inputs are:
  the model,
  the results as a String (with get_results for instance),
  a dictionnary of the arguments,
  a path to save the model,
  a path to save the records.
"""

def save_model(model, evaluator, arguments, path = '', path_to_record = 'drive/MyDrive/Glie_44/training.csv'):
  if path == '' or path == None:
    path = 'drive/MyDrive/Glie_44/Models/model-' + str(int(time.time())) + '.pth'
  torch.save(model,path) 
  add_to_record(arguments = arguments, output = evaluator, is_saved = True, path_to_saved_model = path, filename = path_to_record)

"""
  Loads the model from a specified path.
  Takes the path as an input and returns the loaded model.
"""

def load_model(path):
  model = torch.load(path)
  return model

"""
  Saves the checkpoints in a given text file.
  The inputs are:
  a dictionnary of the arguments,
  a path to save the checkpoints.
"""

def save_checkpoints(dicti, path = 'drive/MyDrive/Glie_44/Checkpoints/checkpoint.txt'):
  with open(path, 'w') as f:
    f.write(json.dumps(dicti))

"""
  Loads the checkpoints from a given text file.
  Takes the path of the text file as input and returns a dictionnary of the
  checkpoints.
"""

def load_checkpoints(path = 'drive/MyDrive/Glie_44/Checkpoints/checkpoint.txt'):
  dicti = {}
  with open(path, 'r') as f:
    dicti = json.loads(f.read())
  return dicti

"""
  Builds the dictionnary of the arguments based on the inputs that are:
  the percentage of data trained on (float),
  the number of epochs (int),
  the batch_size (int),
  the optimizer (sgd or adam usually)
  learning rate (float),
  momentum (float ,only for sgd),
  weight decay (float),
  step size (int),
  gamma (float).

  Returns a dictionnary.
"""
def get_arguments(train_percentage, epochs, batch_size, optimizer,
                            lr, momentum, weight_decay, step_size, gamma):
  
  arguments = {}

  arguments['train_percentage'] = train_percentage
  if (optimizer == 'adam'):
    arguments['optimizer'] = optimizer
    arguments['momentum'] = ''
  else:
    arguments['optimizer'] = 'sgd'
    arguments['momentum'] = momentum
  arguments['lr'] = lr
  arguments['weight_decay'] = weight_decay
  arguments['lr_scheduler_step_size'] = step_size
  arguments['lr_scheduler_gamma'] = gamma
  arguments['number_of_epochs'] = epochs
  arguments['batch_size'] = batch_size

  return arguments

"""
  Trains the model and returns a tuple composed of the model,
  the results (String) and the arguments (dictionnary).

  The path_to_the_saved_model is '' by default. This builds a new model from 
  scratch. path_to_the_saved_model will allow the function to load a custom
  model.
  
  train_percentage is a float between 0 and 1 that determines the ratio of
  trained images of the original dataset. Default value is 0.8.

  test_percentage is a float between 0 and 1 that determines the ratio of
  test images of the original dataset. It must be smaller than 
  (1 - train_percentage).
  It can also be -1 in which cas it will be equal to (1 - train_percentage).
  Default value is -1.

  batch_size is a positive integer. Default value is 2.

  epochs is a positive integer representing the total number of epochs before
  ending the function. Default value is 10.

  cur_epoch is a positive integer to start the training session from a given
  epoch. Default value is 0.

  optimizer is a string between sgd and adam representing the optimizer function
  used. Default value is sgd.

  lr is a positive float representing the learning rate related to the
  optimizer. Default value is 0.005.

  momentum is a float between 0 and 1 only used for the sgd optimizer. Default
  value is 0.9.

  weight_decay is a positive float related to the optimizer.
  Default value is 0.0005.

  step_size is a positive integer related to the learning rate scheduler.
  Default value is 3.

  gamma is a positive float related to the learning rate scheduler.
  Default value is 0.1.

  checkpoints is a positive integer representing the saving rate. If checkpoints
  is 3, the function will save the model and the checkpoints every 3 epochs.
  Default value is -1 which disables the autosave of the model and the
  checkpoints.

  load_checkpoint is a boolean that determines whether to load the checkpoints
  or not. Default value is False.

  output_path_for_model is the path to save the model. Default value is '' which
  saves it on the same file it was loaded from.

  checkpoint_path is the path to save the checkpoints. Default value is the path
  used by the team during the initial training.
"""

def train(path_to_saved_model = '', train_percentage = .8, test_percentage = -1,
          batch_size = 2, epochs = 10, cur_epoch=0, optimizer='sgd',
          lr = 0.005, momentum = 0.9, weight_decay= 0.0005, step_size = 3,
          gamma = 0.1, checkpoints=-1, load_checkpoint=False,
          output_path_for_model = '',
          checkpoint_path = 'drive/MyDrive/Glie_44/Checkpoints/checkpoint.txt',
          preprocessing = Utils.to_tensor()):

  """
    Prepares the variables before running the epochs
  """

  # Creates the argument dictionnary to be returned

  arguments = get_arguments(train_percentage, epochs, batch_size, optimizer,
                            lr, momentum, weight_decay, step_size, gamma)
  
  # Checks for an available GPU, choses the CPU if none is found

  if torch.cuda.is_available():
    device = torch.device('cuda')
  else:
    device = torch.device('cpu')

  # Initializes the total dataset

  X = AllVisDroneVideos(
      Path().absolute().joinpath('VisDrone2019-MOT-train').joinpath("sequences"),
      Path().absolute().joinpath('VisDrone2019-MOT-train').joinpath("annotations"),
      preprocessing)

  # Loads or builds the model, then links the model to the available device

  if str(path_to_saved_model) == '' or str(path_to_saved_model) == None:
    model = build_model()
  else:
    model = load_model(path_to_saved_model)
  model.to(device)

  # Checks for checkpoints to load

  if load_checkpoint:
    checkpoint = load_checkpoints()
    gamma = checkpoint['gamma']
    cur_epoch = checkpoint['cur_epoch']

  # Builds the optimizers and learning rate scheduler required for training

  params = [p for p in model.parameters() if p.requires_grad]

  if (optimizer == 'adam'):
    optim = torch.optim.Adam(params, lr=lr, weight_decay=weight_decay)
  else:
    optim = torch.optim.SGD(params, lr=lr,
                              momentum=momentum, weight_decay=weight_decay)

  lr_scheduler = torch.optim.lr_scheduler.StepLR(optim,
                                                    step_size=step_size,
                                                    gamma=gamma)

  """
    Runs the epochs
  """

  for epoch in range(cur_epoch,epochs):
    
    
    # Shuffles the data every epoch

    random.shuffle(X.imgs)

    # Prepares the training and testing sizes

    train_sz = int(len(X) * train_percentage)
    
    if test_percentage == -1:
        test_sz = len(X) - train_sz
    else:
        test_sz = int(len(X) * min(1,max(0, test_percentage)))

    # Divides the shuffled dataset into training and testing sets

    x_train = AllVisDroneVideos(
        Path().absolute().joinpath('VisDrone2019-MOT-train').joinpath("sequences"),
        Path().absolute().joinpath('VisDrone2019-MOT-train').joinpath("annotations"),
        preprocessing,
        X.imgs[:train_sz])
    x_test = AllVisDroneVideos(
        Path().absolute().joinpath('VisDrone2019-MOT-train').joinpath("sequences"),
        Path().absolute().joinpath('VisDrone2019-MOT-train').joinpath("annotations"),
        preprocessing,
        X.imgs[train_sz:train_sz + test_sz])

    data_loader = torch.utils.data.DataLoader(
        x_train, batch_size=batch_size, shuffle=True, num_workers=2,
        collate_fn=utils.collate_fn)
      
    data_loader_test = torch.utils.data.DataLoader(
        x_test, batch_size=1, shuffle=False, num_workers=2,
        collate_fn=utils.collate_fn)

    train_one_epoch(model, optim, data_loader, device, epoch, print_freq=10)
    print('\n---------\nTRAIN ONE EPOCH FINISHED\n')

    # Updates the learning rate scheduler

    lr_scheduler.step()

    # Calling evaluate outputs the results so no need to print

    results = get_results(evaluate(model, data_loader_test, device=device))

    # Checks if the model and checkpoints should be saved

    if checkpoints != -1 and epoch % checkpoints == 0:
      if output_path_for_model !=  '':
        save_model(model, results, arguments, output_path_for_model)
      else:
        save_model(model, results, arguments, path_to_saved_model)
      save_checkpoints({'cur_epoch': epoch + 1, 'gamma': gamma}, path = checkpoint_path)
      print('\nModel Saved\n')

  return model, results, arguments

In [None]:
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator

def build_model(num_classes = 12):
  model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
  in_features = model.roi_heads.box_predictor.cls_score.in_features
  model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

  return model

In [None]:
"""
  VisDroneVideo represents one video from the VisDrone dataset.
  Images and targets can be accessed in a list-like manner.
"""

class VisDroneVideo(object):

    """
      The inputs are:

      root is the path of the directory with the images composing the video,

      target_path is the path to the file with the video targets (boxes),

      preprocessing is a transform function useful for data augmentation, it is
      advised to work with torchvision's transforms libary,

      imgs is a custom list of image path instead of using the entireity of the
      images composing the directory,

      include_targets is a boolean to include targets or not, in case no
      training is executed.

    """

    def __init__(self, root, target_path, preprocessing = None, imgs = None, include_targets = True):
        self.root = str(root)
        self.preprocessing = preprocessing

        if (imgs != None):
          self.imgs = imgs
        else:
          # Adds the sorted images in the class' attribute
          self.imgs = sorted(listdir(Path(root)), key=lambda x: x.lstrip("_"))

        self._include_targets = include_targets
        if include_targets:
          self.target = target_path


    """
      Retrieves the prepared image and respective targets in a list-like manner.
    """

    def __getitem__(self, idx):

        # Loads image
        img_path = Path(self.root).joinpath(self.imgs[idx])
        img = Image.open(img_path).convert("RGB")

        # Creates the targets' dictionnary if required

        if self._include_targets:
          video_targets = Utils.read_txt_visdrone(self.target)
          image_targets = self.clean_targets(video_targets, idx)
          image_targets["is_crowd"] = 0
          boxes = image_targets[["bbox_left", "bbox_top", "right", "bottom"]]

          boxes = boxes.astype('float32')

          boxes = torch.as_tensor(boxes.values, dtype=torch.float32)
          labels = torch.as_tensor(image_targets.object_category.astype('int64').values, dtype=torch.int64)
          crowd = torch.as_tensor(image_targets.is_crowd.values, dtype=torch.int64)

          image_id = torch.tensor([idx])
          area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])

          target = {}
          target["boxes"] = boxes
          target["labels"] = labels
          target["image_id"] = image_id
          target["area"] = area
          target["iscrowd"] = crowd

          if self.preprocessing is not None:
              img = self.preprocessing(img)

          return img, target
        
        else:
          img_path = Path(self.root).joinpath(self.imgs[idx])
          img = Image.open(img_path).convert("RGB")
          if self.preprocessing is not None:
              img = self.preprocessing(img)

          return [img]

    def __len__(self):
        return len(self.imgs)

    """
      Modifies the data to match the model. Returns the modified targets'
      dictionnary.
    """

    def clean_targets(self, targets, idx):

        targets = self.targetsToDataframe(targets)
        targets = targets[(targets.object_category != "0")]
        targets = targets[(targets.frame_index == str(idx+1))]

        targets.bbox_top = targets.bbox_top.astype('float32')
        targets.bbox_height = targets.bbox_height.astype('float32')
        targets.bbox_left = targets.bbox_left.astype('float32')
        targets.bbox_width = targets.bbox_width.astype('float32')

        targets["bottom"] = targets.bbox_top + targets.bbox_height
        targets["right"] = targets.bbox_left + targets.bbox_width

        return targets

    """
      Returns a dataframe from the given targets.
    """
    
    def targetsToDataframe(self, array):
        columns = [
          "frame_index",
          "target_id",
          "bbox_left",
          "bbox_top",
          "bbox_width",
          "bbox_height",
          "score",
          "object_category",
          "truncation",
          "oclusion"
        ]
        return pd.DataFrame(data=array,columns=columns)

"""
  VisDroneDataset represents all the videos from the VisDrone dataset.
  Each VisDroneVideos can be accessed in a list-like manner.
"""

class VisDroneDataset(object):
    """
      The inputs are:

      root is the path of the directory with the video directories,

      preprocessing is a transform function useful for data augmentation, it is
      advised to work with torchvision's transforms libary,

      include_targets is a boolean to include targets or not, in case no
      training is executed.

    """

    def __init__(self, root, preprocessing = None, include_targets = True):
        self.root = root
        self.preprocessing = preprocessing
        
        self.videos = sorted(listdir(Path(root).joinpath("sequences")),
                             key=lambda x: x.lstrip("_"))
        self._include_targets = include_targets
        if include_targets:
          self.targets = sorted(listdir(Path(root).joinpath("annotations")),
                                key=lambda x: x.lstrip("_"))
    
    def __getitem__(self, idx):
      if self._include_targets:
        return VisDroneVideo(
            Path(self.root).joinpath("sequences").joinpath(self.videos[idx]),
            Path(self.root).joinpath("annotations").joinpath(self.targets[idx]),
            self.preprocessing)
      else:
        return VisDroneVideo(
            Path(self.root).joinpath("sequences").joinpath(self.videos[idx]),
            None,
            self.preprocessing, 
            include_targets=self._include_targets)

    
    def __len__(self):
        return len(self.videos)

    def get_video_path(self,video_index):
      return str(Path(self.root).joinpath("sequences").joinpath(self.videos[video_index]))

    def get_image_path(self, video_index, image_index):
      return str(Path(self[video_index].root).joinpath(self[video_index].imgs[image_index]))

    def get_image(self, video_index, image_index):
      return Image.open(self.get_image_path(video_index, image_index))

"""
  AllVisDroneVideos is a subclass of the VisDroneVideos related to the new 
  architecture of the training dataset composed of only one folder with all the 
  images from each video.
"""

class AllVisDroneVideos(VisDroneVideo):
    """
      The inputs are:

      root is the path of the directory with the images,

      targets_path is the path to the directory with the targets (boxes),

      preprocessing is a transform function useful for data augmentation, it is
      advised to work with torchvision's transforms libary,

      imgs is a custom list of image path instead of using the entireity of the
      images composing the directory.

    """

    def __init__(self,root, targets_path, preprocessing = None, imgs = None):
      super().__init__(root, targets_path, preprocessing, imgs)
      self.targets = sorted(listdir(Path(targets_path)),
                            key=lambda x: x.lstrip("_"))

    def __len__(self):
      return super().__len__()

    """
      Returns the video name corresponding to an image path and the image index.
    """

    def get_video_name(self, image_path):
      return image_path.stem[:-8] + ".txt", int(image_path.stem[-7:]) -1

    """
      Retrieves the prepared image and respective targets in a list-like manner.
    """

    def __getitem__(self, idx):
        img_path = Path(self.root).joinpath(self.imgs[idx])
        img = Image.open(img_path).convert("RGB")

        video_name, image_idx = self.get_video_name(img_path)

        video_targets = Utils.read_txt_visdrone(Path(self.target).joinpath(video_name))
        image_targets = self.clean_targets(video_targets, image_idx)
        image_targets["is_crowd"] = 0
        boxes = image_targets[["bbox_left", "bbox_top", "right", "bottom"]]

        boxes = boxes.astype('float32')

        boxes = torch.as_tensor(boxes.values, dtype=torch.float32)
        labels = torch.as_tensor(
            image_targets.object_category.astype('int64').values,
            dtype=torch.int64)
        crowd = torch.as_tensor(image_targets.is_crowd.values,
                                dtype=torch.int64)

        image_id = torch.tensor([idx])
        area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])

        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        target["image_id"] = image_id
        target["area"] = area
        target["iscrowd"] = crowd

        if self.preprocessing is not None:
            img = self.preprocessing(img)

        return img, target

    """
      Returns the video index from a given image index.
    """

    def get_video_from_idx(self,idx):

      video_lengths = [269, 58, 118, 501, 181, 85, 217, 97, 361, 361,
 516, 1255, 398, 412, 213, 256, 261, 307, 348, 225, 421, 680, 341, 768, 721,
 677, 725, 616, 548, 116, 680, 872, 962, 547, 508, 1424, 500, 210, 346, 556, 
 414, 230, 185, 403, 632, 127, 426, 369, 196, 277, 196, 691, 421, 219, 462,296]
      
      for i in range(len(video_lengths)):
        if idx<video_lengths[i]:
          return i, idx
        else:
          idx -= video_lengths[i]
      return len(video_lengths), idx

Utils.py

In [None]:
class Utils():

  """
    converts a PIL image into a PyTorch Tensor
  """

  @staticmethod
  def to_tensor():
    return torchvision.transforms.Compose([torchvision.transforms.ToTensor()])

  """
    Input: Path of the txt file with annotations as in the VisDrone dataset\n
    Output: A numpy array containing the information for bounding boxes
  """

  @staticmethod
  def read_txt_visdrone(path):
      lines = []
      with open(path) as f:
          lines = f.readlines()
          f.close()
      df = []
      for x in lines:
          splitLine = x.split(",")
          splitLine[-1] = splitLine[-1].split("\n")[0]
          df.append(splitLine)
      return df
  
  """
    Adds boxes to a PIL image and takes as inputs:
    the PIL image,
    an array of boxes,
    a list of classes,
    a list of scores,
    a path to the font to use,
    the precision as a float between 0 and 1.
  """

  @staticmethod
  def add_blocks(image, boxes, classes, scores, font_path, precision):
    draw = ImageDraw.Draw(image)
    width, height = image.size
    for i in range(len(boxes)):
      if scores[i] > precision and classes[i] != 0:
        draw.rectangle(boxes[i], outline = Utils.which_color(classes[i]), width = 3)
        draw.text((boxes[i][0], boxes[i][1]),Utils.box_title(classes[i]), fill=(255,255,255), stroke_fill= (0,0,0,255), stroke_width = 2, font= ImageFont.truetype(font_path, 20))

  """
    Returns the text to add on the screen to identify the class.
  """

  @staticmethod
  def box_title(label_index):
    return str(label_index)

  """
    Returns a tuple representing the RGB colors depending on the class.
  """

  @staticmethod
  def which_color(class_id):
    color_value = int(class_id) * 64
    return (min(color_value, 255), max(min(color_value - 255, 255),0),max(min(color_value - 256 * 2 - 1, 255),0))

  @staticmethod
  def prepare_coords(array):
    return (array[1], array[0], array[3], array[2])