Requirement for ROS environment

In [1]:
import rospy
import assignment_2_2022.msg
from nav_msgs.msg import Odometry
#from rt2_ass2.srv import *
import math
import numpy as np
import actionlib
from actionlib_msgs.msg import GoalStatusArray, GoalStatus
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from functools import partial
from sensor_msgs.msg import LaserScan

%matplotlib widget

Simple User Interface definition

In [2]:
from ipywidgets import Button, FloatText, Box, VBox, Layout, Label

x_in = FloatText(
    value=0.0,
    description='x_target',
    disabled=False,
    layout=Layout(width='150px')
)
y_in = FloatText(
    value=1.0,
    description='y_target',
    disabled=False,
    layout=Layout(width='150px')
)
send_goal_btn = Button(
    description='Send Goal',
    disabled = False,
)
cancel_goal_btn = Button(
    description='Cancel Goal',
    disabled = False,
)
stop_animation_btn = Button(
    description='Stop animation',
    disabled = False,
)
start_animation_btn = Button(
    description='Start animation',
    disabled = False,
)
min_distance_txt = Label(
        value='minimum distance: XX',
        layout=Layout(
            width='150px',
            margin='5px 0px 0px 20px')
)
status_txt = Label(
    value='status: idle',
    layout=Layout(margin='6px 0px 0px 0px')
)

ui_box = Box(
    children=[
        VBox([x_in, y_in, min_distance_txt],
            layout=Layout(margin='10px 0px 10px 0px')
        ),
        VBox([send_goal_btn, cancel_goal_btn, status_txt],
            layout=Layout(margin='10px 0px 10px 0px')
        ),
        VBox([start_animation_btn, stop_animation_btn],
            layout=Layout(margin='10px 0px 10px 0px')
        )
    ],
    layout=Layout(
        border="solid",
        padding='0px',
        width = '500px'
        )
)

Class to visualize the reached and cancelled goals in a bar chart

In [3]:
class Vis_goal_stat():
    def __init__(self):
        self.fig = plt.figure()
        self.x = ['success','cancelled']
        self.y = [0,0]
        plt.ylim(0,10)
        plt.title('Goal statistics')
        self.barcollection = plt.bar(self.x,self.y, color=['green','red'])

    def update_plot(self, frames, goals):
        y = [len(goals['success']), len(goals['cancelled'])]
        for i,b in enumerate(self.barcollection):
            b.set_height(y[i])

Class to visualize the robot position and the target goals

In [4]:
class Vis_robotPos():
    def __init__(self):
        self.fig, self.ax = plt.subplots()
        self.robot_pos_plt, = plt.plot([],[],'b-')
        self.robot_pos_x, self.robot_pos_y = [],[]
    
    def plot_init(self):
        self.ax.set_xlim(-10, 10)
        self.ax.set_ylim(-10, 10)
        self.ax.set_title("Robot and goals positions")
        return self.robot_pos_plt
    
    def odom_callback(self, odom: Odometry):
        # queue new robot position
        self.robot_pos_x.append(odom.pose.pose.position.x)
        self.robot_pos_y.append(odom.pose.pose.position.y)

    def plot_goals(self, goals):
        
        goal_success_x = [g[0] for g in goals['success']]
        goal_success_y = [g[1] for g in goals['success']]
        goal_cancelled_x = [g[0] for g in goals['cancelled']]
        goal_cancelled_y = [g[1] for g in goals['cancelled']]
        goal_current_x = [g[0] for g in goals['current']]
        goal_current_y = [g[1] for g in goals['current']]
        # plot cancelled goals
        self.ax.plot(goal_cancelled_x, goal_cancelled_y, 'ro')
        # plot reached goals
        self.ax.plot(goal_success_x, goal_success_y, 'go')
        # plot current goal
        self.ax.plot(goal_current_x, goal_current_y, 'co')

    def update_robot_plot(self, frames, goals):
        self.robot_pos_plt.set_data(self.robot_pos_x, self.robot_pos_y)
        self.plot_goals(goals)
        return self.robot_pos_plt,


