<a href="https://colab.research.google.com/github/MuhammadHaekal20/UTS-Robotika-dan-Sistemcerdas/blob/main/controller.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
from sensor_msgs.msg import JointState
from std_srvs.srv import Empty
from controller_manager_msgs.srv import ListControllers
from controller_manager_msgs.srv import SwitchController


class MyRobotController(Node):
    def __init__(self):
        super().__init__('my_robot_controller')

        # Subscribe to joint states
        self.subscription = self.create_subscription(
            JointState,
            'joint_states',
            self.joint_states_callback,
            10)
        self.subscription  # prevent unused variable warning

        # Publish velocity commands
        self.publisher = self.create_publisher(Twist, 'cmd_vel', 10)

        # Wait for controller_manager services to be available
        self.get_logger().info('Waiting for controller_manager services...')
        self.list_controllers_client = self.create_client(ListControllers, '/controller_manager/list_controllers')
        while not self.list_controllers_client.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('controller_manager services not available, waiting again...')
        self.switch_controller_client = self.create_client(SwitchController, '/controller_manager/switch_controller')
        while not self.switch_controller_client.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('controller_manager services not available, waiting again...')
        self.get_logger().info('controller_manager services are now available.')

        # Switch to joint velocity controllers
        self.get_logger().info('Switching to joint velocity controllers...')
        self.switch_controllers(['joint1_velocity_controller', 'joint2_velocity_controller'], [], True)

    def joint_states_callback(self, msg):
        # Process joint states
        pass

    def switch_controllers(self, start_controllers, stop_controllers, strictness):
        # Switch controllers using controller_manager services
        request = SwitchController.Request()
        request.start_controllers = start_controllers
        request.stop_controllers = stop_controllers
        request.strictness = strictness
        future = self.switch_controller_client.call_async(request)
        rclpy.spin_until_future_complete(self, future)

        if future.result() is not None:
            response = future.result()
            self.get_logger().info('Switched controllers: %s', response.ok)
        else:
            self.get_logger().error('Failed to switch controllers')

    def send_command(self, linear, angular):
        # Send velocity command
        msg = Twist()
        msg.linear.x = linear
        msg.angular.z = angular
        self.publisher.publish(msg)


def main(args=None):
    rclpy.init(args=args)

    node = MyRobotController()

    # Send velocity commands
    node.send_command(0.1, 0.0)
    rclpy.spin(node)

    # Stop controllers
    node.switch_controllers([], ['joint1_velocity_controller', 'joint2_velocity_controller'], True)

    # Cleanup
    node.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()
