-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.py
executable file
·350 lines (304 loc) · 10.4 KB
/
index.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
#! /bin/env python
#coding=utf-8
# from Crypto.Cipher import AES
import os
import base64
import urllib
import hashlib
import hmac
import json
import logging
import datetime
import requests
import binascii
import random
import string
import urllib2
# 假设这是设备本身附带的ID列表
WhiteList = {111222,222333,666777,888999}
IAM_NOHEADERS = 400
IAM_HEADERS_NOHOST = 401
IAM_HEADERS_NOSIGNED = 402
IAM_NOMETHOD = 420
IAM_NOURI = 440
IAM_SIGNINGKEY = 460
IAM_SIGNATURE = 480
errmessage = {
400: '(IAM) missing required headers',
401: '(IAM) headers lack host',
402: '(IAM) header to singed no exist',
420: '(IAM) missing required method',
440: '(IAM) uri is empty',
460: '(IAM) signing key error',
480: '(IAM) signature error',
}
class ErrorInfo(object):
"""Error Information.
"""
def __init__(self, code, info=None):
self._code = code
self._message = errmessage.get(code, '')
self._info = info
def __str__(self):
return self.tostring()
def tostring(self):
"""convert to string
Returns:
return the errorstring
"""
info = '%s [Errno %d]' % (self._message, self._code)
if self._info:
info += " %s" % self._info
return info
def serialize_authorization(auth):
"""
serialize Authorization object to authorization string
"""
val = "/".join((auth['version'], auth['access'], auth['timestamp'], auth['period'],";".join(auth['signedheaders']), auth['signature']))
return BceSigner.get_utf8_value(val)
def build_authorization(accesskey, signedheaders, period=1800, timestamp=None):
"""
build Authorization object
"""
auth = {}
auth['version'] = "bce-auth-v1"
auth['access'] = accesskey
if not timestamp:
auth['timestamp'] = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
else:
auth['timestamp'] = timestamp
auth['period'] = str(period)
auth['signedheaders'] = signedheaders
return auth
def is_utc_timestamp(timestamp):
"""
check if timestamp is with utc format
"""
try:
datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ')
return True
except ValueError:
return False
def is_integer(value):
"""
check if value is an interger
"""
try:
v = int(value)
return True
except ValueError:
return False
class BceSigner(object):
"""
Utility class which adds allows a request to be signed with an BCE signature
"""
def __init__(self, access_key, secret_key, logger=None):
self.access_key = access_key.encode()
self.secret_key = secret_key.encode()
if logger:
self.logger = logger
else:
self.logger = logging.getLogger()
self.logger.addHandler(logging.StreamHandler())
self.logger.setLevel(logging.INFO)
def gen_authorization(self, request, timestamp=None, expire_period=1800):
"""
generate authorization string
if not specify timestamp, then use current time;
"""
signedheaders = []
if "headers" in request:
signedheaders = list(key.lower() for key in request["headers"].keys() if key != '')
signedheaders.sort()
authorization = build_authorization(self.access_key,signedheaders, expire_period, timestamp)
signingkey = self._calc_signingkey(authorization)
self.logger.debug("SigningKey: %(signingkey)s", {"signingkey": signingkey})
signature = self._calc_signature(signingkey, request, signedheaders)
authorization["signature"] = signature
return serialize_authorization(authorization)
def authenticate(self, authorization, request):
"""
autenticate a request.
calcaulate request signature, and compare with authorization
"""
signingkey = self._calc_signingkey(authorization)
self.logger.debug("SigningKey: %(signingkey)s", {"signingkey": signingkey})
signature = self._calc_signature(signingkey, request, authorization["signedheaders"])
return signature == authorization["signature"]
@staticmethod
def get_utf8_value(value):
"""
Get the UTF8-encoded version of a value.
"""
if not isinstance(value, (str, unicode)):
value = str(value)
if isinstance(value, unicode):
return value.encode('utf-8')
else:
return value
@staticmethod
def canonical_qs(params):
"""
Construct a sorted, correctly encoded query string
"""
keys = list(params)
keys.sort()
pairs = []
for key in keys:
if key == "authorization":
continue
val = BceSigner.normalized(params[key])
pairs.append(urllib.quote(key, safe='') + '=' + val)
qs = '&'.join(pairs)
return qs
@staticmethod
def canonical_header_str(headers, signedheaders=None):
"""
calculate canonicalized header string
"""
headers_norm_lower = dict()
for (k, v) in headers.iteritems():
key_norm_lower = BceSigner.normalized(k.lower())
value_norm_lower = BceSigner.normalized(v.strip())
headers_norm_lower[key_norm_lower] = value_norm_lower
keys = list(headers_norm_lower)
keys.sort()
if "host" not in keys:
raise IAMHeaderError(ErrorInfo(IAM_HEADERS_NOHOST))
header_list = []
default_signed = ("host", "content-length", "content-type", "content-md5")
if signedheaders:
for key in signedheaders:
key = BceSigner.normalized(key.lower())
if key not in keys:
raise IAMHeaderError(ErrorInfo(IAM_HEADERS_NOSIGNED))
if headers_norm_lower[key]:
header_list.append(key + ":" + headers_norm_lower[key])
else:
for key in keys:
if key.startswith("x-bce-") or key in default_signed:
header_list.append(key + ":" + headers_norm_lower[key])
return '\n'.join(header_list)
@staticmethod
def normalized_uri(uri):
"""
Construct a normalized(except slash '/') uri
eg. /json-api/v1/example/ ==> /json-api/v1/example/
"""
return urllib.quote(BceSigner.get_utf8_value(uri), safe='-_.~/')
@staticmethod
def normalized(msg):
"""
Construct a normalized uri
"""
return urllib.quote(BceSigner.get_utf8_value(msg), safe='-_.~')
def _calc_signingkey(self, auth):
""" Get a a signing key """
string_to_sign = "/".join((auth['version'], auth['access'],
auth['timestamp'], auth['period']))
try:
signingkey = hmac.new(self.secret_key, self.get_utf8_value(string_to_sign),hashlib.sha256).hexdigest()
return signingkey
except Exception as err:
raise IAMSignatureError(ErrorInfo(IAM_SIGNINGKEY, str(err)))
def _calc_signature(self, key, request, signedheaders):
"""Generate BCE signature string."""
if not request.get('method'):
raise IAMMethodError(ErrorInfo(IAM_NOMETHOD))
if not request.get('uri'):
raise IAMURIError(ErrorInfo(IAM_NOURI))
# Create canonical request
params = {}
headers = {}
if "params" in request: params = request['params']
if "headers" in request: headers = request['headers']
cr = "\n".join((request['method'].upper(),
self.normalized_uri(request['uri']),
self.canonical_qs(params),
BceSigner.canonical_header_str(headers, signedheaders)))
try:
self.logger.debug("CanonicalRequest: %(request)s", {"request": cr})
signature = hmac.new(key, cr, hashlib.sha256).hexdigest()
self.logger.debug("Signature: %(signature)s", {"signature": signature})
return signature
except Exception as err:
raise IAMSignatureError(ErrorInfo(IAM_SIGNATURE, str(err)))
class IAMError(Exception):
"""
base Error class
"""
def __init__(self, message):
self.message = "[IAMError] " + message
super(IAMError, self).__init__(self.message)
class IAMHeaderError(IAMError):
"""
IAM Header errors
"""
pass
class IAMMethodError(IAMError):
"""
IAM Method errors
"""
pass
class IAMURIError(IAMError):
"""
IAM URI errors
"""
pass
class IAMSignatureError(IAMError):
"""
IAM Signature errors
"""
pass
#上面是签名所定义的,下面是鉴权和请求
def handler(event, context):
accesskey=os.environ.get("AK")
secretkey=os.environ.get("SK")
schemaId = os.environ.get("defaultSchemaId")
body = base64.b64decode(event["body"]) if event["isBase64Encoded"] else event["body"]
body = json.loads(body)
# 白名单检查
if ("deviceId" in body) :
deviceId = body["deviceId"]
else:
return "{'message':'err,no device Id information'}"
if deviceId not in WhiteList:
return "{'message':'err,this device is not in the white list'}"
if ("deviceName" in body) :
deviceName = body["deviceName"]
else:
return "{'message':'err,no deviceName'}"
if ("description" in body) :
description = body["description"]
else:
description = os.environ.get("defaultDescription")
headers={}
headers['x-bce-date'] = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
headers['Content-Type'] = 'application/json'
headers['Host'] = 'iotdm.gz.baidubce.com'
body = {
"deviceName": deviceName,
"description": description,
"schemaId":schemaId
}
params = None
request = {
'method': 'POST',
'uri': '/v3/iot/management/device',
'headers': headers
}
signer = BceSigner(accesskey, secretkey)
auth = signer.gen_authorization(request)
headers['Authorization'] = auth
url = 'http://iotdm.gz.baidubce.com/v3/iot/management/device'
response = requests.request('POST', url, headers=headers,params=params,data=json.dumps(body))
# 处理返回数据
resp = {
"isBase64Encoded": False,
"statusCode": 200,
"headers": {
"X-Custom-Header": "headerValue"
},
"body": json.dumps(response.json())
}
return resp