In [None]:
# Import Jupyros for integrating Jupyter notebooks with ROS
import jupyros as jr
# Import rospy for writing ROS nodes
import rospy
# Import ipywidgets for interactive widgets in Jupyter notebooks
import ipywidgets as widgets
from ipywidgets import Button, Layout, ButtonStyle, GridBox, VBox, HBox
# Import matplotlib.pyplot for data visualization
from matplotlib import pyplot as plt

# Import actionlib for implementing ROS actions
import actionlib
import actionlib.msg
# Import custom message types
import assignment_2_2022.msg
from nav_msgs.msg import Odometry
from assignment_2_2022.msg import Position_velocity
from sensor_msgs.msg import LaserScan

# Import tf for working with coordinate transformations
import tf
from tf.transformations import quaternion_matrix
# Import numpy for numerical computations
import numpy as np
# Import matplotlib.animation for creating animated plots
from matplotlib.animation import FuncAnimation
import numpy as np
# Import matplotlib for advanced plot customization
import matplotlib as mpl

In [None]:
# Define global variables 
global client
global publisher
global msg
global goal
global goal_cancelled
global goal_reached
global goal_list

# Initialize empty lists for plotting
x_plot = []
y_plot = []
xgoal_plot = []
ygoal_plot = []

#initialize the ROS node
rospy.init_node('node_a')

In [None]:
# Create a publisher to publish messages of type Position_velocity on the "/pos_vel" topic
pub = rospy.Publisher("/pos_vel", Position_velocity, queue_size = 10)

In [None]:
def pub_values(msg):
    
    # Extract position from the received message
    pos = msg.pose.pose.position

    # Extract velocity from the received message
    velocity = msg.twist.twist.linear

    # Create a custom message object
    position_velocity = Position_velocity()

    # Set the values of the custom message
    position_velocity.x=pos.x
    position_velocity.y=pos.y
    position_velocity.v_x=velocity.x
    position_velocity.v_y=velocity.y

    # Publish the custom message on the specified topic
    pub.publish(position_velocity)

In [None]:
jr.subscribe("/odom", Odometry, pub_values)

In [None]:
client = actionlib.SimpleActionClient('/reaching_goal', assignment_2_2022.msg.PlanningAction)

#wait for server
client.wait_for_server()

In [None]:
goal = assignment_2_2022.msg.PlanningGoal()

# Create FloatText widgets for specifying goal coordinates
x = widgets.FloatText(description = "x:")
y = widgets.FloatText(description = "y:")
display(x, y)

# Create send and cancel buttons
send_button = Button(description='SEND GOAL',tooltip='SEND GOAL',layout=Layout(width='20%', height='50px', grid_area='b1'),button_style='success')
cancel_button = Button(description='CANCEL GOAL',tooltip='CANCEL GOAL',layout=Layout(width='20%',height='50px', grid_area='b2'),button_style='danger')
display(send_button,cancel_button)

# Function to execute when the send button is clicked
def send_clicked(b):
    
    # Set the goal coordinates from the FloatText widgets
    goal.target_pose.pose.position.x = x.value
    goal.target_pose.pose.position.y = y.value
         
    # Send the goal to the action server
    client.send_goal(goal)
    
    # Add the goal coordinates to the plot
    xgoal_plot.append(x.value)
    ygoal_plot.append(y.value)
    
    # Disable the input and send button
    x.disabled = True
    y.disabled = True
    send_button.disabled = True
    # Enable the cancel button
    cancel_button.disabled = False

# Function to execute when the cancel button is clicked
send_button.on_click(send_clicked)

def cancel_clicked(b):
    # Cancel the goal
    client.cancel_goal()
    
cancel_button.on_click(cancel_clicked)
        

In [None]:
def update_goals_lists():
    # Declare global variables
    global goal_reached, goal_cancelled
    # Append the goal_reached value to reached_list
    reached_list.append(goal_reached)
    # Append the goal_cancelled value to not_reached_list
    not_reached_list.append(goal_cancelled)

In [None]:
def process_goal_feedback(data):
    # Declare global variables
    
    global goal_reached, goal_cancelled
    
    if data.feedback.stat == "Target reached!":
        # Enable input and start button
        x.disabled = False
        y.disabled = False
        button_start.disabled = False
        button_stop.disabled = True
        
        # Update status and goal list
        status.value = "Goal reached!"
        goal_list.value = goal_list.value + "    Goal reached!\n"
        
        # Increment goal_reached count and update lists
        goal_reached += 1
        update_goals_lists()
        
    elif data.feedback.stat == "Target cancelled!":
        # Enable input and start button
        x.disabled = False
        y.disabled = False
        button_start.disabled = False
        button_stop.disabled = True
        
        # Update status and goal list
        status.value = "Goal cancelled!"
        goal_list.value = goal_list.value + "    Goal cancelled!\n"
        
        # Increment goal_cancelled count and update lists
        goal_canceleld += 1
        update_goals_lists()


In [None]:
# Create FloatText widgets for displaying distance and angle
distance_widget= widgets.FloatText(description = "Distance:", disabled = True)
angle_widget = widgets.FloatText(description = "Angle:", disabled = True)

# Callback function for processing laser scan data
def process_laser_scan(scan):
    # Initialize minimum range and angle variables
    min_range = 100
    angle = 100
    # Iterate through the laser scan data
    for at, x in enumerate(scan.ranges):
        # Find the minimum range that is greater than the range minimum
        if x < min_range and x > scan.range_min:
            min_range = x
            angle = scan.angle_min + scan.angle_increment * at
            
    # Update the distance and angle values in the FloatText widgets
    distance_widget.value = min_range
    angle_widget.value = angle

In [None]:
# Subscribe to the '/scan' topic and call the process_laser_scan function when a message of type LaserScan is received
jr.subscribe('/scan', LaserScan, process_laser_scan)

In [None]:
widgets.HBox([distance_widget, angle_widget], description = "Distance from obstacle")

In [None]:
class Visualiser:
    
    def __init__(self):
        self.x_data, self.y_data = [] , []
    
    def vis_callback(self, data):
        # Append position and velocity data to respective lists
        self.y_data.append(data.pos_y)
        self.x_data.append(data.pos_x)
        # Update FloatText widgets with the received data
        posx.value = data.pos_x
        posy.value = data.pos_y
        velx.value = data.vel_x
        vely.value = data.vel_y
            
    def update_plot(self, frame):
        # Update the plot with the updated data
        self.ln.set_data(self.x_data, self.y_data)
        return self.ln

In [None]:
np_x_plot = np.array(x_plot) 
np_y_plot = np.array(y_plot)

fig = plt.figure()

ax = fig.add_axes([0,0,1,1])
ax.set_xlim(-10, 10)
ax.set_ylim(-10, 10)

# Plot the trajector
ax.plot(np_x_plot,np_y_plot,label='trajectory')
# Scatter plot for goals
ax.scatter(xgoal_plot,ygoal_plot, color='red',label='goal')

ax.set_title("my_robot")
ax.set_xlabel("x")
ax.set_ylabel("y")
ax.legend()

In [None]:
plt.bar(goal_reached, cancelled_goal)

plt.xlabel('goals type')
plt.ylabel('number')
plt.title('Number goals reached/cancelled')

plt.show()

In [None]:
plt.pie(goal_reached, cancelled_goal)
plt.title('Number goals reached/cancelled')
plt.show()