In [1]:
import rclpy
from rclpy.node import Node
from controller_interface_msgs.msg import ControllerConfig
from controller_interface_msgs.msg import SetDesiredJointValue
from controller_interface_msgs.msg import UpdateGain
from opt_ctrl_msgs.msg import SetReference
from robot_state_msgs.msg import ExtendedState
from mcp_ep_msgs.msg import MujocoApplyForceIndex
import time
import numpy as np

import subprocess

In [2]:
class ControllerInterfaceNode(Node):
    
    def __init__(self):
        super().__init__('controller_interface_node')
        self.config_publisher_ = self.create_publisher(ControllerConfig, 'controller_config', 10)
        self.desired_joint_value_publisher_ = self.create_publisher(SetDesiredJointValue, 'desired_joint_value', 10)
        self.update_gain_publisher_ = self.create_publisher(UpdateGain, 'update_gain', 10)
        self.mujoco_apply_force_publisher = self.create_publisher(MujocoApplyForceIndex, 'mujoco_apply_force_index', 10)
        
    def change_conroller(self, controller_name):
        msg = ControllerConfig()
        msg.controller_selector = controller_name
        self.config_publisher_.publish(msg)

    def set_desired_joint_value(self, q_d, dq_d, ddq_d):
        msg = SetDesiredJointValue()
        msg.q_d = q_d
        msg.dq_d = dq_d
        msg.ddq_d = ddq_d
        self.desired_joint_value_publisher_.publish(msg)
    
    def update_gain(self, is_update):
        msg = UpdateGain()
        msg.update = is_update
        self.update_gain_publisher_.publish(msg)
        
    def mujoco_apply_force_index_pub(self, face_indices, link_indicies, forces_norm, k_env):
        msg = MujocoApplyForceIndex()
        msg.face_index = face_indices
        msg.link_index = link_indicies
        msg.force_norm = forces_norm
        msg.k_env = k_env
        self.mujoco_apply_force_publisher.publish(msg)
        
    def mujoco_apply_force_index_clear(self):
        msg = MujocoApplyForceIndex()
        msg.face_index = []
        msg.link_index = []
        msg.force_norm = []
        msg.k_env = 0.0
        self.mujoco_apply_force_publisher.publish(msg)
        
        
class MPCControllerInterfaceNode(Node):
    
    def __init__(self):
        super().__init__('mpc_controller_interface_node')
        self.set_reference_publisher_ = self.create_publisher(SetReference, 'set_reference', 10)
        timer_period = 0.001  # seconds
        self.timer = self.create_timer(timer_period, self.timer_callback)
        self.initial_state = None

    def set_initial_state(self, initial_state: ExtendedState):
        # subscribe initial ExtendedState topic
        self.initial_state = initial_state
        self.init_t = self.get_clock().now().nanoseconds / 1e9
        
        
    def timer_callback(self):
        msg = self.circular_trajectory()
        self.set_reference_publisher_.publish(msg)
        
    def circular_trajectory(self, radius=0.05, frequency=1.0):
        msg = SetReference()
        msg.is_q = False
        msg.is_rot = True
        msg.is_pos = True
        
        omega = 2 * np.pi * frequency
        t = self.get_clock().now().nanoseconds / 1e9 - self.init_t
        
        if self.initial_state is None:
            return None
        else:            
            pos_ref = np.array(self.initial_state.ee_pos, dtype=np.float64) + np.array([0.0, radius * (np.cos(omega * t) - 1), radius * np.sin(omega * t)], dtype=np.float64)
            rot_ref = self.initial_state.ee_rot
            msg.pos_ref = pos_ref.tolist()
            msg.rot_ref = rot_ref.tolist()

            
        return msg

    

In [3]:
rclpy.init(args=None)
controller_node = ControllerInterfaceNode()
mpc_node = MPCControllerInterfaceNode()

In [4]:
### Circular trajectory tracking for MPC###
# change controller
controller_name = "PASSIVE_CONTACT_FEEDBACK_MPC"
controller_node.change_conroller(controller_name)

# subscribe initial ExtendedState topic
initial_state = None
def initial_state_callback(msg):
    global initial_state
    initial_state = msg
controller_node.create_subscription(ExtendedState, 'extended_robot_state', initial_state_callback, .)

# wait for initial state
while initial_state is None:
    rclpy.spin_once(controller_node)

mpc_node.set_initial_state(initial_state)

rclpy.spin(mpc_node)

SyntaxError: invalid syntax (498681408.py, line 11)

In [5]:
controller_node.update_gain(True)
controller_name = "TASK_PD_FRIC"
controller_node.change_conroller(controller_name)


## simulation 
q_d = [0.0, 0.378, 0.0, 2.269, 0.0, -0.956, 0.0]
# q_d = [0.5, 0.386, 0.0, 2.29, 0.3, -1.37, 0.3]

## real robot 
# q_d = [0.00, 0.361, 3.14, -2.27, 0.0, 0.951, 1.571]
# q_d = [0.20, 0.561, 3.34, -2.07, 0.2, 1.151, 1.771] 

dq_d = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
ddq_d = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]

controller_node.set_desired_joint_value(q_d, dq_d, ddq_d)

In [5]:
# subprocess.run(["ros2", 'run', 'optimal_control_framework', 'run'])
# subprocess.run(["ros2", 'launch', 'kinova_controller', 'run,launch.py'])

controller_name = "JOINT_PD_FRIC"
# controller_name = "JOINT_NRIC"
# controller_name = "JOINT_PD_FRIC"
# controller_name = "JOINT_PD_1st_FRIC"
# controller_name = "TASK_PD"
# controller_name = "TASK_NRIC"
# controller_name = "GRAVITY_COMPENSATION"
# controller_name = "CONTACT_FEEDBACK_MPC"
# controller_node.update_gain(True)

controller_node.change_conroller(controller_name)

In [105]:
controller_node.update_gain(True)

In [7]:
link_indicies = [7]
face_indicies = [120]
forces_norm = [10.0]
k_env = 150.0
controller_node.mujoco_apply_force_index_pub(face_indicies, link_indicies, forces_norm, k_env)
# 0.154733 -0.0420605   0.68419

In [416]:
controller_node.mujoco_apply_force_index_clear()