Code Manual: \\
-In order for the whole thing to run, the root folder has to contain 'images', 'labels', and 'videos' folders \\
-Run each section, but interact only with console \\
- Put the root to your Data into "root" variable \\
- Put your Weights & Biases key into key

#Install packages

In [12]:
#mounting a drive
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

#installing yolo5
! git clone https://github.com/ultralytics/yolov5 # clone repo
! pip install -r yolov5/requirements.txt  # install
! cp -r yolov5/utils .
! pip install wandb

#installing packages
from torchvision import transforms as T
import xml.etree.ElementTree as et
import matplotlib.pyplot as plt
from PIL import Image
from PIL import ImageDraw, ImageFont
import os
import cv2
import numpy as np
import shutil as s
import random as r
import torch
import time
import sys
import wandb
from IPython.display import clear_output

clear_output()

#MAIN PART

#Transformation

In [13]:
#In order for a class to run there has to be a folder with two folders called
#images and labels.

class Transform():
  def __init__(self, root):
    self.root = root
    self.img_dir = f'{self.root}/images'
    self.label_dir = f'{self.root}/labels'
    self.img_files = os.listdir(self.img_dir)
    self.label_files = os.listdir(self.label_dir)

  def horizontalFlip(self, img):
    transforms = T.Compose([T.ToTensor(), T.RandomHorizontalFlip(1), T.ToPILImage()])
    new_img = transforms(img)
    return new_img

  def verticalFlip(self, img):
    transforms = T.Compose([T.ToTensor(), T.RandomVerticalFlip(1), T.ToPILImage()])
    new_img = transforms(img)
    return new_img

  def autoContrast(self, img):
    transforms = T.Compose([T.ToTensor(), T.RandomAutocontrast(1), T.ToPILImage()])
    new_img = transforms(img)
    return new_img

  def brightnessChange(self, img):
    transforms = T.Compose([T.ToTensor(), T.ColorJitter(0.6), T.ToPILImage()])
    new_img = transforms(img)
    return new_img

  def gaussianBlur(self, img):
    transforms = T.Compose([T.ToTensor(), T.GaussianBlur(kernel_size=(101), sigma=(10,50)), T.ToPILImage()])
    new_img = transforms(img)
    return new_img

  # image needs to be read as cv2
  def grayscale(self, img):
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    gray = np.array(gray / gray.max() * 255, dtype=np.uint8)
    new_img = Image.fromarray(gray)
    return new_img

  def clearFolders(self):
    for img in self.img_files:
      if '_VFlip' or '_HFlip' or '_contrast' or '_bright' or '_gray' or'_blur' in img:
        os.remove(f'{self.img_dir}/{img}')
    for label in self.label_files:
      if '_VFlip' or '_HFlip' or '_contrast' or '_bright' or '_gray' or'_blur' in label:
        os.remove(f'{self.label_dir}/{label}')

  def saveAndUpdateTransformedImageAndTXT(self, img_name, function, new_img):
    if function == 'HFlip': aspects_ratio = 1
    elif function == 'VFlip': aspects_ratio = 2
    else: aspects_ratio = 0

    # Getting the label
    try:
      label = open(f'{self.label_dir}/{img_name}.txt', 'r')
    except:
      return None

    # Updating the image file
    new_img.save(f'{self.img_dir}/{img_name}_{function}.jpg')

    # Getting all the corresponding boxes
    new_aspects = []
    for line in label:
      elements = line.split()
      aspects = [int(elements[0])] + [float(value) for value in elements[1:]]
      if aspects_ratio != 0:
        aspects[aspects_ratio] = 1 - aspects[aspects_ratio]
      new_aspects.append(aspects)

    # Updating txt file

    new_file = open(f'{self.label_dir}/{img_name}_{function}.txt', 'w')
    for elements in new_aspects:
      new_file.write(f'{elements[0]} {elements[1]} {elements[2]} {elements[3]} {elements[4]}\n')
    new_file.close()
    return None

  def performTransformations(self, doHFlip = False, doVFlip = False, doContrast = False, doBright = False, doGray = False, doBlur = False):
    for i in range(len(self.img_files)):
      # Open the image
      img = Image.open(f'{self.img_dir}/{self.img_files[i]}')
      img_for_grayscale = cv2.imread(f'{self.img_dir}/{self.img_files[i]}')
      img_name = self.img_files[i][:-4]
      if doHFlip == True:
        new_img = self.horizontalFlip(img)
        self.saveAndUpdateTransformedImageAndTXT(img_name, 'HFlip', new_img)
      if doVFlip == True:
        new_img = self.verticalFlip(img)
        self.saveAndUpdateTransformedImageAndTXT(img_name, 'VFlip', new_img)
      if doContrast == True:
        new_img = self.autoContrast(img)
        self.saveAndUpdateTransformedImageAndTXT(img_name, 'contrast', new_img)
      if doBright == True:
        new_img = self.brightnessChange(img)
        self.saveAndUpdateTransformedImageAndTXT(img_name, 'bright', new_img)
      if doGray == True:
        new_img = self.grayscale(img_for_grayscale)
        self.saveAndUpdateTransformedImageAndTXT(img_name, 'gray', new_img)
      if doBlur == True:
        new_img = self.gaussianBlur(img)
        self.saveAndUpdateTransformedImageAndTXT(img_name, 'blur', new_img)

