# User Interface of the Gabriele Russo's RT1 third assignment

In [1]:
%reset -f
# enables interactive plots within the Jupyter Notebook
%matplotlib widget 

## Libraries

In [2]:
from rt1_third_assignment.srv import Goal, GoalRequest, Interface, InterfaceRequest
from matplotlib.backends.backend_nbagg import FigureCanvasNbAgg as FigureCanvas
from matplotlib.animation import FuncAnimation
from actionlib_msgs.msg import GoalStatusArray
from sensor_msgs.msg import LaserScan
from geometry_msgs.msg import Twist
from std_msgs.msg import String
# for handling odometry messages
from nav_msgs.msg import Odometry 
# imports display functions from 'IPython.display'
from IPython.display import display, HTML, clear_output
from ipywidgets import HBox, VBox, FloatSlider, Label
# for plotting
import matplotlib.pyplot as plt 
# for creating interactive widgets in the notebook
import ipywidgets as widgets 
# for ROS integration
import jupyros as jr 
import rospy   
# for numerical operations
import numpy as np 
from jupyros import ros3d
import math
import os

## Definition of the global variables and services
In this section are defined the variable goal_status_count and the color mapping, very important variable needed for construct the Histogram and the pie chart on the Goal status.

In [3]:
# Global counters
goal_status_count = {
    'Succeeded': 0,
    'Aborted': 0,
    'Cancelled': 0
}
# Define the color mapping for statuses
color_mapping = {
    'Succeeded': 'green',
    'Aborted': 'red',
    'Cancelled': 'orange'
}

Then, we have all the output variables, used in order to have in output specific messages when the buttons are clicked.

In [4]:
# Creates an output widget to display outputs within the notebook
output_scan = widgets.Output()
output_ui = widgets.Output()
# Output area for the teleop control
teleop_output = widgets.Output()

The 'update_status_count' is a function used to update the variables in the goal status dictionary.

In [5]:
# function to update the status counts
def update_status_counts(status):
    global goal_status_count
    if status in goal_status_count:
        goal_status_count[status] += 1
    else:
        goal_status_count[status] = 1

Lastly, are defined:
* definition of the ros node 'user_interface'
* definition of the client of the 'goal_position' service
* definition of the client of the "command" service used to send the user commands to the controller node

In [6]:
# initialize the ROS node
rospy.init_node("user_interface")
# Define the ROS service clients
client_goal = rospy.ServiceProxy("/goal_position", Goal)
client_interface = rospy.ServiceProxy("/commands", Interface)

## Tracker Class
This class is used in order to track in real time the postion of the robot and plot it using the 'FuncAnimation'.
It also plot a figure that shows the robot position and orientation with respect to the obstacle detected by the laser scan. In this case it works with a button, therefore, if the user click the button the plot is updated and shows the current obstacle detected and the closest obstacle to the robot

