Skip to content

Commit

Permalink
Add request validation test
Browse files Browse the repository at this point in the history
  • Loading branch information
gerardparis committed Oct 4, 2016
1 parent 917143c commit 35bcb0c
Showing 1 changed file with 50 additions and 2 deletions.
52 changes: 50 additions & 2 deletions api/api/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
from django.conf import settings
from django.core.urlresolvers import resolve
from django.test import TestCase, override_settings
from datetime import datetime, timedelta
from rest_framework.test import APIRequestFactory

from .common_utils import get_all_registered_nodes, remove_extra_whitespaces, to_json_bools, rsync_dir_with_nodes
from .common_utils import get_all_registered_nodes, remove_extra_whitespaces, to_json_bools, rsync_dir_with_nodes, is_valid_request
from .exceptions import FileSynchronizationException

# Tests use database=10 instead of 0.
Expand All @@ -14,6 +16,7 @@ class MainTestCase(TestCase):
def setUp(self):
self.r = redis.Redis(connection_pool=settings.REDIS_CON_POOL)
self.create_nodes()
self.factory = APIRequestFactory()

def tearDown(self):
self.r.flushdb()
Expand Down Expand Up @@ -70,6 +73,45 @@ def test_rsync_dir_with_nodes_when_rsync_fails(self, mock_os_system):
with self.assertRaises(FileSynchronizationException):
rsync_dir_with_nodes(settings.WORKLOAD_METRICS_DIR)

@mock.patch('api.common_utils.get_keystone_admin_auth')
def test_is_valid_request_new_valid_token(self, mock_keystone_admin_auth):
not_expired_admin_token = FakeTokenData((datetime.now() + timedelta(hours=2)).strftime('%Y-%m-%dT%H:%M:%SZ'),
{'roles': [{'name': 'admin'}, {'name': '_member_'}]})
mock_keystone_admin_auth.return_value.tokens.validate.return_value = not_expired_admin_token
request = self.factory.get('/')
request.META['HTTP_X_AUTH_TOKEN'] = 'new_not_expired_token'
resp = is_valid_request(request)
self.assertEquals(resp, 'new_not_expired_token')
self.assertTrue(mock_keystone_admin_auth.called)
mock_keystone_admin_auth.reset_mock()

# Successive calls should not invoke keystone
request = self.factory.get('/')
request.META['HTTP_X_AUTH_TOKEN'] = 'new_not_expired_token'
resp = is_valid_request(request)
self.assertEquals(resp, 'new_not_expired_token')
self.assertFalse(mock_keystone_admin_auth.called)

@mock.patch('api.common_utils.get_keystone_admin_auth')
def test_is_valid_request_new_expired_token(self, mock_keystone_admin_auth):
not_expired_admin_token = FakeTokenData((datetime.now() - timedelta(hours=2)).strftime('%Y-%m-%dT%H:%M:%SZ'),
{'roles': [{'name': 'admin'}, {'name': '_member_'}]})
mock_keystone_admin_auth.return_value.tokens.validate.return_value = not_expired_admin_token
request = self.factory.get('/')
request.META['HTTP_X_AUTH_TOKEN'] = 'expired_token'
resp = is_valid_request(request)
self.assertFalse(resp)

@mock.patch('api.common_utils.get_keystone_admin_auth')
def test_is_valid_request_not_admin(self, mock_keystone_admin_auth):
not_expired_admin_token = FakeTokenData((datetime.now() + timedelta(hours=2)).strftime('%Y-%m-%dT%H:%M:%SZ'),
{'roles': [{'name': '_member_'}]})
mock_keystone_admin_auth.return_value.tokens.validate.return_value = not_expired_admin_token
request = self.factory.get('/')
request.META['HTTP_X_AUTH_TOKEN'] = 'not_admin_token'
resp = is_valid_request(request)
self.assertFalse(resp)

#
# URL tests
#
Expand Down Expand Up @@ -111,4 +153,10 @@ def create_nodes(self):
def configure_usernames_and_passwords_for_nodes(self):
self.r.hmset('node:controller', {'ssh_username': 'user1', 'ssh_password': 's3cr3t'})
self.r.hmset('node:storagenode1', {'ssh_username': 'user1', 'ssh_password': 's3cr3t'})
self.r.hmset('node:storagenode2', {'ssh_username': 'user1', 'ssh_password': 's3cr3t'})
self.r.hmset('node:storagenode2', {'ssh_username': 'user1', 'ssh_password': 's3cr3t'})


class FakeTokenData:
def __init__(self, expires, user):
self.expires = expires
self.user = user

0 comments on commit 35bcb0c

Please sign in to comment.