Class to visualize the laser scan data

In [5]:
class Vis_laserScan():
    def __init__(self):
        self.fig, self.ax = plt.subplots()
        self.laser_plt, = plt.plot([],[],'r.')
        self.laser_ranges, self.laser_angles = [], []
        
    def plot_init(self):
        self.ax.set_xlim(-10, 10)
        self.ax.set_ylim(-10, 10)
        self.ax.set_title("Laser scan")
        return self.laser_plt
    
    def laser_scan_callback(self, data: LaserScan):
        angle_increment = data.angle_increment
        self.laser_angles = [data.angle_min + i * angle_increment for i in range(len(data.ranges))]
        self.laser_ranges = data.ranges
    
    def update_laser_plot(self, frame):
        # update min distance label
        min_distance_txt.value = f'min distance: {min(self.laser_ranges):.2f}m'
        laser_x = [r * np.cos(theta) for r, theta in zip(self.laser_ranges, self.laser_angles)]
        laser_y = [r * np.sin(theta) for r, theta in zip(self.laser_ranges, self.laser_angles)]
        self.laser_plt.set_data(laser_x, laser_y)
        self.ax.plot([0],[0],'b*')
        return self.laser_plt,
        

Class to handle goals, such as request to reach a goal, cancel a goal

In [6]:
class GoalManager:
    def __init__(self):
        self.goal_position = None
        self.goal_counter = {
            'success': [],
            'cancelled': [],
            'current':[]
        }

        # create simple action client to send goal
        self.goal_action_client = actionlib.SimpleActionClient("/reaching_goal", assignment_2_2022.msg.PlanningAction)

        # wait for action server
        rospy.loginfo("waiting for server /reaching_goal...")
        self.goal_action_client.wait_for_server()
        rospy.loginfo("Server /reaching_goal available")
 

    def get_goal_counter(self):
        return self.goal_counter


    def reset_goal_counter(self):
        self.goal_counter['success'] = []
        self.goal_counter['cancelled'] = []


    def set_goal(self, x: float, y: float):
        if self.goal_position == None:
            self.goal_position = assignment_2_2022.msg.PlanningActionGoal().goal
        
        self.goal_position.target_pose.pose.position.x = x
        self.goal_position.target_pose.pose.position.y = y
        
        # update current goal for ui
        self.goal_counter['current'].append([x,y])

        # update status for ui
        status_txt.value = 'status: goal set'


    def get_current_goal(self):
        return [self.goal_position.target_pose.pose.position.x, self.goal_position.target_pose.pose.position.y] if self.goal_position else None


    def goal_done_cb(self, goal_status: GoalStatusArray, result):
        #print("goal completed")
        if (goal_status == GoalStatus.PREEMPTED):
            # update status description
            status_txt.value ='status: goal cancelled!'
            # increase cancelled counter
            self.goal_counter['cancelled'].append([
                self.goal_position.target_pose.pose.position.x,
                self.goal_position.target_pose.pose.position.y
            ])
        elif (goal_status == GoalStatus.SUCCEEDED):
            # update status description
            status_txt.value = 'status: goal reached!'
            # increase success counter
            self.goal_counter['success'].append([
                self.goal_position.target_pose.pose.position.x,
                self.goal_position.target_pose.pose.position.y
            ])
        else:
            status_txt.value =f'unknown goal status: {goal_status}'
        
        # update current goal for ui
        self.goal_counter['current'] = []


    def goal_active_cb(self):
        status_txt.value = 'status: reaching target...'


    def send_goal_request(self):
        # send goal request
        self.goal_action_client.send_goal(
            self.goal_position,
            done_cb=self.goal_done_cb,
            active_cb=self.goal_active_cb
        )


    def cancel_goal(self):
        # send request to cancel current goal
        self.goal_action_client.cancel_goal()
        #print("goal cancelled")
        

Class controller, simple class to send commands to the robot in the environment

