# Reactive Robot Programming

In this notebook we will develop a library for reactive programming of the Aldebaran Nao robot. We will first let the robot look around for red balls, and make the robot point towards red balls which are within target range.  

First we import the contents of Reactive Extensions (rx), naoqi and the Reactive Extensions for Qi libraries.

In [None]:
from rx import *
from naoqi import *
import datetime
from rx.subjects import Subject
from threading import Thread
import sys, time

Stream = Observable

We will be connecting to a real Nao, for which we need to configure the IP address. Change the line below to match the IP address of your Nao robot.

In [None]:
nao_address = "tcp://192.168.11.162:9559"

We use the Qi framework to create a session for this robot.

In [None]:
session = qi.Session(nao_address)

To debug our code, we can print the last value. An easy way to print a value in Rx is to subscribe a print function to a stream. In Python 2 however we cannot directly use print as a function. We can however import a print function from the future library.

In [None]:
from __future__ import print_function

## Bridging naoqi and Reactive Extensions

The Qi framework provides us with some features for responding to memory events. ReactiveX has its own model for reactive programming. To bridge Qi and Rx, we create a Subject.

Because the subject has to continue monitoring the memory after it has been created, we run it in its own thread.

In [None]:
class QiMemorySubject(Thread):
    def __init__(self, field, session):
        Thread.__init__(self)
        self.session = session
        self.field = field
        self.stopped = False
        self.subject = Subject()
        
    def run(self):
        mem = self.session.service("ALMemory")
        self.subscriber = mem.subscriber(self.field)
        self.signal = self.subscriber.signal.connect(self.subject.on_next)
        while not self.stopped:
            time.sleep(1)
        
    def stop(self):
        self.stopped = True
        self.subscriber.signal.disconnect(self.signal)
        print("Stopping")

## Listening for a red ball

We can now easily create subjects listening to various sensors of the robot, such as red balls and blobs.

In [None]:
class RedBallSubject(QiMemorySubject):
    def __init__(self, session):
        QiMemorySubject.__init__(self, "redBallDetected", session)

Then start a red ball subject which uses the camera of Nao to look for red balls. Because we designed the red ball subject class to extend the Qi memory subject, and Qi memory subject runs in its own thread, we have to start the thread to start listening.

In [None]:
redBallThread = RedBallSubject(session)
redBallThread.daemon = True
redBallThread.start()  # start the thread

Nao should now be listening for red balls. To check if it is actually working, we can sample the subject. Because we do not want to spam our notebook, we set up a special stream which only returns the first red ball seen.

In [None]:
firstRedBall = redBallThread.subject.first()

Let's debug our first red ball stream using print.

In [None]:
firstRedBallSub = firstRedBall.subscribe(lambda red_ball: print("Red ball spotted: " + str(red_ball)))

In robotics, printing values is used mainly for quick testing and debugging. A production program running on an autonomous robot should contain almost no calls to print, as there is no-one controlling the robot who would be interested in reading those prints. 

Before we go on, let's dispose of our temporary print function. We can do this by disposing the subscription we made.

In [None]:
firstRedBallSub.dispose()

## Processing the data

RedBallDetection gives us a list containing the following values:
* Timestamp (seconds, microseconds)
* Ball info (center x, center y, size x, size y, all in radians)
* Camera pose in torso frame
* Camera pose in robot frame (average of both feet)
* Camera id

For convenience we create a class called Ball to hold the ball information. We overwrite the string magic method for pretty printing.

In [None]:
class Ball():
    def __init__(self, x, y, width, height):
        self.x = x
        self.y = y
        self.width = width
        self.height = height
    
    def __str__(self):
        template = "Location of ball: (center x: %s rad, center y: %s rad, size x: %s rad, size y: %s rad)"
        return template % (self.x, self.y, self.width, self.height)

Using a map operator we can get just the information we need, e.g. the ball info. We use our ball class for converting the list to an object. 

In [None]:
ball = redBallThread.subject.map(lambda ballInfo: Ball(*(ballInfo[1])))

We debug the ball stream using a print subscription.

In [None]:
firstBall = ball.first()

firstBallSubscription = firstBall.subscribe(print)

To avoid too much spam, let's dispose of our print function.

In [None]:
firstBallSubscription.dispose()

The ball detector produces a lot of events. For our use case we can instead sample the stream to reduce the amount of events which we need to handle.

