In [2]:
os.getcwd()

'/home/davis/deploy/davis/simple_joint_control-pkg/apps/samples/manipulation'

In [None]:
import os
# set the current working directory to the deployed package folder. This is required by isaac.
# This cell should only run once.
os.chdir("/home/davis/deploy/davis/simple_joint_control-pkg")
os.getcwd()

In [None]:
from IPython.display import display
import json
import numpy as np
import time
import threading

from engine.pyalice import Application, Codelet
from engine.pyalice.gui.composite_widget import CompositeWidget
from engine.pyalice import Message, Composite

np.set_printoptions(precision=3)

In [None]:
# A Python codelet for joint control through widget
class JointPositionControl(Codelet):
    def start(self):
        self.rx = self.isaac_proto_rx("CompositeProto", "state")
        self.tx = self.isaac_proto_tx("CompositeProto", "command")

        joints = self.config.joints
        limits = self.config.limits
        self._widget = CompositeWidget(joints, "position", limits)
        if self._widget is None:
            report_failure("Cannot create valid widget")
            return
        display(self._widget.panel)

        self.tick_periodically(0.1)
        
    def get_joints(self):
        return Composite.parse_composite_message(self._widget.composite)

    def tick(self):
        state_msg = self.rx.message
        if state_msg is None:
            return
        self._widget.composite = state_msg
    
        self.tx._msg = self._widget.composite
        if self.tx._msg is not None:
            self.tx.publish()

In [None]:
# A Python codelet for printing messages
class PrinterCodelet(Codelet):
    def start(self):
        self.rx = self.isaac_proto_rx("CompositeProto", "ping")
        self.tx = self.isaac_proto_tx("CompositeProto", "pong")
        
        self.tick_on_message(self.rx)
        
    def tick(self):
        print('pinged by server')

In [None]:
app = Application(name="isaac_client")

# load subgraphcs
app.load(filename="packages/planner/apps/multi_joint_lqr_control.subgraph.json", prefix="lqr")
app.load(filename="packages/navsim/apps/navsim_tcp.subgraph.json", prefix="simulation")
app.load(filename="apps/samples/manipulation/IsaacClient.json", prefix="client")

# edges
simulation_node = app.nodes["simulation.interface"]
lqr_interface = app.nodes["lqr.subgraph"]["interface"]
app.connect(simulation_node["output"], "joint_state", lqr_interface, "joint_state")
app.connect(lqr_interface, "joint_command", simulation_node["input"], "joint_position")

# configs
app.nodes["lqr.kinematic_tree"]["KinematicTree"].config.kinematic_file = kinematic_file
lqr_planner = app.nodes["lqr.local_plan"]["MultiJointLqrPlanner"]
lqr_planner.config.speed_min = [-1.0] * len(joints)
lqr_planner.config.speed_max = [1.0] * len(joints)
lqr_planner.config.acceleration_min = [-1.0] * len(joints)
lqr_planner.config.acceleration_max = [1.0] * len(joints)

# add pycodelet JointPositionControl
widget_node = app.add("command_generator")
joint_commander = widget_node.add(JointPositionControl, 'position_control')
joint_commander.config.joints = joints
joint_commander.config.limits = [[-2*np.pi, 2*np.pi]] * len(joints)
app.connect(joint_commander, "command", lqr_interface, "joint_target")
app.connect(simulation_node["output"], "joint_state", joint_commander, "state")
             
app.start()