In [7]:
# PLOT: Travelled Path
class Tracker:
    
    def __init__(self):
        
        # create the figures and subplots inside the class
        self.fig1, self.ax1 = plt.subplots(figsize=(6, 5))
        self.fig2, self.ax2 = plt.subplots(figsize=(6, 5))
        
        # initialize the first plot for the Robot Tracker
        self.ln_odom, = self.ax1.plot([], [], 'ro', label="Robot Tracking")
        self.ln_goal, = self.ax1.plot([], [], 'b*', label="Goal Position")
        self.ax1.legend(handles=[self.ln_odom, self.ln_goal], loc = "upper right")
        
        # initialize the second plot for Laser Scan
        self.ln_scan, = self.ax2.plot([], [], 'go', label="Laser Scan")
        self.ln_closest_obstacle, = self.ax2.plot([], [], 'rx',label="Closest Obstacle", markersize=10)
        #self.ax[1].legend(handles=[self.ln_scan, self.ln_closest_obstacle], loc = "upper right")
        self.robot_marker, = self.ax2.plot([], [], 'bo', label="Robot")  # Robot position
        self.robot_arrow = self.ax2.arrow(0, 0, 1, 0, head_width=0.5, head_length=1, fc='blue', ec='blue')  # Arrow for orientation
        
        self.ax2.legend(loc="upper right")
        
        # initializes empty lists to store the odometry data
        self.x_odom_data, self.y_odom_data = [], []
        self.x_goal_data, self.y_goal_data = [], []
        
        # Variables for scan data
        self.laser_data = []
        self.min_distance = None
        self.min_angle = None
        
        # Initialize the figure and plots
        self.plot_init()
        
        # set up FuncAnimation to update the first subplot automatically
        self.anim = FuncAnimation(self.fig1, self.update_plot, init_func=self.plot_init, blit=True, interval=1000)
        
    def plot_init(self):
        # initialize the first subplot
        self.ax1.set_xlim(-10, 10)
        self.ax1.set_ylim(-10, 10)
        self.ax1.set_title("Robot Tracker")
        self.ax1.set_xlabel("X")
        self.ax1.set_ylabel("Y")
        self.ax1.grid(True)
        
        # initialize the second subplot
        self.ax2.set_xlim(-10, 10)
        self.ax2.set_ylim(-10, 10)
        self.ax2.set_title("Obstacle Detected")
        self.ax2.set_xlabel("X")
        self.ax2.set_ylabel("Y")
        self.ax2.grid(True)
    
    # defines a callback function that appends 
    # the robot's position data (x and y coordinates) 
    # to x_data and y_data whenever an odometry message is received
    def odom_callback(self, msg):
        self.y_odom_data.append(msg.pose.pose.position.y)
        self.x_odom_data.append(msg.pose.pose.position.x)
        
    def goal_callback(self, x, y):
        self.x_goal_data.clear()
        self.y_goal_data.clear()      
        self.x_goal_data.append(float(x))
        self.y_goal_data.append(float(y))
        
    def scan_callback(self, msg):
        self.laser_data = list(msg.ranges)
        # calculate the closest obstacle's distance and its angle
        self.min_distance = min(self.laser_data)
        self.min_angle = np.argmin(self.laser_data) * msg.angle_increment + msg.angle_min
        
    def update_plot(self, frame):
        # ipdate the first subplot (automatically updated by funcanimation)
        self.ln_odom.set_data(self.x_odom_data, self.y_odom_data)
        self.ln_goal.set_data(self.x_goal_data, self.y_goal_data)

    def update_plot_obstacle(self):
        # Clear and update the second plot (Laser Scan)
        self.ax2.cla()
        # Plot the laser scan data
        angles = np.linspace(-np.pi/2, np.pi/2, len(self.laser_data))
        x_scan = np.cos(angles) * self.laser_data
        y_scan = np.sin(angles) * self.laser_data
        self.ax2.plot(x_scan, y_scan, 'go', label="Laser Scan")

        # Mark the closest obstacle
        closest_x = np.cos(self.min_angle) * self.min_distance
        closest_y = np.sin(self.min_angle) * self.min_distance
        self.ax2.plot(closest_x, closest_y, 'rx', label="Closest Obstacle", markersize=10)

        # Plot the robot's position and orientation
        self.ax2.plot(0, 0, 'bo', label="Robot")  # Robot's position at the origin
        self.ax2.arrow(0, 0, 1, 0, head_width=0.5, head_length=1, fc='blue', ec='blue')  # Arrow pointing forward

        # Set plot limits and labels
        self.ax2.set_xlim(-10, 10)
        self.ax2.set_ylim(-10, 10)
        self.ax2.set_title(f"Closest Obstacle Detected: {self.min_distance:.2f} meters, angle: {math.degrees(self.min_angle):.2f} degrees", fontsize=10)
        self.ax2.set_xlabel("X")
        self.ax2.set_ylabel("Y")
        self.ax2.grid(True)
        self.ax2.legend(loc="upper right")

        # Redraw the second plot
        self.fig2.canvas.draw_idle()


## TeleopControl Class
This class implements the keyboard that the user can use in the manuale and assisted drive mode, in order to teleop and control the robot, only clicking the specific button of the keyboard. (in the previous version, this was done by the teleop twist node)

