Import Libraries

In [1]:
import rospy
import actionlib
from assignment_2_2023.msg import PlanningAction, PlanningGoal, Pos_Vel, Goal
from nav_msgs.msg import Odometry
from assignment_2_2023.srv import GetTrgtPos
from assignment_2_2023.srv import LastStatus
from assignment_2_2023.msg import Goal, Pos_Vel
from assignment_2_2023.srv import GetTrgtPos, GetTrgtPosResponse
from assignment_2_2023.msg import Goal

In [None]:
class ActionClientNode:
    def __init__(self):
        # Initialize ROS node
        rospy.init_node('action_client_node')
        
        # Create action client node
        self.action_client = actionlib.SimpleActionClient('/reaching_goal', PlanningAction)   
        
        # Subscribe to odometry to get the position
        rospy.Subscriber('/odom', Odometry, self.odom_callback)  #Publish by Gazebo

        # Publish Custom msg
        self.robot_info_pub = rospy.Publisher('/robot_info', Pos_Vel, queue_size=10)
        self.robot_target_pub = rospy.Publisher('/target_info', Goal, queue_size=10)
    def odom_callback(self, odom_msg):
        pos = odom_msg.pose.pose.position
        twist = odom_msg.twist.twist.linear
        msg = Pos_Vel()
        
        msg.robpos_x = pos.x
        msg.robpos_y = pos.y
        msg.robvel_x = twist.x
        msg.robvel_y = twist.y

        self.robot_info_pub.publish(msg)
    def goal(self):
        
        target_x = float(input("Enter the target x-coordinate: "))
        target_y = float(input("Enter the target y-coordinate: "))       
        goal = PlanningGoal()
        goal.target_pose.pose.position.x = target_x
        goal.target_pose.pose.position.y = target_y
        
        target_msg = Goal()
        target_msg.target_x = target_x
        target_msg.target_y = target_y

        self.robot_target_pub.publish(target_msg)        
        self.action_client.send_goal(goal)

        # code about cancel the goal
        cancel_input = input("Do you cancel the goal? (y/n): ")
        if cancel_input.lower() == 'y':
            self.action_client.cancel_goal()
            print("Goal has been canceled.")
            return
        # Wait for the result or cancellation
        self.action_client.wait_for_result()

        # Print the final result
        print("Goal Status:", self.action_client.get_state())
        print("Goal Result:", self.action_client.get_result())


if __name__ == "__main__":
    try:
        client_node = ActionClientNode()

        # Example: Set a goal interactively
        client_node.goal()

        rospy.spin()

    except rospy.ROSInterruptException:
        pass


Enter the target x-coordinate: 10
Enter the target y-coordinate: 5
Do you cancel the goal? (y/n): n


Get Distance Speed

In [None]:
class LastTargetNode:
    def __init__(self):
        rospy.init_node('status_node')

        # set default value of parameter '/window_size' 
        #self.window_size = rospy.get_param('window_size', 10)

        # set service and callback function
        self.robot_status = rospy.Service('/robot_status', LastStatus, self.robot_status)

        # set subscriber
        rospy.Subscriber('/target_data', Goal, self.goal_callback)
        rospy.Subscriber('/robot_data', Pos_Vel, self.botinfo_callback)


        # list of speed
        self.robot_speeds = []

        # initialize the robot's position
        self.bot_position = None

        rospy.spin()
        
        

    def goal_callback(self, target_msg):
        self.GoalPosition = (target_msg.target_x, target_msg.target_y)
        
        
        
    def botinfo_callback(self, Pos_Vel_msg):
        self.bot_position = (Pos_Vel_msg.robpos_x, Pos_Vel_msg.robpos_y)
        robspeed = (Pos_Vel_msg.robvel_x**2 + Pos_Vel_msg.robvel_y**2)**0.5
        self.robot_speeds.append(robspeed) 
        
        
    def robot_status(self):
        # calculate the distance
        if hasattr(self, 'GoalPosition') and hasattr(self, 'bot_position'): 
            distance_to_target = ((self.GoalPosition[0] - self.bot_position[0])**2 + 
                                  (self.GoalPosition[1] - self.bot_position[1])**2)**0.5
        else:
            rospy.logwarn("Target or robot position not available.")
            distance_to_target = 0.0
        
        if len(self.robot_speeds) > 0:
            average_speed = sum(self.robot_speeds[-self.window_size:]) / len(self.robot_speeds[-self.window_size:])
        else:
            rospy.logwarn("No robot speed information available.")
            average_speed = 0.0        
        return average_speed, distance_to_target
if __name__ == "__main__":
    try:
        status_node = LastTargetNode()
    except rospy.ROSInterruptException:
        pass

Last Target calling

In [None]:
class GetLastTarget:
    def __init__(self):
        rospy.init_node('get_last_target')


        rospy.spin()
    def get_last_target_position(self, request):
        if self.latest_goal_msg is not None:
            response = GetTrgtPosResponse()  # Assuming correct service name is GetTrgtPosResponse
            response.target_x = self.latest_goal_msg.target_x
            response.target_y = self.latest_goal_msg.target_y
        else:
            rospy.logwarn("No target information available")
            return None
    def target_callback(self, goal_msg):
        self.latest_goal_msg = goal_msg
        rospy.loginfo("Received Goal message: target_x={}, target_y={}".format(goal_msg.target_x, goal_msg.target_y))

if __name__ == "__main__":
    try:
        get_last_target = GetLastTarget()
    except rospy.ROSInterruptException:
        pass