# Libraries and Functions

##Libraries

In [None]:
import json
from torchvision.ops import nms
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
import torchvision
import torch
import requests
import os
from tqdm.notebook import tqdm
import cv2
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

# Seed Everything for Reproducibility
SEED = 42


def seed_everything(seed):
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True


seed_everything(SEED)

##Functions

###Functions for Downloading

In [None]:
def Download(name, url):  # found this somewhere on stackoverflow
    """
    Function for downloading datasets (except downloading from Google Drive)
    """
    response = requests.get(url, stream=True)
    total_size_in_bytes = int(response.headers.get('content-length', 0))
    block_size = 1024  # 1 Kibibyte
    progress_bar = tqdm(total=total_size_in_bytes, unit='iB', unit_scale=True)
    with open('/content/zipdata/'+name, 'wb') as file:
        for data in response.iter_content(block_size):
            progress_bar.update(len(data))
            file.write(data)
    progress_bar.close()
    if total_size_in_bytes != 0 and progress_bar.n != total_size_in_bytes:
        print("ERROR, something went wrong")


def download_file_from_google_drive(id, destination):
    '''Function that downloads files from google drive'''
    URL = "https://docs.google.com/uc?export=download"
    session = requests.Session()
    response = session.get(URL, params={'id': id}, stream=True)
    token = get_confirm_token(response)
    if token:
        params = {'id': id, 'confirm': token}
        response = session.get(URL, params=params, stream=True)
    total_size_in_bytes = int(response.headers.get('content-length', 0))
    progress_bar = tqdm(total=total_size_in_bytes, unit='iB', unit_scale=True)
    save_response_content(response, destination, progress_bar)


def get_confirm_token(response):
    for key, value in response.cookies.items():
        if key.startswith('download_warning'):
            return value
    return None


def save_response_content(response, destination, progress_bar):
    CHUNK_SIZE = 32768
    with open(destination, "wb") as f:
        for chunk in response.iter_content(CHUNK_SIZE):
            if chunk:  # filter out keep-alive new chunks
                f.write(chunk)
                progress_bar.update(len(chunk))



###Functions for Image Processing

In [None]:
def filterBoxes(output, nms_th=0.3, score_threshold=0.5):
    ''' Function returns boxes, scores and labels
    for set classes'''
    boxes = output['boxes']
    scores = output['scores']
    labels = output['labels']
    # Non Max Supression
    mask = nms(boxes, scores, nms_th)
    boxes = boxes[mask]
    scores = scores[mask]
    labels = labels[mask]
    boxes = boxes.data.cpu().numpy().astype(np.int32)
    scores = scores.data.cpu().numpy()
    labels = labels.data.cpu().numpy()
    mask = scores >= score_threshold
    boxes = boxes[mask]
    scores = scores[mask]
    labels = labels[mask]
    return boxes, scores, labels


def displayPredictions(image, output, nms_th=0.3, score_threshold=0.5, unpacked=False):
    ''' Displays predictions drawing bboxes, using the filterBoxes function'''

    if not unpacked:
        boxes, scores, labels = filterBoxes(output, nms_th, score_threshold)
    else:
        boxes, scores, labels = output['boxes'], output['scores'], output['labels']
    colors = {1: (0, 255, 0), 2: (255, 255, 0), 3: (255, 0, 0)}
    for box, label in zip(boxes, labels):
        image = cv2.rectangle(image,
                              (box[0], box[1]),
                              (box[2], box[3]),
                              colors[label], 2)
    return image, {'boxes': boxes, 'scores': scores, 'labels': labels}


def preprocessImage(frame):
    '''Preprocess frames of the video '''

    frame = cv2.resize(frame, (512, 512))
    frame_ = torch.as_tensor([frame]).to(device).permute(0, 3, 1, 2)/255.0
    return frame_, frame


def detectTrafficLight(frame, model, nms_th=0.2, score_th=0.5):
    '''Detecting the object in the frame of a video'''

    model.eval()
    frame_, frame = preprocessImage(frame)
    output = model(frame_)[0]
    pred, boxes_info = displayPredictions(frame, output, nms_th, score_th)
    return pred, boxes_info


def write_json(boxes_list, jfile):
    '''Function that writes the necessary json file with predictions'''

    label_names = {1: 'green', 2: 'yellow', 3: 'red'}
    result = {}
    for k in range(len(boxes_list)):
        coords, labels = boxes_list[k]['boxes'], boxes_list[k]['labels']
        inner_result = {}
        for i in range(len(labels)):
            inner_result[i] = {'coords': [int(j) for j in coords[i]], 'state': label_names[labels[i]],
                               'affect': 'True' if (coords[i][[0, 2]].mean() > 180) and (coords[i][[1, 3]].mean() < 256) else 'False'}
        result[k] = inner_result
    jfile.write(json.dumps(result, indent=5))
    return json.dumps(result, indent=5)


#Main algo

##Downloading

In [None]:
#Creating dirs to store datasets
try:
  os.makedirs('/content/video')
  print('Directory "/content/video" is created')
except:
  print('Directory "/content/video" already exists')

Directory "/content/video" already exists


In [None]:
video_drive = '1yQJn-qoZNl53hr7BnrAss4CRARG23-uN'
video_path = '/content/video/test_video_1'
download_file_from_google_drive(video_drive, video_path)

HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))

In [None]:
download_file_from_google_drive('1zrfmFG5lz84of6Ul7e0USBSlIyzdm2ZX', '/content/fasterrcnn_resnet50_fpn.pth')

HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))

##Model configuration

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print('Device:', device)
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
N_CLASS = 4 
INP_FEATURES = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(INP_FEATURES, N_CLASS)
model.to(device)
model.load_state_dict(torch.load(os.path.join('/content/fasterrcnn_resnet50_fpn.pth'), map_location=device))

Device: cuda


<All keys matched successfully>

##Video processing

In [None]:
%%capture

# Initialization
counter, boxes_list = 0, []

# Model parameters
nms_th, score_threshold = 0.2, 0.2

# Opening files
cap_video = cv2.VideoCapture(video_path)
VIDEO_FPS = 30
result_video = cv2.VideoWriter('/content/video/result-1.mp4',
                               cv2.VideoWriter_fourcc(*"FMP4"), VIDEO_FPS, (512, 512), True)
json_file = open('traffic_lights.json', 'w')

# Opening the file for json
# MAIN LOOP

while cap_video.isOpened():
    ret, frame = cap_video.read()
    if not ret:
        break
    # skipping processing step for some frames
    if counter % 4 == 0:
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        # detecting boxes for the frame (using NN)
        pred, boxes = detectTrafficLight(frame, model, nms_th, score_threshold)
        pred = cv2.cvtColor(pred, cv2.COLOR_BGR2RGB)
        result_video.write(pred)
    else:
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        # not detecting boxes, using boxes which were calculated on previous steps
        frame = cv2.resize(frame, (512, 512))
        pred, boxes = displayPredictions(
            frame, boxes, nms_th, score_threshold, unpacked=True)
        pred = cv2.cvtColor(pred, cv2.COLOR_BGR2RGB)
        result_video.write(pred)
    boxes_list.append(boxes)
    counter = counter + 1

# Creating a json file
write_json(boxes_list, json_file)
result_video.release()
cap_video.release()
json_file.close()
