# UR Arm ROS2 Control

## Joint Control

First you need to save your Joints Module.  The UR tells its current joint states on the topic "joint_states" and lets you set the angles with the action "/follow_joint_trajectory".

In [1]:
%%writefile Joints.py

import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from sensor_msgs.msg import JointState
from builtin_interfaces.msg import Duration
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from control_msgs.action import FollowJointTrajectory
import math
import time

class Joints(rclpy.node.Node):
    """
    Subscribe to the joint_states topic.
    """

    def __init__(self):
        super().__init__("joint_state_subscriber")
        print("Initializing MyUR3e...")
        self.subscription = self.create_subscription(JointState, "joint_states", self.joints_callback, 10)
        self._action_client = ActionClient(self, FollowJointTrajectory, "scaled_joint_trajectory_controller/follow_joint_trajectory")
        if not self._action_client.wait_for_server(timeout_sec=10):
            raise RuntimeError(
                "Action server not available after waiting for 10 seconds. Check ROS UR Driver."
            )
        self.states = None
        self.done = False

    def joints_callback(self, msg):
        self.states = {"name": msg.name, "position": msg.position, "velocity": msg.velocity, "effort": msg.effort}
        self.read_joints = True
        #    "name": [msg.name[5]] + msg.name[0:5],
        #    "position": [msg.position[5]] + msg.position[0:5].tolist(),
        #    "velocity": [msg.velocity[5]] + msg.velocity[0:5].tolist(),
        #    "effort": [msg.effort[5]] + msg.effort[0:5].tolist(),}

    def read(self):
        self.spin_and_wait()
        return self.states

    def move_once(self, joint_positions, time = 5, degrees = True):
        trajectory = JointTrajectory()
        trajectory.joint_names = ["shoulder_pan_joint","shoulder_lift_joint","elbow_joint","wrist_1_joint","wrist_2_joint","wrist_3_joint"]
        trajectory.points.append(self.set_waypoint(joint_positions, time, degrees))
        self.move_arm(trajectory)

    def move_many(self, q_matrix, time, degrees = True):
        trajectory = JointTrajectory()
        trajectory.joint_names = ["shoulder_pan_joint","shoulder_lift_joint","elbow_joint","wrist_1_joint","wrist_2_joint","wrist_3_joint"]
        for q in q_matrix:
            next_time =  time + i * (time / (len(q_matrix) - 1))
            trajectory.points.append(self.set_waypoint(q, next_time, degrees))
        self.move_arm(trajectory)

    def set_waypoint(self, joint_positions, time = 5, degrees = True):
        point = JointTrajectoryPoint()
        if degrees: 
            joint_positions = [math.radians(q) for q in joint_positions]
        point.positions =  joint_positions
        point.time_from_start = Duration(sec = int(time), nanosec = int(time % 1 * 1000000000))
        return point
        
    def move_arm(self, trajectory):
        goal_msg = FollowJointTrajectory.Goal()
        goal_msg.trajectory = trajectory

        self._action_client.wait_for_server()
        self._set_goal = self._action_client.send_goal_async(goal_msg)
        self._set_goal.add_done_callback(self.goal_request_callback)
        print("Sent trajectory ")
        self.spin_and_move()
        
    def goal_request_callback(self, future): 
        '''
        run this when the action server responds with either go or no-go and set up another callback for when the action is done
        '''
        goal_handle = future.result()
        if not goal_handle.accepted:
            print('Goal rejected :(')
            self.done = True
            return
        print('Goal accepted :)')
        # goal accepted - now wait for it to be executed
        self._when_done = goal_handle.get_result_async()
        self._when_done.add_done_callback(self.get_result_callback)

    def get_result_callback(self, future):
        '''
        run this when the action is done
        '''        
        result = future.result().result
        print('Result: {0}'.format(result))
        self.done = True
        
    def stop(self):
        positions = self.read()
        point = JointTrajectoryPoint()
        point.positions = positions
        point.time_from_start = Duration(sec=0, nanosec=100000000)
        trajectory.points.append(point)
        self.move_arm(trajectory)

    def spin_and_wait(self):
        self.read_joints = False
        while not self.read_joints:
            rclpy.spin_once(self, timeout_sec = 0.1)
        self.read_joints = False

    def spin_and_move(self):
        rclpy.spin_until_future_complete(self, self._set_goal)
        while not hasattr(self, '_when_done'):
            time.sleep(0.1)
            rclpy.spin_once(self)  # Process any callbacks
        
        rclpy.spin_until_future_complete(self, self._when_done)
        for _ in range(10):  # Spin a few extra times to process any pending callbacks
            rclpy.spin_once(self)
            time.sleep(0.01)


