# Curator Role

In this notebook we put everything together to create the *curator role* of the Jetbot Robot. The main functionalities are the *Paopu* detector, to detect if a player wins, the person counter/detector and the motion detector.

In [1]:
# Import libraries
from jetbot import Camera
from jetbot import bgr8_to_jpeg
from jetbot import ObjectDetector
from jetbot import Robot

import cv2
import imutils
import multiprocessing
import numpy as np
from PIL import Image
import simpleaudio as sa
import time
import traitlets

import torch
import torchvision
import torch.nn.functional as F

from IPython.display import display
import ipywidgets.widgets as widgets

First of all we define all the functions needed for the *Paopu* detection functionality.

In [2]:
LANG = "en"    # ca

mean = 255.0 * np.array([0.485, 0.456, 0.406])
stdev = 255.0 * np.array([0.229, 0.224, 0.225])
normalize = torchvision.transforms.Normalize(mean, stdev)

def preprocess(camera_value, device):
    """
    Preprocess image for paopu detector model.
    
    Arguments:
        camera_value (array): Image from Jetbot Camera.
        device (torch.device): cpu or cuda
    """ 
    global normalize
    x = camera_value
    x = cv2.resize(x, (224, 224))
    x = cv2.cvtColor(x, cv2.COLOR_BGR2RGB)
    x = x.transpose((2, 0, 1))
    x = torch.from_numpy(x).float()
    x = normalize(x)
    x = x.to(device)
    x = x[None, ...]
    
    return x


def play_sound(filename, sync=False):
    """
    Play a wav sound file.
    """
    wave_obj = sa.WaveObject.from_wave_file(filename)
    
    play_obj = wave_obj.play()
    if sync:
        play_obj.wait_done()
        
    return play_obj


def lose_sequence():
    play_obj = play_sound(f"../Music/{LANG}/gameover.wav")
    robot.left(0.2)
    play_obj.wait_done()
    robot.stop()
    
    
def win_sequence():
    play_obj = play_sound(f"../Music/{LANG}/winsound.wav")
    robot.left(0.2)
    play_obj.wait_done()
    play_sound(f"../Music/{LANG}/win.wav")
    robot.stop()

    
def detect_hand(model):
    x = preprocess(camera.value, device)
    y = model(x)
    y = F.softmax(y, dim=1)
    
    prob_hand = float(y.flatten()[1])
    hand_slider.value = prob_hand
    
    if prob_hand > 0.9:
        #time.sleep(0.1)  # add a small sleep to make sure frames have finished processing
        return True
    else:
        return False

    
def curator_role(robot, camera, paopu_detector, person_detector, device, image_widget, video_writer=None, total_players=3, movement_threshold=40):
    
    # Total players list
    players_list = [i+1 for i in range(total_players)]
    
    print("Starting curator role...")
    
    play = True
    
    while play:
        robot.stop()
        x = preprocess(camera.value, device)
        y = paopu_detector(x)
        y = F.softmax(y, dim=1)

        prob_pao = float(y.flatten()[3])
        prob_move = float(y.flatten()[2])
        prob_hand = float(y.flatten()[1])
        prob_environment = float(y.flatten()[0])
        
        pao_slider.value = prob_pao
        move_slider.value = prob_move
        hand_slider.value = prob_hand
        environment_slider.value = prob_environment

        if prob_pao > 0.94:# and prob_move < 0.1:
            
            # Start green melody process
            greenlight_play_obj = play_sound(f"../Music/{LANG}/greenlight.wav")

            # Check for hand during green state ()               
            t0 = time.time()
            elapsed_time = 0
            while elapsed_time < 4:
                win = detect_hand(paopu_detector)
                if win:
                    break
                elapsed_time = time.time() - t0
            
            # If player wins, end game
            if win:
                greenlight_play_obj.stop()
                win_sequence()
                print("You won!\nEnding game...")
                play = False    #break
            
            # Else, move 180º and detect movement
            else:
                robot.right(0.15)
                time.sleep(1.15)   # Move right time
                robot.stop()
                time.sleep(1.0)
                
                # Detect movement
                print('Detecting movement...')
                detections = detect_person_motion(camera, person_detector, image_widget, 3, video_writer=video_writer, threshold=movement_threshold)
                
                if len(detections):                   
                    str_nums = ", ".join([str(i) for i in detections])
                    print(f'Person(s) moving: {str_nums}')
                    
                    # Play sound for each of the players eliminated
                    for n in detections:
                        print(f'\tPlayer {players_list[n]} eliminated!')
                        play_sound(f"../Music/{LANG}/player.wav", True)
                        play_sound(f"../Music/{LANG}/{n}.wav", True)
                        play_sound(f"../Music/{LANG}/eliminated.wav", True)
                    
                    for n in detections:
                        players_list.remove(players_list[n-1])
                                            
                    if not len(players_list):
                        # End game when all players eliminated
                        print("GAME OVER")
                        lose_sequence()
                        play = False
                        break
                    
                # Go back to wall
                robot.right(0.15)
                time.sleep(1.10)
                robot.stop()
                    
                #T = T * 1.1 # increase threshold as players move closer
                    
        # TODO: what happens when player touches wall while robot is still moving (outside green melody)?
            
        else:
            # Move right in order to find Paopu image
            robot.right(0.12)
            
    # Stop robot when game is finished
    robot.stop()

