# Headers

In [1]:
%matplotlib widget

import matplotlib as mpl
import matplotlib.pyplot as plt

from matplotlib import animation

import ipywidgets as widgets
from ipywidgets import Button, GridBox, Layout, ButtonStyle, FloatSlider, HBox, VBox

import rospy
from geometry_msgs.msg import Twist
from nav_msgs.msg import Odometry
from rt2_assignment1.msg import SetVel
from rt2_assignment1.srv import Command

from IPython.display import display

import numpy as np



# Classes

## LimList

A subclass of list, used here to generate queues of fixed maximum size, used to store only the n-most recent data used for plotting (eg. odometry or velocities).

In [2]:
class LimList(list):
    
    def __init__(self, max_size, l=[]):
        super().__init__(l)
        self.max_size = max_size
       
    def append(self, data):
        super().append(data)
        if super().__len__() > self.max_size:
            super().pop(0)

## DirectionPad

Class which accounts for the usage of the 4 directional buttons which can be used to directly control the robot.
The robot velocity (twist) depends on the values expressed by the sliders (their references are here passed at the init) and the button pressed: to this aim 5 helper functions are present, correlated to the 4 directional buttons plus the stop on. The class is also responsible for publishing those values on `/cmd_vel`, where they are read by the simulation.

In [3]:
class DirectionPad:
    
    def __init__(self, lin_slider, ang_slider):
        self.pub_vel = rospy.Publisher('/cmd_vel', Twist, queue_size=1)
        self.twist = Twist()
        self.fwd_m = 0.0
        self.ang_m = 0.0
        self.lin_slider = lin_slider
        self.ang_slider = ang_slider
    
    def forward(self):
        self.fwd_m = 1.0
        self.ang_m = 0.0
    def backward(self):
        self.fwd_m = -1.0
        self.ang_m = 0.0
    def turnCounter(self):
        self.fwd_m = 0.0
        self.ang_m = -1.0
    def turnClock(self):
        self.fwd_m = 0.0
        self.ang_m = 1.0
    def stop(self):
        self.fwd_m = 0.0
        self.ang_m = 0.0
        
    def pubTwist(self):
        self.twist.linear.x = self.fwd_m*lin_slider.value
        self.twist.angular.z = self.ang_m*ang_slider.value
        
        self.pub_vel.publish(self.twist)
        
    

# Global parameters

The most useful parameters are all declared here, to easily modify them if needed.

In [4]:
output = widgets.Output()

vel_msg = SetVel() # The message type used on the topic where maximum velocities values are published

# Initial values for the velocities
lin_init = 0.6
ang_init = 0.6
vel_msg.linear = lin_init
vel_msg.angular = ang_init

# Empty fixed-size queues used to store odometry data
rob_odom_max_len_ = 2000
rob_odom_x = LimList(rob_odom_max_len_)
rob_odom_y = LimList(rob_odom_max_len_)

# Fixed-size queues used to store velocities data.
# They're here filled with values in order to correctly
# print them. This values will be replaced as soon as
# real ones arrive.
vel_max_len_ = 200
vel_x_sim = LimList(vel_max_len_, [lin_init]*vel_max_len_)
vel_x_odm = LimList(vel_max_len_, [0]*vel_max_len_)

vel_w_sim = LimList(vel_max_len_, [ang_init]*vel_max_len_)
vel_w_odm = LimList(vel_max_len_, [0]*vel_max_len_)

t = np.linspace(0, 1, vel_max_len_)


# States: 0 (stop), 1 (go_to_point), 2 (directionPad)
from enum import Enum

class Running(Enum):
    STOP = 0
    GTP  = 1
    PAD  = 2
    
running  = Running.STOP
# Publisher on /cmd_vel, client of /user_interface
pub_ = None
ui_client = None


# Widgets

## Sliders

Used to set the maximum velocities used both by the `go_to_point` node and by the direct control implemented here.
As soon as their value changes, it is published on an ad-hoc `/set_vel` topic, which is then read by `go_to_point`.

In [5]:
max_vel_ = 1.6
lin_slider = FloatSlider(description='Linear Velocity', value=lin_init, min=0.1, max=max_vel_, step=0.02)
ang_slider = FloatSlider(description='Angular Velocity', value=ang_init, min=0.1, max=max_vel_, step=0.02)


v1 = VBox([lin_slider, ang_slider])

def lin_vel_change(vel):
    vel_msg.linear = vel["new"]
    pub_.publish(vel_msg)

def ang_vel_change(vel):
    vel_msg.angular = vel["new"]
    pub_.publish(vel_msg)
    
