# Walkthrough through TrafficSense

- A simple run of [TrafficSense](https://github.com/Cyber-Machine/TrafficSense).
- It can classify four Indian police hand signals trained on custom augmented dataset and achieves a accuracy of ~86%.

- Detecting poses is achieved via the movenet-thunder model which is lightweight and enables real-time detection.

- Classification is done using a custom layered Neural Network which achieves a accuracy of ~86%.

- Run the following cells to test code.

- Output video of realtime video is generated under `./TrafficSense/output.mp4` which can be downloaded.

In [1]:
# Import dependencies
from IPython.display import display, Javascript, Image , HTML
from google.colab.output import eval_js
from base64 import b64decode, b64encode
import cv2
import numpy as np
import PIL
import io
import html
import time
import os
import sys

Cloning Repo from Github

In [2]:
!git clone https://github.com/Cyber-Machine/TrafficSense

Cloning into 'TrafficSense'...
remote: Enumerating objects: 59, done.[K
remote: Counting objects: 100% (59/59), done.[K
remote: Compressing objects: 100% (46/46), done.[K
remote: Total 59 (delta 12), reused 59 (delta 12), pack-reused 0[K
Unpacking objects: 100% (59/59), done.


Changing directory to the cloned repo

In [3]:
cd TrafficSense

/content/TrafficSense


In [4]:

!python --version

Python 3.8.16


In [5]:
!pip install -r requirements.txt

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting argparse
  Downloading argparse-1.4.0-py2.py3-none-any.whl (23 kB)
Collecting opencv-python~=4.5.3.56
  Downloading opencv_python-4.5.3.56-cp38-cp38-manylinux2014_x86_64.whl (49.9 MB)
[K     |████████████████████████████████| 49.9 MB 109 kB/s 
Collecting tflite-runtime>=2.7.0
  Downloading tflite_runtime-2.11.0-cp38-cp38-manylinux2014_x86_64.whl (2.5 MB)
[K     |████████████████████████████████| 2.5 MB 32.7 MB/s 
Installing collected packages: tflite-runtime, opencv-python, argparse
  Attempting uninstall: opencv-python
    Found existing installation: opencv-python 4.6.0.66
    Uninstalling opencv-python-4.6.0.66:
      Successfully uninstalled opencv-python-4.6.0.66
Successfully installed argparse-1.4.0 opencv-python-4.5.3.56 tflite-runtime-2.11.0


In [6]:
import time
import cv2
import utils
import numpy 
from ml import Classifier
from ml import Movenet

estimation_model = 'movenet_thunder'  
tracker_type = 'bounding_box'  # ['keypoint', 'bounding_box']
classification_model = 'pose_classifier.tflite'
label_file = 'pose_labels.txt'
pose_detector = Movenet(estimation_model)
print("MoveNet Lightning/Thunder model selected.")

MoveNet Lightning/Thunder model selected.


In [7]:
def record_video(filename):
    js=Javascript("""
      async function recordVideo() {
        const options = { mimeType: "video/webm; codecs=vp9" };
        const div = document.createElement('div');
        const capture = document.createElement('button');
        const stopCapture = document.createElement("button");
        
        capture.textContent = "Start Recording";
        capture.style.background = "orange";
        capture.style.color = "white";

        stopCapture.textContent = "Stop Recording";
        stopCapture.style.background = "red";
        stopCapture.style.color = "white";
        div.appendChild(capture);

        const video = document.createElement('video');
        const recordingVid = document.createElement("video");
        video.style.display = 'block';

        const stream = await navigator.mediaDevices.getUserMedia({audio:true, video: true});
      
        let recorder = new MediaRecorder(stream, options);
        document.body.appendChild(div);
        div.appendChild(video);

        video.srcObject = stream;
        video.muted = true;

        await video.play();

        google.colab.output.setIframeHeight(document.documentElement.scrollHeight, true);

        await new Promise((resolve) => {
          capture.onclick = resolve;
        });
        recorder.start();
        capture.replaceWith(stopCapture);

        await new Promise((resolve) => stopCapture.onclick = resolve);
        recorder.stop();
        let recData = await new Promise((resolve) => recorder.ondataavailable = resolve);
        let arrBuff = await recData.data.arrayBuffer();
        
        // stop the stream and remove the video element
        stream.getVideoTracks()[0].stop();
        div.remove();

        let binaryString = "";
        let bytes = new Uint8Array(arrBuff);
        bytes.forEach((byte) => {
          binaryString += String.fromCharCode(byte);
        })
      return btoa(binaryString);
      }
    """)
    try:
      display(js)
      data=eval_js('recordVideo({})')
      binary=b64decode(data)
      with open(filename,"wb") as video_file:
        video_file.write(binary)
      print(f"Finished recording video at:{filename}")
    except Exception as err:
      print(str(err))

In [8]:
def show_video(video_path, video_width = 600):
  
  video_file = open(video_path, "r+b").read()

  video_url = f"data:video/mp4;base64,{b64encode(video_file).decode()}"
  return HTML(f"""<video width={video_width} controls><source src="{video_url}"></video>""")

In [14]:
video_path = "test.mp4"
record_video(video_path)

In [15]:
show_video(video_path)

In [10]:
from google.colab.patches import cv2_imshow

In [11]:
row_size = 20  # pixels
left_margin = 24  # pixels
text_color = (255, 0, 0)  # Blue
font_size = 1
font_thickness = 3
max_detection_results = 2
fps_avg_frame_count = 10

In [12]:
cap = cv2.VideoCapture(video_path)
fourcc = cv2.VideoWriter_fourcc('F','M','P','4')
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
res = (int(width), int(height))
last_image = None
out = cv2.VideoWriter('./output.mp4', fourcc, 20.0, res)

while cap.isOpened():
    success, image = cap.read()
    if image is None :
      print('Done')
      break;
    
    image = cv2.flip(image, 1)

    list_persons = [pose_detector.detect(image)]
    image = utils.visualize(image, list_persons)



    if classification_model:
            classifier = Classifier(classification_model, label_file)
            detection_results_to_show = min(max_detection_results, len(classifier.pose_class_names))
            # Run pose classification.
            prob_list = classifier.classify_pose(list_persons[0])

            scores = []
            # Show classification results on the image.
            for i in range(detection_results_to_show):
                class_name = prob_list[i].label
        
                probability = round(prob_list[i].score, 2)
                scores.append(probability)
            
            class_name = prob_list[numpy.argmax(scores)].label
            cv2.putText(image, class_name, (75,50), cv2.FONT_HERSHEY_DUPLEX, font_size, text_color, font_thickness)
            out.write(image)
            last_image = image
out.release()
cap.release()
cv2.destroyAllWindows()

In [13]:
cv2_imshow(last_image)

The video is saved as `output.mp4` in this directory.