In [None]:
import os
import sys
import re
import json
import math
import time
import copy
import io
import random
import datetime
import threading
import signal
import traceback
import cProfile

In [None]:
import cv2
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.cm

In [None]:
import tensorflow as tf

In [None]:
gpus = tf.config.list_physical_devices('GPU')

if gpus:
    # Don't allocate huge memory unnecessarily
    tf.config.experimental.set_memory_growth( gpus[0], True)


In [None]:
import panoramasdk

node = panoramasdk.node()

def getMediasFromCamera():
    while True:
        media_list = node.inputs.video_in.get()
        for media_obj in media_list:
            if not media_obj.is_cached:
                return media_list
        time.sleep(0.01)
        continue

def putMediasToHdmi(media_list):
    node.outputs.video_out.put(media_list)


In [None]:
media_list = getMediasFromCamera()

media_list

In [None]:
media_list[0].image.shape, media_list[0].image.dtype

In [None]:
def previewImage( image ):
    
    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    
    plt.figure( figsize = ( 10, 10 ) )
    plt.imshow( image_rgb, interpolation='antialiased' )

In [None]:
previewImage(media_list[0].image)

In [None]:
putMediasToHdmi(media_list[:1])

In [None]:
def mainLoop():
    try:
        while True:
            
            media_list = getMediasFromCamera()

            putMediasToHdmi( media_list[:1] )
            
    except KeyboardInterrupt:
        pass

In [None]:
mainLoop()

**Manual steps:**

1. Download ssd_mobilenet_v2_2.tar.gz from https://tfhub.dev/tensorflow/ssd_mobilenet_v2/2.
1. Upload the file to next to this notebook.
1. Create a directory "./ssd_mobilenet_v2_2"
1. Extract the contents of the model file under "./ssd_mobilenet_v2_2/"

In [None]:
model = tf.saved_model.load("./ssd_mobilenet_v2_2")

model

In [None]:
detector = model.signatures["serving_default"]
detector

In [None]:
def preprocessAndDetect( image ):

    input_resolution = ( 320, 320 )
    
    image = tf.expand_dims( image, axis=0 )

    image = tf.image.resize( image, input_resolution )

    # BGR to RGB
    image = tf.reverse(image, axis=[-1])

    image = tf.cast( image, dtype=tf.uint8 )

    result = detector(image)

    return result
    

In [None]:
detection_result = preprocessAndDetect( media_list[0].image )

detection_result

In [None]:
score_threshold = 0.5
box_color = (255,0,0)
box_thickness = 2

def renderResult( image, detection_result ):
    
    h, w, _ = image.shape

    detection_classes = detection_result["detection_classes"][0].numpy()
    detection_scores = detection_result["detection_scores"][0].numpy()
    detection_boxes = detection_result["detection_boxes"][0].numpy()

    for klass, score, box in zip( detection_classes, detection_scores, detection_boxes ):
        if klass == 1: # person
            if score >= score_threshold:
    
                box_in_camera_space = (
                    int( box[1].item() * w ),
                    int( box[0].item() * h ),
                    int( box[3].item() * w ),
                    int( box[2].item() * h ), 
                )

                cv2.rectangle( 
                    image, 
                    box_in_camera_space[0:2], 
                    box_in_camera_space[2:4], 
                    color = box_color, thickness = box_thickness, lineType=cv2.LINE_8
                )


In [None]:
renderResult( media_list[0].image, detection_result )

In [None]:
previewImage(media_list[0].image)

In [None]:
people_positions_x = []
people_positions_y = []
people_positions_timestamp = []

