In [1]:
# math and robotics
import pathlib
import numpy as np
import spatialmath as sm

# ROS
import actionlib
import rospy
import hrr_common
from hr_recycler_msgs.msg import (
    AdaptiveGraspingAction, AdaptiveGraspingGoal,
    CuttingAction, CuttingGoal,
    ChangeToolAction, ChangeToolGoal,    
    FingerGraspAction, FingerGraspGoal,    
    PushOpenAction, PushOpenGoal,
    VacuumGraspingAction, VacuumGraspingGoal,
    UnscrewAction, UnscrewGoal
)
from hrr_msgs.msg import CalibrateCobotAction, CalibrateCobotGoal

# set printing and plotting options
np.set_printoptions(precision=5, suppress=True)

from hr_recycler_msgs.msg import CuttingAction, CuttingFeedback, CuttingResult, CuttingGoal, SkillResult, CobotState, MaterialType
import actionlib
from hr_recycler_msgs.msg import ToolType

from hr_recycler_msgs.msg import PlannerAction , PlannerGoal
import hrr_common as hrr_cm

In [2]:
rospy.init_node('emergency_lamp_demo')

In [3]:
cobot_ns = hrr_common.ros_utils.fix_prefix(hrr_common.ros_utils.get_param("/cobot_ns"))
change_tool_action_srv_name = hrr_common.ros_utils.get_param(f"{cobot_ns}change_tool_action_srv_name")
calibration_action_srv_name = hrr_common.ros_utils.get_param(f"{cobot_ns}calibration_action_srv_name")
cutting_action_srv_name = hrr_common.ros_utils.get_param(f"{cobot_ns}cutting_action_srv_name")
grinding_action_srv_name = hrr_common.ros_utils.get_param(f"{cobot_ns}grinding_action_srv_name")
pc_opening_action_srv_name = hrr_common.ros_utils.get_param(f"{cobot_ns}pc_opening_action_srv_name")
sensitive_grasping_action_srv_name = hrr_common.ros_utils.get_param(f"{cobot_ns}sensitive_grasping_action_srv_name")
unscrew_action_srv_name = hrr_common.ros_utils.get_param(f"{cobot_ns}unscrew_action_srv_name")
vacuum_pick_place_action_srv_name = hrr_common.ros_utils.get_param(f"{cobot_ns}vacuum_pick_place_action_srv_name")
def get_client(action_topic, ActionClass, timeout_secs=2.0):
    client = actionlib.SimpleActionClient(action_topic, ActionClass)
    client.wait_for_server(timeout=rospy.Duration(timeout_secs))
    return client
def get_result(cl):
    cl.wait_for_result()
    return cl.get_result()

def cancel(cl):
    cl.cancel_all_goals()


### Load robot

In [4]:
import hrr_cobot_robot as hrr_rob
cobot = hrr_rob.HrrCobotControl.from_ros("/hrr_cobot")
cobot.change_tool("nothing")

pybullet build time: May  8 2021 05:48:13


[rosout] some functions are not yet compiled. Expect delays upon first call


True

In [5]:
cobot.change_tool("vacuum")

[rosout] vacuum gripper contains two tips. This contradicts default pipeline atm. Use 1


True

### Dispose Location

### Tool changer manually

In [None]:
cobot.close_tool_changer()

In [None]:
cobot.open_tool_changer()

### Initiate all Clients

In [None]:
vacuum_client = get_client(vacuum_pick_place_action_srv_name, VacuumGraspingAction)
pgrip_client = get_client(finger_grasping_action_srv_name, FingerGraspAction)
tool_change_client = get_client(change_tool_action_srv_name, ChangeToolAction)
shaftgrinder_client=get_client(cutting_action_srv_name, CuttingAction)

# Emergency Lamp disassembly

## Tool Change to Shaftgrinder

In [None]:
tc_goal = ChangeToolGoal()
tc_goal.new_tool.type = tc_goal.new_tool.SHAFT_GRINDER
tool_change_client.send_goal(tc_goal)

## Cutting (6 goals)
cutting.yaml

### Clip 1

In [None]:
goal=CuttingGoal()
goal.surface_normal_start=normalA
goal.surface_normal_end=normalB
goal.start_location=pointA
goal.end_location=pointB
goal.material.material_type=material
goal.timeout=5000
goal.thickness = 0.005 #not important. can be anything

In [None]:
shaftgrinder_client.send_goal(goal)

### Clip 2

In [None]:
goal=CuttingGoal()
goal.surface_normal_start=normalA
goal.surface_normal_end=normalB
goal.start_location=pointA
goal.end_location=pointB
goal.material.material_type=material
goal.timeout=5000
goal.thickness = 0.005 #not important. can be anything

In [None]:
shaftgrinder_client.send_goal(goal)

### Clip 3

In [None]:
goal=CuttingGoal()
goal.surface_normal_start=normalA
goal.surface_normal_end=normalB
goal.start_location=pointA
goal.end_location=pointB
goal.material.material_type=material
goal.timeout=5000
goal.thickness = 0.005 #not important. can be anything

In [None]:
shaftgrinder_client.send_goal(goal)

### Clip 4

In [None]:
goal=CuttingGoal()
goal.surface_normal_start=normalA
goal.surface_normal_end=normalB
goal.start_location=pointA
goal.end_location=pointB
goal.material.material_type=material
goal.timeout=5000
goal.thickness = 0.005 #not important. can be anything

In [None]:
shaftgrinder_client.send_goal(goal)

### Clip 5

In [None]:
goal=CuttingGoal()
goal.surface_normal_start=normalA
goal.surface_normal_end=normalB
goal.start_location=pointA
goal.end_location=pointB
goal.material.material_type=material
goal.timeout=5000
goal.thickness = 0.005 #not important. can be anything