In [8]:
class TeleopControl:
    def __init__(self):
        # publisher for the velocity command
        self.cmd_vel_pub = rospy.Publisher('/cmd_vel', Twist, queue_size=10)
        
        # Create sliders for linear and angular velocities
        self.linear_slider = widgets.FloatSlider(value=0.5, min=0, max=2, step=0.1, description='Linear Vel')
        self.angular_slider = widgets.FloatSlider(value=0.5, min=0, max=2, step=0.1, description='Angular Vel')
        
        # create buttons for control
        self.forward_button = widgets.Button(description="Forward", style={'button_color': 'orange'})
        self.backward_button = widgets.Button(description="Backward", style={'button_color': 'orange'})
        self.left_button = widgets.Button(description="Left", style={'button_color': 'orange'})
        self.right_button = widgets.Button(description="Right", style={'button_color': 'orange'})
        self.stop_button = widgets.Button(description="Stop", style={'button_color': 'lightcoral'})
        self.empty = widgets.Button(description='')
        
        # Display buttons in a layout
        self.control_box = widgets.HBox([
            widgets.VBox([  # Teleop buttons on the left
                widgets.HBox([widgets.Button(description=''), self.forward_button, widgets.Button(description='')]),  # Center the Forward button
                widgets.HBox([self.left_button, self.stop_button, self.right_button]),
                widgets.HBox([widgets.Button(description=''), self.backward_button, widgets.Button(description='')])  # Center the Backward button
            ]),
            widgets.VBox([  # Sliders on the right
                self.linear_slider,
                self.angular_slider
            ])
        ])
        
        
        # Initially hide the control box
        self.control_box.layout.display = 'none'
        
        # Output widget to show current state
        self.output = widgets.Output()
        
        # Assign functions to buttons
        self.forward_button.on_click(self.move_forward)
        self.backward_button.on_click(self.move_backward)
        self.left_button.on_click(self.turn_left)
        self.right_button.on_click(self.turn_right)
        self.stop_button.on_click(self.stop_robot)
        
    def show(self):
        self.control_box.layout.display = 'block'
        
    def hide(self):
        self.control_box.layout.display = 'none'
        
    def publish_twist(self, linear_x=0.0, linear_y=0.0, linear_z=0.0, angular_x=0.0, angular_y=0.0, angular_z=0.0):
        twist = Twist()
        twist.linear.x = linear_x
        twist.linear.y = linear_y
        twist.linear.z = linear_z
        twist.angular.x = angular_x
        twist.angular.y = angular_y
        twist.angular.z = angular_z
        self.cmd_vel_pub.publish(twist)
        
    def move_forward(self, b):
        linear_velocity = self.linear_slider.value
        self.publish_twist(linear_x=linear_velocity)
        with self.output:
            self.output.clear_output(wait=True)
            print(f"Moving Forward with linear velocity: {linear_velocity}")
        
    def move_backward(self, b):
        linear_velocity = self.linear_slider.value
        self.publish_twist(linear_x=-linear_velocity)
        with self.output:
            self.output.clear_output(wait=True)
            print(f"Moving Backward with linear velocity: {linear_velocity}")
        
    def turn_left(self, b):
        angular_velocity = self.angular_slider.value
        self.publish_twist(angular_z=angular_velocity)
        with self.output:
            self.output.clear_output(wait=True)
            print(f"Turning Left with angular velocity: {angular_velocity}")
        
    def turn_right(self, b):
        angular_velocity = self.angular_slider.value
        self.publish_twist(angular_z=-angular_velocity)
        with self.output:
            self.output.clear_output(wait=True)
            print(f"Turning Right with angular velocity: {angular_velocity}")
            
    def stop_robot(self, b):
        self.publish_twist()  # No movement, zero velocities
        with self.output:
            self.output.clear_output(wait=True)
            print("Stopping")

Callback function which updates the goal_status_count dictionary, through the update_status_counts function

In [9]:
def goal_status_callback(msg):
    if msg.data == "Goal Reached":
        update_status_counts("Succeeded")
        with output_ui:
            print("Goal Reached!")
    elif msg.data == "Goal Not Reachable":
        update_status_counts("Aborted")
        with output_ui:
            print("Goal Not Reachable!")

## 3D map visualization settings
here are defined all the settings used to show the simulation directly here on the notebook thanks to Jupyros

In [10]:
# 3D map
map_3d = ros3d.Viewer()
rc = ros3d.ROSConnection(url = 'ws://localhost:9090')
tf_client = ros3d.TFClient(ros = rc, fixed_frame = 'link_chassis')
laser_view = ros3d.LaserScan(topic = '/scan', ros = rc, tf_client = tf_client)
map_view = ros3d.OccupancyGrid(topic = '/map', ros = rc, tf_client = tf_client, continuous = True)
path = ros3d.Path(topic = '/move_base/NavfnROS/plan', ros = rc,tf_client = tf_client)
urdf = ros3d.URDFModel(ros = rc, tf_client = tf_client,path=os.environ.get('JUPYROS_ASSETS_URL', 'http://localhost:3000'))
g = ros3d.GridModel()
map_3d.objects = [g, laser_view, map_view, path, urdf]
map_3d.layout.width = '100%'

## Buttons definition and settings

In [11]:
# define the buttons with custom colors, increased size and bold text
button_layout = widgets.Layout(width='150px', height='50px', font_size='20px')

