# Robot Control Interface

This notebook provides a ROS robot control interface (action client) with features:
- Set a goal position (x, y)
- Cancel the current goal
- Real-time display of the robot’s current position
- Display the distance to the closest obstacle
- Real-time animation of robot position
- Statistics of reached/not-reached targets

In [1]:
# Import required libraries
import rospy
import actionlib
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import display, clear_output
import ipywidgets as widgets
from nav_msgs.msg import Odometry
from sensor_msgs.msg import LaserScan
from geometry_msgs.msg import Point, Twist
from assignment2_part1_rt.msg import PlanningAction, PlanningGoal, Status
from std_srvs.srv import SetBool
from assignment2_part1_rt.srv import DisAvg
from actionlib_msgs.msg import GoalStatus
import math
import time
from matplotlib.animation import FuncAnimation

In [2]:
# Initialize ROS node
if not rospy.core.is_initialized():
    rospy.init_node('robot_control_interface', anonymous=True)
    print("ROS node initialized: robot_control_interface")
else:
    print("ROS node already initialized")

ROS node initialized: robot_control_interface


## Global Variables and State Tracking

In [3]:
# Define global variables
robot_position = {'x': 0.0, 'y': 0.0}
robot_velocity = {'linear_x': 0.0, 'angular_z': 0.0}
target_position = {'x': 0.0, 'y': 0.0}
closest_obstacle = 10.0  # default max detection range
regions = None  # laser scan regions
goal_active = False  # whether a goal is active

# Path history
position_history = {'x': [], 'y': []}
timestamp_history = []  # For time-based plotting
max_history_points = 200  # max history length

# Target tracking statistics
target_stats = {'reached': 0, 'cancelled': 0}
target_history = {'timestamp': [], 'reached': [], 'cancelled': []}

# Animation objects
position_anim = None
stats_anim = None

## Callback Functions

In [4]:
# Odometry callback function
def odom_callback(msg):
    """Process odometry to update position and velocity."""
    global robot_position, robot_velocity, position_history, timestamp_history
    
    # Update position
    robot_position['x'] = msg.pose.pose.position.x
    robot_position['y'] = msg.pose.pose.position.y
    
    # Update velocity
    robot_velocity['linear_x'] = msg.twist.twist.linear.x
    robot_velocity['angular_z'] = msg.twist.twist.angular.z
    
    # Update position history
    position_history['x'].append(robot_position['x'])
    position_history['y'].append(robot_position['y'])
    timestamp_history.append(rospy.get_time())
    
    # Limit history size
    if len(position_history['x']) > max_history_points:
        position_history['x'].pop(0)
        position_history['y'].pop(0)
        timestamp_history.pop(0)

In [5]:
# Laser scan callback function
def laser_callback(msg):
    """Process laser scan to find the closest obstacle."""
    global closest_obstacle, regions
    
    # Get minimum distance from laser scan data
    ranges = list(filter(lambda x: not math.isinf(x), msg.ranges))
    if ranges:
        closest_obstacle = min(ranges)
    else:
        closest_obstacle = 10.0  # default value
        
    # Calculate distances for each region
    regions = {
        'right': min(min(msg.ranges[0:143]), 10),
        'fright': min(min(msg.ranges[144:287]), 10),
        'front': min(min(msg.ranges[288:431]), 10),
        'fleft': min(min(msg.ranges[432:575]), 10),
        'left': min(min(msg.ranges[576:719]), 10),
    }

## Action Client Setup

In [6]:
# Initialize action client
client = actionlib.SimpleActionClient('/reaching_goal', PlanningAction)
print("Connecting to action server...")
client_connected = client.wait_for_server(timeout=rospy.Duration(5.0))

if client_connected:
    print("Connected to action server")
else:
    print("Failed to connect to action server. Please check ROS setup.")

Connecting to action server...
Connected to action server


In [7]:
# Set up subscribers
odom_sub = rospy.Subscriber('/odom', Odometry, odom_callback)
laser_sub = rospy.Subscriber('/scan', LaserScan, laser_callback)
status_pub = rospy.Publisher('/status', Status, queue_size=1)