In [None]:
shaftgrinder_client.send_goal(goal)

### Clip 6

In [None]:
goal=CuttingGoal()
goal.surface_normal_start=normalA
goal.surface_normal_end=normalB
goal.start_location=pointA
goal.end_location=pointB
goal.material.material_type=material
goal.timeout=5000
goal.thickness = 0.005 #not important. can be anything

In [None]:
shaftgrinder_client.send_goal(goal)

## Tool Change to Vacuum

In [None]:
tc_goal = ChangeToolGoal()
tc_goal.new_tool.type = tc_goal.new_tool.VACUUM_GRIPPER
tool_change_client.send_goal(tc_goal)

## Vacuum

Human Disassembly of Hg lamp

## Vacuum Middle Cover
vacuum.yaml

In [None]:
vacuum_client.send_goal(vac_goal)

## Tool Change to Shaft grinder

In [None]:
tc_goal = ChangeToolGoal()
tc_goal.new_tool.type = tc_goal.new_tool.SHAFT_GRINDER
tool_change_client.send_goal(tc_goal)

## Cutting Cable (One goal)
cutting.yaml

In [None]:
shaftgrinder_client.send_goal(goal)

## Tool Change to Pneumatic Gripper

In [None]:
tc_goal = ChangeToolGoal()
tc_goal.new_tool.type = tc_goal.new_tool.WSG_50
tool_change_client.send_goal(tc_goal)

## Remove Battery
fingergrasp

In [None]:
pgrip_client.send_goal(pgrip_goal)

## Tool Change to None

In [None]:
tc_goal = ChangeToolGoal()
tc_goal.new_tool.type = tc_goal.new_tool.NONE
tool_change_client.send_goal(tc_goal)

## Sending side

Get access to this PC via ssh-copy-id

```
ssh-copy-id schrottgott@192.168.2.34
```

test secure copy

```
scp cutting.yaml schrottgott@192.168.2.34:~/_fake_ros/cutting.yaml
```

then run ```send_file```

In [40]:
from dataclasses import dataclass
import pathlib

import typing

import rospy
import numpy as np
import yaml


def set_msg_from_dict(data, out):
    for k, v in data.items():
        if isinstance(v, dict):
            python_pointer = [getattr(out, k)]
            set_msg_from_dict(v, python_pointer[0])
        else:
            try:
                setattr(out, k, v)
            except AttributeError:
                pass


# python3 
def get_msg_keys(msg):
    return  list(filter(lambda x: x[0] != '_' and 'serialize' not in x, msg.__dir__()))

# python2
def get_msg_keys(msg):
    return  list(filter(lambda x: x[0] != '_' and 'serialize' not in x, dir(msg)))


def msg_to_dict(msg, data):
    for k in get_msg_keys(msg):
        v = getattr(msg, k)
        if np.isscalar(v) or isinstance(v, str):
            data[k] = v
        else:
            data[k] = dict()
            msg_to_dict(v, data[k])
            if len(data[k]) == 0:
                del data[k]
    return data


def read_yaml(file_name):
    file_name = pathlib.Path(file_name)
    if file_name.exists():
        with open(file_name) as file:
            return yaml.load(file, Loader=yaml.FullLoader)        
    rospy.logerr(f"{file_name} does not exist")



def read_msg(msg, file_name):
    data = read_yaml(file_name)
    if data is not None:
        set_msg_from_dict(data, msg)
        return msg
    rospy.logerr(f"{file_name} does not exist")

import subprocess
    
def send_file(cur_file, remote_user, remote_host, remote_file_path):    
    subprocess.Popen(["scp", cur_file,  "{}@{}:{}".format(remote_user, remote_host, remote_file_path)])
    

def send_dict(data, filename, **kwargs):    
    tmp_file = pathlib.Path("/tmp") / f"{filename}.yaml"
    with open(tmp_file, 'w') as file:
        yaml.dump(data, file)
    send_file(data_file, **kwargs)
    

In [20]:
## SEND data

In [24]:
import subprocess
    
def send_file(cur_file, remote_user, remote_host, remote_file_path):    
    subprocess.Popen(["scp", cur_file,  "{}@{}:{}".format(remote_user, remote_host, remote_file_path)])

send_file(cur_file=r'/home/schrottgott/Downloads/cut_test.yaml', remote_user='schrottgott', remote_host='192.168.2.34', 
          remote_file_path=r"/home/schrottgott/_fake_ros/cutting.yaml")

ssh_askpass: exec(/usr/bin/ssh-askpass): No such file or directory
Host key verification failed.
lost connection


In [22]:
import os
os.getenv('ROS_IP')

'192.168.2.34'

In [16]:
## READ data

In [38]:
from hr_recycler_msgs.msg import CuttingGoal
read_yaml('/home/schrottgott/_fake_ros/cutting_old.yaml')
read_msg(CuttingGoal(), '/home/schrottgott/_fake_ros/cut2.yaml')

ConstructorError: could not determine a constructor for the tag 'tag:yaml.org,2002:python/object/new:geometry_msgs.msg._Vector3Stamped.Vector3Stamped'
  in "/home/schrottgott/_fake_ros/cutting_old.yaml", line 2, column 15

In [35]:
msg = CuttingGoal()
data = msg_to_dict(msg, dict())
tmp_file = "/home/schrottgott/_fake_ros/cut2.yaml"
with open(tmp_file, 'w') as file:
    yaml.dump(data, file)

In [43]:
cdata = pathlib.Path('/home/schrottgott/_fake_ros/cutting_old.yaml')
ldata = cdata.read_text().split('\n')

In [45]:
import re
re.search("!!.*", ldata[1])

<re.Match object; span=(14, 82), match='!!python/object/new:geometry_msgs.msg._Vector3Sta>