In [None]:
ballSamples = ball.sample(500)

To find out the distance between the robot and the ball, we have to do some simple trigonometry. We know that the formula for an angle $\theta$ given the opposite side $A$ and adjacent side $B$ is given by the tangent.

$tan(\theta)=\dfrac{A}{B}$

In our case, we are given the angle $\theta$. Given a $size$ which is half the height or width of the ball, i.e. the radius, we can calculate the $distance$ to the ball by using the tangent function:

$distance = \dfrac{\dfrac{size}{2}}{tan(\theta)} = \dfrac{size}{2*tan(\theta)}$

In Python this is implemented as follows:

In [None]:
from math import tan 

def distance_to_ball(ball_size, angle):
    return ball_size / (2 * tan(angle))

Let's test this for a ball with a size of 4 cm, with a reported angle of 45 degrees. If we fill these values into our formula we get:

$distance = \dfrac{4 cm}{2*tan(45^{\circ})}=2cm$

Note that the Python tangent function by default takes as parameter the angle in radians and not in degrees. Hence instead of using 45 degrees, we use $\pi/4 rad$. 

In [None]:
from math import pi, radians

%matplotlib inline

import matplotlib.pyplot as plt
import numpy as np

plt.plot(range(1,46), [distance_to_ball(5, x) for x in [radians(l) for l in range(1,46)]])

plt.xlabel('Angle (degree)')
plt.ylabel('Distance (cm)')
plt.title('Distance according to angle')
plt.grid(True)
plt.show()

To improve the accuracy of the distance measurement we will use the average of the observed width and height. We write a function called dist2 which performs this for us.

In [3]:
def dist2(width, height): 
    return (1/4.) * (size/tan(width) + size/tan(height))

Then we write a function ball_distance which uses dist2 for calculating the distance to the ball.

In [None]:
def ball_distance(ball): 
    return dist2(ball.width, ball.height)

Using our new distance function, let's create a stream which gives us the distance to the ball.

In [None]:
distance = ballSamples.map(ball_distance)

We can test our stream using print:

In [4]:
firstRedBallDistanceSub = distance.first().subscribe(lambda distance: print("Distance: %s cm" % (distance)))

SyntaxError: invalid syntax (<ipython-input-4-e993d31b90f1>, line 1)

Again, let's dispose of our print function.

In [None]:
firstRedBallDistanceSub.dispose()

## Controlling LEDs based on ball distance
We can now calculate the distance to the ball. We can make the robot react differently depending on this distance. For example, let's change the light color of Nao's eyes based on the distance of the ball.

We use the ALLeds module for controlling the LEDs of the robot.

In [None]:
leds = session.service("ALLeds")
leds.off("FaceLeds")

First we write a function to determine the brightness of the LEDs based on distance:

In [2]:
def brightness(distance):
    if distance > 20:
        return 0
    elif distance > 5:
        return (-(distance-5)/15.) + 1
    else:
        return 1

Then we write a function which sets the LED brightness based on the input.

In [None]:
def led_brightness(intensity):
    leds.setIntensity("RightFaceLedsRed", intensity)
    leds.setIntensity("LeftFaceLedsRed", intensity)

We wrote the above as two separate functions so we can test the intensity function separately from the robot. Let's plot our intensity function.

In [None]:
%matplotlib inline

import matplotlib.pyplot as plt
import numpy as np

plt.plot(range(0,31), [get_intensity(x) for x in range(0,31)])

plt.xlabel('distance (cm)')
plt.ylabel('intensity')
plt.title('LED intensity over distance')
plt.grid(True)
plt.show()

We can now subscribe our brightness function.

In [None]:
ballSamplesSub = ballSamples.subscribe(led_brightness)

We can dispose of our subscription if we don't want our brightness function to remain active.

In [None]:
ballSamplesSub.dispose()

## Tracking the ball
Aldebaran offers a red ball tracker. We will make our own using reactive programming. This tracker gives us more flexibility than the build in tracker, for example we can make the robot transition from looking at the ball to pointing at it. We will still use some functionality of the original tracker.

In [None]:
tracker = session.service("ALTracker")
tracker.registerTarget("RedBall", 0.04)
tracker.track("RedBall")

We initialize the robot in the crouching position and get some sensor readings which we later need to reset the arm after tracking.

