### Imports

In [1]:
import bosdyn.client
import bosdyn.client.util
import bosdyn.client.robot_command as bdcrc
from bosdyn.client.robot_command import RobotCommandBuilder
from bosdyn.client.robot_command import RobotCommandClient
from bosdyn.client.image import ImageClient
from bosdyn.client.manipulation_api_client import ManipulationApiClient

from bosdyn.util import seconds_to_duration
from bosdyn.geometry import EulerZXY

from bosdyn.client.world_object import WorldObjectClient
from bosdyn.client.frame_helpers import get_a_tform_b, get_vision_tform_body, BODY_FRAME_NAME, VISION_FRAME_NAME

from bosdyn.api import (arm_command_pb2, geometry_pb2, robot_command_pb2, synchronized_command_pb2,
                        trajectory_pb2, world_object_pb2, manipulation_api_pb2, image_pb2, estop_pb2, robot_state_pb2)

import cv2
import time
import numpy as np
import matplotlib.pyplot as plt
from copy import deepcopy

from PIL import Image
import io

from bosdyn.client.graph_nav import GraphNavClient
from bosdyn.api.graph_nav import graph_nav_pb2
from bosdyn.api.graph_nav import map_pb2
from bosdyn.api.graph_nav import nav_pb2
from bosdyn.client.frame_helpers import get_odom_tform_body
from bosdyn.client.exceptions import ResponseError
from bosdyn.api import image_pb2

### Setup

In [2]:
def initRobot(IP):
    #Setup
    sdk = bosdyn.client.create_standard_sdk('ASTRO')
    robot = sdk.create_robot(IP)
    id_client = robot.ensure_client('robot-id')

    #Authenticate
    robot.authenticate('user', 'r4kr99yw3hje')

    #Clients
    robot_state_client = robot.ensure_client('robot-state')
    command_client = robot.ensure_client(RobotCommandClient.default_service_name)
    image_client = robot.ensure_client(ImageClient.default_service_name)
    
    graph_nav_client = robot.ensure_client(GraphNavClient.default_service_name)
    world_object_client = robot.ensure_client(WorldObjectClient.default_service_name)
    manipulation_api_client = robot.ensure_client(ManipulationApiClient.default_service_name)
    
    return sdk, robot, id_client, robot_state_client, command_client, image_client, graph_nav_client, world_object_client, manipulation_api_client

In [3]:
def getBattInfo():
    
    robot_state = robot_state_client.get_robot_state()
    
    battery_state = robot_state.battery_states[0]
    
    chargePercent = battery_state.charge_percentage.value
    
    runtimeRemaining = battery_state.estimated_runtime.seconds
    
    print("Battery at:", chargePercent, "%. Robot has", runtimeRemaining, "seconds left")
    
    return chargePercent, runtimeRemaining

In [4]:
def getLease(robot):
    lease_client = robot.ensure_client('lease')
    lease = lease_client.acquire()
    lease_keep_alive = bosdyn.client.lease.LeaseKeepAlive(lease_client)
    
    return lease_client, lease, lease_keep_alive

In [5]:
def powerOn(robot):
    robot.power_on(timeout_sec=20)
    print(robot.is_powered_on())
    robot.time_sync.wait_for_sync()
    return True

## Image

In [16]:
def capHandImage(image_client, source):
    dtype = None
    channels = None

    if source == 'hand_depth_in_hand_color_frame':
        dtype = np.uint8
        channels = 2
    elif source == 'hand_color_image':
        dtype = np.uint8
        channels = 3
    else:
        print('NOT A SUPPORTED IMAGE SOURCE')
        return None

    image_response = image_client.get_image_from_sources([source])[0]
    img = np.frombuffer(image_response.shot.image.data, dtype=dtype)
    
    if source == 'hand_depth_in_hand_color_frame':
        img = img.reshape((image_response.shot.image.rows, image_response.shot.image.cols, channels))
    else:
        img = cv2.imdecode(img, -1)
    
    return img, image_response

## Movement