Now we define all the functions needed for the person motion detection functionality.

In [3]:
# Define functions for person motion detection

def detect_people(object_detector, image, conf_thr):
    """
    Detect people on an image and return bounding boxes.

    Arguments:
        object_detector: ObjectDetector model
        image (array): input image
        conf_thr (float): confidence threshold
        
    Returns:
            list: of bounding boxes (left, top, right, bottom)
    """

    person_class = 1

    # Image size
    rows = 300
    cols = 300

    # Make prediction on image
    detections = object_detector(image)

    # Iterate over each detection and save boundig box 
    # if confidence is above threshold and detected class is person
    person_boxes = []

    for detection in detections[0]:
            if detection['confidence'] > conf_thr and detection['label'] == person_class:
                    left = int(detection['bbox'][0] * cols)
                    top = int(detection['bbox'][1]  * rows)
                    right = int(detection['bbox'][2] * cols)
                    bottom = int(detection['bbox'][3]  * rows)
                    person_boxes.append([left, top, right, bottom])

    return sorted(person_boxes)

# Generate a random color palette
COLORS = np.random.uniform(0, 255, size=(15, 3))


def plot_boxes(image, people_boxes, motion_boxes=[]):
    """
    Plot bounding boxes on an image.

    Arguments:
        image (array): input image
        people_boxes (array): array of people bounding boxes with [left, top, right, bottom] positions
        motion_boxes (array): array of motion bounding boxes with [left, top, right, bottom] positions
    """
    # Plot people bounding boxes and corresponding number
    for i in range(len(people_boxes)):
        bbox = people_boxes[i]

        left = bbox[0]
        top = bbox[1]
        right = bbox[2]
        bottom = bbox[3]

        # Plot bounding box
        cv2.rectangle(image, (left, top), (right, bottom), COLORS[i], thickness=2)
        cv2.putText(image, f'{i+1}', (int(left)+5, int(top)+20), cv2.FONT_HERSHEY_PLAIN, 1, (255, 255, 255), 2)

    # Plot motion bounding boxes in green
    for bbox in motion_boxes:
        left = bbox[0]
        top = bbox[1]
        right = bbox[2]
        bottom = bbox[3]

        # Plot bounding box
        cv2.rectangle(image, (left, top), (right, bottom), (0, 255, 0), thickness=1)
        
    return image
 
    
def display_image(image, image_widget, text1="", text2=""):
    """
    Display an image on a Jupyter Widget and optionally puts top/bottom text.
    
    Arguments:
        image (array): image to display.
        img_widget (widgets.Image, optional): widget used to display the image
        text1 (str, optional): Optional 1st text to print on the top of image.
        text2 (str, optional): Optional 2nd text to print on the top of image.
    """
                
    # Add optional text
    if text1:
        cv2.putText(image, text1, (10, 20), cv2.FONT_HERSHEY_PLAIN, 1, (255, 255, 255), 2)
    
    if text2:
        cv2.putText(image, text2, (10, 40), cv2.FONT_HERSHEY_PLAIN, 1, (255, 255, 255), 1)
        
    # Display image
    image_jpeg = bgr8_to_jpeg(image)
    image_widget.value = image_jpeg