In [None]:
posture = session.service("ALRobotPosture")
posture.goToPosture("Crouch", 1.0)

motion = session.service("ALMotion")
sensor_angles = motion.getAngles(["LArm"], True)

We set up a stream for detecting whether balls are close or far using a map operation.

In [None]:
def point_or_track(close):
    if close:
        target = tracker.getTargetPosition(0)
        tracker.pointAt("LArm", target, 0, 0.5)
    else:
        tracker.setMode("Head")
        motion.setAngles(["LArm"], sensor_angles, 0.2)
        
pointRangeBalls = redBallDistanceSamples.map(lambda distance: distance < 20)
pointRangeBallsSub = pointRangeBalls.subscribe(point_or_track)

After testing our behavior we can dispose of the subscription.

In [None]:
pointRangeBallsSub.dispose()

We want to move towards grabbing the ball if it is within reach. Let's try a naive implementation which closes the hand if the ball is very close, and opens it otherwise.

In [None]:
def open_or_close(graspable):
    if graspable:
        motion.closeHand('LHand')
    else:
        motion.openHand('LHand')

graspRangeBalls = redBallDistanceSamples.map(lambda distance: distance < 15)
graspRangeBallsSub = graspRangeBalls.subscribe(open_or_close)

Again, we should dispose after testing.

In [None]:
graspRangeBallsSub.dispose()

### Determining which effector to use

We want to track objects to the left with the left effector and objects to the right with the right effector. To determine which effector to use, we take the sum of the angle of the head and the angle in the camera frame.

In [None]:
class QiPullingMemorySubject(Thread):
    def __init__(self, field, session, pull_rate):
        Thread.__init__(self)
        self.session = session
        self.field = field
        self.pull_rate = pull_rate / 1000.0
        self.stopped = False
        self.subject = Subject()
        
    def run(self):
        mem = self.session.service("ALMemory")
        #latched_data = 0
        while not self.stopped:
            data = mem.getData(self.field)
            
            #if data < latched_data - 0.02 or data > latched_data + 0.02:
            self.subject.on_next(data)
            #    latched_data = data
            time.sleep(self.pull_rate)
        
    def stop(self):
        self.stopped = True
        print("Stopping")
  

In [None]:
class HeadYawSubject(QiPullingMemorySubject):
    def __init__(self, session):
        QiPullingMemorySubject.__init__(self, "Device/SubDeviceList/HeadYaw/Position/Sensor/Value", session, 10)

In [None]:
headYawThread = HeadYawSubject(session)
headYawThread.daemon = True
headYawThread.start()
head_yaw = headYawThread.subject

In [None]:
### Example 2 specific
## Functions
# mapping ball to a pair containing its x angle and distance
angle_dist_helper = lambda ball: (ball.x, dist2(ball.width, ball.height))
# combining head_yaw with robot_angle
combinator = lambda headYaw, ballAngle: (headYaw.value+ballAngle.value, 
                                         headYaw.timestamp-ballAngle.timestamp)
recent = lambda pair: pair[1].total_seconds() < 1
## CEP graph
# distance to head and head & body
head = distance.filter(lambda d: d >= 50)
head_subscription = head.subscribe(head_tracker)
head_body = distance.filter(lambda d: d >= 20 and d < 50)
head_body_subscription = head_body.subscribe(head_body_tracker)
# BallSamples to Camera angle time
cam_angle_dist = ballSamples.map(angle_dist_helper)
cam_angle_dist_filtered = cam_angle_dist.filter(lambda pair: pair[1] < 20)
cam_angle = cam_angle_dist_filtered.map(lambda pair: pair[0])
# product of object's camera angle and head yaw
cam_angle_timestamped = cam_angle.timestamp()
head_yaw_timestamped = head_yaw.timestamp()
robot_angle_timestamped = Stream.combine_latest(head_yaw_timestamped, 
                                                cam_angle_timestamped, 
                                                combinator)
robot_angle = robot_angle_timestamped.filter(recent)
arm = robot_angle.map(lambda a: "LArm" if a[0] > 0 else "RArm")
arm_subscription = arm.subscribe(arm_tracker)

## TDM example

We will now construct an example from the TDM paper. 

We want a robot which reacts to humans and collects objects in an environment. Reacting to the human means that the robot will initially greet the human and verbally respond to touching such as petting on the head. Objects will be small blue pluche balls. The robot collects these balls and bring them to a human, if present.