In [7]:
def moveRobotRelative(command_client, x_meters, y_meters, theta_rad, timeout_sec = 5, blocking = True):
    frameTreeSnapshot = robot.get_frame_tree_snapshot()
    cmd = RobotCommandBuilder.synchro_trajectory_command_in_body_frame(x_meters, y_meters, theta_rad, frame_tree_snapshot = frameTreeSnapshot)
    cmd_id = cmd_id = command_client.robot_command(cmd, end_time_secs = time.time()+timeout_sec)
    if blocking:
        bdcrc.block_for_trajectory_cmd(command_client, cmd_id)

## Arm

In [8]:
def makeGrasp(grasp_request, graph_nav_client = None, graphExists = True):
    """
    Makes a grasp with arm given a grasp request proto described here:
    https://dev.bostondynamics.com/protos/bosdyn/api/proto_reference#manipulation-api-proto
    
    Uploading a graph to graph_nav_client makes arm camera fail. Grasp WILL NOT WORK if graph 
    is uploaded. First step of method is to clear graph if it exists.
    
    Graph must be reuploaded and position reinitialized later.
    
    Method must be called explicitly with graphExists = False if there is no graph in use.
    
    """
    
    if graphExists:
        graph_nav_client.clear_graph()
    
    cmd_response = manipulation_api_client.manipulation_api_command(
    manipulation_api_request = grasp_request)

    startTimeSec = time.time()
    
    # Get feedback from the robot
    response = None
    
    success = False
    while True:
        feedback_request = manipulation_api_pb2.ManipulationApiFeedbackRequest(
            manipulation_cmd_id=cmd_response.manipulation_cmd_id)

        # Send the request
        response = manipulation_api_client.manipulation_api_feedback_command(
            manipulation_api_feedback_request=feedback_request)

#         print('Current state: ',
#               manipulation_api_pb2.ManipulationFeedbackState.Name(response.current_state))

        if response.current_state == manipulation_api_pb2.MANIP_STATE_GRASP_SUCCEEDED:
            success = True
            break

        if response.current_state == manipulation_api_pb2.MANIP_STATE_GRASP_FAILED \
            or response.current_state == manipulation_api_pb2.MANIP_STATE_GRASP_PLANNING_NO_SOLUTION:
            break
            
        curTimeSec = time.time()
        
        if curTimeSec - startTimeSec > 15:
            print('GRASP TOOK TOO LONG, STOPPING NOW')
            response.current_state == manipulation_api_pb2.MANIP_STATE_GRASP_FAILED
            break
            
        time.sleep(0.25)

    #creates carry override
    carry_override_rq = manipulation_api_pb2.ApiGraspedCarryStateOverride(override_request = 3)

    #creates override
    override_rq = manipulation_api_pb2.ApiGraspOverrideRequest(
        carry_state_override = carry_override_rq)

    #sends override so object can be stowed and carried
    override_response = manipulation_api_client.grasp_override_command(grasp_override_request = override_rq)
    
    cmd = RobotCommandBuilder.arm_stow_command()
    cmd_id = command_client.robot_command(cmd, end_time_secs = time.time()+5)
    
    return response, success

In [9]:
def moveToAndLookAt(moveTo, lookAt, blocking = True):
    """Takes in two, 3-element array like types. 
        First is the coordinates for the gripper head location.
        Second is the coordinates for the gripper head to look at.
        Blocking argument will pause code until the move command is finished
    """

    arm_pos_list = [moveTo[0],moveTo[1],moveTo[2]]
    dur_sec = 3

    duration = seconds_to_duration(dur_sec)


    arm_pos = geometry_pb2.Vec3(x=arm_pos_list[0], 
                                y=arm_pos_list[1], 
                                z=arm_pos_list[2])
    hand_pose = geometry_pb2.SE3Pose(position=arm_pos)
    hand_pose_traj_point = trajectory_pb2.SE3TrajectoryPoint(pose=hand_pose,
                                                             time_since_reference=duration)
    hand_traj = trajectory_pb2.SE3Trajectory(points=[hand_pose_traj_point])


    gaze_pos_list = [lookAt[0],lookAt[1],lookAt[2]]
    gaze_pos = geometry_pb2.Vec3(x=gaze_pos_list[0],
                                 y=gaze_pos_list[1],
                                 z=gaze_pos_list[2])
    gaze_pos_traj_point = trajectory_pb2.Vec3TrajectoryPoint(point=gaze_pos)
    gaze_traj = trajectory_pb2.Vec3Trajectory(points=[gaze_pos_traj_point])

    gaze_cmd = arm_command_pb2.GazeCommand.Request(target_trajectory_in_frame1=gaze_traj,
                                                   frame1_name='body',
                                                   tool_trajectory_in_frame2=hand_traj,
                                                   frame2_name='body')

    arm_command = arm_command_pb2.ArmCommand.Request(arm_gaze_command=gaze_cmd)
    synchronized_command = synchronized_command_pb2.SynchronizedCommand.Request(arm_command=arm_command)
    command = robot_command_pb2.RobotCommand(synchronized_command=synchronized_command)
    gaze_command_id = command_client.robot_command(command)
    if(blocking):
        try:
            bdcrc.block_until_arm_arrives(command_client, gaze_command_id, timeout_sec = time.time() + 3.0)
        except:
            print('ARM already at desired location and orientation')