lin_slider.observe(lin_vel_change, names='value')
ang_slider.observe(ang_vel_change, names='value')

## Buttons

### Directional Buttons

All placed in a Gridbox to simulate the keyboard layout.
>NOTE: the unicode values in the description represent an 'arrow icon' each.

In [6]:
up_b  = Button(description='\u2191', #direction='up',
                 layout=Layout(width='auto', grid_area='up'),
                 style=ButtonStyle(button_color='lightgray'))
down_b    = Button(description='\u2193', #direction='down',
                 layout=Layout(width='auto', grid_area='down'),
                 style=ButtonStyle(button_color='lightgray'))
right_b = Button(description='\u2192', #direction='right',
                 layout=Layout(width='auto', grid_area='right'),
                 style=ButtonStyle(button_color='lightgray'))
left_b  = Button(description='\u2190', #direction='left',
                 layout=Layout(width='auto', grid_area='left'),
                 style=ButtonStyle(button_color='lightgray'))

gb = GridBox(children=[up_b, down_b, right_b, left_b],
        layout=Layout(
            width='100%',
            grid_template_rows='auto auto',
            grid_template_columns='33% 33% 33%',
            grid_template_areas='''
            ". up ."
            "left down right "
            ''')
       )

### Start&Stop

Used to *Start* the `goToPoint` behavior and *Stop* the robot movement.
Placed in a HBox to be side by side, then all the Boxes (sliders, directional and start/stop) in another Gridbox for layout purposes.

In [7]:
start_b  = Button(description='START',
                 layout=Layout(width='100%', grid_area='start'),
                 style=ButtonStyle(button_color='lightgray'))
stop_b  = Button(description='STOP',
                 layout=Layout(width='100%', grid_area='stop'),
                 style=ButtonStyle(button_color='lightgray'))

h1 = HBox([start_b, stop_b])

g_all = GridBox(children=[v1, h1, gb],
        layout=Layout(
            width='100%',
            grid_template_rows='auto auto',
            grid_template_columns='33% 40% 27%',
            grid_template_areas='''
            "v1 . gb"
            " . h1 . "
            ''')
       )

# Callbacks

Callback for each button, as well as the callback for the Odometry *subscriber*.

In [8]:

dir_pad = DirectionPad(lin_slider, ang_slider)
    
def start_gotopoint(b):
    """
    Start goToPoint behavior.
    
    Requests the service from
    /user_interface to do so.
    
    Args:
        b (Button): the button
                    pressed
    """
    
    global running
    if running != Running.GTP:
        ui_client("start")
        running = Running.GTP
        
def stop_motion(b):
    """
    Stop robot movement.
    
    Requests the service from
    /user_interface to do so,
    if it's currently in
    goToPoint behavior, else
    informs the DirectionalPad
    instance.
    
    Args:
        b (Button): the button
                    pressed
    """
    global running
    if running ==  Running.GTP:
        ui_client("stop")
    elif running == Running.PAD:
        dir_pad.stop()
        dir_pad.pubTwist()
    
    running = Running.STOP
        
        
def directionalButton(b):
    """
    Issue direct control.
    
    If goToPoint was active
    stop it. Control is
    carried out through the
    DirectionalPad interface.
    
    Args:
        b (Button): the button
                    pressed
    """
    global running

    if running == Running.GTP:
        stop_motion(b)
        
    if b.layout.grid_area == 'up':
        dir_pad.forward()
    elif b.layout.grid_area == 'down':
        dir_pad.backward()
    elif b.layout.grid_area == 'left':
        dir_pad.turnCounter()
    elif b.layout.grid_area == 'right':
        dir_pad.turnClock()
    else:
        return # exit, shouldn't happen normally
    
    running = Running.PAD
    

def odomCllbck(msg):
    """
    Odometry callback.
    
    Publishes the current maximum
    velocities on /cmd_vel if the
    robot is directly controlled.
    The values read from the msg
    are stored and later plotted.
    
    Args:
        msg (nav_msgs/Odometry):
            the odometry message.
    """
    
    global running
    
    rob_odom_x.append(msg.pose.pose.position.x)
    rob_odom_y.append(msg.pose.pose.position.y)
    
    vel_x_sim.append(lin_slider.value)
    vel_x_odm.append(msg.twist.twist.linear.x)
    
    vel_w_sim.append(ang_slider.value)
    vel_w_odm.append(msg.twist.twist.angular.z)
    
    if running == Running.PAD:
        dir_pad.pubTwist()

## Buttons callback binding

In [9]:
start_b.on_click(start_gotopoint)
stop_b.on_click(stop_motion)
up_b.on_click(directionalButton)
down_b.on_click(directionalButton)
left_b.on_click(directionalButton)
right_b.on_click(directionalButton)