#Training

In [3]:
#In order for a class to run there has to be a folder with two folders called
#images and labels.

class Train():
  def __init__(self, root, number_of_classes, names):
    self.root = root
    self.number_of_classes = number_of_classes
    self.names = names
    self.img_dir = f'{root}/images'
    self.label_dir = f'{root}/labels'
    try:
      self.imgs = os.listdir(self.img_dir)
    except FileNotFoundError:
      self.imgs = []

  def createFolderDrive(self, path):
    try:
      os.mkdir(path)
    except FileExistsError:
      s.rmtree(path, ignore_errors=True)
      os.mkdir(path)

  def yamlFileMountYolo(self):
    yamlFileName = os.path.join(self.root, 'training_data.yaml')
    yamlFile = open(yamlFileName, "w")
    yamlFile.write(f'path: ./\ntrain: train/images/\nval: val/images/\ntest:\nnc: {self.number_of_classes}\nnames: {self.names}')
    yamlFile.close()
    path = f'{self.root}/training_data.yaml'
    ! cp $path '.'

  def getBrokenPaths(self, root_labels, labels):
    root_jpg = root_labels[:-6] + 'images'
    brokenLabelPaths = []
    brokenJPG = []
    for file_name in labels:
      txt_file = open(f'{root_labels}/{file_name}', 'r')
      for count, line in enumerate(txt_file):
        pass
      num_lines = count+1
      if num_lines > self.number_of_classes:
        brokenLabelPaths.append(f'{root_labels}/{file_name}')
        jpg_name = file_name[:-4] + '.jpg'
        print(f'{jpg_name} file has more classes than were printed by the user')
        brokenJPG.append(f'{root_jpg}/{jpg_name}')
    return brokenLabelPaths, brokenJPG

  def DeleteBrokenLabels(self):
    error_threshold = 0.5
    root_labels = f'{self.root}/labels'
    val_labels = f'{self.root}/val/labels'
    train_labels = f'{self.root}/train/labels'
    files = []
    files_val = []
    files_train = []
    brokenLabelPaths = []
    brokenJPGPaths = []
    root_labels_exist = True
    try:
      files = os.listdir(root_labels)
    except FileNotFoundError:
      root_labels_exist = False
      files_val = os.listdir(val_labels)
      files_train = os.listdir(train_labels)

    if root_labels_exist:
      paths = self.getBrokenPaths(root_labels, files)
      brokenLabelPaths.append(paths[0])
      brokenJPGPaths.append(paths[1])
    else:
      paths_val = self.getBrokenPaths(val_labels, files_val)
      paths_train = self.getBrokenPaths(train_labels, files_train)

      brokenLabelPaths = paths_val[0] + paths_train[0]
      brokenJPGPaths = paths_val[1] + paths_train[1]
    if len(brokenLabelPaths) > (len(files) + len(files_val) + len(files_train)) * error_threshold:
      return 'You either have a broken data set or your number of classes is incorrect'
    else:
      for brokenLabel in brokenLabelPaths:
        os.remove(brokenLabel)
      for brokenJPG in brokenJPGPaths:
        os.remove(brokenJPG)
      return 'Deleted unnecessary files'

  def splitValAndTrain(self, split_ratio):
    val_size = int(len(self.imgs) * split_ratio)
    train_size = len(self.imgs) - val_size

    #Making corresponding val and train folders in Drive
    self.createFolderDrive(os.path.join(self.root, 'val'))
    self.createFolderDrive(os.path.join(self.root, 'train'))

    #Populating new directories with folders
    self.createFolderDrive(os.path.join(f'{self.root}/train', 'images'))
    self.createFolderDrive(os.path.join(f'{self.root}/train', 'labels'))
    self.createFolderDrive(os.path.join(f'{self.root}/val', 'images'))
    self.createFolderDrive(os.path.join(f'{self.root}/val', 'labels'))

    destination_train_imgs = f'{self.root}/train/images'
    destination_train_labels = f'{self.root}/train/labels'
    destination_val_imgs = f'{self.root}/val/images'
    destination_val_labels = f'{self.root}/val/labels'

    for i in range(val_size):
      choice = r.choice(self.imgs)
      choice_label = choice[:-3] + 'txt'
      try:
        s.move(f'{self.img_dir}/{choice}', f'{destination_val_imgs}/{choice}')
        s.move(f'{self.label_dir}/{choice_label}', f'{destination_val_labels}/{choice_label}')
        self.imgs.remove(choice)
      except:
        self.imgs.remove(choice)

    for j in range(train_size):
      choice = r.choice(self.imgs)
      choice_label = choice[:-3] + 'txt'
      try:
        s.move(f'{self.img_dir}/{choice}', f'{destination_train_imgs}/{choice}')
        s.move(f'{self.label_dir}/{choice_label}', f'{destination_train_labels}/{choice_label}')
        self.imgs.remove(choice)
      except:
        self.imgs.remove(choice)

    #remove used directories
    s.rmtree(self.img_dir, ignore_errors=True)
    s.rmtree(self.label_dir, ignore_errors=True)

  def TrainModel(self, project_name = 'DF', batch = 32, epochs = 250):
    path_val = f'{self.root}/val'
    path_train = f'{self.root}/train'
    path_train_zip = f'{path_train}.zip'
    path_val_zip = f'{path_val}.zip'

    #zipping and transferring to different machine
    s.make_archive(f'{path_val}', 'zip', f'{self.root}', 'val')
    s.make_archive(f'{path_train}', 'zip', f'{self.root}', 'train')
    ! cp $path_val_zip 'yolov5/'
    ! cp $path_train_zip 'yolov5/'

    #unzipping packages on different machine
    s.unpack_archive(f'{path_train_zip}', 'yolov5/', 'zip')
    s.unpack_archive(f'{path_val_zip}', 'yolov5/', 'zip')

    #training the data
    name = f'v5s_640_{batch}_{epochs}'
    path_yaml = f'{self.root}/training_data.yaml'
    ! python yolov5/train.py --save-period 2 --project $project_name --name $name --img 640 --batch $batch --epochs $epochs --data training_data.yaml --weights yolov5s.pt

    #getting data back
    root_best = f'{project_name}/{name}/weights/best.pt'
    root_last = f'{project_name}/{name}/weights/last.pt'
    self.createFolderDrive(os.path.join(self.root, 'trained'))
    root_train = f'{self.root}/trained'
    ! cp $root_best $root_train
    ! cp $root_last $root_train

