Skip to content

Commit

Permalink
Normalize datetimes to account for tz
Browse files Browse the repository at this point in the history
This patch makes sure that datetimes in the auth_token middleware
are normalized to account for timezone offsets.

Some of the old tests were changed to ensure that the expires string
stored in the cache is in ISO 8601 format and not a random float.

Fixes bug 1195924

Change-Id: I5917ab728193cd2aa8784c4860a96cdc17f3d43f
  • Loading branch information
BryanSD committed Oct 9, 2013
1 parent e9fb6c7 commit 93793cb
Show file tree
Hide file tree
Showing 2 changed files with 236 additions and 7 deletions.
21 changes: 17 additions & 4 deletions keystoneclient/middleware/auth_token.py
Expand Up @@ -326,10 +326,12 @@ def confirm_token_not_expired(data):
timestamp = data['token']['expires_at']
else:
raise InvalidUserToken('Token authorization failed')
expires = timeutils.parse_isotime(timestamp).strftime('%s')
if time.time() >= float(expires):
expires = timeutils.parse_isotime(timestamp)
expires = timeutils.normalize_time(expires)
utcnow = timeutils.utcnow()
if utcnow >= expires:
raise InvalidUserToken('Token authorization failed')
return expires
return timeutils.isotime(at=expires, subsecond=True)


def safe_quote(s):
Expand Down Expand Up @@ -998,7 +1000,18 @@ def _cache_get(self, token_id, ignore_expires=False):
raise InvalidUserToken('Token authorization failed')

data, expires = cached
if ignore_expires or time.time() < float(expires):

try:
expires = timeutils.parse_isotime(expires)
except ValueError:
# Gracefully handle upgrade of expiration times from *nix
# timestamps to ISO 8601 formatted dates by ignoring old cached
# values.
return

expires = timeutils.normalize_time(expires)
utcnow = timeutils.utcnow()
if ignore_expires or utcnow < expires:
self.LOG.debug('Returning cached token %s', token_id)
return data
else:
Expand Down
222 changes: 219 additions & 3 deletions keystoneclient/tests/test_auth_token_middleware.py
Expand Up @@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.

import calendar
import datetime
import iso8601
import os
Expand Down Expand Up @@ -733,7 +734,9 @@ def test_encrypt_cache_data(self):
}
self.set_middleware(conf=conf)
token = 'my_token'
data = ('this_data', 10e100)
some_time_later = timeutils.utcnow() + datetime.timedelta(hours=4)
expires = timeutils.strtime(some_time_later)
data = ('this_data', expires)
self.middleware._init_cache({})
self.middleware._cache_store(token, data)
self.assertEqual(self.middleware._cache_get(token), data[0])
Expand All @@ -747,7 +750,9 @@ def test_sign_cache_data(self):
}
self.set_middleware(conf=conf)
token = 'my_token'
data = ('this_data', 10e100)
some_time_later = timeutils.utcnow() + datetime.timedelta(hours=4)
expires = timeutils.strtime(some_time_later)
data = ('this_data', expires)
self.middleware._init_cache({})
self.middleware._cache_store(token, data)
self.assertEqual(self.middleware._cache_get(token), data[0])
Expand All @@ -760,7 +765,9 @@ def test_no_memcache_protection(self):
}
self.set_middleware(conf=conf)
token = 'my_token'
data = ('this_data', 10e100)
some_time_later = timeutils.utcnow() + datetime.timedelta(hours=4)
expires = timeutils.strtime(some_time_later)
data = ('this_data', expires)
self.middleware._init_cache({})
self.middleware._cache_store(token, data)
self.assertEqual(self.middleware._cache_get(token), data[0])
Expand Down Expand Up @@ -1242,3 +1249,212 @@ def test_unquoted_token(self):

def test_quoted_token(self):
self.assertEqual('foo%20bar', auth_token.safe_quote('foo%20bar'))


class TokenExpirationTest(BaseAuthTokenMiddlewareTest):
def setUp(self):
super(TokenExpirationTest, self).setUp()
timeutils.set_time_override()
self.now = timeutils.utcnow()
self.delta = datetime.timedelta(hours=1)
self.one_hour_ago = timeutils.isotime(self.now - self.delta,
subsecond=True)
self.one_hour_earlier = timeutils.isotime(self.now + self.delta,
subsecond=True)

def tearDown(self):
super(TokenExpirationTest, self).tearDown()
timeutils.clear_time_override()