# ROS node start

In [10]:
rospy.init_node("jupyter_interface")


pub_ = rospy.Publisher('/set_vel', SetVel, queue_size=10)
sub = rospy.Subscriber('/odom', Odometry, odomCllbck)

rospy.wait_for_service('/user_interface')
ui_client = rospy.ServiceProxy('/user_interface', Command)

## Control panel

In [11]:
display(g_all)
output

GridBox(children=(VBox(children=(FloatSlider(value=0.6, description='Linear Velocity', max=1.6, min=0.1, step=…

Output()

## Action

A simple action client could be implemented here, but we are instead going to read the goal results of the go_to_point action directly from the topic. While it's true that such practice should be avoided in the general case, here we are merely interested in tracking the number of goal reached, but instantiated by a different node. A Subscriber to the action/Result topic will thus suffice.

In [17]:
from rt2_assignment1.msg import PoseActionResult
n_goal = {'succ':0, 'canc':0}

def goalCllbck(msg):
    global n_goal
    if msg.result.reached:
        n_goal['succ']+=1
    else:
        n_goal['canc']+=1


sub_g = rospy.Subscriber('/go_to_point/result', PoseActionResult, goalCllbck)


# Plots

## Odometry

The graphs shows the robot position evolving during time, as read from `/odom`.
The graph is automatically updated thanks to `animation.FuncAnimation`.

In [16]:
fig, ax = plt.subplots()

ax.set_xlim((-5, 5))
ax.set_ylim((-5, 5))

odom_plt, = ax.plot([], [], 'r.')
ax.set_title("Robot odometry position")
ax.set_xlabel("X")
ax.set_ylabel("Y")

def init():
    odom_plt.set_data([], [])
    return (odom_plt,)
def animate(i):
    odom_plt.set_data(rob_odom_x, rob_odom_y)
    return (odom_plt,)

anim = animation.FuncAnimation(fig, animate, init_func = init, frames=100, interval=20, blit=True)

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

## Velocities

In the graph are compared the current maximum velocities set from the sliders with the current robot velocities as read from `/odom`.

In [14]:
fig_vel, axs = plt.subplots(nrows=2)

ax0, ax1 = axs.flatten()
ax0.set_title("Linear velocity")
ax0.set_ylim(0.0, max_vel_)

vel_x_plt_sim, = ax0.plot([], [], 'r', label='max linear velocity')
vel_x_plt_odm, = ax0.plot([], [], 'g', label='current linear velocity')

ax1.set_title("Angular velocity")
ax1.set_ylim(0.0, max_vel_)

vel_w_plt_sim, = ax1.plot([], [], 'r', label='max angular velocity')
vel_w_plt_odm, = ax1.plot([], [], 'g', label='current angular velocity')


def init_vel():
    vel_x_plt_sim.set_data([], [])
    vel_x_plt_odm.set_data([], [])
    
    vel_w_plt_sim.set_data([], [])
    vel_w_plt_odm.set_data([], [])
    return (vel_x_plt_sim, vel_x_plt_odm, vel_w_plt_sim, vel_w_plt_odm)

def animate_vel(i):
    vel_x_plt_sim.set_data(t, vel_x_sim)
    vel_x_plt_odm.set_data(t, vel_x_odm)
    
    vel_w_plt_sim.set_data(t, vel_w_sim)
    vel_w_plt_odm.set_data(t, vel_w_odm)
    
    return (vel_x_plt_sim, vel_x_plt_odm, vel_w_plt_sim, vel_w_plt_odm)

anim_vel = animation.FuncAnimation(fig_vel, animate_vel, init_func = init_vel, frames=100, interval=20, blit=True)


Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

## Goal succeeded vs. Goal canceled graph

A bar graph, with the successful vs. interrupted goals.

In [18]:
fig_bar, ax_bar = plt.subplots()

ax_bar.set_title("go_to_point goals")
ax_bar.set_ylim(0, 20)

ind = np.array(['Success','Cancel'])

bar_plt = ax_bar.bar(ind, [0,0], width=0.3)

def init_bar():
    for bb, ii, col in zip(bar_plt, n_goal.values(), ('g', 'r')):
        bb.set_height(ii)
        bb.set_color(col)
    return (bar_plt,)

def animate_bar(i):
    for bb, ii in zip(bar_plt, n_goal.values()):
        bb.set_height(ii)
    
    return (bar_plt,)

anim_bar = animation.FuncAnimation(fig_bar, animate_bar, init_func = init_bar, frames=100, interval=20, blit=True)

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …