This repository has been archived by the owner on Apr 18, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 124
/
connection_manager.py
195 lines (142 loc) · 7.33 KB
/
connection_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import logging
import select as select_lib
import gearman.util
from gearman.connection_poller import GearmanConnectionPoller
from gearman.connection import GearmanConnection
from gearman.constants import _DEBUG_MODE_
from gearman.errors import ConnectionError, ServerUnavailable
from gearman.job import GearmanJob, GearmanJobRequest
from gearman import compat
gearman_logger = logging.getLogger(__name__)
class DataEncoder(object):
@classmethod
def encode(cls, encodable_object):
raise NotImplementedError
@classmethod
def decode(cls, decodable_string):
raise NotImplementedError
class NoopEncoder(DataEncoder):
"""Provide common object dumps for all communications over gearman"""
@classmethod
def _enforce_byte_string(cls, given_object):
if type(given_object) != str:
raise TypeError("Expecting byte string, got %r" % type(given_object))
@classmethod
def encode(cls, encodable_object):
cls._enforce_byte_string(encodable_object)
return encodable_object
@classmethod
def decode(cls, decodable_string):
cls._enforce_byte_string(decodable_string)
return decodable_string
class GearmanConnectionManager(object):
"""Abstract base class for any Gearman-type client that needs to connect/listen to multiple connections
Mananges and polls a group of gearman connections
Forwards all communication between a connection and a command handler
The state of a connection is represented within the command handler
Automatically encodes all 'data' fields as specified in protocol.py
"""
command_handler_class = None
connection_class = GearmanConnection
connection_poller_class = GearmanConnectionPoller
job_class = GearmanJob
job_request_class = GearmanJobRequest
data_encoder = NoopEncoder
def __init__(self, host_list=None):
assert self.command_handler_class is not None, 'GearmanClientBase did not receive a command handler class'
self._address_to_connection_map = {}
self._connection_to_handler_map = {}
self._handler_to_connection_map = {}
self._connection_poller = self.connection_poller_class()
host_list = host_list or []
for hostport_tuple in host_list:
self.connect_to_host(hostport_tuple)
def shutdown(self):
# Shutdown all our connections one by one
for gearman_connection in self._connection_to_handler_map.iterkeys():
try:
gearman_connection.close()
except ConnectionError:
pass
###################################
# Connection management functions #
###################################
def connect_to_host(self, hostport_tuple):
"""Add a new connection to this connection manager"""
gearman_host, gearman_port = gearman.util.disambiguate_server_parameter(hostport_tuple)
current_connection = self.connection_class(host=gearman_host, port=gearman_port)
# Establish a connection immediately - check for socket exceptions like: "host not found"
current_connection.connect()
client_connection = self.register_connection(current_connection)
return client_connection
def register_connection(self, current_connection):
# Once we have a socket, register this connection with the poller
current_connection.set_connection_manager(self)
self._connection_poller.add_connection(current_connection)
connection_address = current_connection.getpeername()
self._address_to_connection_map[connection_address] = current_connection
# Setup this CommandHandler
current_handler = self.command_handler_class(connection_manager=self)
self._setup_handler(current_handler)
# Setup bi-directional maps so we can forward events between these two calsses
self._connection_to_handler_map[current_connection] = current_handler
self._handler_to_connection_map[current_handler] = current_connection
return current_connection
def unregister_connection(self, current_connection):
# Immediately signal a disconnect!
current_handler = self.on_disconnect(current_connection)
self._handler_to_connection_map.pop(current_handler)
self._connection_to_handler_map.pop(current_connection)
connection_address = current_connection.getpeername()
self._address_to_connection_map.pop(connection_address)
self._connection_poller.remove_connection(current_connection)
current_connection.set_connection_manager(None)
return current_connection
@property
def connection_list(self):
return self._connection_to_handler_map.keys()
def _setup_handler(self, current_handler):
return current_handler
def poll_connections_until_stopped(self, continue_polling_callback, timeout=None):
return self._connection_poller.poll_until_stopped(continue_polling_callback, timeout=timeout)
def establish_connections(self):
for current_connection in self.connection_list:
if current_connection.disconnected:
current_connection.connect()
def wait_until_connection_established(self, poll_timeout=None):
# Poll to make sure we send out our request for a status update
self.establish_connections()
def continue_while_not_connected():
return not compat.any(current_connection.connected for current_connection in self.connection_list)
self.poll_connections_until_stopped(continue_while_not_connected, timeout=poll_timeout)
if not compat.any(current_connection.connected for current_connection in self.connection_list):
raise ServerUnavailable(self.connection_list)
def wait_until_connection_lost(self, poll_timeout=None):
def continue_while_connections_alive():
return compat.all([current_connection.connected for current_connection in self.connection_list])
self.poll_connections_until_stopped(continue_while_connections_alive, timeout=poll_timeout)
###################################
# Connection management functions #
###################################
### Handlers to take care of Connection Events
def on_recv_command(self, current_connection, cmd_type, cmd_args):
"""Forwarding connection communication on to the respect handler"""
current_handler = self._connection_to_handler_map[current_connection]
current_handler.recv_command(cmd_type, cmd_args)
return current_handler
def on_connect(self, current_connection):
current_handler = self._connection_to_handler_map[current_connection]
current_handler.on_connect()
return current_handler
def on_disconnect(self, current_connection):
current_handler = self._connection_to_handler_map[current_connection]
current_handler.on_disconnect()
return current_handler
# Handlers to take care of CommandHandlerEvents
def on_send_command(self, current_handler, cmd_type, cmd_args):
current_connection = self._handler_to_connection_map[current_handler]
current_connection.send_command(cmd_type, cmd_args)
return current_connection
def on_gearman_error(self, current_handler):
current_connection = self._handler_to_connection_map[current_handler]
return current_connection