# Set up services
try:
    rospy.wait_for_service('dist_avg', timeout=2.0)
    dist_avg_service = rospy.ServiceProxy('dist_avg', DisAvg)
    print("Connected to distance average service")
except rospy.ROSException:
    print("Failed to connect to distance average service")
    dist_avg_service = None

Connected to distance average service


## Utility Functions

In [8]:
# Publish robot status function
def publish_robot_status():
    """Publish the current robot status to the status topic"""
    status = Status()
    status.x = robot_position['x']
    status.y = robot_position['y']
    status.v_x = robot_velocity['linear_x']
    status.v_z = robot_velocity['angular_z']
    status_pub.publish(status)

# Check if goal reached
def goal_reached():
    """Check if the goal has been reached"""
    return client.get_state() == GoalStatus.SUCCEEDED

# Set goal function
def set_goal(x, y):
    """Set a new goal position"""
    global target_position, goal_active
    
    # Update coordinates
    target_position['x'] = x
    target_position['y'] = y
    
    # Update ROS parameters
    rospy.set_param("des_pos_x", x)
    rospy.set_param("des_pos_y", y)
    
    # Create and send goal
    goal = PlanningGoal()
    goal.target_pose.pose.position.x = x
    goal.target_pose.pose.position.y = y
    
    client.send_goal(goal, done_cb=goal_done_callback)
    goal_active = True
    
    print(f"Goal set: x={x}, y={y}")
    return True

# Cancel goal function
def cancel_goal():
    """Cancel the current goal"""
    global goal_active, target_stats, target_history
    
    if goal_reached():
        print("Goal just reached. Cannot cancel.")
        return False
    
    # Update statistics
    if goal_active:
        target_stats['cancelled'] += 1
        current_time = rospy.get_time()
        target_history['timestamp'].append(current_time)
        target_history['reached'].append(target_stats['reached'])
        target_history['cancelled'].append(target_stats['cancelled'])
    
    client.cancel_goal()
    goal_active = False
    print("Goal canceled")
    return True

# Goal completion callback
def goal_done_callback(state, result):
    """Callback when goal completes or fails"""
    global goal_active, target_stats, target_history
    
    if state == GoalStatus.SUCCEEDED:
        print("Goal reached successfully!")
        goal_active = False
        target_stats['reached'] += 1
        
        # Update history for plotting
        current_time = rospy.get_time()
        target_history['timestamp'].append(current_time)
        target_history['reached'].append(target_stats['reached'])
        target_history['cancelled'].append(target_stats['cancelled'])

## Interactive UI Components

In [9]:
# Goal setting widgets
x_slider = widgets.FloatSlider(
    value=0.0,
    min=-10.0,
    max=10.0,
    step=0.1,
    description='X:',
    continuous_update=False
)

y_slider = widgets.FloatSlider(
    value=0.0,
    min=-10.0,
    max=10.0,
    step=0.1,
    description='Y:',
    continuous_update=False
)

set_goal_button = widgets.Button(
    description='Set Goal',
    button_style='success',
    tooltip='Click to set a new goal',
    icon='check'
)

cancel_goal_button = widgets.Button(
    description='Cancel Goal',
    button_style='danger',
    tooltip='Click to cancel the current goal',
    icon='stop'
)

# Status display widget
robot_status = widgets.HTML(
    value='<h3>Robot Status</h3>',
    placeholder='Robot status will be displayed here',
    description='',
)

# Button event handlers
def on_set_goal_button_clicked(b):
    set_goal(x_slider.value, y_slider.value)
    update_display()

def on_cancel_goal_button_clicked(b):
    cancel_goal()
    update_display()

# Register event handlers
set_goal_button.on_click(on_set_goal_button_clicked)
cancel_goal_button.on_click(on_cancel_goal_button_clicked)

## Visualization Functions

