-
Notifications
You must be signed in to change notification settings - Fork 136
/
spot_wrapper.py
976 lines (847 loc) · 45.6 KB
/
spot_wrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
import time
import math
from bosdyn.client import create_standard_sdk, ResponseError, RpcError
from bosdyn.client.async_tasks import AsyncPeriodicQuery, AsyncTasks
from bosdyn.geometry import EulerZXY
from bosdyn.client.robot_state import RobotStateClient
from bosdyn.client.robot_command import RobotCommandClient, RobotCommandBuilder
from bosdyn.client.graph_nav import GraphNavClient
from bosdyn.client.frame_helpers import get_odom_tform_body
from bosdyn.client.power import safe_power_off, PowerClient, power_on
from bosdyn.client.lease import LeaseClient, LeaseKeepAlive
from bosdyn.client.image import ImageClient, build_image_request
from bosdyn.api import image_pb2
from bosdyn.api.graph_nav import graph_nav_pb2
from bosdyn.api.graph_nav import map_pb2
from bosdyn.api.graph_nav import nav_pb2
from bosdyn.client.estop import EstopClient, EstopEndpoint, EstopKeepAlive
from bosdyn.client import power
from bosdyn.client import frame_helpers
from bosdyn.client import math_helpers
from bosdyn.client.exceptions import InternalServerError
from . import graph_nav_util
import bosdyn.api.robot_state_pb2 as robot_state_proto
from bosdyn.api import basic_command_pb2
from google.protobuf.timestamp_pb2 import Timestamp
front_image_sources = ['frontleft_fisheye_image', 'frontright_fisheye_image', 'frontleft_depth', 'frontright_depth']
"""List of image sources for front image periodic query"""
side_image_sources = ['left_fisheye_image', 'right_fisheye_image', 'left_depth', 'right_depth']
"""List of image sources for side image periodic query"""
rear_image_sources = ['back_fisheye_image', 'back_depth']
"""List of image sources for rear image periodic query"""
class AsyncRobotState(AsyncPeriodicQuery):
"""Class to get robot state at regular intervals. get_robot_state_async query sent to the robot at every tick. Callback registered to defined callback function.
Attributes:
client: The Client to a service on the robot
logger: Logger object
rate: Rate (Hz) to trigger the query
callback: Callback function to call when the results of the query are available
"""
def __init__(self, client, logger, rate, callback):
super(AsyncRobotState, self).__init__("robot-state", client, logger,
period_sec=1.0/max(rate, 1.0))
self._callback = None
if rate > 0.0:
self._callback = callback
def _start_query(self):
if self._callback:
callback_future = self._client.get_robot_state_async()
callback_future.add_done_callback(self._callback)
return callback_future
class AsyncMetrics(AsyncPeriodicQuery):
"""Class to get robot metrics at regular intervals. get_robot_metrics_async query sent to the robot at every tick. Callback registered to defined callback function.
Attributes:
client: The Client to a service on the robot
logger: Logger object
rate: Rate (Hz) to trigger the query
callback: Callback function to call when the results of the query are available
"""
def __init__(self, client, logger, rate, callback):
super(AsyncMetrics, self).__init__("robot-metrics", client, logger,
period_sec=1.0/max(rate, 1.0))
self._callback = None
if rate > 0.0:
self._callback = callback
def _start_query(self):
if self._callback:
callback_future = self._client.get_robot_metrics_async()
callback_future.add_done_callback(self._callback)
return callback_future
class AsyncLease(AsyncPeriodicQuery):
"""Class to get lease state at regular intervals. list_leases_async query sent to the robot at every tick. Callback registered to defined callback function.
Attributes:
client: The Client to a service on the robot
logger: Logger object
rate: Rate (Hz) to trigger the query
callback: Callback function to call when the results of the query are available
"""
def __init__(self, client, logger, rate, callback):
super(AsyncLease, self).__init__("lease", client, logger,
period_sec=1.0/max(rate, 1.0))
self._callback = None
if rate > 0.0:
self._callback = callback
def _start_query(self):
if self._callback:
callback_future = self._client.list_leases_async()
callback_future.add_done_callback(self._callback)
return callback_future
class AsyncImageService(AsyncPeriodicQuery):
"""Class to get images at regular intervals. get_image_from_sources_async query sent to the robot at every tick. Callback registered to defined callback function.
Attributes:
client: The Client to a service on the robot
logger: Logger object
rate: Rate (Hz) to trigger the query
callback: Callback function to call when the results of the query are available
"""
def __init__(self, client, logger, rate, callback, image_requests):
super(AsyncImageService, self).__init__("robot_image_service", client, logger,
period_sec=1.0/max(rate, 1.0))
self._callback = None
if rate > 0.0:
self._callback = callback
self._image_requests = image_requests
def _start_query(self):
if self._callback:
callback_future = self._client.get_image_async(self._image_requests)
callback_future.add_done_callback(self._callback)
return callback_future
class AsyncIdle(AsyncPeriodicQuery):
"""Class to check if the robot is moving, and if not, command a stand with the set mobility parameters
Attributes:
client: The Client to a service on the robot
logger: Logger object
rate: Rate (Hz) to trigger the query
spot_wrapper: A handle to the wrapper library
"""
def __init__(self, client, logger, rate, spot_wrapper):
super(AsyncIdle, self).__init__("idle", client, logger,
period_sec=1.0/rate)
self._spot_wrapper = spot_wrapper
def _start_query(self):
if self._spot_wrapper._last_stand_command != None:
try:
response = self._client.robot_command_feedback(self._spot_wrapper._last_stand_command)
self._spot_wrapper._is_sitting = False
if (response.feedback.synchronized_feedback.mobility_command_feedback.stand_feedback.status ==
basic_command_pb2.StandCommand.Feedback.STATUS_IS_STANDING):
self._spot_wrapper._is_standing = True
self._spot_wrapper._last_stand_command = None
else:
self._spot_wrapper._is_standing = False
except (ResponseError, RpcError) as e:
self._logger.error("Error when getting robot command feedback: %s", e)
self._spot_wrapper._last_stand_command = None
if self._spot_wrapper._last_sit_command != None:
try:
self._spot_wrapper._is_standing = False
response = self._client.robot_command_feedback(self._spot_wrapper._last_sit_command)
if (response.feedback.synchronized_feedback.mobility_command_feedback.sit_feedback.status ==
basic_command_pb2.SitCommand.Feedback.STATUS_IS_SITTING):
self._spot_wrapper._is_sitting = True
self._spot_wrapper._last_sit_command = None
else:
self._spot_wrapper._is_sitting = False
except (ResponseError, RpcError) as e:
self._logger.error("Error when getting robot command feedback: %s", e)
self._spot_wrapper._last_sit_command = None
is_moving = False
if self._spot_wrapper._last_velocity_command_time != None:
if time.time() < self._spot_wrapper._last_velocity_command_time:
is_moving = True
else:
self._spot_wrapper._last_velocity_command_time = None
if self._spot_wrapper._last_trajectory_command != None:
try:
response = self._client.robot_command_feedback(self._spot_wrapper._last_trajectory_command)
status = response.feedback.synchronized_feedback.mobility_command_feedback.se2_trajectory_feedback.status
# STATUS_AT_GOAL always means that the robot reached the goal. If the trajectory command did not
# request precise positioning, then STATUS_NEAR_GOAL also counts as reaching the goal
if status == basic_command_pb2.SE2TrajectoryCommand.Feedback.STATUS_AT_GOAL or \
(status == basic_command_pb2.SE2TrajectoryCommand.Feedback.STATUS_NEAR_GOAL and
not self._spot_wrapper._last_trajectory_command_precise):
self._spot_wrapper._at_goal = True
# Clear the command once at the goal
self._spot_wrapper._last_trajectory_command = None
elif status == basic_command_pb2.SE2TrajectoryCommand.Feedback.STATUS_GOING_TO_GOAL:
is_moving = True
elif status == basic_command_pb2.SE2TrajectoryCommand.Feedback.STATUS_NEAR_GOAL:
is_moving = True
self._spot_wrapper._near_goal = True
else:
self._spot_wrapper._last_trajectory_command = None
except (ResponseError, RpcError) as e:
self._logger.error("Error when getting robot command feedback: %s", e)
self._spot_wrapper._last_trajectory_command = None
self._spot_wrapper._is_moving = is_moving
if self._spot_wrapper.is_standing and not self._spot_wrapper.is_moving:
self._spot_wrapper.stand(False)
class SpotWrapper():
"""Generic wrapper class to encompass release 1.1.4 API features as well as maintaining leases automatically"""
def __init__(self, username, password, hostname, logger, rates = {}, callbacks = {}):
self._username = username
self._password = password
self._hostname = hostname
self._logger = logger
self._rates = rates
self._callbacks = callbacks
self._keep_alive = True
self._valid = True
self._mobility_params = RobotCommandBuilder.mobility_params()
self._is_standing = False
self._is_sitting = True
self._is_moving = False
self._at_goal = False
self._near_goal = False
self._last_stand_command = None
self._last_sit_command = None
self._last_trajectory_command = None
self._last_trajectory_command_precise = None
self._last_velocity_command_time = None
self._front_image_requests = []
for source in front_image_sources:
self._front_image_requests.append(build_image_request(source, image_format=image_pb2.Image.FORMAT_RAW))
self._side_image_requests = []
for source in side_image_sources:
self._side_image_requests.append(build_image_request(source, image_format=image_pb2.Image.FORMAT_RAW))
self._rear_image_requests = []
for source in rear_image_sources:
self._rear_image_requests.append(build_image_request(source, image_format=image_pb2.Image.FORMAT_RAW))
try:
self._sdk = create_standard_sdk('ros_spot')
except Exception as e:
self._logger.error("Error creating SDK object: %s", e)
self._valid = False
return
self._robot = self._sdk.create_robot(self._hostname)
try:
self._robot.authenticate(self._username, self._password)
self._robot.start_time_sync()
except RpcError as err:
self._logger.error("Failed to communicate with robot: %s", err)
self._valid = False
return
if self._robot:
# Clients
try:
self._robot_state_client = self._robot.ensure_client(RobotStateClient.default_service_name)
self._robot_command_client = self._robot.ensure_client(RobotCommandClient.default_service_name)
self._graph_nav_client = self._robot.ensure_client(GraphNavClient.default_service_name)
self._power_client = self._robot.ensure_client(PowerClient.default_service_name)
self._lease_client = self._robot.ensure_client(LeaseClient.default_service_name)
self._lease_wallet = self._lease_client.lease_wallet
self._image_client = self._robot.ensure_client(ImageClient.default_service_name)
self._estop_client = self._robot.ensure_client(EstopClient.default_service_name)
except Exception as e:
self._logger.error("Unable to create client service: %s", e)
self._valid = False
return
# Store the most recent knowledge of the state of the robot based on rpc calls.
self._current_graph = None
self._current_edges = dict() #maps to_waypoint to list(from_waypoint)
self._current_waypoint_snapshots = dict() # maps id to waypoint snapshot
self._current_edge_snapshots = dict() # maps id to edge snapshot
self._current_annotation_name_to_wp_id = dict()
# Async Tasks
self._async_task_list = []
self._robot_state_task = AsyncRobotState(self._robot_state_client, self._logger, max(0.0, self._rates.get("robot_state", 0.0)), self._callbacks.get("robot_state", lambda:None))
self._robot_metrics_task = AsyncMetrics(self._robot_state_client, self._logger, max(0.0, self._rates.get("metrics", 0.0)), self._callbacks.get("metrics", lambda:None))
self._lease_task = AsyncLease(self._lease_client, self._logger, max(0.0, self._rates.get("lease", 0.0)), self._callbacks.get("lease", lambda:None))
self._front_image_task = AsyncImageService(self._image_client, self._logger, max(0.0, self._rates.get("front_image", 0.0)), self._callbacks.get("front_image", lambda:None), self._front_image_requests)
self._side_image_task = AsyncImageService(self._image_client, self._logger, max(0.0, self._rates.get("side_image", 0.0)), self._callbacks.get("side_image", lambda:None), self._side_image_requests)
self._rear_image_task = AsyncImageService(self._image_client, self._logger, max(0.0, self._rates.get("rear_image", 0.0)), self._callbacks.get("rear_image", lambda:None), self._rear_image_requests)
self._idle_task = AsyncIdle(self._robot_command_client, self._logger, 10.0, self)
self._estop_endpoint = None
self._async_tasks = AsyncTasks(
[self._robot_state_task, self._robot_metrics_task, self._lease_task, self._front_image_task, self._side_image_task, self._rear_image_task, self._idle_task])
self._robot_id = None
self._lease = None
@property
def logger(self):
"""Return logger instance of the SpotWrapper"""
return self._logger
@property
def is_valid(self):
"""Return boolean indicating if the wrapper initialized successfully"""
return self._valid
@property
def id(self):
"""Return robot's ID"""
return self._robot_id
@property
def robot_state(self):
"""Return latest proto from the _robot_state_task"""
return self._robot_state_task.proto
@property
def metrics(self):
"""Return latest proto from the _robot_metrics_task"""
return self._robot_metrics_task.proto
@property
def lease(self):
"""Return latest proto from the _lease_task"""
return self._lease_task.proto
@property
def front_images(self):
"""Return latest proto from the _front_image_task"""
return self._front_image_task.proto
@property
def side_images(self):
"""Return latest proto from the _side_image_task"""
return self._side_image_task.proto
@property
def rear_images(self):
"""Return latest proto from the _rear_image_task"""
return self._rear_image_task.proto
@property
def is_standing(self):
"""Return boolean of standing state"""
return self._is_standing
@property
def is_sitting(self):
"""Return boolean of standing state"""
return self._is_sitting
@property
def is_moving(self):
"""Return boolean of walking state"""
return self._is_moving
@property
def near_goal(self):
return self._near_goal
@property
def at_goal(self):
return self._at_goal
@property
def time_skew(self):
"""Return the time skew between local and spot time"""
return self._robot.time_sync.endpoint.clock_skew
def resetMobilityParams(self):
"""
Resets the mobility parameters used for motion commands to the default values provided by the bosdyn api.
Returns:
"""
self._mobility_params = RobotCommandBuilder.mobility_params()
def robotToLocalTime(self, timestamp):
"""Takes a timestamp and an estimated skew and return seconds and nano seconds in local time
Args:
timestamp: google.protobuf.Timestamp
Returns:
google.protobuf.Timestamp
"""
rtime = Timestamp()
rtime.seconds = timestamp.seconds - self.time_skew.seconds
rtime.nanos = timestamp.nanos - self.time_skew.nanos
if rtime.nanos < 0:
rtime.nanos = rtime.nanos + 1000000000
rtime.seconds = rtime.seconds - 1
# Workaround for timestamps being incomplete
if rtime.seconds < 0:
rtime.seconds = 0
rtime.nanos = 0
return rtime
def claim(self):
"""Get a lease for the robot, a handle on the estop endpoint, and the ID of the robot."""
try:
self._robot_id = self._robot.get_id()
self.getLease()
self.resetEStop()
return True, "Success"
except (ResponseError, RpcError) as err:
self._logger.error("Failed to initialize robot communication: %s", err)
return False, str(err)
def updateTasks(self):
"""Loop through all periodic tasks and update their data if needed."""
try:
self._async_tasks.update()
except Exception as e:
print(f"Update tasks failed with error: {str(e)}")
def resetEStop(self):
"""Get keepalive for eStop"""
self._estop_endpoint = EstopEndpoint(self._estop_client, 'ros', 9.0)
self._estop_endpoint.force_simple_setup() # Set this endpoint as the robot's sole estop.
self._estop_keepalive = EstopKeepAlive(self._estop_endpoint)
def assertEStop(self, severe=True):
"""Forces the robot into eStop state.
Args:
severe: Default True - If true, will cut motor power immediately. If false, will try to settle the robot on the ground first
"""
try:
if severe:
self._estop_keepalive.stop()
else:
self._estop_keepalive.settle_then_cut()
return True, "Success"
except:
return False, "Error"
def disengageEStop(self):
"""Disengages the E-Stop"""
try:
self._estop_keepalive.allow()
return True, "Success"
except:
return False, "Error"
def releaseEStop(self):
"""Stop eStop keepalive"""
if self._estop_keepalive:
self._estop_keepalive.stop()
self._estop_keepalive = None
self._estop_endpoint = None
def getLease(self):
"""Get a lease for the robot and keep the lease alive automatically."""
self._lease = self._lease_client.acquire()
self._lease_keepalive = LeaseKeepAlive(self._lease_client)
def releaseLease(self):
"""Return the lease on the body."""
if self._lease:
self._lease_client.return_lease(self._lease)
self._lease = None
def release(self):
"""Return the lease on the body and the eStop handle."""
try:
self.releaseLease()
self.releaseEStop()
return True, "Success"
except Exception as e:
return False, str(e)
def disconnect(self):
"""Release control of robot as gracefully as posssible."""
if self._robot.time_sync:
self._robot.time_sync.stop()
self.releaseLease()
self.releaseEStop()
def _robot_command(self, command_proto, end_time_secs=None, timesync_endpoint=None):
"""Generic blocking function for sending commands to robots.
Args:
command_proto: robot_command_pb2 object to send to the robot. Usually made with RobotCommandBuilder
end_time_secs: (optional) Time-to-live for the command in seconds
timesync_endpoint: (optional) Time sync endpoint
"""
try:
id = self._robot_command_client.robot_command(lease=None, command=command_proto, end_time_secs=end_time_secs, timesync_endpoint=timesync_endpoint)
return True, "Success", id
except Exception as e:
return False, str(e), None
def stop(self):
"""Stop the robot's motion."""
response = self._robot_command(RobotCommandBuilder.stop_command())
return response[0], response[1]
def self_right(self):
"""Have the robot self-right itself."""
response = self._robot_command(RobotCommandBuilder.selfright_command())
return response[0], response[1]
def sit(self):
"""Stop the robot's motion and sit down if able."""
response = self._robot_command(RobotCommandBuilder.synchro_sit_command())
self._last_sit_command = response[2]
return response[0], response[1]
def stand(self, monitor_command=True):
"""If the e-stop is enabled, and the motor power is enabled, stand the robot up."""
response = self._robot_command(RobotCommandBuilder.synchro_stand_command(params=self._mobility_params))
if monitor_command:
self._last_stand_command = response[2]
return response[0], response[1]
def safe_power_off(self):
"""Stop the robot's motion and sit if possible. Once sitting, disable motor power."""
response = self._robot_command(RobotCommandBuilder.safe_power_off_command())
return response[0], response[1]
def clear_behavior_fault(self, id):
"""Clear the behavior fault defined by id."""
try:
rid = self._robot_command_client.clear_behavior_fault(behavior_fault_id=id, lease=None)
return True, "Success", rid
except Exception as e:
return False, str(e), None
def power_on(self):
"""Enble the motor power if e-stop is enabled."""
try:
power.power_on(self._power_client)
return True, "Success"
except Exception as e:
return False, str(e)
def set_mobility_params(self, mobility_params):
"""Set Params for mobility and movement
Args:
mobility_params: spot.MobilityParams, params for spot mobility commands.
"""
self._mobility_params = mobility_params
def get_mobility_params(self):
"""Get mobility params
"""
return self._mobility_params
def velocity_cmd(self, v_x, v_y, v_rot, cmd_duration=0.125):
"""Send a velocity motion command to the robot.
Args:
v_x: Velocity in the X direction in meters
v_y: Velocity in the Y direction in meters
v_rot: Angular velocity around the Z axis in radians
cmd_duration: (optional) Time-to-live for the command in seconds. Default is 125ms (assuming 10Hz command rate).
"""
end_time=time.time() + cmd_duration
response = self._robot_command(RobotCommandBuilder.synchro_velocity_command(
v_x=v_x, v_y=v_y, v_rot=v_rot, params=self._mobility_params),
end_time_secs=end_time, timesync_endpoint=self._robot.time_sync.endpoint)
self._last_velocity_command_time = end_time
return response[0], response[1]
def trajectory_cmd(self, goal_x, goal_y, goal_heading, cmd_duration, frame_name='odom', precise_position=False):
"""Send a trajectory motion command to the robot.
Args:
goal_x: Position X coordinate in meters
goal_y: Position Y coordinate in meters
goal_heading: Pose heading in radians
cmd_duration: Time-to-live for the command in seconds.
frame_name: frame_name to be used to calc the target position. 'odom' or 'vision'
precise_position: if set to false, the status STATUS_NEAR_GOAL and STATUS_AT_GOAL will be equivalent. If
true, the robot must complete its final positioning before it will be considered to have successfully
reached the goal.
"""
self._at_goal = False
self._near_goal = False
self._last_trajectory_command_precise = precise_position
self._logger.info("got command duration of {}".format(cmd_duration))
end_time=time.time() + cmd_duration
if frame_name == 'vision':
vision_tform_body = frame_helpers.get_vision_tform_body(
self._robot_state_client.get_robot_state().kinematic_state.transforms_snapshot)
body_tform_goal = math_helpers.SE3Pose(x=goal_x, y=goal_y, z=0, rot=math_helpers.Quat.from_yaw(goal_heading))
vision_tform_goal = vision_tform_body * body_tform_goal
response = self._robot_command(
RobotCommandBuilder.synchro_se2_trajectory_point_command(
goal_x=vision_tform_goal.x,
goal_y=vision_tform_goal.y,
goal_heading=vision_tform_goal.rot.to_yaw(),
frame_name=frame_helpers.VISION_FRAME_NAME,
params=self._mobility_params),
end_time_secs=end_time
)
elif frame_name == 'odom':
odom_tform_body = frame_helpers.get_odom_tform_body(
self._robot_state_client.get_robot_state().kinematic_state.transforms_snapshot)
body_tform_goal = math_helpers.SE3Pose(x=goal_x, y=goal_y, z=0, rot=math_helpers.Quat.from_yaw(goal_heading))
odom_tform_goal = odom_tform_body * body_tform_goal
response = self._robot_command(
RobotCommandBuilder.synchro_se2_trajectory_point_command(
goal_x=odom_tform_goal.x,
goal_y=odom_tform_goal.y,
goal_heading=odom_tform_goal.rot.to_yaw(),
frame_name=frame_helpers.ODOM_FRAME_NAME,
params=self._mobility_params),
end_time_secs=end_time
)
else:
raise ValueError('frame_name must be \'vision\' or \'odom\'')
if response[0]:
self._last_trajectory_command = response[2]
return response[0], response[1]
def list_graph(self, upload_path):
"""List waypoint ids of garph_nav
Args:
upload_path : Path to the root directory of the map.
"""
ids, eds = self._list_graph_waypoint_and_edge_ids()
# skip waypoint_ for v2.2.1, skip waypiont for < v2.2
return [v for k, v in sorted(ids.items(), key=lambda id : int(id[0].replace('waypoint_','')))]
def navigate_to(self, upload_path,
navigate_to,
initial_localization_fiducial=True,
initial_localization_waypoint=None):
""" navigate with graph nav.
Args:
upload_path : Path to the root directory of the map.
navigate_to : Waypont id string for where to goal
initial_localization_fiducial : Tells the initializer whether to use fiducials
initial_localization_waypoint : Waypoint id string of current robot position (optional)
"""
# Filepath for uploading a saved graph's and snapshots too.
if upload_path[-1] == "/":
upload_filepath = upload_path[:-1]
else:
upload_filepath = upload_path
# Boolean indicating the robot's power state.
power_state = self._robot_state_client.get_robot_state().power_state
self._started_powered_on = (power_state.motor_power_state == power_state.STATE_ON)
self._powered_on = self._started_powered_on
# FIX ME somehow,,,, if the robot is stand, need to sit the robot before starting garph nav
if self.is_standing and not self.is_moving:
self.sit()
# TODO verify estop / claim / power_on
self._clear_graph()
self._upload_graph_and_snapshots(upload_filepath)
if initial_localization_fiducial:
self._set_initial_localization_fiducial()
if initial_localization_waypoint:
self._set_initial_localization_waypoint([initial_localization_waypoint])
self._list_graph_waypoint_and_edge_ids()
self._get_localization_state()
resp = self._navigate_to([navigate_to])
return resp
## copy from spot-sdk/python/examples/graph_nav_command_line/graph_nav_command_line.py
def _get_localization_state(self, *args):
"""Get the current localization and state of the robot."""
state = self._graph_nav_client.get_localization_state()
self._logger.info('Got localization: \n%s' % str(state.localization))
odom_tform_body = get_odom_tform_body(state.robot_kinematics.transforms_snapshot)
self._logger.info('Got robot state in kinematic odometry frame: \n%s' % str(odom_tform_body))
def _set_initial_localization_fiducial(self, *args):
"""Trigger localization when near a fiducial."""
robot_state = self._robot_state_client.get_robot_state()
current_odom_tform_body = get_odom_tform_body(
robot_state.kinematic_state.transforms_snapshot).to_proto()
# Create an empty instance for initial localization since we are asking it to localize
# based on the nearest fiducial.
localization = nav_pb2.Localization()
self._graph_nav_client.set_localization(initial_guess_localization=localization,
ko_tform_body=current_odom_tform_body)
def _set_initial_localization_waypoint(self, *args):
"""Trigger localization to a waypoint."""
# Take the first argument as the localization waypoint.
if len(args) < 1:
# If no waypoint id is given as input, then return without initializing.
self._logger.error("No waypoint specified to initialize to.")
return
destination_waypoint = graph_nav_util.find_unique_waypoint_id(
args[0][0], self._current_graph, self._current_annotation_name_to_wp_id, self._logger)
if not destination_waypoint:
# Failed to find the unique waypoint id.
return
robot_state = self._robot_state_client.get_robot_state()
current_odom_tform_body = get_odom_tform_body(
robot_state.kinematic_state.transforms_snapshot).to_proto()
# Create an initial localization to the specified waypoint as the identity.
localization = nav_pb2.Localization()
localization.waypoint_id = destination_waypoint
localization.waypoint_tform_body.rotation.w = 1.0
self._graph_nav_client.set_localization(
initial_guess_localization=localization,
# It's hard to get the pose perfect, search +/-20 deg and +/-20cm (0.2m).
max_distance = 0.2,
max_yaw = 20.0 * math.pi / 180.0,
fiducial_init=graph_nav_pb2.SetLocalizationRequest.FIDUCIAL_INIT_NO_FIDUCIAL,
ko_tform_body=current_odom_tform_body)
def _list_graph_waypoint_and_edge_ids(self, *args):
"""List the waypoint ids and edge ids of the graph currently on the robot."""
# Download current graph
graph = self._graph_nav_client.download_graph()
if graph is None:
self._logger.error("Empty graph.")
return
self._current_graph = graph
localization_id = self._graph_nav_client.get_localization_state().localization.waypoint_id
# Update and print waypoints and edges
self._current_annotation_name_to_wp_id, self._current_edges = graph_nav_util.update_waypoints_and_edges(
graph, localization_id, self._logger)
return self._current_annotation_name_to_wp_id, self._current_edges
def _upload_graph_and_snapshots(self, upload_filepath):
"""Upload the graph and snapshots to the robot."""
self._logger.info("Loading the graph from disk into local storage...")
with open(upload_filepath + "/graph", "rb") as graph_file:
# Load the graph from disk.
data = graph_file.read()
self._current_graph = map_pb2.Graph()
self._current_graph.ParseFromString(data)
self._logger.info("Loaded graph has {} waypoints and {} edges".format(
len(self._current_graph.waypoints), len(self._current_graph.edges)))
for waypoint in self._current_graph.waypoints:
# Load the waypoint snapshots from disk.
with open(upload_filepath + "/waypoint_snapshots/{}".format(waypoint.snapshot_id),
"rb") as snapshot_file:
waypoint_snapshot = map_pb2.WaypointSnapshot()
waypoint_snapshot.ParseFromString(snapshot_file.read())
self._current_waypoint_snapshots[waypoint_snapshot.id] = waypoint_snapshot
for edge in self._current_graph.edges:
# Load the edge snapshots from disk.
with open(upload_filepath + "/edge_snapshots/{}".format(edge.snapshot_id),
"rb") as snapshot_file:
edge_snapshot = map_pb2.EdgeSnapshot()
edge_snapshot.ParseFromString(snapshot_file.read())
self._current_edge_snapshots[edge_snapshot.id] = edge_snapshot
# Upload the graph to the robot.
self._logger.info("Uploading the graph and snapshots to the robot...")
self._graph_nav_client.upload_graph(lease=self._lease.lease_proto,
graph=self._current_graph)
# Upload the snapshots to the robot.
for waypoint_snapshot in self._current_waypoint_snapshots.values():
self._graph_nav_client.upload_waypoint_snapshot(waypoint_snapshot)
self._logger.info("Uploaded {}".format(waypoint_snapshot.id))
for edge_snapshot in self._current_edge_snapshots.values():
self._graph_nav_client.upload_edge_snapshot(edge_snapshot)
self._logger.info("Uploaded {}".format(edge_snapshot.id))
# The upload is complete! Check that the robot is localized to the graph,
# and it if is not, prompt the user to localize the robot before attempting
# any navigation commands.
localization_state = self._graph_nav_client.get_localization_state()
if not localization_state.localization.waypoint_id:
# The robot is not localized to the newly uploaded graph.
self._logger.info(
"Upload complete! The robot is currently not localized to the map; please localize", \
"the robot using commands (2) or (3) before attempting a navigation command.")
def _navigate_to(self, *args):
"""Navigate to a specific waypoint."""
# Take the first argument as the destination waypoint.
if len(args) < 1:
# If no waypoint id is given as input, then return without requesting navigation.
self._logger.info("No waypoint provided as a destination for navigate to.")
return
self._lease = self._lease_wallet.get_lease()
destination_waypoint = graph_nav_util.find_unique_waypoint_id(
args[0][0], self._current_graph, self._current_annotation_name_to_wp_id, self._logger)
if not destination_waypoint:
# Failed to find the appropriate unique waypoint id for the navigation command.
return
if not self.toggle_power(should_power_on=True):
self._logger.info("Failed to power on the robot, and cannot complete navigate to request.")
return
# Stop the lease keepalive and create a new sublease for graph nav.
self._lease = self._lease_wallet.advance()
sublease = self._lease.create_sublease()
self._lease_keepalive.shutdown()
# Navigate to the destination waypoint.
is_finished = False
nav_to_cmd_id = -1
while not is_finished:
# Issue the navigation command about twice a second such that it is easy to terminate the
# navigation command (with estop or killing the program).
nav_to_cmd_id = self._graph_nav_client.navigate_to(destination_waypoint, 1.0,
leases=[sublease.lease_proto])
time.sleep(.5) # Sleep for half a second to allow for command execution.
# Poll the robot for feedback to determine if the navigation command is complete. Then sit
# the robot down once it is finished.
is_finished = self._check_success(nav_to_cmd_id)
self._lease = self._lease_wallet.advance()
self._lease_keepalive = LeaseKeepAlive(self._lease_client)
# Update the lease and power off the robot if appropriate.
if self._powered_on and not self._started_powered_on:
# Sit the robot down + power off after the navigation command is complete.
self.toggle_power(should_power_on=False)
status = self._graph_nav_client.navigation_feedback(nav_to_cmd_id)
if status.status == graph_nav_pb2.NavigationFeedbackResponse.STATUS_REACHED_GOAL:
return True, "Successfully completed the navigation commands!"
elif status.status == graph_nav_pb2.NavigationFeedbackResponse.STATUS_LOST:
return False, "Robot got lost when navigating the route, the robot will now sit down."
elif status.status == graph_nav_pb2.NavigationFeedbackResponse.STATUS_STUCK:
return False, "Robot got stuck when navigating the route, the robot will now sit down."
elif status.status == graph_nav_pb2.NavigationFeedbackResponse.STATUS_ROBOT_IMPAIRED:
return False, "Robot is impaired."
else:
return False, "Navigation command is not complete yet."
def _navigate_route(self, *args):
"""Navigate through a specific route of waypoints."""
if len(args) < 1:
# If no waypoint ids are given as input, then return without requesting navigation.
self._logger.error("No waypoints provided for navigate route.")
return
waypoint_ids = args[0]
for i in range(len(waypoint_ids)):
waypoint_ids[i] = graph_nav_util.find_unique_waypoint_id(
waypoint_ids[i], self._current_graph, self._current_annotation_name_to_wp_id, self._logger)
if not waypoint_ids[i]:
# Failed to find the unique waypoint id.
return
edge_ids_list = []
all_edges_found = True
# Attempt to find edges in the current graph that match the ordered waypoint pairs.
# These are necessary to create a valid route.
for i in range(len(waypoint_ids) - 1):
start_wp = waypoint_ids[i]
end_wp = waypoint_ids[i + 1]
edge_id = self._match_edge(self._current_edges, start_wp, end_wp)
if edge_id is not None:
edge_ids_list.append(edge_id)
else:
all_edges_found = False
self._logger.error("Failed to find an edge between waypoints: ", start_wp, " and ", end_wp)
self._logger.error(
"List the graph's waypoints and edges to ensure pairs of waypoints has an edge."
)
break
self._lease = self._lease_wallet.get_lease()
if all_edges_found:
if not self.toggle_power(should_power_on=True):
self._logger.error("Failed to power on the robot, and cannot complete navigate route request.")
return
# Stop the lease keepalive and create a new sublease for graph nav.
self._lease = self._lease_wallet.advance()
sublease = self._lease.create_sublease()
self._lease_keepalive.shutdown()
# Navigate a specific route.
route = self._graph_nav_client.build_route(waypoint_ids, edge_ids_list)
is_finished = False
while not is_finished:
# Issue the route command about twice a second such that it is easy to terminate the
# navigation command (with estop or killing the program).
nav_route_command_id = self._graph_nav_client.navigate_route(
route, cmd_duration=1.0, leases=[sublease.lease_proto])
time.sleep(.5) # Sleep for half a second to allow for command execution.
# Poll the robot for feedback to determine if the route is complete. Then sit
# the robot down once it is finished.
is_finished = self._check_success(nav_route_command_id)
self._lease = self._lease_wallet.advance()
self._lease_keepalive = LeaseKeepAlive(self._lease_client)
# Update the lease and power off the robot if appropriate.
if self._powered_on and not self._started_powered_on:
# Sit the robot down + power off after the navigation command is complete.
self.toggle_power(should_power_on=False)
def _clear_graph(self, *args):
"""Clear the state of the map on the robot, removing all waypoints and edges."""
return self._graph_nav_client.clear_graph(lease=self._lease.lease_proto)
def toggle_power(self, should_power_on):
"""Power the robot on/off dependent on the current power state."""
is_powered_on = self.check_is_powered_on()
if not is_powered_on and should_power_on:
# Power on the robot up before navigating when it is in a powered-off state.
power_on(self._power_client)
motors_on = False
while not motors_on:
future = self._robot_state_client.get_robot_state_async()
state_response = future.result(timeout=10) # 10 second timeout for waiting for the state response.
if state_response.power_state.motor_power_state == robot_state_pb2.PowerState.STATE_ON:
motors_on = True
else:
# Motors are not yet fully powered on.
time.sleep(.25)
elif is_powered_on and not should_power_on:
# Safe power off (robot will sit then power down) when it is in a
# powered-on state.
safe_power_off(self._robot_command_client, self._robot_state_client)
else:
# Return the current power state without change.
return is_powered_on
# Update the locally stored power state.
self.check_is_powered_on()
return self._powered_on
def check_is_powered_on(self):
"""Determine if the robot is powered on or off."""
power_state = self._robot_state_client.get_robot_state().power_state
self._powered_on = (power_state.motor_power_state == power_state.STATE_ON)
return self._powered_on
def _check_success(self, command_id=-1):
"""Use a navigation command id to get feedback from the robot and sit when command succeeds."""
if command_id == -1:
# No command, so we have not status to check.
return False
status = self._graph_nav_client.navigation_feedback(command_id)
if status.status == graph_nav_pb2.NavigationFeedbackResponse.STATUS_REACHED_GOAL:
# Successfully completed the navigation commands!
return True
elif status.status == graph_nav_pb2.NavigationFeedbackResponse.STATUS_LOST:
self._logger.error("Robot got lost when navigating the route, the robot will now sit down.")
return True
elif status.status == graph_nav_pb2.NavigationFeedbackResponse.STATUS_STUCK:
self._logger.error("Robot got stuck when navigating the route, the robot will now sit down.")
return True
elif status.status == graph_nav_pb2.NavigationFeedbackResponse.STATUS_ROBOT_IMPAIRED:
self._logger.error("Robot is impaired.")
return True
else:
# Navigation command is not complete yet.
return False
def _match_edge(self, current_edges, waypoint1, waypoint2):
"""Find an edge in the graph that is between two waypoint ids."""
# Return the correct edge id as soon as it's found.
for edge_to_id in current_edges:
for edge_from_id in current_edges[edge_to_id]:
if (waypoint1 == edge_to_id) and (waypoint2 == edge_from_id):
# This edge matches the pair of waypoints! Add it the edge list and continue.
return map_pb2.Edge.Id(from_waypoint=waypoint2, to_waypoint=waypoint1)
elif (waypoint2 == edge_to_id) and (waypoint1 == edge_from_id):
# This edge matches the pair of waypoints! Add it the edge list and continue.
return map_pb2.Edge.Id(from_waypoint=waypoint1, to_waypoint=waypoint2)
return None