### Graph Nav

In [10]:
# From "graph_nav_command_line.py"
# https://github.com/boston-dynamics/spot-sdk/blob/master/python/examples/graph_nav_command_line/graph_nav_command_line.py
def upload_graph_and_snapshots(upload_filepath, 
                               current_graph, 
                               graph_nav_client, 
                               current_waypoint_snapshots, 
                               current_edge_snapshots):
    """Upload the graph and snapshots to the robot."""
    print("Loading the graph from disk into local storage...")
    with open(upload_filepath + "/graph", "rb") as graph_file:
        # Load the graph from disk.
        data = graph_file.read()
        current_graph = map_pb2.Graph()
        current_graph.ParseFromString(data)
        print("Loaded graph has {} waypoints and {} edges".format(
            len(current_graph.waypoints), len(current_graph.edges)))
        
    for waypoint in current_graph.waypoints:
        # Load the waypoint snapshots from disk.
        with open(upload_filepath + "/waypoint_snapshots/{}".format(waypoint.snapshot_id),
                  "rb") as snapshot_file:
            waypoint_snapshot = map_pb2.WaypointSnapshot()
            waypoint_snapshot.ParseFromString(snapshot_file.read())
            current_waypoint_snapshots[waypoint_snapshot.id] = waypoint_snapshot
            
    for edge in current_graph.edges:
        if len(edge.snapshot_id) == 0:
            continue
        # Load the edge snapshots from disk.
        with open(upload_filepath + "/edge_snapshots/{}".format(edge.snapshot_id),
                  "rb") as snapshot_file:
            edge_snapshot = map_pb2.EdgeSnapshot()
            edge_snapshot.ParseFromString(snapshot_file.read())
            current_edge_snapshots[edge_snapshot.id] = edge_snapshot
    # Upload the graph to the robot.
    print("Uploading the graph and snapshots to the robot...")
    true_if_empty = not len(current_graph.anchoring.anchors)
    response = graph_nav_client.upload_graph(lease=lease.lease_proto,
                                                   graph=current_graph,
                                                   generate_new_anchoring=true_if_empty)
     
    # Upload the snapshots to the robot.
    for snapshot_id in response.unknown_waypoint_snapshot_ids:
        waypoint_snapshot = current_waypoint_snapshots[snapshot_id]
        graph_nav_client.upload_waypoint_snapshot(waypoint_snapshot)
        print("Uploaded {}".format(waypoint_snapshot.id))
    for snapshot_id in response.unknown_edge_snapshot_ids:
        edge_snapshot = current_edge_snapshots[snapshot_id]
        graph_nav_client.upload_edge_snapshot(edge_snapshot)
        print("Uploaded {}".format(edge_snapshot.id))

    # The upload is complete! Check that the robot is localized to the graph,
    # and if it is not, prompt the user to localize the robot before attempting
    # any navigation commands.
    localization_state = graph_nav_client.get_localization_state()
    if not localization_state.localization.waypoint_id:
        # The robot is not localized to the newly uploaded graph.
        print("\n")
        print("Upload complete! The robot is currently not localized to the map; please localize")
    return response