#Analysis

In [4]:
def find_best_path(root):
    for root, dirs, files in os.walk(root):
      for name in files:
        if name.find('best') != -1:
          return os.path.join(root, name)

class Analysis():
  def __init__(self, root, score_lim, only_npys, FPS):
    self.root = root
    self.instances = {}
    self.results = []
    self.model = torch.hub.load('ultralytics/yolov5', 'custom', path=find_best_path(root), force_reload = True)
    self.num_of_videos = 0
    self.score_lim = score_lim
    self.scaling_factor = 0.5 # doesn't affect performance at all
    self.only_npys = only_npys
    self.FPS = FPS
    #time constants
    self.time_start = time.perf_counter()
    self.initial_time = time.perf_counter()
    self.progress = 0
    self.video_num = 1
    self.num_of_videos = len(self.locate_video_paths())

  def producePredictions(self, frame):
    self.results = self.model(frame)
    return self.results.xyxy[0].detach().cpu().numpy()

  def createNPYs(self, frame_num, predictions):
    self.instances[frame_num] = predictions
    return None

  def frameFirstTransform(self, frame, scaling_factor):
    frame = cv2.resize(frame, None, fx=scaling_factor, fy=scaling_factor, interpolation=cv2.INTER_AREA)
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    return Image.fromarray(frame)

  def frameSecondTransform(self, frame, scaling_factor):
    frame = cv2.cvtColor(np.asarray(frame), cv2.COLOR_RGB2BGR)
    frame = cv2.resize(frame, None, fx=(1/scaling_factor), fy=(1/scaling_factor), interpolation=cv2.INTER_AREA)
    return frame

  def drawBoxes(self, img, predictions, score_threshold):
    img1 = ImageDraw.Draw(img)
    for boxes in predictions:
      if boxes.tolist() != []:
        color_code = int(boxes[5])
        score = boxes[4]
        box = boxes[:4]

        # I picked random constants for RGB. You might pick different ones
        R, G, B, step = 220, 30, 30, 60

        color = ((color_code*step + R)%255, (color_code*step + G)%255, (color_code*step + B)%255)
        if score > score_threshold:
          img1.rectangle(box, outline = "black", width=1)
          img1.rectangle(box, outline = color, width=2)
    return img


  def analyze_video(self, path):
    self.instances = {}
    self.progress = 0
    video = cv2.VideoCapture(path)
    if video.isOpened() == False:
      return None
    else:
      frame_width = int(video.get(3))
      frame_height = int(video.get(4))
      size = (frame_width, frame_height)
      length = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
    name = f'{path}'
    name = name[:-4]
    if self.only_npys == False:
      result = cv2.VideoWriter(f'{name}_yolo.avi', cv2.VideoWriter_fourcc(*'MJPG'),
                              self.FPS, size)
    frame_num = 0
    t0 = time.perf_counter()
    while video.isOpened():
      self.progress = round(frame_num / length * 100, 1)
      tx = time.perf_counter()
      if tx - t0 > 10:
        print(f'Program is done analyzing video {self.video_num}/{self.num_of_videos} by {self.progress}%')
        t0 = time.perf_counter()
      ret, frame = video.read()
      if ret:
        if self.scaling_factor != 1:
          frame = self.frameFirstTransform(frame, self.scaling_factor)
          predictions = self.producePredictions(frame)
          if self.only_npys == False:
            predicted_frame = self.drawBoxes(frame, predictions, self.score_lim)
            predicted_frame = self.frameSecondTransform(frame, self.scaling_factor)
            result.write(predicted_frame)
            self.createNPYs(frame_num, predictions)
          else:
            self.createNPYs(frame_num, predictions)
        else:
          predictions = self.producePredictions(frame)
          if self.only_npys == False:
            predicted_frame = self.drawBoxes(frame, predictions, self.score_lim)
            result.write(predicted_frame)
            self.createNPYs(frame_num, predictions)
          else:
            self.createNPYs(frame_num, predictions)
        frame_num += 1
      else:
        break

    video.release()
    if self.only_npys == False:
      result.release()
    cv2.destroyAllWindows()

    if self.instances != {}:
      # Save the predictions
      dictionary_path = f'{path}'[:-4]
      video_name = dictionary_path.split('/')[-1]
      path ='/'.join(dictionary_path.split('/')[:-1])
      if path.find('compass') != -1 or path.find('Compass') != -1 or path.find('COMPASS') != -1 or path.find('COMP') != -1 or path.find('comp') != -1:
        video_name += '_compass'
      if os.path.exists(path+'/predictions'):
        np.save(path+'/predictions/'+video_name+'.npy', self.instances)
      else:
        os.mkdir(path+'/predictions')
        np.save(path+'/predictions/'+video_name+'.npy', self.instances)

  def locate_prediction_names(self):
    names = []
    prediction_type = '.npy'
    for root, dirs, files in os.walk(self.root):
      for name in files:
        file_type = name[-4:]
        if file_type == prediction_type:
          name = name.replace('_yolo', '')
          name = name.replace('_predictions', '')
          name = name.replace('_compass', '')
          names.append(name[:-4])
    return names

  def locate_video_paths(self):
    predictions = self.locate_prediction_names()
    paths = []
    file_types = ['.avi', '.AVI', '.mov', '.MOV', '.mp4', '.MP4']
    for root, dirs, files in os.walk(self.root):
      for name in files:
        file_type = name[-4:]
        yolo_check = name[-8:-4]
        if file_type in file_types and yolo_check != 'yolo' and name[:-4] not in predictions:
          paths.append(os.path.join(root, name))
    return paths

  def perform_analysis(self):
    paths = self.locate_video_paths()
    for path in paths:
      self.analyze_video(path)
      self.video_num += 1