In [12]:
# define the buttons with custom colors
auto_button = widgets.Button(description="Auto Drive", style={'button_color': 'red'}, layout=widgets.Layout(width='150px', height='50px', font_size='20px'))
manual_button = widgets.Button(description="Manual Drive", style={'button_color': 'magenta'}, layout=widgets.Layout(width='150px', height='50px', font_size='20px'))
assisted_button = widgets.Button(description="Assisted Drive", style={'button_color': 'cyan'}, layout=widgets.Layout(width='150px', height='50px', font_size='20px'))
cancel_button = widgets.Button(description="Cancel Goal", style={'button_color': 'orange'}, layout=widgets.Layout(width='150px', height='50px', font_size='20px'))
submit_button = widgets.Button(description="Submit", style={'button_color': 'lightgreen'}, layout=widgets.Layout(width='150px', height='50px', font_size='20px'))
show_obstacle_button = widgets.Button(description="Show Detected Obstacle", style={'button_color': 'gold'}, layout=widgets.Layout(width='150px', height='50px', font_size='20px'))
goal_button = widgets.Button(description="Plot Goal Statuses", style={'button_color': 'lightcoral'}, layout=widgets.Layout(width='150px', height='50px', font_size='20px'))

In [13]:
# define the input fields for X and Y positions
x_position = widgets.FloatText(description="X Position")
y_position = widgets.FloatText(description="Y Position")

In [14]:
# initially hide the input fields
x_position.layout.display = 'none'
y_position.layout.display = 'none'
submit_button.layout.display = 'none'
cancel_button.layout.display = 'none'

## Functions associated to the buttons

In [15]:
# calculate the explode parameter dynamically
# used in order to avoid bad graphical visualization of the pie chart, when there are not goal status or they are few
def calculate_explode(c):
    if len(c) > 0:
        max_index = c.index(max(c))
        explode = [0] * len(c)
        # explode the slice with the maximum value
        explode[max_index] = 0.1
        return explode
    else:
        return None

In [16]:
def plot_goal_statuses():
    # Clear the previous plot
    clear_output(wait=True)
    
    # Extract data for plotting
    statuses = list(goal_status_count.keys())
    counts = list(goal_status_count.values())
    
    # filter out zero values to avoid issues in the pie chart
    non_zero_counts = [count for count in counts if count > 0]
    non_zero_statuses = [status for status, count in zip(statuses, counts) if count > 0]
    non_zero_colors = [color_mapping[status] for status in non_zero_statuses]
    
    explode = calculate_explode(non_zero_counts)
    
    # create the figure with subplots
    fig, axs = plt.subplots(1, 2, figsize=(10, 5))
    
    # Adjust spacing between subplots to prevent overlap
    plt.subplots_adjust(wspace=0.5)  # Increase space between the plots
    
    #plot the bar chart on the first subplot
    axs[0].bar(statuses, counts, color=["green", "red", "orange"])
    axs[0].set_title('Goal Status Counts')
    axs[0].set_xlabel('Goal Status')
    axs[0].set_ylabel('Count')
    
    # plot the pie chart on the second subplot
    axs[1].pie(non_zero_counts, explode=explode, labels=non_zero_statuses, colors=non_zero_colors, autopct='%1.1f%%', shadow=True, startangle=90)
    # equal aspect ratio ensures that pie is drawn as a circle
    axs[1].axis('equal')
    axs[1].set_title('Goal Status Distribution')
    
    # display the plot
    plt.show()
    
    # Display the button again so it's always available
    display(goal_button)

In [17]:
def show_goal_status(b):
    plot_goal_statuses()

In [18]:
def show_obstacle(b):
    robot_position.update_plot_obstacle()
    plt.draw()

In [19]:
teleop = TeleopControl()

In [20]:
# function to display the teleop interface
def show_teleop_interface():
     with teleop_output:
        clear_output(wait=True)
        display(teleop.control_box)  # Display the control box
        teleop.show()

In [21]:
# function to hide the teleop interface
def hide_teleop_interface():
    teleop.hide()
    with teleop_output:
        clear_output(wait=True)
        # instantiate the teleopcontrol class
    

In [22]:
# Function to handle auto drive mode
def auto_drive(b):
    
    hide_teleop_interface()
    
    com = InterfaceRequest()
    # ord is needed in order to transform the char 'a' in the ASCII corresponding integer
    com.command = ord('a')
    client_interface.wait_for_service()
    client_interface.call(com)
    
    # make the x and y position inputs visible
    x_position.layout.display = 'block'
    y_position.layout.display = 'block'
    submit_button.layout.display = 'block'
    cancel_button.layout.display = 'block'
        
    # display a message prompting the user to enter positions
    output_ui.clear_output()
    
    with output_ui:
        print("Enter the desired x and y positions and press 'Submit'")