In [11]:
# From "graph_nav_command_line.py"
# https://github.com/boston-dynamics/spot-sdk/blob/master/python/examples/graph_nav_command_line/graph_nav_command_line.py
def set_initial_localization_fiducial(state_client, graph_nav_client):
    """Trigger localization when near a fiducial."""
    robot_state = state_client.get_robot_state()
    current_odom_tform_body = get_odom_tform_body(
        robot_state.kinematic_state.transforms_snapshot).to_proto()
    
    # Create an empty instance for initial localization since we are asking it to localize
    # based on the nearest fiducial.
    localization = nav_pb2.Localization()
    graph_nav_client.set_localization(initial_guess_localization=localization,
                                            ko_tform_body=current_odom_tform_body)

In [12]:
# From "graph_nav_command_line.py"
# https://github.com/boston-dynamics/spot-sdk/blob/master/python/examples/graph_nav_command_line/graph_nav_command_line.py
def check_success(graph_nav_client, command_id=-1):
    """Use a navigation command id to get feedback from the robot and sit when command succeeds."""
    if command_id == -1:
        # No command, so we have no status to check.
        return False
    status = graph_nav_client.navigation_feedback(command_id)
    if status.status == graph_nav_pb2.NavigationFeedbackResponse.STATUS_REACHED_GOAL:
        # Successfully completed the navigation commands!
        return True
    elif status.status == graph_nav_pb2.NavigationFeedbackResponse.STATUS_LOST:
        print("Robot got lost when navigating the route, the robot will now sit down.")
        return True
    elif status.status == graph_nav_pb2.NavigationFeedbackResponse.STATUS_STUCK:
        print("Robot got stuck when navigating the route, the robot will now sit down.")
        return True
    elif status.status == graph_nav_pb2.NavigationFeedbackResponse.STATUS_ROBOT_IMPAIRED:
        print("Robot is impaired.")
        return True
    else:
        # Navigation command is not complete yet.
        return False

In [13]:
def waitUntilFinishedNav(nav_to_cmd_id):
    """
    Takes in a navigation command id. 
    Blocks until command is finished executing as determined by check_success.
    """
    is_finished = False
    prevTimeoutTimerSec = time.time()
    while(not is_finished):
        curTime = time.time()
        if(curTime - prevTimeoutTimerSec > timeout_sec):
            break
        time.sleep(0.2)
        is_finished = check_success(graph_nav_client, nav_to_cmd_id)

In [14]:
def spinInitFiducial(command_client, graph_nav_client, robot_state_client, timeout_sec = 5):
    
    """
    Rotates + moves in 30˚ and 0.5m increments to find a fiducial and initialize graph.
    Returns True if fiducial found and graph initialized. Returns False otherwise.
    """

    bdcrc.blocking_stand(command_client, timeout_sec = timeout_sec)
    
    for i in range(3):
        for j in range(4):
            print("attempt #: ", i*4+j)
            try:
                set_initial_localization_fiducial(robot_state_client, graph_nav_client)
                print("SUCCESS: initialized location to using nearest fiducial")
                return True
            except ResponseError as e:
                pass
            moveRobotRelative(0, 0, np.pi/6)
        moveRobotRelative(0, 0.5, 0)
    print("FAILED: could not find nearby fiducial")
    return False

In [15]:
def graphInitBlank(graph_nav_client, upload_filepath):
    
    """
    Uploads graph, initializes graph using spinInitFiducial.
    Creates and returns ordered list of waypoints.
    """
    
    current_graph = None
    current_waypoint_snapshots = dict()
    current_edge_snapshots = dict()
    
    response = upload_graph_and_snapshots(upload_filepath, current_graph, 
                               graph_nav_client, current_waypoint_snapshots, current_edge_snapshots)

    spinInitFiducial(command_client, graph_nav_client, robot_state_client)

    #Gets the current graph and reads off waypoints
    current_graph = graph_nav_client.download_graph()
    waypointList = [(waypoint.annotations.name,waypoint.id) for waypoint in current_graph.waypoints]
    waypointDict = dict()
    #make a dict and exclude default waypoints
    for waypointName, waypointId in waypointList:
        if(waypointName == 'default'):
            continue
        waypointDict[int(waypointName[9:])] = waypointId
    
    return waypointDict

In [None]:
def graphDelGetGraphLoc():
    

In [None]:
def graphInitGraphLoc():
    """"""