Writing Joints.py


And a sample code

In [8]:
import rclpy
from Joints import Joints
import time

rclpy.init()
j = Joints()
print(j.read())
j.move_once([-90,-90,-10,0,0,45])

j.destroy_node()
rclpy.shutdown()

Initializing MyUR3e...
{'name': ['shoulder_lift_joint', 'elbow_joint', 'wrist_1_joint', 'wrist_2_joint', 'wrist_3_joint', 'shoulder_pan_joint'], 'position': array('d', [4.454249999952253e-06, -0.1745413839817047, 3.0237385253428783e-06, 1.7644564650254324e-05, 0.7854048609733582, -1.5707915464984339]), 'velocity': array('d', [0.0, -0.0, 0.0, 0.0, -0.0, 0.0]), 'effort': array('d', [-2.207540512084961, -1.1266224384307861, -0.19081313908100128, 0.13154640793800354, -0.038486212491989136, -0.08039316534996033])}
Sent trajectory 
Goal accepted :)
Result: control_msgs.action.FollowJointTrajectory_Result(error_code=0, error_string='Goal successfully reached!')


In [5]:
import requests
import json

# Assemble the URL for the API call

api_url = "https://cad.onshape.com/api/v10/assemblies/d/1daa84665ebf3705bcd3c52f/w/6e9de45d2b7a6c2ebacbc588/e/ab80c4f3562f1d7ccc7818af/matevalues"

# Optional query parameters can be assigned
params = {}

# Use the keys from the developer portal
access_key = "GYWbvkA2VuMPXkHN0cYaJ4DM"
secret_key = "8JlSQeqyxH4a22TmcwuD6REfsBjSMejWQrEIWJlFZIoQfGTY"

# Define the header for the request
headers = {
    "Accept": "application/json;charset=UTF-8;qs=0.09",
    "Content-Type": "application/json",
}

# Putting everything together to make the API request
response = requests.get(
    api_url, params=params, auth=(access_key, secret_key), headers=headers
)


def find_onshape_angles():
    response = requests.get(
        api_url, params=params, auth=(access_key, secret_key), headers=headers
    )
    data = response.json()
    return data


def update_onshape_angles(joint_angles, onshape_angles):
    """
    Update Onshape joint angles (rotationZ) based on joint position values.
    
    Args:
        joint_angles (dict): Dictionary containing joint names and position values
        onshape_angles (list): List of Onshape joint dictionaries to be updated
        
    Returns:
        list: Updated Onshape angles list
    """
    # Create a mapping of joint names to their position values
    joint_positions = {}
    for i, name in enumerate(joint_angles['name']):
        joint_positions[name] = joint_angles['position'][i]
    
    # Update the rotationZ value in each Onshape joint that has a matching name
    updated_onshape_angles = []
    for joint in onshape_angles:
        mate_name = joint['mateName']
        if mate_name in joint_positions:
            # Create a new joint dictionary with updated rotationZ
            updated_joint = joint.copy()
            updated_joint['rotationZ'] = float(joint_positions[mate_name])
            updated_onshape_angles.append(updated_joint)
        else:
            # Keep the original joint if no matching name is found
            updated_onshape_angles.append(joint.copy())
    
    return updated_onshape_angles

def move_onshape_arm(joint_angles):
    data = find_onshape_angles()

    a = update_onshape_angles(joint_angles, data["mateValues"])
    print("%%%%")
    b = {'mateValues': a}
    response  = requests.post(api_url, params=params, auth=(access_key, secret_key), headers=headers, data=json.dumps(b))
    print(response)
    '''
    for i, value in enumerate(angles):
        print(names[i], value)

    
    for i in enumerate(data["mateValues"]):
        print(joint_angles[i[0]], data["mateValues"][i[0]])

    
    for mates in data["mateValues"]:
        #angle = input()
        angle = 0
        mates["rotationZ"] = angle
    
    #print(mates["mateName"], ":", mates["rotationZ"])
    requests.post(api_url, params=params, auth=(access_key, secret_key), headers=headers, data=json.dumps(data))
    '''


In [27]:
import rclpy
from Joints import Joints
import time

rclpy.init()
j = Joints()
#print(j.read())
j.move_once([0,0,-0,-0,0,0])
move_onshape_arm(j.read())

rclpy.shutdown()


Initializing MyUR3e...
Sent trajectory 
Goal accepted :)
Result: control_msgs.action.FollowJointTrajectory_Result(error_code=0, error_string='Goal successfully reached!')
%%%%
<Response [200]>
