/
test_gaesessions.py
373 lines (332 loc) · 14.7 KB
/
test_gaesessions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
from base64 import b64decode, b64encode
import logging
import pickle
import time
from google.appengine.ext import db
from nose.tools import assert_equal, assert_not_equal, assert_raises
from main import make_entity
from gaesessions import COOKIE_NAME_PREFIX, SessionMiddleware, SID_LEN, SIG_LEN
from SessionTester import SessionTester
# Tests (each on a variety of configurations):
# 0) Correct session usage and memcache loss
# 1) Session expiration
# 2) Bad cookie data (e.g., sig invalid due to data changed by user)
# 3) API downtime (future work)
logger = logging.getLogger('TESTS ')
logger.setLevel(logging.DEBUG)
def test_middleware():
"""Tests that the middleware requires cookie_key when it should."""
logging.debug("cookie_key is required and needs to be reasonably long")
assert_raises(ValueError, SessionMiddleware, None, None)
assert_raises(ValueError, SessionMiddleware, None, cookie_key='blah')
SessionMiddleware(None, cookie_only_threshold=10, cookie_key='blah'*8)
SessionMiddleware(None, cookie_only_threshold=0, cookie_key="still need a key"*4)
def test_sessions():
"""Run a variety of tests on various session configurations (includes
whether or not to use the datastore and the cookie only threshold).
"""
CHECKS = (check_correct_usage, check_expiration, check_bad_cookie, check_various_session_sizes)
for no_datastore in (False, True):
if no_datastore:
test_db = 'without'
else:
test_db = 'with'
for cot in (0, 10*1024, 2**30):
if cot == 0:
test_cookie = 'no data stored in cookies'
elif cot == 2**30:
test_cookie = 'data only stored in cookies'
else:
test_cookie = 'store data in cookies when its encoded size<=%dB' % cot
for check in CHECKS:
logger.debug('\n\n' + '*'*50)
logger.debug('Running %s %s datastore and %s' % (check.__name__, test_db, test_cookie))
yield check, no_datastore, cot
# helper function which checks how many sessions we should have in the db
# given the current test's configuration
def generic_expected_num_sessions_in_db_if_db_used(st, no_datastore, cookie_only_threshold,
num, num_above_cookie_thresh=0, num_after=None):
if not no_datastore:
if cookie_only_threshold == 0:
st.verify_active_sessions_in_db(num,num_after)
else:
st.verify_active_sessions_in_db(num_above_cookie_thresh, num_after)
else:
st.verify_active_sessions_in_db(0) # cookie or memcache only
def check_correct_usage(no_datastore, cookie_only_threshold):
"""Checks correct usage of session including in the face of memcache data loss."""
def minitest_divider(test):
logger.debug('\n\n' + '-'*50)
logger.debug(test + ' (nd=%s cot=%s)' % (no_datastore, cookie_only_threshold))
st = SessionTester(no_datastore=no_datastore, cookie_only_threshold=cookie_only_threshold)
expected_num_sessions_in_db_if_db_used = lambda a,b=0 : generic_expected_num_sessions_in_db_if_db_used(st, no_datastore, cookie_only_threshold, a, b)
st.verify_active_sessions_in_db(0)
minitest_divider('try doing nothing (no session should be started)')
st.noop()
st.verify_active_sessions_in_db(0)
minitest_divider('start a session with a single write')
st.start_request()
str(st)
assert st.get_expiration()==0, "no session yet => no expiration yet"
assert st.is_active() is False
st['x'] = 7
assert st.is_active() is True
st.finish_request_and_check()
expected_num_sessions_in_db_if_db_used(1)
minitest_divider('start another session')
st2 = SessionTester(st=st)
st2.start_request()
assert not st2.is_active()
assert st2.get('x') is None, "shouldn't get other session's data"
assert not st2.is_active(), "still shouldn't be active - nothing set yet"
st2['x'] = 'st2x'
assert st2.is_active()
st2.finish_request_and_check()
expected_num_sessions_in_db_if_db_used(2)
minitest_divider('each session should get a unique sid')
assert st2.ss.sid != st.ss.sid
minitest_divider('we should still have the values we set earlier')
st.start_request()
str(st)
assert_equal(st['x'], 7)
st.finish_request_and_check()
st2.start_request()
assert_equal(st2['x'], 'st2x')
st2.finish_request_and_check()
minitest_divider("check get session by sid, save(True), and terminate()")
if cookie_only_threshold == 0:
data1 = st.ss.data
data2 = st2.ss.data
else:
# data is being stored in cookie-only form => won't be in the db
data1 = data2 = {}
resp = st.get_url('/get_by_sid?sid=%s' % st.ss.sid)
assert_equal(pickle.loads(b64decode(resp.body)), data1)
resp = st2.get_url('/get_by_sid?sid=%s' % st2.ss.sid)
assert_equal(pickle.loads(b64decode(resp.body)), data2)
expected_num_sessions_in_db_if_db_used(2)
st.start_request()
st['y'] = 9 # make the session dirty
st.save(True) # force it to persist to the db even though it normally wouldn't
st.finish_request_and_check()
# now the data should be in the db
resp = st.get_url('/get_by_sid?sid=%s' % st.ss.sid)
assert_equal(pickle.loads(b64decode(resp.body)), st.ss.data)
expected_num_sessions_in_db_if_db_used(2, 1)
st.start_request()
st.terminate() # remove it from the db
st.finish_request_and_check()
expected_num_sessions_in_db_if_db_used(1)
minitest_divider("should be able to terminate() and then start a new session all in one request")
st.start_request()
st['y'] = 'yy'
assert_equal(st.get('y'), 'yy')
st.terminate()
assert_raises(KeyError, st.__getitem__, 'y')
st['x'] = 7
st.finish_request_and_check()
expected_num_sessions_in_db_if_db_used(2)
minitest_divider("regenerating SID test")
initial_sid = st.ss.sid
st.start_request()
initial_expir = st.get_expiration()
st.regenerate_id()
assert_equal(st['x'], 7, "data should not be affected")
st.finish_request_and_check()
assert_not_equal(initial_sid, st.ss.sid, "regenerated sid should be different")
assert_equal(initial_expir, st._get_expiration(), "expiration should not change")
st.start_request()
assert_equal(st['x'], 7, "data should not be affected")
st.finish_request_and_check()
expected_num_sessions_in_db_if_db_used(2)
minitest_divider("regenerating SID test w/new expiration time")
initial_sid = st.ss.sid
st.start_request()
initial_expir = st.get_expiration()
new_expir = initial_expir + 120 # something new
st.regenerate_id(expiration_ts=new_expir)
assert_equal(st['x'], 7, "data should not be affected")
st.finish_request_and_check()
assert_not_equal(initial_sid, st.ss.sid, "regenerated sid should be different")
assert_equal(new_expir, st._get_expiration(), "expiration should be what we asked for")
st.start_request()
assert_equal(st['x'], 7, "data should not be affected")
st.finish_request_and_check()
expected_num_sessions_in_db_if_db_used(2)
minitest_divider("check basic dictionary operations")
st.start_request()
st['s'] = 'aaa'
st['i'] = 99
st['f'] = 4.37
assert_equal(st.pop('s'), 'aaa')
assert_equal(st.pop('s'), None)
assert_equal(st.pop('s', 'nil'), 'nil')
assert st.has_key('i')
assert not st.has_key('s')
assert_equal(st.get('i'), 99)
assert_equal(st.get('ii'), None)
assert_equal(st.get('iii', 3), 3)
assert_equal(st.get('f'), st['f'])
del st['f']
assert_raises(KeyError, st.__getitem__, 'f')
assert 'f' not in st
assert 'i' in st
assert_equal(st.get('x'), 7)
st.clear()
assert 'i' not in st
assert 'x' not in st
st.finish_request_and_check()
minitest_divider("add complex data (models and objects) to the session")
st.start_request()
st['model'] = make_entity(0)
st['dict'] = dict(a='alpha', c='charlie', e='echo')
st['list'] = ['b', 'd', 'f']
st['set'] = set([2, 3, 5, 7, 11, 13, 17, 19])
st['tuple'] = (7, 7, 1985)
st.finish_request_and_check()
st.start_request()
st.clear()
st.finish_request_and_check()
minitest_divider("test quick methods: basic usage")
st.start_request()
st.set_quick('msg', 'mc only!')
assert_equal('mc only!', st['msg'])
st.finish_request_and_check()
st.start_request()
assert_equal('mc only!', st.pop_quick('msg'))
assert_raises(KeyError, st.__getitem__, 'msg')
st.finish_request_and_check()
minitest_divider("test quick methods: flush memcache (value will be lost if not using cookies)")
st.start_request()
st.set_quick('a', 1)
st.set_quick('b', 2)
st.finish_request_and_check()
st.flush_memcache()
st.start_request()
if cookie_only_threshold > 0:
assert_equal(st['a'], 1)
assert_equal(st['b'], 2)
else:
assert_raises(KeyError, st.__getitem__, 'a')
assert_raises(KeyError, st.__getitem__, 'b')
st.finish_request_and_check()
minitest_divider("test quick methods: flush memcache should have no impact if another mutator is also used (and this ISNT memcache-only)")
st.start_request()
st['x'] = 24
st.set_quick('a', 1)
st.finish_request_and_check()
st.flush_memcache()
st.start_request()
if no_datastore and cookie_only_threshold == 0:
assert_raises(KeyError, st.__getitem__, 'a')
assert_raises(KeyError, st.__getitem__, 'x')
else:
assert_equal(st['a'], 1)
assert_equal(st['x'], 24)
st.set_quick('msg', 'hello')
st['z'] = 99
st.finish_request_and_check()
MAX_COOKIE_ONLY_SIZE_TO_TEST = 59 * 1024
def check_various_session_sizes(no_datastore, cookie_only_threshold):
for log2_data_sz_bytes in xrange(10,22):
if log2_data_sz_bytes == 20:
# maximum data size is 1MB *including* overhead, so just try up to
# about the maximum size not including overhead (overhead based on,
# actual data but for this test it looks like about 5%).
data_sz_bytes = 2**log2_data_sz_bytes - 50*1024
elif log2_data_sz_bytes == 16 and cookie_only_threshold>2**15:
# the minimums recommended for cookie storage is 20 cookies of 4KB
# each. 64KB of data plus overhead is just a notch above this, so
# shrink this test just a tad for cookie-only sessions to get about
# 20 full cookies - about 59KB of raw data for this test.
data_sz_bytes = MAX_COOKIE_ONLY_SIZE_TO_TEST
else:
data_sz_bytes = 2**log2_data_sz_bytes
logging.info("trying session with %dB (~%.1fKB) (before encoding)" % (data_sz_bytes, data_sz_bytes/1024.0))
if cookie_only_threshold<data_sz_bytes or data_sz_bytes<=MAX_COOKIE_ONLY_SIZE_TO_TEST:
st = SessionTester(no_datastore=no_datastore, cookie_only_threshold=cookie_only_threshold)
st.start_request()
st['v'] = 'x' * data_sz_bytes
expect_fail = (log2_data_sz_bytes >= 21)
st.finish_request_and_check(expect_failure=expect_fail)
else:
logging.info("skipped - too big for cookie-only data")
def check_expiration(no_datastore, cookie_only_threshold):
st = SessionTester(no_datastore=no_datastore, cookie_only_threshold=cookie_only_threshold)
expected_num_sessions_in_db_if_db_used = lambda a,c : generic_expected_num_sessions_in_db_if_db_used(st, no_datastore, cookie_only_threshold, a, 0, c)
# generate some sessions
num_to_start = 20
sessions_which_expire_shortly = (1, 3, 8, 9, 11)
sts = []
for i in xrange(num_to_start):
stnew = SessionTester(st=st)
sts.append(stnew)
stnew.start_request()
if i in sessions_which_expire_shortly:
stnew.start(expiration_ts=time.time()-1)
else:
stnew.start(expiration_ts=time.time()+600)
stnew.finish_request_and_check()
# try accessing an expired session
st_expired = sts[sessions_which_expire_shortly[0]]
st_expired.start_request()
assert not st_expired.is_active()
st_expired.finish_request_and_check()
if cookie_only_threshold > 0:
return # no need to see if cleaning up db works - nothing there for this case
# check that after cleanup only unexpired ones are left in the db
num_left = num_to_start - len(sessions_which_expire_shortly)
expected_num_sessions_in_db_if_db_used(num_to_start-1, num_left) # -1 b/c we manually expired one above
def check_bad_cookie(no_datastore, cookie_only_threshold):
for test in (check_bad_sid, check_manip_cookie_data, check_bogus_data, check_bogus_data2):
logger.info('preparing for %s' % test.__name__)
st = SessionTester(no_datastore=no_datastore, cookie_only_threshold=cookie_only_threshold)
st.start_request()
st['x'] = 7
st.finish_request_and_check()
logger.info('running %s' % test.__name__)
test(st, st.get_cookies())
st.new_session_state()
st.start_request()
assert not st.is_active() # due to invalid sig
st.finish_request_and_check()
def check_bad_sid(st, cookies):
cv = cookies[COOKIE_NAME_PREFIX + '00']
sid = cv[SIG_LEN:SIG_LEN+SID_LEN]
bad_sid = ''.join(reversed(sid))
cookies[COOKIE_NAME_PREFIX + '00'] = cv[:SIG_LEN]+bad_sid+cv[SID_LEN+SIG_LEN:]
def check_manip_cookie_data(st, cookies):
cv = cookies[COOKIE_NAME_PREFIX + '00']
cookies[COOKIE_NAME_PREFIX + '00'] = cv[:SIG_LEN+SID_LEN] + b64encode(pickle.dumps(dict(evil='fail'),2))
def check_bogus_data(st, cookies):
cv = cookies[COOKIE_NAME_PREFIX + '00']
cookies[COOKIE_NAME_PREFIX + '00'] = cv[:SIG_LEN+SID_LEN] + "==34@#K$$;))" # invalid "base64"
def check_bogus_data2(st, cookies):
cookies[COOKIE_NAME_PREFIX + '00'] = "blah"
def test_cookies_deleted_when_session_storage_moved_to_backend():
logger.info("make a session with data stored in the cookie")
st = SessionTester(no_datastore=False, cookie_only_threshold=14*1024)
st.start_request()
st['junk'] = 'x' * 9000 # fits in the cookie
st.finish_request_and_check()
assert not st.ss.in_mc
logger.info("force the session to be stored on the backend (too big for app engine headers)")
st.start_request()
st['junk'] = 'x' * 16000 # does NOT fit in the cookie
st.finish_request_and_check()
assert st.ss.in_mc
def main():
"""Run nose tests and generate a coverage report."""
import coverage
import nose
import os
from shutil import rmtree
rmtree('./covhtml', ignore_errors=True)
try:
os.remove('./.coverage')
except Exception,e:
pass
# run nose in its own process because the .coverage file isn't written
# until the process terminates and we need to read it
nose.run()
if __name__ == '__main__': main()