In [10]:
# Initialize the animation plots
def setup_animations():
    """Set up the animation plots for robot position and target statistics"""
    global position_anim, stats_anim
    
    # Create figure with two subplots
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
    
    # Position plot setup
    robot_plot, = ax1.plot([], [], 'go', markersize=10, label='Robot')
    path_plot, = ax1.plot([], [], 'b-', alpha=0.5, label='Path')
    goal_plot, = ax1.plot([], [], 'r*', markersize=15, label='Goal')
    
    # Set grid and axes for position plot
    ax1.set_xlim(-12, 12)
    ax1.set_ylim(-12, 12)
    ax1.set_xlabel('X')
    ax1.set_ylabel('Y')
    ax1.set_title('Robot Position and Path')
    ax1.grid(True)
    ax1.legend()
    
    # Target statistics plot setup
    reached_plot, = ax2.plot([], [], 'g-', label='Reached Targets')
    cancelled_plot, = ax2.plot([], [], 'r-', label='Cancelled Targets')
    
    # Set grid and axes for statistics plot
    ax2.set_xlim(0, 10)  # Will auto-adjust as time progresses
    ax2.set_ylim(0, 5)   # Will auto-adjust as counts increase
    ax2.set_xlabel('Time (s)')
    ax2.set_ylabel('Count')
    ax2.set_title('Target Reach Statistics')
    ax2.grid(True)
    ax2.legend()
    
    # Create animation functions
    def init_position():
        robot_plot.set_data([robot_position['x']], [robot_position['y']])
        path_plot.set_data(position_history['x'], position_history['y'])
        goal_plot.set_data([target_position['x']], [target_position['y']])
        return robot_plot, path_plot, goal_plot
    
    def update_position(frame):
        # Update robot position
        robot_plot.set_data([robot_position['x']], [robot_position['y']])
        
        # Update path history
        path_plot.set_data(position_history['x'], position_history['y'])
        
        # Update goal position
        goal_plot.set_data([target_position['x']], [target_position['y']])
        
        fig.canvas.draw_idle()
        
        return robot_plot, path_plot, goal_plot
    
    def init_stats():
        if len(target_history['timestamp']) == 0:
            dummy_time = rospy.get_time()
            reached_plot.set_data([0], [0])
            cancelled_plot.set_data([0], [0])
        else:
            rel_timestamps = [t - target_history['timestamp'][0] for t in target_history['timestamp']]
            reached_plot.set_data(rel_timestamps, target_history['reached'])
            cancelled_plot.set_data(rel_timestamps, target_history['cancelled'])
        return reached_plot, cancelled_plot
    
    def update_stats(frame):
        if not hasattr(update_stats, 'start_time') and len(target_history['timestamp']) > 0:
            update_stats.start_time = target_history['timestamp'][0]
        elif not hasattr(update_stats, 'start_time'):
            update_stats.start_time = rospy.get_time()
        
        if len(target_history['timestamp']) > 0:
            # Calculate relative timestamps
            rel_timestamps = [t - update_stats.start_time for t in target_history['timestamp']]
            
            # Update plots with latest data
            reached_plot.set_data(rel_timestamps, target_history['reached'])
            cancelled_plot.set_data(rel_timestamps, target_history['cancelled'])
            
            # Adjust x and y limits as needed
            if rel_timestamps:
                ax2.set_xlim(0, max(rel_timestamps) + 5)
                
            max_count = max(max(target_history['reached'] + [1]), 
                             max(target_history['cancelled'] + [1]))
            ax2.set_ylim(0, max_count + 2)
        else:
            current_time = rospy.get_time() - update_stats.start_time
            reached_plot.set_data([0, current_time], [0, 0])
            cancelled_plot.set_data([0, current_time], [0, 0])
        
        fig.canvas.draw_idle()
        
        return reached_plot, cancelled_plot
    
    # Create animation objects with adjusted interval
    position_anim = FuncAnimation(fig, update_position, init_func=init_position,
                                  frames=None, interval=200, blit=True)
    
    stats_anim = FuncAnimation(fig, update_stats, init_func=init_stats,
                               frames=None, interval=200, blit=True)
    
    plt.tight_layout()
    plt.show()