To construct this behavior we will split it into small reactive behaviors, and then combine these behaviors to realize more advanced behavior. Let's start with finding the blue pluche balls.

In the previous example we programmed the robot to track a red ball. We conveniently reused the red ball tracker which is part of the Naoqi framework. For other color balls the programming is a little bit more complicated, but still manageable.

Before running the code below, make sure you ran the code fragments in the introduction for connecting to the robot and creating QiMemorySubject. You will also need to change the color properties of the object you want to detect. To find out the red, green, blue values for the color, you can use the Nao Monitor application to take a picture of the ball you want to track, and then set the values accordingly.

In [None]:
class BlobDetectionSubject(QiMemorySubject):
    def __init__(self, session):
        QiMemorySubject.__init__(self, "ALTracker/ColorBlobDetected", session)
        
class BlueBlobDetectionSubject(BlobDetectionSubject):
    def __init__(self, session):
        BlobDetectionSubject.__init__(self, session)
        blobDetection = session.service("ALColorBlobDetection")
        blobDetection.setColor(50, 210, 240, 20) # change these values to match the ball color
        blobDetection.setObjectProperties(30, 0.04, "Circle")
        blobDetection.subscribe("BlueBlob", 500, 0)
        
class RedBlobDetectionSubject(BlobDetectionSubject):
    def __init__(self, session):
        BlobDetectionSubject.__init__(self, session)
        blobDetection = session.service("ALColorBlobDetection")
        blobDetection.setColor(50, 194, 23, 50) # change these values to match the ball color
        blobDetection.setObjectProperties(30, 0.04, "Circle")
        blobDetection.subscribe("RedBlob", 500, 0)

Let's instantiate our blue blob detector.

In [None]:
blueBlobDetection = BlueBlobDetectionSubject(session)
blueBlobDetection.daemon = True
blueBlobDetection.start()

As with the red ball example, the robot will not actually start detecting objects until we instantiate a tracker, which is what we do now.

In [None]:
tracker = session.service("ALTracker")
tracker.trackEvent("ALTracker/ColorBlobDetected")

Let's print the first value in the stream to test our blue blob detection. First we select the first message.

In [None]:
firstBlueBlob = blueBlobDetection.subject.first()

Then we subscribe our print function. 

In [None]:
firstBlueBlobSub = firstBlueBlob.subscribe(print)

If it worked well, we can dispose of our print function.

In [None]:
firstBlueBlobSub.dispose()

We want to write a function which compares the objects returned by the blob stream. The format differs from the red ball format.
* Target position in the torso frame, as a Position6D
* Target position in the robot frame, as a Position6D
* Timestamp in seconds and in microseconds
* Effector which should be used for tracking (unused)
* Threshold (in radians) to avoid head oscillation (unused)

A Position6D contains three translations (in meters) and three rotations (in radians). 

We have now written the logic required to detect blue blobs. However we would like this program to work in a world where there are multiple blue blobs, hence we somehow have to split the detected blobs if multiple are found. We can do this using a group by operation. This operation groups the elements in a stream based on a comparator. 

In Python we can overload the equals operator to accomplish this, however we first need to wrap the blob into a class.

In [None]:
from math import sqrt

class Position6D():
    def __init__(self, values, similarity_threshold=0.1):
        self.robot_pos = values[0]
        self.x, self.y, self.z = self.robot_pos[0:3]
        self.similarity_threshold = similarity_threshold
        
    def distance(self, x_p=0, y_p=0, z_p=0):
        sqrdiff = lambda a, b: pow(abs(a-b), 2)
        x, y, z = self.x, self.y, self.z
        return sqrt(sqrdiff(x, x_p) + sqrdiff(y, y_p) + sqrdiff(z, z_p))
        
    def __eq__(self, other):
        x_p, y_p, z_p = other.x, other.y, other.z
        return self.distance({'x': x_p, 'y': y_p, 'z': z_p}) <= self.similarity_threshold

    
class Blob():
    def __init__(self, values):
        self.robot_pos = Position6D(values)
        
    def distance(self):
        return self.robot_pos.distance()
        
    def __eq__(self, other):
        return self.robot_pos == other.robot_pos

In [None]:
threading.active_count()