def create_v2_token_fixture(self, expires=None):
v2_fixture = {
'access': {
'token': {
'id': 'blah',
'expires': expires or self.one_hour_earlier,
'tenant': {
'id': 'tenant_id1',
'name': 'tenant_name1',
},
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
'serviceCatalog': {}
},
}

return v2_fixture

def create_v3_token_fixture(self, expires=None):

v3_fixture = {
'token': {
'expires_at': expires or self.one_hour_earlier,
'user': {
'id': 'user_id1',
'name': 'user_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'project': {
'id': 'tenant_id1',
'name': 'tenant_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'roles': [
{'name': 'role1', 'id': 'Role1'},
{'name': 'role2', 'id': 'Role2'},
],
'catalog': {}
}
}

return v3_fixture

def test_no_data(self):
data = {}
self.assertRaises(auth_token.InvalidUserToken,
auth_token.confirm_token_not_expired,
data)

def test_bad_data(self):
data = {'my_happy_token_dict': 'woo'}
self.assertRaises(auth_token.InvalidUserToken,
auth_token.confirm_token_not_expired,
data)

def test_v2_token_not_expired(self):
data = self.create_v2_token_fixture()
expected_expires = data['access']['token']['expires']
actual_expires = auth_token.confirm_token_not_expired(data)
self.assertEqual(actual_expires, expected_expires)

def test_v2_token_expired(self):
data = self.create_v2_token_fixture(expires=self.one_hour_ago)
self.assertRaises(auth_token.InvalidUserToken,
auth_token.confirm_token_not_expired,
data)

def test_v2_token_with_timezone_offset_not_expired(self):
current_time = timeutils.parse_isotime('2000-01-01T00:01:10.000123Z')
current_time = timeutils.normalize_time(current_time)
timeutils.set_time_override(current_time)
data = self.create_v2_token_fixture(
expires='2000-01-01T00:05:10.000123-05:00')
expected_expires = '2000-01-01T05:05:10.000123Z'
actual_expires = auth_token.confirm_token_not_expired(data)
self.assertEqual(actual_expires, expected_expires)

def test_v2_token_with_timezone_offset_expired(self):
current_time = timeutils.parse_isotime('2000-01-01T00:01:10.000123Z')
current_time = timeutils.normalize_time(current_time)
timeutils.set_time_override(current_time)
data = self.create_v2_token_fixture(
expires='2000-01-01T00:05:10.000123+05:00')
data['access']['token']['expires'] = '2000-01-01T00:05:10.000123+05:00'
self.assertRaises(auth_token.InvalidUserToken,
auth_token.confirm_token_not_expired,
data)

def test_v3_token_not_expired(self):
data = self.create_v3_token_fixture()
expected_expires = data['token']['expires_at']
actual_expires = auth_token.confirm_token_not_expired(data)
self.assertEqual(actual_expires, expected_expires)

def test_v3_token_expired(self):
data = self.create_v3_token_fixture(expires=self.one_hour_ago)
self.assertRaises(auth_token.InvalidUserToken,
auth_token.confirm_token_not_expired,
data)

def test_v3_token_with_timezone_offset_not_expired(self):
current_time = timeutils.parse_isotime('2000-01-01T00:01:10.000123Z')
current_time = timeutils.normalize_time(current_time)
timeutils.set_time_override(current_time)
data = self.create_v3_token_fixture(
expires='2000-01-01T00:05:10.000123-05:00')
expected_expires = '2000-01-01T05:05:10.000123Z'

actual_expires = auth_token.confirm_token_not_expired(data)
self.assertEqual(actual_expires, expected_expires)

def test_v3_token_with_timezone_offset_expired(self):
current_time = timeutils.parse_isotime('2000-01-01T00:01:10.000123Z')
current_time = timeutils.normalize_time(current_time)
timeutils.set_time_override(current_time)
data = self.create_v3_token_fixture(
expires='2000-01-01T00:05:10.000123+05:00')
self.assertRaises(auth_token.InvalidUserToken,
auth_token.confirm_token_not_expired,
data)

def test_cached_token_not_expired(self):
token = 'mytoken'
data = 'this_data'
self.set_middleware()
self.middleware._init_cache({})
some_time_later = timeutils.strtime(at=(self.now + self.delta))
expires = some_time_later
self.middleware._cache_put(token, data, expires)
self.assertEqual(self.middleware._cache_get(token), data)

def test_cached_token_not_expired_with_old_style_nix_timestamp(self):
"""Ensure we cannot retrieve a token from the cache.
Getting a token from the cache should return None when the token data
in the cache stores the expires time as a *nix style timestamp.
"""
token = 'mytoken'
data = 'this_data'
self.set_middleware()
self.middleware._init_cache({})
some_time_later = self.now + self.delta
# Store a unix timestamp in the cache.
expires = calendar.timegm(some_time_later.timetuple())
self.middleware._cache_put(token, data, expires)
self.assertIsNone(self.middleware._cache_get(token))

def test_cached_token_expired(self):
token = 'mytoken'
data = 'this_data'
self.set_middleware()
self.middleware._init_cache({})
some_time_earlier = timeutils.strtime(at=(self.now - self.delta))
expires = some_time_earlier
self.middleware._cache_put(token, data, expires)
self.assertIsNone(self.middleware._cache_get(token))

def test_cached_token_with_timezone_offset_not_expired(self):
token = 'mytoken'
data = 'this_data'
self.set_middleware()
self.middleware._init_cache({})
timezone_offset = datetime.timedelta(hours=2)
some_time_later = self.now - timezone_offset + self.delta
expires = timeutils.strtime(some_time_later) + '-02:00'
self.middleware._cache_put(token, data, expires)
self.assertEqual(self.middleware._cache_get(token), data)

def test_cached_token_with_timezone_offset_expired(self):
token = 'mytoken'
data = 'this_data'
self.set_middleware()
self.middleware._init_cache({})
timezone_offset = datetime.timedelta(hours=2)
some_time_earlier = self.now - timezone_offset - self.delta
expires = timeutils.strtime(some_time_earlier) + '-02:00'
self.middleware._cache_put(token, data, expires)
self.assertIsNone(self.middleware._cache_get(token))

0 comments on commit 93793cb

Please sign in to comment.