-
Notifications
You must be signed in to change notification settings - Fork 33
/
scl.py
99 lines (74 loc) · 2.78 KB
/
scl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
"""
__________________________________
| _ _ _ |
| _( )__ _( )__ _( )__ |
| _| _| _| _| _| _| |
| (_ S (_ (_ C (_ (_ L (_ |
| |_( )__| |_( )__| |_( )__| |
| |
| Signaling and Communication Link |
|__________________________________|
SCL Python interface
Copyright (C) 2014 Tobias Simon, Integrated Communication Systems Group, TU Ilmenau
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details. """
import os, zmq
from threading import Thread
from msgpack import loads, dumps
try:
pp_path = "ipc://" + os.environ["HOME"] + "/.PenguPilot/ipc/"
except:
raise ZSEx("HOME environment variable is not set")
context = zmq.Context()
class SCL_Socket:
def __init__(self, zmq_socket):
self.zmq_socket = zmq_socket
def send(self, data):
self.zmq_socket.send(dumps(data))
def recv(self):
return loads(self.zmq_socket.recv())
def scl_get_socket(id, type_name):
map = {"sub": zmq.SUB, "req": zmq.REQ, "push": zmq.PUSH,
"pub": zmq.PUB, "rep": zmq.REP, "pull": zmq.PULL}
socket_type = map[type_name]
socket_path = pp_path + id
socket = context.socket(socket_type)
if socket_type in [zmq.SUB, zmq.REQ, zmq.PUSH]:
socket.setsockopt(zmq.RCVHWM, 1)
if socket_type == zmq.SUB:
socket.setsockopt(zmq.SUBSCRIBE, "")
socket.connect(socket_path)
elif socket_type in [zmq.PUB, zmq.REP, zmq.PULL]:
socket.setsockopt(zmq.SNDHWM, 1)
socket.bind(socket_path)
else:
raise Exception("unknown socket type: %d" % socket_type)
return SCL_Socket(socket)
class SCL_Reader(Thread):
def __init__(self, name, type, def_data = 0.0, callback = None):
Thread.__init__(self)
self.daemon = True
self.socket = scl_get_socket(name, type)
self.callback = callback
self.data = def_data
self.start()
def run(self):
while True:
self.data = self.socket.recv()
if self.callback:
self.callback(self.data)
class SCL_Proxy(Thread):
def __init__(self, in_name, in_type, out_name, out_type):
Thread.__init__(self)
self.in_socket = scl_get_socket(in_name, in_type)
self.out_socket = scl_get_socket(out_name, out_type)
self.start()
def run(self):
while True:
self.out_socket.send(self.in_socket.recv())