# Visualize robot and environment (old static visualization)
def visualize_robot():
    """Visualize the robot's position, path history, and goal"""
    plt.figure(figsize=(10, 8))
    
    # Set grid and axes
    plt.grid(True)
    plt.xlim(-12, 12)
    plt.ylim(-12, 12)
    plt.xlabel('X')
    plt.ylabel('Y')
    plt.title('Robot Position and Goal')
    
    # Draw path history
    if position_history['x'] and position_history['y']:
        plt.plot(position_history['x'], position_history['y'], 'b-', alpha=0.5, label='Path History')
    
    # Draw goal position
    plt.plot(target_position['x'], target_position['y'], 'r*', markersize=15, label='Goal')
    
    # Draw current robot position
    plt.plot(robot_position['x'], robot_position['y'], 'go', markersize=10, label='Robot')
    
    # Draw laser scan data (if available)
    if regions:
        # Simple obstacle display
        angles = ['right', 'fright', 'front', 'fleft', 'left']
        angle_values = [0, 45, 90, 135, 180]
        
        for i, region in enumerate(angles):
            if regions[region] < 10.0:  # Display only if within max range
                angle_rad = math.radians(angle_values[i])
                obst_x = robot_position['x'] + regions[region] * math.cos(angle_rad)
                obst_y = robot_position['y'] + regions[region] * math.sin(angle_rad)
                plt.plot([robot_position['x'], obst_x], [robot_position['y']], 'r-', alpha=0.3)
                plt.plot(obst_x, obst_y, 'rx', markersize=8)
    
    plt.legend()
    plt.show()

# Update status display
def update_status_display():
    """Update the robot status display"""
    global robot_status
    
    # Get distance to goal
    distance_to_goal = math.sqrt((target_position['x'] - robot_position['x'])**2 + 
                                (target_position['y'] - robot_position['y'])**2)
    
    # Construct status HTML
    status_html = f"""
    <h3>Robot Status</h3>
    <table style="width:100%">
      <tr>
        <td><b>Current Position:</b></td>
        <td>X: {robot_position['x']:.2f}, Y: {robot_position['y']:.2f}</td>
      </tr>
      <tr>
        <td><b>Goal Position:</b></td>
        <td>X: {target_position['x']:.2f}, Y: {target_position['y']:.2f}</td>
      </tr>
      <tr>
        <td><b>Distance to Goal:</b></td>
        <td>{distance_to_goal:.2f} m</td>
      </tr>
      <tr>
        <td><b>Closest Obstacle:</b></td>
        <td>{closest_obstacle:.2f} m</td>
      </tr>
      <tr>
        <td><b>Linear Velocity:</b></td>
        <td>{robot_velocity['linear_x']:.2f} m/s</td>
      </tr>
      <tr>
        <td><b>Angular Velocity:</b></td>
        <td>{robot_velocity['angular_z']:.2f} rad/s</td>
      </tr>
      <tr>
        <td><b>Goal Status:</b></td>
        <td>{"Active" if goal_active else "Inactive"}</td>
      </tr>
      <tr>
        <td><b>Targets Reached:</b></td>
        <td>{target_stats['reached']}</td>
      </tr>
      <tr>
        <td><b>Targets Cancelled:</b></td>
        <td>{target_stats['cancelled']}</td>
      </tr>
    </table>
    """
    
    robot_status.value = status_html
    
    # Publish robot status to topic
    publish_robot_status()

In [12]:
# Update the display - function to update all UI components
def update_display():
    update_status_display()
    global position_anim, stats_anim
    if position_anim and position_anim.event_source:
        position_anim.event_source.start()
    if stats_anim and stats_anim.event_source:
        stats_anim.event_source.start()

if len(position_history['x']) == 0:
    position_history['x'].append(robot_position['x'])
    position_history['y'].append(robot_position['y'])
    timestamp_history.append(rospy.get_time())

# Create UI Layout
controls = widgets.HBox([x_slider, y_slider, set_goal_button, cancel_goal_button])
ui = widgets.VBox([controls, robot_status])

# Start updating display in a background thread
import threading
def update_thread():
    while True:
        update_display()
        time.sleep(0.5)  # Update display every 0.5 seconds

display_thread = threading.Thread(target=update_thread)
display_thread.daemon = True  # Thread will exit when main program exits
display_thread.start()

# Set up animations
setup_animations()

# Display UI
display(ui)

VBox(children=(HBox(children=(FloatSlider(value=1.0, continuous_update=False, description='X:', max=10.0, min=…