-
Notifications
You must be signed in to change notification settings - Fork 133
/
Copy pathsubscriber.py
74 lines (59 loc) · 2.21 KB
/
subscriber.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# Copyright 2020 Unity Technologies
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import rclpy
import socket
import re
from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSReliabilityPolicy
from rclpy.qos import QoSProfile
from .communication import RosReceiver
class RosSubscriber(RosReceiver):
"""
Class to send messages outside of ROS network
"""
def __init__(self, topic, message_class, tcp_server, queue_size=10):
"""
Args:
topic: Topic name to publish messages to
message_class: The message class in catkin workspace
queue_size: Max number of entries to maintain in an outgoing queue
"""
strippedTopic = re.sub("[^A-Za-z0-9_]+", "", topic)
self.node_name = f"{strippedTopic}_RosSubscriber"
RosReceiver.__init__(self, self.node_name)
self.topic = topic
self.msg = message_class
self.tcp_server = tcp_server
self.queue_size = queue_size
qos_profile = QoSProfile(depth=queue_size)
# Start Subscriber listener function
self.subscription = self.create_subscription(
self.msg, self.topic, self.send, qos_profile # queue_size
)
self.subscription
def send(self, data):
"""
Connect to TCP endpoint on client and pass along message
Args:
data: message data to send outside of ROS network
Returns:
self.msg: The deserialize message
"""
self.tcp_server.send_unity_message(self.topic, data)
return self.msg
def unregister(self):
"""
Returns:
"""
self.destroy_subscription(self.subscription)
self.destroy_node()