def motion_detection(firstFrame, newFrame, threshold=40):
    """
    Detect motion by comparing the newFrame with a firstFrame.
    
    Arguments:
        firstFrame (array): first frame considered as the baseline.
        newFrame (array): new frame.
        threshold (int): movement threshold.
        
    Returns:
        list: of regions where movement has been detected (left, top, right, bottom)
    """
    # Adapted from https://www.pyimagesearch.com/2015/05/25/basic-motion-detection-and-tracking-with-python-and-opencv/

    # Min movement area
    MIN_AREA = 50

    # Gaussian blur kernel
    KERNEL_SIZE = 3

    # List to store regions where movement is detected
    motion_bboxes = []
    
    # firstFrame preprocessing
    firstFrame = cv2.cvtColor(firstFrame, cv2.COLOR_BGR2GRAY)
    firstFrame = cv2.GaussianBlur(firstFrame, (KERNEL_SIZE, KERNEL_SIZE), 0)
    
    # grab the current frame and initialize the static/moving text
    frame = newFrame.copy()

    # convert frame to grayscale, and blur it
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    gray = cv2.GaussianBlur(gray, (KERNEL_SIZE, KERNEL_SIZE), 0)

    # compute the absolute difference between the current frame and
    # first frame
    frameDelta = cv2.absdiff(firstFrame, gray)
    thresh = cv2.threshold(frameDelta, threshold, 255, cv2.THRESH_BINARY)[1]

    # dilate the thresholded image to fill in holes, then find contours
    # on thresholded image
    thresh = cv2.dilate(thresh, None, iterations=2)
    cnts = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)   
    cnts = imutils.grab_contours(cnts)

    # loop over the contours
    for c in cnts:
        # if the contour is too small, ignore it
        if cv2.contourArea(c) < MIN_AREA:
            continue

        # compute the bounding box for the contour, draw it on the frame,
        # and update the text
        (x, y, w, h) = cv2.boundingRect(c)
        cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 1)
        
        motion_bboxes.append((x, y, x+w, y+h))    # (left, top, right, bottom)
    
    return motion_bboxes

def box_in_box(in_box, out_box, margin=0):
    """
    Evaluate if a bounding box is partially inside another one.
    
    Arguments:
        in_box (array): inner bounding box (left, top, right, bottom).
        out_box (array): outter bounding box (left, top, right, bottom).
        margin (int, optional): percentual margin to add to outter box. Defaults to 0.
        
    Returns:
        boolean
    """
    top_left = False
    bottom_right = False
    
    # Apply margin
    out_box[0] = int((1 - margin/100) * out_box[0])
    out_box[1] = int((1 - margin/100) * out_box[1])
    out_box[2] = int((1 + margin/100) * out_box[2])
    out_box[3] = int((1 + margin/100) * out_box[3])
    
    # Top-left corner
    if out_box[0] <= in_box[0] <= out_box[2] and out_box[1] <= in_box[1] <= out_box[3]:
        top_left = True
        
    # Bottom-right corner
    if out_box[0] <= in_box[2] <= out_box[2] and out_box[1] <= in_box[3] <= out_box[3]:
        bottom_right = True
            
    # Return True if top-left and bottom-right corners are inside
    if top_left and bottom_right:
        return True
    
def allow_movement(jetbot_camera, object_detector, image_widget, detection_time, video_writer=None):
    """
    Perform person detection and display results.
    
    Arguments:
        jetbot_camera(Camera): Jetbot camera initialized with size (300, 300).
        object_detector: ObjectDetector model
        image_widget (widgets.Image): Jupyter image widget.
        video_writer (cv2.VideoWriter, optional): OpenCV video writer used to save output.
        
    """
    initial_t = time.time()
    elapsed_time = 0

    while elapsed_time < detection_time:
        # Make people detections
        img = jetbot_camera.value.copy()
        people_detections = detect_people(object_detector, img, 0.1)
        
        # Display image and detections
        image = plot_boxes(img, people_detections)
        display_image(image, image_widget, 'MOVE', f'{int(detection_time - elapsed_time)}s')
        
        # Save video
        if video_writer:
            video_writer.write(image)
            
        elapsed_time =  time.time() - initial_t

    
def detect_person_motion(jetbot_camera, object_detector, image_widget, max_time, threshold=40, video_writer=None):
    """
    Perform person and motion detection in a detection window of max time
    and display results.
    
    Arguments:
        jetbot_camera(Camera): Jetbot camera initialized with size (300, 300).
        max_time (int): Detection window time.
        image_widget (widgets.Image): Jupyter image widget.
        running_process(process): Running process during which the detection is performed.
        threshold (int): movement threshold.
        video_writer (cv2.VideoWriter, optional): OpenCV video writer used to save output.
    """    
    # Get first frame
    first_frame = jetbot_camera.value.copy()

    initial_t = time.time()
    elapsed_time = 0
    t = ""

    # Make people detections
    people_detections = detect_people(object_detector, jetbot_camera.value.copy(), 0.1)
    
    # Run motion detection while process is running
    moving_persons_total = []
    
    while elapsed_time < max_time:
        # Get new frame
        img = jetbot_camera.value.copy()
        
        # Detect motion
        m_bboxes = motion_detection(first_frame, img, threshold)        
        m_bboxes = set(m_bboxes)    # remove duplicates
        
        # Detect which person is moving
        moving_persons = []
        for i in range(len(people_detections)):
            out_box = people_detections[i]                
            person_num = i+1
            
            for in_box in m_bboxes:
                if box_in_box(list(in_box), out_box):
                    if person_num not in moving_persons:
                        moving_persons.append(person_num)
                        
        moving_persons_total.extend(moving_persons)
        
        # Persons moved in this frame
        if len(moving_persons):
            str_nums = ", ".join([str(i) for i in moving_persons])
            t = f'Moving: {str_nums}'
        
        # Display image and detections
        image = plot_boxes(img, people_detections, m_bboxes)
        display_image(image, image_widget, 'STOP', t)
        
        # Save video
        if video_writer:
            video_writer.write(image)
            
        elapsed_time =  time.time() - initial_t

    # Print persons moved in detection window
    moving_persons_total = set(moving_persons_total)
