-
Notifications
You must be signed in to change notification settings - Fork 0
/
example_read_robot_positions.py
executable file
·52 lines (42 loc) · 2.04 KB
/
example_read_robot_positions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#!/usr/bin/python3
# This example shows how to read periodically the joint angles and the position of the robot.
# Use Ctrl+C to stop the program
import websocket # websocket-client https://github.com/websocket-client/websocket-client
import json # default supported by python
# Each message which is received through the WebSocket is processed in this function
def on_message(ws, message):
msg = json.loads(message) # Convert json to python dictionary for easier usage
if msg["op"] == "publish":
if msg["topic"] == "/joint_states":
any_robot_interface_payload = msg["msg"]
print("Joint positions in radians: " + str(any_robot_interface_payload["position"]))
elif msg["topic"] == "/tool_frame":
any_robot_interface_payload = msg["msg"]
print("Robot position as Euler Intrinsic ZYX frame (m, radians): " + str(any_robot_interface_payload))
def on_error(ws, error):
print(error)
def on_close(ws, close_status_code, close_msg):
print("Closed connection to AnyRobot Interface")
def on_open(ws):
print("Opened connection to AnyRobot Interface")
# Activate subscription to joint_states topic for getting the joint values
msg_subscribe_joint_command = {
"op": "subscribe",
"topic": "/joint_states"}
ws.send(json.dumps(msg_subscribe_joint_command)) # Send command to AnyRobot Interface
# Activate subscription to get robot pose
msg_subscribe_joint_command = {
"op": "subscribe",
"topic": "/tool_frame"}
ws.send(json.dumps(msg_subscribe_joint_command)) # Send command to AnyRobot Interface
if __name__ == "__main__":
connection_ip = "127.0.0.1" # Loopback localhost address, assuming running in same computer
connection_port = 9091
connection_uri = "ws://" + connection_ip + ":" + str(connection_port)
websocket.enableTrace(False)
ws = websocket.WebSocketApp(connection_uri,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close)
ws.run_forever()