In [7]:
class Controller:
    def __init__(self):
        rospy.init_node("controller", anonymous=True)

        # create GoalManager
        self.goal_manager = GoalManager()
        
        # symmetric coordinate limits in absolute value 
        self.x_lim = 8
        self.y_lim = 8

        # default goal
        self.goal_position = None

        # robot current position
        self.robot_x = None
        self.robot_y = None

        self.goalManager = GoalManager()

        # robot and goal positions visualization
        self.vis_robot_pos = Vis_robotPos()
        
        # create subscriber for /odom
        odom_sub = rospy.Subscriber("/odom", Odometry, callback=self.vis_robot_pos.odom_callback)
        
        # run robot position animation
        self.ani_robot_pos = FuncAnimation(
                                self.vis_robot_pos.fig,
                                partial(self.vis_robot_pos.update_robot_plot,
                                        goals=self.goal_manager.get_goal_counter()),
                                init_func=self.vis_robot_pos.plot_init,
                                interval=200,
                                blit=True
                            )
        
        # laser visualization
        self.vis_laserscan = Vis_laserScan()
        
        # create subscriber for laser scan data
        laserscan_sub = rospy.Subscriber("/scan", LaserScan, self.vis_laserscan.laser_scan_callback)
        
        # run laserscan animation
        self.ani_laserscan = FuncAnimation(
                                self.vis_laserscan.fig,
                                self.vis_laserscan.update_laser_plot,
                                init_func=self.vis_laserscan.plot_init,
                                interval=200,
                                blit=True
                            )

        # goal statistics visualization
        self.vis_goalstat = Vis_goal_stat()
        self.ani_goalstat = FuncAnimation(
                                self.vis_goalstat.fig,
                                partial(self.vis_goalstat.update_plot,
                                        goals=self.goal_manager.get_goal_counter()),
                                repeat=False,
                                blit=False,
                                interval=500
                            )

    def stop_animations(self):
        self.ani_robot_pos.event_source.stop()
        self.ani_laserscan.event_source.stop()
        self.ani_goalstat.event_source.stop()
        
    def start_animations(self):
        self.ani_robot_pos.event_source.start()
        self.ani_laserscan.event_source.start()
        self.ani_goalstat.event_source.start()
    

    def saturate_input(self, x, limit):
        if (abs(x) > limit):
            return  math.copysign(limit, x)
        else:
            return x
    
    # Goal management functions
    def set_goal(self, x:float, y:float):
        self.goal_manager.set_goal(
            self.saturate_input(x, self.x_lim),
            self.saturate_input(y, self.y_lim)
        )

    def send_goal_request(self):
        self.goal_manager.send_goal_request()

    def cancel_goal(self):
        self.goal_manager.cancel_goal()

    def get_goal_counter(self):
        return self.goal_manager.get_goal_counter()

    def shutdown(self):
        self.stop_animations()
        rospy.signal_shutdown("User shutdown request")


User Interface button callback functions

In [8]:
def send_goal_btn_callback(btn):
    controller.set_goal(float(x_in.value), float(y_in.value))
    controller.send_goal_request()

send_goal_btn.on_click(send_goal_btn_callback)

def cancel_goal_btn_callback(btn):
    controller.cancel_goal()

cancel_goal_btn.on_click(cancel_goal_btn_callback)

def start_animation_btn_callback(btn):
    controller.start_animations()

start_animation_btn.on_click(start_animation_btn_callback)

def stop_animation_btn_callback(btn):
    controller.stop_animations()

stop_animation_btn.on_click(stop_animation_btn_callback)

Instantiate Controller and Display User Interface

In [9]:
controller = Controller()
display(ui_box)

[INFO] [1708701706.773355, 1216.667000]: waiting for server /reaching_goal...
[INFO] [1708701706.785816, 1216.680000]: Server /reaching_goal available
[INFO] [1708701706.788329, 1216.682000]: waiting for server /reaching_goal...
[INFO] [1708701706.789407, 1216.683000]: Server /reaching_goal available


Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

Box(children=(VBox(children=(FloatText(value=0.0, description='x_target', layout=Layout(width='150px')), Float…