In [None]:
import sys
sys.path.append('/usr/local/lib/python3.8/site-packages')

In [None]:
import select
import threading
import time
from time import sleep
import numpy as np
import pinocchio as pin
from FR3Py.lcm_msgs.fr3_states import fr3_state
from FR3Py.lcm_msgs.fr3_commands import fr3_cmd
import lcm

class FR3Real:
    def __init__(
        self,
        robot_name = 'franka',
        interface_type = 'joint_velocity',
    ):
        
        self.state = None
        self.trigger_timestamp = 0
        self.robot_name = robot_name
        self.state_topic_name = f'{robot_name}_state'
        self.command_topic_name = f'{robot_name}_command'
        self.states_msg = fr3_state()
        self.command_msg = fr3_cmd()
        self.user_callback = None
        # Threading Interface for handleing LCM
        self.lc = lcm.LCM("udpm://239.255.76.67:7667?ttl=1")
        self.subscription = self.lc.subscribe(self.state_topic_name, self.update)
        self.subscription.set_queue_capacity(1)
        self.running = True
        self.lcm_thread = threading.Thread(target=self.LCMThreadFunc)
        self.lcm_thread.start()
        sleep(0.2)
        print("Interface Running...")

    def LCMThreadFunc(self):
        while self.running:
            rfds, wfds, efds = select.select([self.lc.fileno()], [], [], 0.5)
            if rfds: # Handle only if there are data in the interface file
                self.lc.handle()

    def update(self, channel, data):
        msg = fr3_state.decode(data)
        self.trigger_timestamp = np.array(msg.timestamp) / 1000000
        q = np.hstack([msg.q,np.zeros((2))])
        dq = np.hstack([msg.dq,np.zeros((2))])
        T = np.hstack([msg.T,np.zeros((2))])
        self.joint_state = {
            'q':q,
            'dq':dq,
            'T':T
        }
        # Call an arbitrary user callback 
        if self.user_callback is not None:
            self.user_callback(self.joint_state)
        
    def readStates(self):
        if time.time()-self.trigger_timestamp > 0.2:
            self.state = None
            return None
        else:
            return self.joint_state

    def setCommand(self, cmd):
        self.command_msg.timestamp = int(self.trigger_timestamp*1000000)
        self.command_msg.cmd = cmd.tolist()
        self.lc.publish(self.command_topic_name, self.command_msg.encode())
        self.cmd_log = cmd

    def close(self):
        self.running = False
        self.lcm_thread.join()
        self.lc.unsubscribe(self.subscription)
        del self.lc
        print("Interface Closed.")        

In [None]:
# from FR3Py.interfaces import FR3Real
robot = FR3Real()

In [None]:
robot.readStates()

In [None]:
while True:
    sleep(0.001)
    robot.setCommand(np.zeros(7))

In [None]:
from FR3Py.controllers.utils import RobotModel

In [None]:
model = RobotModel()

In [None]:
import numpy as np
model.getInfo(np.zeros(9), np.zeros(9)).keys()

In [None]:
from scipy.spatial.transform import Rotation
import copy
from FR3Py.solvers.qp_solver import QPSolver
import numpy as np
import numpy.linalg as LA
from scipy.spatial.transform import Rotation as R


def axis_angle_from_rot_mat(rot_mat):
    rotation = R.from_matrix(rot_mat)
    axis_angle = rotation.as_rotvec()
    angle = LA.norm(axis_angle)
    axis = axis_angle / angle
    return axis, angle

def get_R_end_from_start(x_ang, y_ang, z_ang, R_start):
    """Get target orientation based on initial orientation"""
    _R_end = (
        R.from_euler("x", x_ang, degrees=True).as_matrix()
        @ R.from_euler("y", y_ang, degrees=True).as_matrix()
        @ R.from_euler("z", z_ang, degrees=True).as_matrix()
        @ R_start
    )
    R_end = R.from_matrix(_R_end).as_matrix()
    return R_end

