-
Notifications
You must be signed in to change notification settings - Fork 23
/
http.py
232 lines (200 loc) · 7.86 KB
/
http.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
import urllib
import ssl
from socket import (
gaierror,
error as socket_error,
)
from time import time
from urlparse import urlparse
from aplus import Promise
from futile.caching import LRUCache
from geventhttpclient.client import HTTPClient
from geventhttpclient.response import HTTPResponse
from openmtc.exc import (
OpenMTCNetworkError,
ConnectionFailed,
)
from openmtc_onem2m.exc import (
get_error_class,
get_response_status,
ERROR_MIN,
)
from openmtc_onem2m.model import (
ResourceTypeE,
get_short_attribute_name,
get_short_member_name,
)
from openmtc_onem2m.serializer.util import (
decode_onem2m_content,
encode_onem2m_content,
)
from openmtc_onem2m.transport import (
OneM2MOperation,
OneM2MResponse,
OneM2MErrorResponse,
)
from . import (
OneM2MClient,
normalize_path,
)
_method_map_to_http = {
OneM2MOperation.create: 'POST',
OneM2MOperation.retrieve: 'GET',
OneM2MOperation.update: 'PUT',
OneM2MOperation.delete: 'DELETE',
OneM2MOperation.notify: 'POST',
}
_clients = LRUCache(threadsafe=False)
_query_params = frozenset(['rt', 'rp', 'rcn', 'da', 'drt', 'rids', 'tids', 'ltids', 'tqi'])
_header_to_field_map = {
'X-M2M-ORIGIN': 'originator',
'X-M2M-RI': 'rqi',
'X-M2M-GID': 'gid',
'X-M2M-OT': 'ot',
'X-M2M-RST': 'rset',
'X-M2M-RET': 'rqet',
'X-M2M-OET': 'oet',
'X-M2M-EC': 'ec',
'X-M2M-RVI': 'rvi',
'X-M2M-VSI': 'vsi',
}
def get_client(m2m_ep, use_xml=False, ca_certs=None, cert_file=None, key_file=None,
insecure=False):
try:
return _clients[(m2m_ep, use_xml)]
except KeyError:
# TODO: make connection_timeout and concurrency configurable
client = _clients[(m2m_ep, use_xml)] = OneM2MHTTPClient(
m2m_ep, use_xml, ca_certs, cert_file, key_file, insecure)
return client
class OneM2MHTTPClient(OneM2MClient):
# defaults
DEF_SSL_VERSION = ssl.PROTOCOL_TLSv1_2
def __init__(self, m2m_ep, use_xml, ca_certs=None, cert_file=None, key_file=None,
insecure=False):
super(OneM2MHTTPClient, self).__init__()
self.parsed_url = urlparse(m2m_ep)
is_https = self.parsed_url.scheme[-1].lower() == "s"
port = self.parsed_url.port or (is_https and 443 or 80)
host = self.parsed_url.hostname
self.path = self.parsed_url.path.rstrip('/')
if self.path and not self.path.endswith('/'):
self.path += '/'
# TODO(rst): handle IPv6 host here
# geventhttpclient sets incorrect host header
# i.e "host: ::1:8000" instead of "host: [::1]:8000
if is_https:
ssl_options = {
'ssl_version': self.DEF_SSL_VERSION
}
if ca_certs:
ssl_options['ca_certs'] = ca_certs
if cert_file and key_file:
ssl_options['certfile'] = cert_file
ssl_options['keyfile'] = key_file
else:
ssl_options = None
def get_http_client():
return HTTPClient(host, port, connection_timeout=120.0,
concurrency=50, ssl=is_https,
ssl_options=ssl_options, insecure=insecure)
self._get_client = get_http_client
self.content_type = 'application/' + ('xml' if use_xml else 'json')
def _handle_network_error(self, exc, p, http_request, t,
exc_class=OpenMTCNetworkError):
error_str = str(exc)
if error_str in ("", "''"):
error_str = repr(exc)
method = http_request["method"]
path = http_request["request_uri"]
log_path = "%s://%s/%s" % (self.parsed_url.scheme, self.parsed_url.netloc, path)
error_msg = "Error during HTTP request: %s. " \
"Request was: %s %s (%.4fs)" % (error_str, method, log_path, time() - t)
p.reject(exc_class(error_msg))
def map_onem2m_request_to_http_request(self, onem2m_request):
"""
Maps a OneM2M request to a HTTP request
:param onem2m_request: OneM2M request to be mapped
:return: request: the resulting HTTP request
"""
self.logger.debug("Mapping OneM2M request to generic request: %s", onem2m_request)
params = {
param: getattr(onem2m_request, param) for param in _query_params
if getattr(onem2m_request, param) is not None
}
if onem2m_request.fc is not None:
filter_criteria = onem2m_request.fc
params.update({
(get_short_attribute_name(name) or get_short_member_name(name)): val
for name, val in filter_criteria.get_values(True).iteritems()
})
if onem2m_request.ae_notifying:
path = ''
else:
path = normalize_path(onem2m_request.to)
if params:
path += '?' + urllib.urlencode(params, True)
content_type, data = encode_onem2m_content(onem2m_request.content, self.content_type, path=path)
# TODO(rst): check again
# set resource type
if onem2m_request.operation == OneM2MOperation.create:
content_type += '; ty=' + str(ResourceTypeE[onem2m_request.resource_type.typename])
headers = {
header: getattr(onem2m_request, field)
for header, field in _header_to_field_map.iteritems()
if getattr(onem2m_request, field) is not None
}
headers['content-type'] = content_type
headers['accept'] = self.content_type
self.logger.debug("Added request params: %s", params)
return {
'method': _method_map_to_http[onem2m_request.operation],
'request_uri': self.path + path,
'body': data,
'headers': headers,
}
def map_http_response_to_onem2m_response(self, onem2m_request, response):
"""
Maps HTTP response to OneM2M response
:param onem2m_request: the OneM2M request that created the response
:param response: the HTTP response
:return: resulting OneM2MResponse or OneM2MErrorResponse
"""
if not isinstance(response, HTTPResponse):
self.logger.error("Not a valid response: %s", response)
# return OneM2MErrorResponse(STATUS_INTERNAL_SERVER_ERROR)
self.logger.debug("Mapping HTTP response for OneM2M response: %s", response)
rsc = response.get("x-m2m-rsc", 5000)
if int(rsc) >= ERROR_MIN:
return OneM2MErrorResponse(
get_error_class(rsc).response_status_code, onem2m_request)
return OneM2MResponse(
get_response_status(rsc),
request=onem2m_request,
rsc=rsc,
pc=decode_onem2m_content(response.read(), response.get("content-type"))
)
def send_onem2m_request(self, onem2m_request):
with Promise() as p:
http_request = self.map_onem2m_request_to_http_request(onem2m_request)
t = time()
client = self._get_client()
try:
response = client.request(**http_request)
except (socket_error, gaierror) as exc:
self._handle_network_error(exc, p, http_request, t, ConnectionFailed)
except Exception as exc:
self.logger.exception("Error in HTTP request")
self._handle_network_error(exc, p, http_request, t)
else:
try:
onem2m_response = self.map_http_response_to_onem2m_response(onem2m_request, response)
if isinstance(onem2m_response, OneM2MErrorResponse):
p.reject(onem2m_response)
else:
p.fulfill(onem2m_response)
finally:
response.release()
finally:
client.close()
return p