-
Notifications
You must be signed in to change notification settings - Fork 832
/
gate.py
126 lines (110 loc) · 4.37 KB
/
gate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import gevent
import gipc
import greenlet
import logging
import os
import pickle
import signal
import aj
from aj.gate.stream import GateStreamServerEndpoint, GateStreamWorkerEndpoint
from aj.gate.worker import Worker
from aj.util import BroadcastQueue
class WorkerGate(object):
def __init__(self, session, gateway_middleware, name=None, log_tag=None, restricted=False,
initial_identity=None):
self.session = session
self.process = None
self.stream = None
self.stream_reader = None
self.worker = None
self.name = name
self.log_tag = log_tag
self.restricted = restricted
self.gateway_middleware = gateway_middleware
self.initial_identity = initial_identity
self.q_http_replies = BroadcastQueue()
self.q_socket_messages = BroadcastQueue()
def start(self):
pipe_parent, pipe_child = gipc.pipe(
duplex=True,
encoder=lambda x: pickle.dumps(x, 2),
)
self.stream = GateStreamServerEndpoint(pipe_parent)
stream_child = GateStreamWorkerEndpoint(pipe_child)
self.process = gipc.start_process(
target=self._target,
kwargs={
'stream': stream_child,
'_pipe': pipe_child,
}
)
logging.debug('Started child process %s', self.process.pid)
self.stream_reader = gevent.spawn(self._stream_reader)
def stop(self):
logging.debug('Stopping child process %s', self.process.pid)
try:
os.killpg(self.process.pid, signal.SIGTERM)
except OSError as e:
logging.debug('Child process %s is already dead: %s', self.process.pid, e)
return
self.process.terminate()
self.process.join(0.125)
try:
os.kill(self.process.pid, 0)
logging.debug('Child process %s did not stop, killing', self.process.pid)
os.kill(self.process.pid, signal.SIGKILL)
os.killpg(self.process.pid, signal.SIGKILL)
os.kill(self.process.pid, 0)
logging.error('Child process %s did not stop after SIGKILL!', self.process.pid)
except OSError:
pass
self.stream.destroy()
self.stream_reader.kill(block=False)
def send_config_data(self):
logging.debug('Sending a config update to %s', self.name)
self.stream.send({
'type': 'config-data',
'data': aj.config.data,
})
def send_sessionlist(self):
logging.debug('Sending a session list update to %s', self.name)
self.stream.send({
'type': 'session-list',
'data': [session.serialize() for key,session in self.gateway_middleware.sessions.items()],
})
def _stream_reader(self):
try:
while True:
resp = self.stream.buffer_single_response(None)
if not resp:
return
self.stream.ack_response(resp.id)
if resp.object['type'] == 'socket':
self.q_socket_messages.broadcast(resp)
if resp.object['type'] == 'http':
self.q_http_replies.broadcast(resp)
if resp.object['type'] == 'terminate':
self.session.deactivate()
if resp.object['type'] == 'restart-master':
aj.restart()
if resp.object['type'] == 'update-sessionlist':
self.gateway_middleware.broadcast_sessionlist()
if resp.object['type'] == 'reload-config':
aj.config.load()
aj.config.ensure_structure()
self.gateway_middleware.broadcast_config_data()
if resp.object['type'] == 'log':
method = {
'info': logging.info,
'warn': logging.warn,
'debug': logging.debug,
'error': logging.error,
'critical': logging.critical,
}.get(resp.object['method'], None)
if method:
method('%s', resp.object['message'], extra=resp.object['kwargs'])
except greenlet.GreenletExit:
pass
def _target(self, stream=None, _pipe=None):
self.worker = Worker(stream, self)
self.worker.run()