def trackPeoplePositions( detection_result ):

    global people_positions_x, people_positions_y, people_positions_timestamp

    t_now = time.time()

    #num_detections = float( result["num_detections"][0] )
    detection_classes = detection_result["detection_classes"][0].numpy()
    detection_scores = detection_result["detection_scores"][0].numpy()
    detection_boxes = detection_result["detection_boxes"][0].numpy()

    # add detected positions (bottom-center of boxes)
    for klass, score, box in zip( detection_classes, detection_scores, detection_boxes ):
        if klass == 1: # person
            if score >= 0.5:
                people_positions_x.append( ( box[1] + box[3] ) * 0.5 )
                people_positions_y.append( box[2] )
                people_positions_timestamp.append( t_now )

    # forget old positions
    max_duration = 5 * 60 # 5min
    #max_duration = 60 * 60 # 1hour
    for i, t in enumerate( people_positions_timestamp ):
        if t > t_now-max_duration:
            break

    people_positions_x = people_positions_x[i:]
    people_positions_y = people_positions_y[i:]
    people_positions_timestamp = people_positions_timestamp[i:]

    #print( "Number of data points :", len(people_positions_timestamp) )


In [None]:
trackPeoplePositions( detection_result )

In [None]:
heatmap_resolution = (90,160)
heatmap_sigma = 5

def renderHeatmap():

    fig, ax1 = plt.subplots( nrows = 1, ncols = 1, figsize=( 16, 9 ) )

    img, xedges, yedges = np.histogram2d( people_positions_y, people_positions_x, bins=heatmap_resolution, range=((0,1),(0,1)) )
    
    img = cv2.GaussianBlur( img, (0,0), heatmap_sigma, cv2.BORDER_DEFAULT )

    ax1.axis("off")
    ax1.imshow(img, cmap=matplotlib.cm.jet)

    fig.tight_layout( pad=0 )

    fig.canvas.draw()
    img = np.frombuffer( fig.canvas.tostring_rgb(), dtype=np.uint8 )
    fig_w, fig_h = fig.canvas.get_width_height()
    img = img.reshape( ( fig_h, fig_w, 3 ) )
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

    plt.close(fig)

    return img


In [None]:
heatmap = renderHeatmap()

heatmap.shape, heatmap.dtype

In [None]:
previewImage( heatmap )

In [None]:
def overlayHeatmap( dst_image, heatmap, weight=0.5 ):
    resized_heatmap = cv2.resize( heatmap, ( dst_image.shape[1], dst_image.shape[0] ))
    blended = cv2.addWeighted( dst_image, 1-weight, resized_heatmap, weight, 0.0 )
    dst_image[:,:,:] = blended

In [None]:
overlayHeatmap( media_list[0].image, heatmap, 0.5 )

In [None]:
previewImage( media_list[0].image )

In [None]:
text_color = (255,255,255)
text_shadow_color = (0,0,0)
text_thickness = 2
text_shadow_thickness = 2
text_scale = 2

def renderTitle( image, s ):

    h, w, _ = image.shape
    
    cv2.putText( image, s, (22, 40+2), fontFace=cv2.FONT_HERSHEY_PLAIN, fontScale=text_scale, color=text_shadow_color, thickness=text_shadow_thickness, lineType=cv2.LINE_AA )
    cv2.putText( image, s, (20, 40), fontFace=cv2.FONT_HERSHEY_PLAIN, fontScale=text_scale, color=text_color, thickness=text_thickness, lineType=cv2.LINE_AA )


In [None]:
def mainLoop():
    try:
        heatmap = None
        t_heatmap = 0
        
        while True:
            
            media_list = getMediasFromCamera()
            
            if media_list[0].is_cached:
                time.sleep(0.03)
                continue

            detection_result = preprocessAndDetect( media_list[0].image )
            trackPeoplePositions(detection_result)
            
            if heatmap is None or time.time() - t_heatmap > 5:
                heatmap = renderHeatmap()
                t_heatmap = time.time()
            
            overlayHeatmap( media_list[0].image, heatmap )
            
            renderResult( media_list[0].image, detection_result )
            
            renderTitle( media_list[0].image, "Retail - traffic analysis by heatmap" )
            
            putMediasToHdmi( media_list[:1] )
            
    except KeyboardInterrupt:
        pass

In [None]:
mainLoop()

In [None]:
cProfile.runctx( "mainLoop()", globals(), locals() )