Neural Network

In [7]:
class NeuralNetwork():
  def __init__(self, root, key):
    self.root = root
    self.transformations_bools = [False, False, False, False, False, False]
    self.key = key
    self.batches = 0
    self.epochs = 0

  def createFolderDrive(self, path):
    try:
      os.mkdir(path)
    except FileExistsError:
      s.rmtree(path, ignore_errors=True)
      os.mkdir(path)

  def moveObject(self, path_start, path_end):
    try:
      s.move(path_start, path_end)
    except FileExistsError:
      os.remove(path_end)
      s.move(path_start, path_end)

  def RestoreTraining(self, run_path, batches, epochs):
    path_val = f'{self.root}/val'
    path_train = f'{self.root}/train'
    path_train_zip = f'{path_train}.zip'
    path_val_zip = f'{path_val}.zip'

    name = f'v5s_640_{batches}_{epochs}'
    project_name = run_path.split('/')[1]
    path_yaml = f'{self.root}/training_data.yaml'
    ! cp $path_yaml '.'

    ! cp $path_val_zip 'yolov5/'
    ! cp $path_train_zip 'yolov5/'

    s.unpack_archive(f'{path_train_zip}', 'yolov5/', 'zip')
    s.unpack_archive(f'{path_val_zip}', 'yolov5/', 'zip')

    ! wandb artifact get $run_path --root restored_model
    ! python yolov5/train.py --resume restored_model/last.pt --save-period 2

    root_best = f'{project_name}/{name}/weights/best.pt'
    root_last = f'{project_name}/{name}/weights/last.pt'
    self.createFolderDrive(os.path.join(self.root, 'trained'))
    root_train = f'{self.root}/trained'
    ! cp $root_best $root_train
    ! cp $root_last $root_train

  def clean_root(self):
    self.createFolderDrive(os.path.join(f'{self.root}', 'output_videos'))
    self.createFolderDrive(os.path.join(f'{self.root}', 'output_npys'))
    appended_files = []

    for root, dirs, files in os.walk(self.root):
      for name in files:
        avi_yolo_check = name[-8:]
        npy_check = name[-4:]
        if avi_yolo_check == 'yolo.avi' and name not in appended_files:
          self.moveObject(os.path.join(root, name), f'{self.root}/output_videos')
          appended_files.append(name)
        elif npy_check == '.npy' and name not in appended_files:
          self.moveObject(os.path.join(root, name), f'{self.root}/output_npys')
          appended_files.append(name)
    # Add the deletion of everything part
    folders = ['train', 'val', 'videos']
    files = ['train.zip', 'training_data.yaml', 'val.zip']
    for folder in folders:
      s.rmtree(os.path.join(self.root, folder), ignore_errors=True)
    for filE in files:
      os.remove(os.path.join(self.root, filE))

  def transformationsConsole(self):
      booleans = ['Vertical Flip', 'Horizontal Flip', 'Contrast Shift', 'Brightness Shift', 'Grayscale', 'Blur']
      counter = 0
      while counter != len(booleans)-1:
        boolean = booleans[counter]
        print(f'Do you want to perform {boolean}? (+)(-)')
        line = str(input())
        if line == '+':
          self.transformations_bools[counter] = True
          counter += 1
        elif line == '-':
          self.transformations_bools[counter] = False
          counter += 1
      my_transform = Transform(self.root)
      my_transform.performTransformations(doHFlip = self.transformations_bools[0], doVFlip = self.transformations_bools[1],
                  doContrast = self.transformations_bools[2], doBright = self.transformations_bools[3],
                  doGray = self.transformations_bools[4], doBlur = self.transformations_bools[5])
      print('Transforms are succesfully completed!')

  def trainingConsole(self):
    print('Do you want to restore already existing model? (+)(-)')
    restoration_check = str(input())
    if restoration_check == '+':
      print('Copy and paste the path to your last checkpoint from W&B (Go to your project -> Artifacts -> model -> pick the last one -> copy "Full Name" which is a link):')
      run_path = str(input())
      print('How many batches did you use? Type this number:')
      batches = str(input())
      print('How many epochs did you use? Type this number:')
      epochs = str(input())
      self.RestoreTraining(run_path, batches, epochs)
    else:
      print('How many classes of objects do you want to identify?')
      nc = int(input())
      names = []
      for i in range(nc):
        print('Type the name of the class:')
        names.append(str(input()))
      my_train = Train(self.root, nc, names)
      #print(my_train.DeleteBrokenLabels())
      my_train.yamlFileMountYolo()
      print('Have you already splitted val and train? (+)(-)')
      split_bool = str(input())
      if split_bool == '-':
        print('In which ratio do you want to split your data into train and val (val:(train+val))? Input a number between 0 and 1')
        ratio = float(input())
        if ratio > 1 or ratio < 0:
          while ratio > 1 or ratio <0:
            print('Input a correct ratio from 0 to 1')
            ratio = float(input())
        else:
          my_train.splitValAndTrain(ratio)
      print('Your project name is DF, number of batches is 32, and number of epochs is 250. Do you want to change that? (+)(-)')
      if str(input()) == '+':
        print('Your desired project name:')
        project_names = str(input())
        print('Your desired number of batches:')
        self.batches = int(input())
        print('Your desired number of epochs:')
        self.epochs = int(input())
        my_train.TrainModel(project_name = project_names, batch = self.batches, epochs = self.epochs)
      else:
        my_train.TrainModel()
      print('Training is sucessfully completed!')

  def analysisConsole(self):
    print('Do you want to produce NPY files only? (+)(-)')
    NPY_check = str(input())
    if NPY_check == '+':
      npy = True
      score = 0.8
      FPS = 120
    else:
      print('Your score threshold is set to 0.8. Do you want to change that? (+)(-)')
      score_check = str(input())
      if score_check == '+':
        print('What do you want to change it to?')
        score = float(input())
      else:
        score = 0.8
      print('What is the FPS of your video?')
      FPS = int(input())
      npy = False
    my_analysis = Analysis(self.root, score, npy, FPS)
    my_analysis.perform_analysis()

  def mainConsole(self):
    #Transformations
    print('Do you want to perform transformations? (+)(-)')
    transform_check = str(input())
    if transform_check == '+':
      self.transformationsConsole()

    # Training part
    wandb.login(key=self.key)
    print('Do you want to train or restore training of your model? (+)(-)')
    training_check = str(input())
    if training_check == '+':
      self.trainingConsole()

    #Analysis part
    print('Do you want to perform analysis? (+)(-)')
    analysis_check = str(input())
    if analysis_check == '+':
      self.analysisConsole()

    #Delete and restructure part
    print('Do you want to restructure your root folder? (+)(-)')
    restructure_check = str(input())
    if restructure_check == '+':
      self.clean_root()

#Console

In [14]:
+
# Root provides a link to your directory where all your videos are as well as the model
# Also, if you want to perform Analysis from step 0, you have to have 3 folders: images, labels, videos

#Key you will find after creating an account at Weights and Biases.

root = '/content/drive/MyDrive/tongyuan_analysis/training_test'
key = '979abf5c61616bf151cbf82c3c8eca4d535324e9'


neural_network = NeuralNetwork(root, key)

neural_network.mainConsole()

Do you want to perform transformations? (+)(-)
+
Do you want to perform Vertical Flip? (+)(-)
+
Do you want to perform Horizontal Flip? (+)(-)
+
Do you want to perform Contrast Shift? (+)(-)
+
Do you want to perform Brightness Shift? (+)(-)
+
Do you want to perform Grayscale? (+)(-)
+
Error: local variable 'img' referenced before assignment


UnboundLocalError: ignored