/
local.py
210 lines (179 loc) · 8.29 KB
/
local.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
from __future__ import (
absolute_import,
unicode_literals,
)
from collections import deque
from typing import (
Any,
Deque,
Dict,
Hashable,
Mapping,
Optional,
Type,
Union,
)
from conformity import fields
from pymetrics.recorders.base import MetricsRecorder
import six
from pysoa.common.transport.base import (
ClientTransport,
ReceivedMessage,
ServerTransport,
)
from pysoa.server.server import Server
__all__ = (
'LocalClientTransport',
'LocalClientTransportSchema',
'LocalServerTransport',
'LocalServerTransportSchema',
)
_server_settings = fields.SchemalessDictionary(
key_type=fields.UnicodeString(),
description='A dictionary of settings for the server (which will further validate them).',
)
class LocalClientTransportSchema(fields.Dictionary):
contents = {
# Server class can be an import path or a class object
'server_class': fields.Any(
fields.TypePath(
description='The importable Python path to the `Server`-extending class.',
base_classes=Server,
),
fields.TypeReference(
description='A reference to the `Server`-extending class',
base_classes=Server,
),
description='The path to the `Server` class to use locally (as a library), or a reference to the '
'`Server`-extending class/type itself.',
),
# No deeper validation than "schemaless dictionary" because the Server will perform its own validation
'server_settings': fields.Any(
fields.PythonPath(
value_schema=_server_settings,
description='The importable Python path to the settings dict, in the format "module.name:VARIABLE".',
),
_server_settings,
description='The settings to use when instantiating the `server_class`.',
),
}
description = 'The constructor kwargs for the local client transport.'
@fields.ClassConfigurationSchema.provider(LocalClientTransportSchema())
class LocalClientTransport(ClientTransport, ServerTransport):
"""A transport that incorporates a server for running a service and client in a single thread."""
def __init__(
self,
service_name, # type: six.text_type
metrics, # type: MetricsRecorder
server_class, # type: Union[six.text_type, Type[Server]]
server_settings # type: Union[six.text_type, Dict[six.text_type, Any]]
):
# type: (...) -> None
"""
:param service_name: The service name
:param metrics: The metrics recorder
:param server_class: The server class for which this transport will serve as a client
:param server_settings: The server settings that will be passed to the server class on instantiation
"""
super(LocalClientTransport, self).__init__(service_name, metrics)
# If the server is specified as a path, resolve it to a class
if isinstance(server_class, six.string_types):
try:
server_class = fields.PythonPath.resolve_python_path(server_class)
except (ValueError, ImportError, AttributeError) as e:
raise type(e)('Could not resolve server class path {}: {!r}'.format(server_class, e))
if not isinstance(server_class, type) or not issubclass(server_class, Server):
raise TypeError('server_class must be or extend Server')
# Make sure the client and the server match names
if server_class.service_name != service_name:
raise Exception('Server {} service name "{}" does not match "{}"'.format(
server_class,
server_class.service_name,
service_name,
))
# See if the server settings is actually a string to the path for settings
if isinstance(server_settings, six.string_types):
try:
settings_dict = fields.PythonPath.resolve_python_path(server_settings) # type: Dict[six.text_type, Any]
except (ValueError, ImportError, AttributeError) as e:
raise type(e)('Could not resolve settings path {}: {!r}'.format(server_settings, e))
if not isinstance(settings_dict, dict):
raise TypeError('Imported settings path {} is not a dictionary.'.format(server_settings))
else:
settings_dict = server_settings
# Patch settings_dict to use LocalServerTransport, temporarily, to prevent recursive construction (actual
# transport will be set to `self` below).
settings_dict['transport'] = {
'path': 'pysoa.common.transport.local:LocalServerTransport',
}
# Set an empty queued request; we'll use this later
self._current_request = None # type: Optional[ReceivedMessage]
# Set up a deque for responses for just this client
self.response_messages = deque() # type: Deque[ReceivedMessage]
# Create and setup Server instance
self.server_settings = server_class.settings_class(settings_dict)
self.server = server_class(self.server_settings) # type: Server
self.server.transport = self
self.server.setup()
def send_request_message(self, request_id, meta, body, _=None):
# type: (int, Dict[six.text_type, Any], Dict[six.text_type, Any], Optional[int]) -> None
"""
Receives a request from the client and handles and dispatches in in-thread. `message_expiry_in_seconds` is not
supported. Messages do not expire, as the server handles the request immediately in the same thread before
this method returns. This method blocks until the server has completed handling the request.
"""
self._current_request = ReceivedMessage(request_id, meta, body)
try:
self.server.handle_next_request()
finally:
self._current_request = None
def receive_request_message(self):
# type: () -> ReceivedMessage
"""
Gives the server the current request (we are actually inside the stack of send_request_message so we know this
is OK).
"""
if self._current_request:
try:
return self._current_request
finally:
self._current_request = None
else:
raise RuntimeError('Local server tried to receive message more than once')
def send_response_message(self, request_id, meta, body):
# type: (int, Dict[six.text_type, Any], Dict[six.text_type, Any]) -> None
"""
Add the response to the deque.
"""
self.response_messages.append(ReceivedMessage(request_id, meta, body))
def receive_response_message(self, _=None):
# type: (Optional[int]) -> ReceivedMessage
"""
Receives a message from the deque. `receive_timeout_in_seconds` is not supported. Receive does not time out,
because by the time the thread calls this method, a response is already available in the deque, or something
happened and a response will never be available. This method does not wait and returns immediately.
"""
if self.response_messages:
return self.response_messages.popleft()
return ReceivedMessage(None, None, None)
class LocalServerTransportSchema(fields.Dictionary):
contents = {} # type: Mapping[Hashable, fields.Base]
description = 'The local server transport takes no constructor kwargs.'
@fields.ClassConfigurationSchema.provider(LocalServerTransportSchema())
class LocalServerTransport(ServerTransport):
"""
Empty class that we use as an import stub for local transport before we swap in the Client transport instance to do
double duty.
"""
def receive_request_message(self):
"""
Does nothing, because this will never be called (the same-named method on the `LocalClientTransport` is called,
instead).
"""
raise TypeError('The LocalServerTransport cannot be used directly; it is a stub.')
def send_response_message(self, request_id, meta, body):
"""
Does nothing, because this will never be called (the same-named method on the `LocalClientTransport` is called,
instead).
"""
raise TypeError('The LocalServerTransport cannot be used directly; it is a stub.')