In [None]:
threading.current_thread()

In [None]:
threading.enumerate()[0].stop()

In [None]:
threading.enumerate()

# Face detection

In [None]:
class FaceDetectionSubject(QiMemorySubject):
    def __init__(self, session):
        QiMemorySubject.__init__(self, "FaceDetected", session)
        faceDetection = session.service("ALFaceDetection")
        faceDetection.subscribe("FaceDetected", 500, 0.0)

In [None]:
faceDetection = FaceDetectionSubject(session)
faceDetection.daemon = True
faceDetection.start()

In [None]:
def time_stamp_dict(data):
    return {
        'seconds': data[0],
        'microseconds': data[1]
    }

def shape_info_dict(data):
    ''' info of the face shape from camera angles in radians '''
    if data != 0:
        return {
            'alpha': data[1],
            'beta': data[2],
            'size_x': data[3],
            'size_y': data[4]
        }
    else:
        return ""

def extra_info_dict(data):
    return {
        'face_id': data[0],
        'score_recognition': data[1],
        'face_label': data[2]
    }

def face_info_dict(data):
    return {
        'shape_info': shape_info_dict(data[0]),
        'extra_info': [extra_info_dict(extra) for extra in data[1:len(data)-1]]
    }

class RecognitionInfo(Enum):
    nothing_new = 0
    one_face = 2
    several_faces = 3
    learn_face_suggestion = 4

def recognition_info_dict(data):
    return {
        'status': RecognitionInfo(data[0]),
        'labels': None if len(data) == 0 or data[0] == 4 else data[1]
        # labels if there are any
    }

def face_data_dict(data):
    return {
        'face_info': [face_info_dict(face) for face in data[0:len(data)-1]],
        'recognition_info': recognition_info_dict(data[len(data)-1]) if data[len(data)-1] != [] else None
    }

def face_detected_dict(data):
    if len(data) >= 2:
        return {
            'time_stamp': time_stamp_dict(data[0]),
            'face_data': face_data_dict(data[1])
        }
    else:
        return None

In [None]:
faceDetectionSamples = faceDetection.subject.sample(1000)

In [None]:
def face_distance(angle_width, angle_height, width=14.9, height=20.05):
    return (distance(width, angle_width) + distance(height, angle_height)) / 2

def detect_face_distance(face, width=14.9, height=20.05):
    if face == None:
        return -1
    shape_info = face['face_data']['face_info'][0]['shape_info']
    return face_distance(shape_info['size_x'], shape_info['size_y'], 13, 18)

In [None]:
blueBlobDetectionSamples = blueBlobDetection.subject.sample(1000)

In [None]:
blueBlobDetectionWithDistance = blueBlobDetectionSamples.map(lambda item: ("blue_blob", 50 * (Blob(item).distance())))

In [None]:
blueBlobDetectionWithDistanceSub = blueBlobDetectionWithDistance.subscribe(print)

In [None]:
blueBlobDetectionWithDistanceSub.dispose()

In [None]:
faceDetectionSamplesMapped = faceDetectionSamples.map(lambda x, y: face_detected_dict(x))

In [None]:
faceDetectionSamplesMappedSub = faceDetectionSamplesMapped.subscribe(print)

In [None]:
faceDetectionSamplesMappedSub.dispose()

In [None]:
faceDetectionSamplesMappedDistance = faceDetectionSamplesMapped.map(lambda face: ("face", detect_face_distance(face)))

In [None]:
faceDetectionSamplesMappedDistanceSub = faceDetectionSamplesMappedDistance.subscribe(print)

In [None]:
faceDetectionSamplesMappedDistanceSub.dispose()

In [None]:
latestFaceAndBall = Observable.combine_latest(faceDetectionSamplesMappedDistance, blueBlobDetectionWithDistance, lambda face, blob: (face, blob))

In [None]:
latestFaceAndBallSub = latestFaceAndBall.subscribe(print)

In [None]:
latestFaceAndBallSub.dispose()

In [None]:
def closest_face_or_blob(face, blob):
    if face[1] == -1:
        return blob
    elif blob[1] == -1:
        return face
    else:
        if face[1] > blob[1]:
            return blob
        else:
            return face

## Paper example

In [None]:
def head_tracker(distance): print("head")
def head_body_tracker(distance): print("head and body")
def arm_tracker(arm): print(arm)