# Install TensorRT and Cuda Using Pip

In [None]:
!pip install nvidia-pyindex
!pip install --upgrade nvidia-tensorrt
!pip install pycuda

Collecting nvidia-pyindex
  Downloading nvidia-pyindex-1.0.9.tar.gz (10 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: nvidia-pyindex
  Building wheel for nvidia-pyindex (setup.py) ... [?25l[?25hdone
  Created wheel for nvidia-pyindex: filename=nvidia_pyindex-1.0.9-py3-none-any.whl size=8418 sha256=47e9f7c8fd77e7738f55e84680c26b0949e8ddfd02dc68ec18cce864c493ce9b
  Stored in directory: /root/.cache/pip/wheels/2c/af/d0/7a12f82cab69f65d51107f48bcd6179e29b9a69a90546332b3
Successfully built nvidia-pyindex
Installing collected packages: nvidia-pyindex
Successfully installed nvidia-pyindex-1.0.9
Collecting nvidia-tensorrt
  Downloading nvidia_tensorrt-99.0.0-py3-none-manylinux_2_17_x86_64.whl (17 kB)
Collecting tensorrt (from nvidia-tensorrt)
  Downloading tensorrt-8.6.1.post1.tar.gz (18 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: tensorrt
  Building wheel for tensorrt (setup.py) ... [?25

# Import Python Modules

In [None]:
import cv2
import torch
import random
import time
import numpy as np
import tensorrt as trt
from collections import OrderedDict,namedtuple

# allows getting of files from google drive
import gdown

# Get Files from Google Drive

In [None]:
# test sources zip file url
test_zip_url = "https://drive.google.com/file/d/1WOQLXYLGmXeSBlMRCkA56UM11m6--ETc/view?usp=sharing"
test_zip_output = "Test Sources.zip"

gdown.download(test_zip_url, test_zip_output, quiet = False, fuzzy = True)

# model file - v100
gpu = "v100"

if gpu == 't4':
  model_url = "https://drive.google.com/file/d/1-8qbCDQmtfOrErRvV274DG7ERnhyfyO9/view?usp=sharing"
  model_output = "yolov7-self-driving-T4.trt"
elif gpu == "v100":
  model_url = "https://drive.google.com/file/d/1OizWs0pjEdCdTQj17-HpA0da7v9CMff0/view?usp=sharing"
  model_output = "yolov7-self-driving-v100.trt"
else:
  print("No A100 model yet")

gdown.download(model_url, model_output, quiet = False, fuzzy = True)

Downloading...
From (original): https://drive.google.com/uc?id=1WOQLXYLGmXeSBlMRCkA56UM11m6--ETc
From (redirected): https://drive.google.com/uc?id=1WOQLXYLGmXeSBlMRCkA56UM11m6--ETc&confirm=t&uuid=81f281df-3f52-40fa-abf3-7324de467193
To: /content/Test Sources.zip
100%|██████████| 2.24G/2.24G [00:21<00:00, 104MB/s]
Downloading...
From: https://drive.google.com/uc?id=1OizWs0pjEdCdTQj17-HpA0da7v9CMff0
To: /content/yolov7-self-driving-v100.trt
100%|██████████| 76.6M/76.6M [00:00<00:00, 109MB/s]


'yolov7-self-driving-v100.trt'

# Unzip uk and rare and difficult Tests Zip

In [None]:
! unzip "/content/Test Sources.zip"

Archive:  /content/Test Sources.zip
   creating: Rare or Difficult Conditions Tests/
   creating: Rare or Difficult Conditions Tests/First Party/
  inflating: Rare or Difficult Conditions Tests/First Party/Test Rare or Difficult Conditions Self Recorded 1.mp4  
  inflating: Rare or Difficult Conditions Tests/First Party/Test Rare or Difficult Conditions Self Recorded 2.mp4  
  inflating: Rare or Difficult Conditions Tests/First Party/Test Rare or Difficult Conditions Self Recorded 3.mp4  
  inflating: Rare or Difficult Conditions Tests/First Party/Test Rare or Difficult Conditions Self Recorded 4.mp4  
  inflating: Rare or Difficult Conditions Tests/First Party/Test Rare or Difficult Conditions Self Recorded Overexposed 1.mp4  
  inflating: Rare or Difficult Conditions Tests/First Party/Test Rare or Difficult Conditions Self Recorded Overexposed 2.mp4  
  inflating: Rare or Difficult Conditions Tests/First Party/Test Rare or Difficult Conditions Self Recorded Overexposed 3.mp4  
  infl

# Constants

In [None]:
# change to relevant path
PATH_TO_MODEL_WEIGHTS = "/content/yolov7-self-driving-v100.trt"

GPU_DEVICE = torch.device("cuda:0")

CLASS_NAMES = ['pedestrian', 'rider', 'car', 'truck', 'bus', 'train', 'motorcycle', 'bicycle', 'traffic light', 'traffic sign']

CLASS_COLOURS = {
    'pedestrian' : (199, 27, 185), # pink
    'rider': (130, 140, 0), # dark green
    'car' : (97, 248, 37), # lime green
    'truck' : (255, 0, 0), # red
    'bus' : (24, 226, 195), # turquoise
    'train' : (255, 127, 117), # salmon
    'motorcycle' : (227, 217, 30), # yellow
    'bicycle' : (113, 10, 187), # purple
    'traffic light' : (28, 45, 199), # blue
    'traffic sign' : (255, 127, 0) # orange
}

# Deserialize TensorRT Engine (Fine-Tuned Model) and Set up Execution
This code will prepare a TensorRT engine for predicting objects location and class by setting up the necessary data structures and execution context

In [None]:
# init the tensor engine
binding = namedtuple('Binding', ('name', 'dtype', 'shape', 'data', 'ptr'))
logger = trt.Logger(trt.Logger.INFO)
trt.init_libnvinfer_plugins(logger, namespace="")

# get model from given path and deserialize it
try:
  with open(PATH_TO_MODEL_WEIGHTS, 'rb') as f, trt.Runtime(logger) as runtime:
      model = runtime.deserialize_cuda_engine(f.read())
except Exception as e:
  print(f'Failed to deserialize the model: {e}')

bindings = OrderedDict()
for index in range(model.num_bindings):
    name = model.get_tensor_name(index)
    dtype = trt.nptype(model.get_tensor_dtype(name))
    shape = tuple(model.get_tensor_shape(name))
    data = torch.from_numpy(np.empty(shape, dtype=np.dtype(dtype))).to(GPU_DEVICE)
    bindings[name] = binding(name, dtype, shape, data, int(data.data_ptr()))

binding_addrs = OrderedDict((n, d.ptr) for n, d in bindings.items())

# allows for the execution of the model on data
context = model.create_execution_context()

# warmup 10 times
for i in range(10):
  temp = torch.randn(1, 3, 640, 640)
  binding_addrs['image'] = int(temp.data_ptr())
  context.execute_v2(list(binding_addrs.values()))

# Code to Resize Input Image to 640*640

In [None]:
def resize_img(img):
  new_img_size = (640, 640)
  padding_colour = (114, 114, 114)
  current_img_size = img.shape[:2]

  # calculate the minimum scale ratio to ge the image to the new size
  scale_ratio = min(new_img_size[0] / current_img_size[0], new_img_size[1] / current_img_size[1])

  # adding padding to maintain the current aspect ratio of the input image for post processing
  new_unpadded = int(round(current_img_size[1] * scale_ratio)), int(round(current_img_size[0] * scale_ratio))
  width_padding = new_img_size[1] - new_unpadded[0]
  height_padding = new_img_size[0] - new_unpadded[1]

  # divide the padding into 2 sides
  width_padding /= 2
  height_padding /=2

  # if the size of the input image and desired size are not the same then resize
  if current_img_size[::-1] != new_unpadded:
    img = cv2.resize(img, new_unpadded, interpolation=cv2.INTER_LINEAR)

  # calculate the required padding for the image
  top = int(round(height_padding - 0.1))
  bottom = int(round(height_padding + 0.1))
  left = int(round(width_padding - 0.1))
  right = int(round(width_padding + 0.1))

  # add padding to the image
  img = cv2.copyMakeBorder(img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=padding_colour)
  return img, scale_ratio, (width_padding, height_padding)

# Postprocessing to Reverse Resizing of Input Image of the Image and Model Predictions

In [None]:
def postprocessing(boxes, scale_ratio, width_height_padding):
  width_height_padding = torch.tensor(width_height_padding * 2).to(boxes.device)
  boxes -= width_height_padding
  boxes /= scale_ratio
  return boxes

# Function to Run the Model on Inputted Image

In [None]:
def run_image_through_model(input_image):
  # run preprocessing on the image
  img_pre = cv2.cvtColor(input_image, cv2.COLOR_BGR2RGB)
  image = img_pre.copy()

  # resize img
  image, scale_ratio, width_height_padding = resize_img(image)
  image = image.transpose((2, 0, 1))
  image = np.expand_dims(image, 0)
  image = np.ascontiguousarray(image)

  img = image.astype(np.float32)
  img =  torch.from_numpy(img).to(GPU_DEVICE)

  # normalise image
  img /= 255

  # run the image through the model
  binding_addrs['images'] = int(img.data_ptr())
  context.execute_v2(list(binding_addrs.values()))

  # run postprocessing on the image
  nums = bindings['num_dets'].data
  boxes = bindings['det_boxes'].data
  scores = bindings['det_scores'].data
  classes = bindings['det_classes'].data
  nums.shape,boxes.shape,scores.shape,classes.shape

  boxes = boxes[0,:nums[0][0]]
  scores = scores[0,:nums[0][0]]
  classes = classes[0,:nums[0][0]]

  for box,score,cl in zip(boxes,scores,classes):
      box = postprocessing(box, scale_ratio, width_height_padding).round().int()
      name = CLASS_NAMES[cl]
      colour = CLASS_COLOURS[name]
      name += ' ' + str(round(float(score),3))
      cv2.rectangle(input_image, box[:2].tolist(), box[2:].tolist(), colour, 2)

      # print text on top of box
      (w, h), _ = cv2.getTextSize(name, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 1)
      cv2.rectangle(input_image, (box[0].tolist(), box[1].tolist() - 20), (box[0].tolist() + w, box[1].tolist()), colour, -1)
      cv2.putText(input_image, name, (int(box[0]), int(box[1]) - 2), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), thickness=1)

  # return predicted frame
  return input_image

# Function to Process a Video File and run through the model

In [None]:
def process_video_file(video_path, output_video_path_and_name):
  video_in = cv2.VideoCapture(video_path)

  # get the first frame from the video
  success, frame = video_in.read()

  # create output video
  fourcc = cv2.VideoWriter_fourcc(*'MP4V')
  width = frame.shape[1]
  height = frame.shape[0]
  fps = video_in.get(cv2.CAP_PROP_FPS)
  video_out = cv2.VideoWriter(output_video_path_and_name + '.mp4', fourcc, fps, (width, height))

  frame_count = 0
  while success:
    # run frame through model
    predicted_frame = run_image_through_model(frame)

    # add frame to output video
    video_out.write(frame)

    # read next frame
    success, frame = video_in.read()

    frame_count += 1

  print(f"{frame_count} Frames Processed for Inputted Video")

  # input video processed, release output video
  video_out.release()

In [None]:
process_video_file("/content/UK Road Tests/UK Road Tests 9.mp4", "/content/test/test")

# Run Performance Tests

In [None]:
# create results folder structure
! mkdir results
! mkdir "results/UK Road Test Output"

! mkdir "results/Rare or Difficult Conditions Tests Output"
! mkdir "results/Rare or Difficult Conditions Tests Output/First Party"
! mkdir "results/Rare or Difficult Conditions Tests Output/Third Party"

In [None]:
import os
import time

print("Processing Videos...")

uk_road_tests_folder = "/content/UK Road Tests"
rare_or_difficult_conditions_video_folder = "/content/Rare or Difficult Conditions Tests"

average_mb_per_second = 1

# process uk_road_tests_folder
# get video files in folder
uk_test_videos = os.listdir(uk_road_tests_folder)

for video in uk_test_videos:
  file_size_mb = os.path.getsize(uk_road_tests_folder + "/" + video) / 1000000
  file_name = video.split('.')[0]

  print(f'Processing File {video}')
  print(f'Estimated Seconds to Complete {(file_size_mb / average_mb_per_second)}')

  start_time = time.time()
  process_video_file(uk_road_tests_folder + "/" + video, file_name)
  end_time = time.time()

  mb_per_second = file_size_mb / (end_time - start_time)

  average_mb_per_second += mb_per_second
  average_mb_per_second /= 2

# process rare or difficult conditions
rare_or_difficult_videos = os.listdir(rare_or_difficult_conditions_video_folder)

for party_type in rare_or_difficult_videos:
  video_files = os.listdir(rare_or_difficult_conditions_video_folder + "/" + party_type)

  for video in video_files:
    file_size_mb = os.path.getsize(rare_or_difficult_conditions_video_folder + "/" + party_type + "/" + video) / 1000000
    file_name = video.split(('.'))[0]

    print(f'Processing File {video}')
    print(f'Estimated Seconds to Complete {(file_size_mb / average_mb_per_second)}')

    start_time = time.time()
    process_video_file(rare_or_difficult_conditions_video_folder + "/" + party_type + "/" + video, file_name)
    end_time = time.time()

    mb_per_second = file_size_mb / (end_time - start_time)

    average_mb_per_second += mb_per_second
    average_mb_per_second /= 2

print("Finished Processing Videos...")

Processing Videos...
Processing File UK Road Tests 1.mp4
Estimated Seconds to Complete 25.382835
UK Road Tests 1
300 Frames Processed for Inputted Video
Processing File UK Road Tests 3.mp4
Estimated Seconds to Complete 123.98115247776875
UK Road Tests 3
2100 Frames Processed for Inputted Video
Processing File UK Road Tests 10.mp4
Estimated Seconds to Complete 29.58211643340668
UK Road Tests 10
600 Frames Processed for Inputted Video
Processing File UK Road Tests 8.mp4
Estimated Seconds to Complete 21.117152340513844
UK Road Tests 8
450 Frames Processed for Inputted Video
Processing File UK Road Tests 5.mp4
Estimated Seconds to Complete 13.828854562927601
UK Road Tests 5
300 Frames Processed for Inputted Video
Processing File UK Road Tests 7.mp4
Estimated Seconds to Complete 25.152506787251625
UK Road Tests 7
600 Frames Processed for Inputted Video
Processing File UK Road Tests 6.mp4
Estimated Seconds to Complete 19.217330875679192
UK Road Tests 6
480 Frames Processed for Inputted Video

# Zip Results Folder

In [None]:
! zip -r "/content/results.zip" "/content/results"

  adding: content/results/ (stored 0%)
  adding: content/results/Rare or Difficult Conditions Tests Output/ (stored 0%)
  adding: content/results/Rare or Difficult Conditions Tests Output/Third Party/ (stored 0%)
  adding: content/results/Rare or Difficult Conditions Tests Output/Third Party/Test Rare or Difficult Conditions Third Party 4.mp4 (deflated 4%)
  adding: content/results/Rare or Difficult Conditions Tests Output/Third Party/Test Rare or Difficult Conditions Third Party 3.mp4 (deflated 8%)
  adding: content/results/Rare or Difficult Conditions Tests Output/Third Party/Test Rare or Difficult Conditions Third Party 6.mp4 (deflated 4%)
  adding: content/results/Rare or Difficult Conditions Tests Output/Third Party/Test Rare or Difficult Conditions Third Party 5.mp4 (deflated 2%)
  adding: content/results/Rare or Difficult Conditions Tests Output/First Party/ (stored 0%)
  adding: content/results/Rare or Difficult Conditions Tests Output/First Party/Test Rare or Difficult Conditi