/
edgeauth.py
193 lines (161 loc) · 6.53 KB
/
edgeauth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# -*- coding: utf-8 -*-
import binascii
import hashlib
import hmac
import os
import re
import sys
import time
if sys.version_info[0] >= 3:
from urllib.parse import quote_plus
else:
from urllib import quote_plus
# Force the local timezone to be GMT.
os.environ['TZ'] = 'GMT'
class EdgeAuthError(Exception):
def __init__(self, text):
self._text = text
def __str__(self):
return 'EdgeAuthError:{0}'.format(self._text)
def _getText(self):
return str(self)
text = property(_getText, None, None,
'Formatted error text.')
class EdgeAuth:
def __init__(self, token_type=None, token_name='__token__',
key=None, algorithm='sha256', salt=None,
ip=None, payload=None, session_id=None,
start_time=None, end_time=None, window_seconds=None,
field_delimiter='~', acl_delimiter='!',
escape_early=False, verbose=False):
if key is None or len(key) <= 0:
raise EdgeAuthError('You must provide a secret in order to '
'generate a new token.')
self.token_type = token_type
self.token_name = token_name
self.key = key
self.algorithm = algorithm
self.salt = salt
self.ip = ip
self.payload = payload
self.session_id = session_id
self.start_time = start_time
self.end_time = end_time
self.window_seconds = window_seconds
self.field_delimiter = field_delimiter
self.acl_delimiter = acl_delimiter
self.escape_early = escape_early
self.verbose = verbose
def _escape_early(self, text):
if self.escape_early:
def toLower(match):
return match.group(1).lower()
return re.sub(r'(%..)', toLower, quote_plus(text))
else:
return text
def _generate_token(self, path, is_url):
start_time = self.start_time
end_time = self.end_time
if str(start_time).lower() == 'now':
start_time = int(time.mktime(time.gmtime()))
elif start_time:
try:
if int(start_time) <= 0:
raise EdgeAuthError('start_time must be ( > 0 )')
except:
raise EdgeAuthError('start_time must be numeric or now')
if end_time:
try:
if int(end_time) <= 0:
raise EdgeAuthError('end_time must be ( > 0 )')
except:
raise EdgeAuthError('end_time must be numeric')
if self.window_seconds:
try:
if int(self.window_seconds) <= 0:
raise EdgeAuthError('window_seconds must be ( > 0 )')
except:
raise EdgeAuthError('window_seconds must be numeric')
if end_time is None:
if self.window_seconds:
if start_time is None:
# If we have a window_seconds without a start time,
# calculate the end time starting from the current time.
end_time = int(time.mktime(time.gmtime())) + \
self.window_seconds
else:
end_time = start_time + self.window_seconds
else:
raise EdgeAuthError('You must provide an expiration time or '
'a duration window ( > 0 )')
if start_time and (end_time <= start_time):
raise EdgeAuthError('Token will have already expired.')
if self.verbose:
print('''
Akamai Token Generation Parameters
Token Type : {0}
Token Name : {1}
Key/Secret : {2}
Algo : {3}
Salt : {4}
IP : {5}
Payload : {6}
Session ID : {7}
Start Time : {8}
End Time : {9}
Window(seconds) : {10}
Field Delimiter : {11}
ACL Delimiter : {12}
Escape Early : {13}
PATH : {14}
Generating token...'''.format(self.token_type if self.token_type else '',
self.token_name if self.token_name else '',
self.key if self.key else '',
self.algorithm if self.algorithm else '',
self.salt if self.salt else '',
self.ip if self.ip else '',
self.payload if self.payload else '',
self.session_id if self.session_id else '',
start_time if start_time else '',
end_time if end_time else '',
self.window_seconds if self.window_seconds else '',
self.field_delimiter if self.field_delimiter else '',
self.acl_delimiter if self.acl_delimiter else '',
self.escape_early if self.escape_early else '',
('url: ' if is_url else 'acl: ') + path))
hash_source = []
new_token = []
if self.ip:
new_token.append('ip={0}'.format(self._escape_early(self.ip)))
if start_time:
new_token.append('st={0}'.format(start_time))
new_token.append('exp={0}'.format(end_time))
if not is_url:
new_token.append('acl={0}'.format(path))
if self.session_id:
new_token.append('id={0}'.format(self._escape_early(self.session_id)))
if self.payload:
new_token.append('data={0}'.format(self._escape_early(self.payload)))
hash_source = list(new_token)
if is_url:
hash_source.append('url={0}'.format(self._escape_early(path)))
if self.salt:
hash_source.append('salt={0}'.format(self.salt))
if self.algorithm.lower() not in ('sha256', 'sha1', 'md5'):
raise EdgeAuthError('Unknown algorithm')
token_hmac = hmac.new(
binascii.a2b_hex(self.key.encode()),
self.field_delimiter.join(hash_source).encode(),
getattr(hashlib, self.algorithm.lower())).hexdigest()
new_token.append('hmac={0}'.format(token_hmac))
return self.field_delimiter.join(new_token)
def generate_acl_token(self, acl):
if not acl:
raise EdgeAuthError('You must provide acl')
elif isinstance(acl, list):
acl = self.acl_delimiter.join(acl)
return self._generate_token(acl, False)
def generate_url_token(self, url):
if not url:
raise EdgeAuthError('You must provide url')
return self._generate_token(url, True)