In [23]:
# function to submit goal positions
def submit_goal_positions(b):
    
    pos = GoalRequest()
    pos.x = float(x_position.value)
    pos.y = float(y_position.value)
    
    robot_position.goal_callback(x_position.value, y_position.value)

    with output_ui:
        print("The goal position is sent to the controller")
        
    try:
        client_goal.wait_for_service()
        
        client_goal.call(pos)
    
    except rospy.ServiceException as e:
        with output_ui:
            print(f"Service call failed: {e}")


In [24]:
# Function to handle manual drive mode
def manual_drive(b):
    
    output_ui.clear_output()
    with output_ui:
        print("You are in the manual drive mode, You can use teleop to control the robot.")
    
    com = InterfaceRequest()
    com.command = ord('m')
    client_interface.wait_for_service()
    client_interface.call(com)
    
    show_teleop_interface()
    
    # initially hide the input fields
    x_position.layout.display = 'none'
    y_position.layout.display = 'none'
    submit_button.layout.display = 'none'
    cancel_button.layout.display = 'none'
    

In [25]:
# Function to handle assisted drive mode
def assisted_drive(b):
    
    output_ui.clear_output()
    with output_ui:
        print("Now you are in the assisted drive mode")
    
    com = InterfaceRequest()
    com.command = ord('d')
    client_interface.wait_for_service()
    client_interface.call(com)
    
    show_teleop_interface()
    
    # initially hide the input fields
    x_position.layout.display = 'none'
    y_position.layout.display = 'none'
    submit_button.layout.display = 'none'
    cancel_button.layout.display = 'none'

In [26]:
# Function to cancel the goal
def cancel_goal(b):
    
    com = InterfaceRequest()
    com.command = ord('c')
    client_interface.wait_for_service()
    client_interface.call(com)
    
    update_status_counts("Cancelled")
    
    output_ui.clear_output()
    with output_ui:
        print("Goal has been cancelled")
    
    # make the x and y position inputs visible
    x_position.layout.display = 'block'
    y_position.layout.display = 'block'
    submit_button.layout.display = 'block'
    cancel_button.layout.display = 'block'

In [27]:
# set the button callbacks
auto_button.on_click(auto_drive)
submit_button.on_click(submit_goal_positions)
manual_button.on_click(manual_drive)
assisted_button.on_click(assisted_drive)
cancel_button.on_click(cancel_goal)
show_obstacle_button.on_click(show_obstacle)

In [28]:
goal_button.on_click(show_goal_status)

In [29]:
# organize the buttons into rows
row1 = VBox([HBox([auto_button, manual_button, assisted_button]), teleop_output])
row2 = HBox([submit_button, cancel_button])


## Robot position tracker and Obstacle Detector

In [30]:
robot_position = Tracker()
# Subscribes to the '/odom' topic, which provides odometry messages, and attaches the 'odom_callback' function to process incoming messages.
jr.subscribe('/odom', Odometry, robot_position.odom_callback)
jr.subscribe('/scan', LaserScan, robot_position. scan_callback)
jr.subscribe('/goal_status',String, goal_status_callback)
display(show_obstacle_button)
# show the figure
plt.show(block = False)

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

Button(description='Show Detected Obstacle', layout=Layout(height='50px', width='150px'), style=ButtonStyle(bu…

## Goal Status Histogram and Pie chart

In [31]:
display(goal_button)

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

Button(description='Plot Goal Statuses', layout=Layout(height='50px', width='150px'), style=ButtonStyle(button…

## User Interface Keyboard 

In [32]:
print('')
print('Please select a driving mode: ')
# display the widgets
display(HTML("<style> .widget-button { margin: 5px 10px; } </style>"))
display(row1)
# display the X and Y position inputs and submit button below the buttons
display(VBox([x_position, y_position, row2, output_ui]))


Please select a driving mode: 


VBox(children=(HBox(children=(Button(description='Auto Drive', layout=Layout(height='50px', width='150px'), st…

VBox(children=(FloatText(value=0.0, description='X Position', layout=Layout(display='none')), FloatText(value=…

## 3D Visualization 

In [33]:
display(map_3d)

Viewer(layout=Layout(width='100%'), objects=[GridModel(), LaserScan(ros=ROSConnection(url='ws://localhost:9090…