#     if len(moving_persons_total):
#         str_nums = ", ".join([str(i) for i in moving_persons_total])
#         print(f'Person(s) moving: {str_nums}')
        
    return moving_persons_total

Now we use the previously defined functions to create the *curator role*.

In [4]:
# Init camera with same resolution as model input (300x300)
camera = Camera.instance(width=300, height=300)

In [5]:
# Load mobilenet-v2 pretrained on COCO
obj_detector_model = ObjectDetector('ssd_mobilenet_v2_coco.engine')

In [6]:
# Load Paopu detection model
paopu_detector_model = torchvision.models.alexnet(pretrained=False)
paopu_detector_model.classifier[6] = torch.nn.Linear(paopu_detector_model.classifier[6].in_features, 4)
paopu_detector_model.load_state_dict(torch.load('../../../Models/best_model3.pth'))    # Load pretrained weights

device = torch.device('cuda')
paopu_detector_model = paopu_detector_model.to(device)

In [7]:
# Init robot
robot = Robot()

In [8]:
# Widgets
camera_widget = widgets.Image(format='jpeg', width=400, height=400)
image_widget = widgets.Image(format='jpeg', width=400, height=400)

pao_slider = widgets.FloatSlider(description='pao', min=0.0, max=1.0, orientation='vertical')
move_slider = widgets.FloatSlider(description='move', min=0.0, max=1.0, orientation='vertical')
hand_slider = widgets.FloatSlider(description='hand', min=0.0, max=1.0, orientation='vertical')
environment_slider = widgets.FloatSlider(description='environment', min=0.0, max=1.0, orientation='vertical')

camera_link = traitlets.dlink((camera, 'value'), (camera_widget, 'value'), transform=bgr8_to_jpeg)

display(widgets.HBox([camera_widget, image_widget, pao_slider, move_slider, hand_slider, environment_slider]))

HBox(children=(Image(value=b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xdb\x00C…

Now we can call the main curator function ```curator_role``` with the desired parameters.

In [9]:
# Main curator

# Config parameters
N=2
MOVEMENT_THRESHOLD = 45
VIDEO_WRITER = False

# Video Writer
if VIDEO_WRITER:
    cv2_video_writer = cv2.VideoWriter('output.avi',cv2.VideoWriter_fourcc('M','J','P','G'), 10, (300,300))
else:
    cv2_video_writer = None

try:
    curator_role(robot, camera, paopu_detector_model, obj_detector_model, device, image_widget, cv2_video_writer, N, movement_threshold=MOVEMENT_THRESHOLD)
except KeyboardInterrupt:
    robot.stop()

Starting curator role...
Detecting movement...
Detecting movement...
Person(s) moving: 1
	Player 2 eliminated!


### Movement and detection test
The cells belows are used to perform some tests and can be used to determine the duration of the turn action and the value for the movement threshold.

In [10]:
# Test paopu detector
x = preprocess(camera.value.copy(), device)
y = paopu_detector_model(x)
y = F.softmax(y, dim=1)

prob_pao = float(y.flatten()[3])
prob_move = float(y.flatten()[2])
prob_hand = float(y.flatten()[1])
prob_environment = float(y.flatten()[0])

pao_slider.value = prob_pao
move_slider.value = prob_move
hand_slider.value = prob_hand
environment_slider.value = prob_environment

In [11]:
# Test robot 180 turn
robot.right(0.15)
time.sleep(1.05)
robot.stop()
time.sleep(1.0)

In [12]:
robot.stop()

In [13]:
# Test person movement detector
time.sleep(1)

MOVEMENT_THRESHOLD = 50

stop_time = 3
move_time = 6

# Video Writer
cv2_video_writer = cv2.VideoWriter('output.avi',cv2.VideoWriter_fourcc('M','J','P','G'), 10, (300,300))
# cv2_video_writer = None

# Main loop
print('Start')
while True:
    try:
        # MOVE - Allow movement
        allow_movement(camera, obj_detector_model, image_widget, move_time, cv2_video_writer)

        # STOP - Detect movement
        detect_person_motion(camera, obj_detector_model, image_widget, stop_time, video_writer=cv2_video_writer, threshold=MOVEMENT_THRESHOLD)
    
    except KeyboardInterrupt:
        break

print('Finish')        
cv2_video_writer.release()

Start
Finish