class WaypointController:
    def __init__(self):
        
        # define solver
        self.robot = RobotModel()
        self.solver = QPSolver(9)
        self.initialized = False

    def compute(self, q, dq):
        #Get the robot paramters for the given state
        info = self.robot.getInfo(q, dq)

        if not self.initialized:
            # get initial rotation and position
            self.R_start, _p_start = info["R_EE"], info["P_EE"]
            self.p_start = _p_start[:, np.newaxis]

            # get target rotation and position
            self.p_end = np.array([[1.2], [0], [0.35]])
            self.R_end = get_R_end_from_start(0, -90, 0, self.R_start)
            self.movement_duration = 10.0

            # compute R_error, ω_error, θ_error
            self.R_error = self.R_end @ self.R_start.T
            self.ω_error, self.θ_error = axis_angle_from_rot_mat(self.R_error)
            self.initialized = True

        # get end-effector position
        p_current = info["P_EE"][:, np.newaxis]

        # get end-effector orientation
        R_current = info["R_EE"]

        # get Jacobians from info
        pinv_jac = info["pJ_HAND"]
        jacobian = info["J_HAND"]

        # compute joint-centering joint acceleration
        dq_nominal = 0.5 * (self.robot.q_nominal - q[:, np.newaxis])

        # compute error rotation matrix
        R_err = self.R_end @ R_current.T

        # compute orientation error in axis-angle form
        rotvec_err = Rotation.from_matrix(R_err).as_rotvec()

        # compute EE position error
        p_error = np.zeros((6, 1))
        p_error[:3] = self.p_end - p_current
        p_error[3:] = rotvec_err[:, np.newaxis]

        # compute EE velocity target
        dp_target = np.zeros((6, 1))
        params = {
            "Jacobian": jacobian,
            "p_error": p_error,
            "p_current": p_current,
            "dp_target": dp_target,
            "Kp": 0.1 * np.eye(6),
            "dq_nominal": dq_nominal,
            "nullspace_proj": np.eye(9) - pinv_jac @ jacobian,
        }

        # solver for target joint velocity
        self.solver.solve(params)
        dq_target = self.solver.qp.results.x
        return dq_target

In [1]:
import time
import numpy as np
from FR3Py.controllers.waypoint_controller import WaypointController
from FR3Py.envs.pybullet import FR3Sim

env = FR3Sim(render_mode='human', mode='velocity')
env.reset()
controller = WaypointController()
while True:
    q, dq = env.get_state()
    dq_cmd = controller.compute(q, dq)
    env.step(dq_cmd)
    time.sleep(0.001)

pybullet build time: May 20 2022 19:44:17


interface control mode is: velocity
startThreads creating 1 threads.
starting thread 0
started thread 0 
argc=2
argv[0] = --unused
argv[1] = --start_demo_name=Physics Server
ExampleBrowserThreadFunc started
X11 functions dynamically loaded using dlopen/dlsym OK!
X11 functions dynamically loaded using dlopen/dlsym OK!
Creating context
Created GL 3.3 context
Direct GLX rendering context obtained
Making context current
GL_VENDOR=Intel
GL_RENDERER=Mesa Intel(R) Graphics (RKL GT1)
GL_VERSION=4.6 (Core Profile) Mesa 21.2.6
GL_SHADING_LANGUAGE_VERSION=4.60
pthread_getconcurrency()=0
Version = 4.6 (Core Profile) Mesa 21.2.6
Vendor = Intel
Renderer = Mesa Intel(R) Graphics (RKL GT1)
b3Printf: Selected demo: Physics Server
startThreads creating 1 threads.
starting thread 0
started thread 0 
MotionThreadFunc thread started
ven = Intel
Workaround for some crash in the Intel OpenGL driver on Linux/Ubuntu
ven = Intel
Workaround for some crash in the Intel OpenGL driver on Linux/Ubuntu


KeyboardInterrupt: 

: 