-
Notifications
You must be signed in to change notification settings - Fork 38
/
fd_server.py
executable file
·116 lines (88 loc) · 2.92 KB
/
fd_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#!/usr/bin/env python
"""
fd_server.py
Publishes FDObject at PATH on BUSN (see code), demonstrating server side
implementation of methods with open UNIX file descriptors as arguments
(type 'h' as per the dbus spec).
NOTE::
Passing open UNIX filedescriptors accross RPC / ICP mechanisms such as dbus
requires the underlying transport to be a UNIX domain socket.
"""
import os
import twisted
from twisted.internet import defer, reactor
from txdbus import client, interface, objects
def trace_method_call(method):
def wrapper(*args, **kwargs):
print(f'handling {method.__name__}{args[1:]!r}', end=' = ')
result = method(*args, **kwargs)
print(repr(result))
return result
return wrapper
class FDObject(objects.DBusObject):
_methods = [
'org.example.FDInterface',
interface.Method('lenFD', arguments='h', returns='t'),
interface.Method('readBytesFD', arguments='ht', returns='ay'),
]
@trace_method_call
def dbus_lenFD(self, fd):
"""
Returns the byte count after reading till EOF.
"""
f = os.fdopen(fd, 'rb')
result = len(f.read())
f.close()
return result
@trace_method_call
def dbus_readBytesFD(self, fd, byte_count):
"""
Reads byte_count bytes from fd and returns them.
"""
f = os.fdopen(fd, 'rb')
result = f.read(byte_count)
f.close()
return bytearray(result)
@trace_method_call
def dbus_readBytesTwoFDs(self, fd1, fd2, byte_count):
"""
Reads byte_count from fd1 and fd2. Returns concatenation.
"""
result = bytearray()
for fd in (fd1, fd2):
f = os.fdopen(fd, 'rb')
result.extend(f.read(byte_count))
f.close()
return result
# Only export 'readBytesTwoFDs' if we're running Twisted >= 17.1.0 which
# is required to handle multiple UNIX FD arguments.
_minTxVersion = type(twisted.version)('twisted', 17, 1, 0)
if twisted.version >= _minTxVersion:
_methods.append(
interface.Method('readBytesTwoFDs', arguments='hht', returns='ay')
)
else:
print('Twisted version < {}, not exposing {!r}'.format(
_minTxVersion.base(),
'readBytesTwoFDs'
))
del _minTxVersion
dbusInterfaces = [interface.DBusInterface(*_methods)]
@defer.inlineCallbacks
def main(reactor):
PATH = '/path/to/FDObject'
BUSN = 'org.example'
try:
bus = yield client.connect(reactor)
except Exception as e:
print(f'failed connecting to dbus: {e}')
reactor.stop()
defer.returnValue(None)
print('connected to dbus')
object = FDObject(PATH)
bus.exportObject(object)
yield bus.requestBusName(BUSN)
print(f'exported {object.__class__.__name__!r} on {BUSN!r} at {PATH!r}')
if __name__ == '__main__':
reactor.callWhenRunning(main, reactor)
reactor.run()