Skip to content

Commit

Permalink
added support for querystring in redirection uris
Browse files Browse the repository at this point in the history
  • Loading branch information
synasius committed Jan 15, 2015
1 parent c0b27fb commit d17d3ea
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 4 deletions.
4 changes: 2 additions & 2 deletions oauth2_provider/compat.py
Expand Up @@ -10,9 +10,9 @@

# urlparse in python3 has been renamed to urllib.parse
try:
from urlparse import urlparse, parse_qs, urlunparse
from urlparse import urlparse, parse_qs, parse_qsl, urlunparse
except ImportError:
from urllib.parse import urlparse, parse_qs, urlunparse
from urllib.parse import urlparse, parse_qs, parse_qsl, urlunparse

try:
from urllib import urlencode, unquote_plus
Expand Down
18 changes: 16 additions & 2 deletions oauth2_provider/models.py
Expand Up @@ -14,7 +14,7 @@
from django.core.exceptions import ImproperlyConfigured

from .settings import oauth2_settings
from .compat import AUTH_USER_MODEL
from .compat import AUTH_USER_MODEL, parse_qsl, urlparse
from .generators import generate_client_secret, generate_client_id
from .validators import validate_uris

Expand Down Expand Up @@ -94,7 +94,21 @@ def redirect_uri_allowed(self, uri):
:param uri: Url to check
"""
return uri in self.redirect_uris.split()
for allowed_uri in self.redirect_uris.split():
parsed_allowed_uri = urlparse(allowed_uri)
parsed_uri = urlparse(uri)

if (parsed_allowed_uri.scheme == parsed_uri.scheme and
parsed_allowed_uri.netloc == parsed_uri.netloc and
parsed_allowed_uri.path == parsed_uri.path):

aqs_set = set(parse_qsl(parsed_allowed_uri.query))
uqs_set = set(parse_qsl(parsed_uri.query))

if aqs_set.issubset(uqs_set):
return True

return False

def clean(self):
from django.core.exceptions import ValidationError
Expand Down
142 changes: 142 additions & 0 deletions oauth2_provider/tests/test_authorization_code.py
Expand Up @@ -403,6 +403,46 @@ def test_code_post_auth_deny_custom_redirect_uri_scheme(self):
self.assertIn('custom-scheme://example.com?', response['Location'])
self.assertIn("error=access_denied", response['Location'])

def test_code_post_auth_redirection_uri_with_querystring(self):
"""
Tests that a redirection uri with query string is allowed
and query string is retained on redirection.
See http://tools.ietf.org/html/rfc6749#section-3.1.2
"""
self.client.login(username="test_user", password="123456")

form_data = {
'client_id': self.application.client_id,
'state': 'random_state_string',
'scope': 'read write',
'redirect_uri': 'http://example.com?foo=bar',
'response_type': 'code',
'allow': True,
}

response = self.client.post(reverse('oauth2_provider:authorize'), data=form_data)
self.assertEqual(response.status_code, 302)
self.assertIn("http://example.com?foo=bar", response['Location'])
self.assertIn("code=", response['Location'])

def test_code_post_auth_fails_when_redirect_uri_path_is_invalid(self):
"""
Tests that a redirection uri is matched using scheme + netloc + path
"""
self.client.login(username="test_user", password="123456")

form_data = {
'client_id': self.application.client_id,
'state': 'random_state_string',
'scope': 'read write',
'redirect_uri': 'http://example.com/a?foo=bar',
'response_type': 'code',
'allow': True,
}

response = self.client.post(reverse('oauth2_provider:authorize'), data=form_data)
self.assertEqual(response.status_code, 400)


class TestAuthorizationCodeTokenView(BaseTest):
def get_auth(self):
Expand Down Expand Up @@ -759,6 +799,108 @@ def test_malicious_redirect_uri(self):
response = self.client.post(reverse('oauth2_provider:token'), data=token_request_data)
self.assertEqual(response.status_code, 401)

def test_code_exchange_succeed_when_redirect_uri_match(self):
"""
Tests code exchange succeed when redirect uri matches the one used for code request
"""
self.client.login(username="test_user", password="123456")

# retrieve a valid authorization code
authcode_data = {
'client_id': self.application.client_id,
'state': 'random_state_string',
'scope': 'read write',
'redirect_uri': 'http://example.it?foo=bar',
'response_type': 'code',
'allow': True,
}
response = self.client.post(reverse('oauth2_provider:authorize'), data=authcode_data)
query_dict = parse_qs(urlparse(response['Location']).query)
authorization_code = query_dict['code'].pop()

# exchange authorization code for a valid access token
token_request_data = {
'grant_type': 'authorization_code',
'code': authorization_code,
'redirect_uri': 'http://example.it?foo=bar'
}
auth_headers = self.get_basic_auth_header(self.application.client_id, self.application.client_secret)

response = self.client.post(reverse('oauth2_provider:token'), data=token_request_data, **auth_headers)
self.assertEqual(response.status_code, 200)

content = json.loads(response.content.decode("utf-8"))
self.assertEqual(content['token_type'], "Bearer")
self.assertEqual(content['scope'], "read write")
self.assertEqual(content['expires_in'], oauth2_settings.ACCESS_TOKEN_EXPIRE_SECONDS)

def test_code_exchange_fails_when_redirect_uri_does_not_match(self):
"""
Tests code exchange fails when redirect uri does not match the one used for code request
"""
self.client.login(username="test_user", password="123456")

# retrieve a valid authorization code
authcode_data = {
'client_id': self.application.client_id,
'state': 'random_state_string',
'scope': 'read write',
'redirect_uri': 'http://example.it?foo=bar',
'response_type': 'code',
'allow': True,
}
response = self.client.post(reverse('oauth2_provider:authorize'), data=authcode_data)
query_dict = parse_qs(urlparse(response['Location']).query)
authorization_code = query_dict['code'].pop()

# exchange authorization code for a valid access token
token_request_data = {
'grant_type': 'authorization_code',
'code': authorization_code,
'redirect_uri': 'http://example.it?foo=baraa'
}
auth_headers = self.get_basic_auth_header(self.application.client_id, self.application.client_secret)

response = self.client.post(reverse('oauth2_provider:token'), data=token_request_data, **auth_headers)
self.assertEqual(response.status_code, 401)

def test_code_exchange_succeed_when_redirect_uri_match_with_multiple_query_params(self):
"""
Tests code exchange succeed when redirect uri matches the one used for code request
"""
self.client.login(username="test_user", password="123456")
self.application.redirect_uris = "http://localhost http://example.com?foo=bar"
self.application.save()

# retrieve a valid authorization code
authcode_data = {
'client_id': self.application.client_id,
'state': 'random_state_string',
'scope': 'read write',
'redirect_uri': 'http://example.com?bar=baz&foo=bar',
'response_type': 'code',
'allow': True,
}
response = self.client.post(reverse('oauth2_provider:authorize'), data=authcode_data)
query_dict = parse_qs(urlparse(response['Location']).query)
authorization_code = query_dict['code'].pop()

# exchange authorization code for a valid access token
token_request_data = {
'grant_type': 'authorization_code',
'code': authorization_code,
'redirect_uri': 'http://example.com?bar=baz&foo=bar'
}
auth_headers = self.get_basic_auth_header(self.application.client_id, self.application.client_secret)

response = self.client.post(reverse('oauth2_provider:token'), data=token_request_data, **auth_headers)
self.assertEqual(response.status_code, 200)

content = json.loads(response.content.decode("utf-8"))
self.assertEqual(content['token_type'], "Bearer")
self.assertEqual(content['scope'], "read write")
self.assertEqual(content['expires_in'], oauth2_settings.ACCESS_TOKEN_EXPIRE_SECONDS)


class TestAuthorizationCodeProtectedResource(BaseTest):
def test_resource_access_allowed(self):
Expand Down
40 changes: 40 additions & 0 deletions oauth2_provider/tests/test_implicit.py
Expand Up @@ -185,6 +185,46 @@ def test_token_post_auth_deny(self):
self.assertEqual(response.status_code, 302)
self.assertIn("error=access_denied", response['Location'])

def test_implicit_redirection_uri_with_querystring(self):
"""
Tests that a redirection uri with query string is allowed
and query string is retained on redirection.
See http://tools.ietf.org/html/rfc6749#section-3.1.2
"""
self.client.login(username="test_user", password="123456")

form_data = {
'client_id': self.application.client_id,
'state': 'random_state_string',
'scope': 'read write',
'redirect_uri': 'http://example.com?foo=bar',
'response_type': 'token',
'allow': True,
}

response = self.client.post(reverse('oauth2_provider:authorize'), data=form_data)
self.assertEqual(response.status_code, 302)
self.assertIn("http://example.com?foo=bar", response['Location'])
self.assertIn("access_token=", response['Location'])

def test_implicit_fails_when_redirect_uri_path_is_invalid(self):
"""
Tests that a redirection uri is matched using scheme + netloc + path
"""
self.client.login(username="test_user", password="123456")

form_data = {
'client_id': self.application.client_id,
'state': 'random_state_string',
'scope': 'read write',
'redirect_uri': 'http://example.com/a?foo=bar',
'response_type': 'code',
'allow': True,
}

response = self.client.post(reverse('oauth2_provider:authorize'), data=form_data)
self.assertEqual(response.status_code, 400)


class TestImplicitTokenView(BaseTest):
def test_resource_access_allowed(self):
Expand Down

0 comments on commit d17d3ea

Please sign in to comment.