Skip to content

Commit

Permalink
Updated per dgouldins tip + pull
Browse files Browse the repository at this point in the history
  • Loading branch information
ib-lundgren committed Mar 24, 2012
2 parents bd3691c + 79edcf5 commit 9ab832c
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 42 deletions.
19 changes: 12 additions & 7 deletions oauthlib/oauth.py
Expand Up @@ -24,8 +24,11 @@

class OAuth1aClient(object):
"""A client used to sign OAuth 1.0a requests"""
def __init__(self, client_key, client_secret,
resource_owner_key, resource_owner_secret, callback_uri=None,
def __init__(self, client_key,
client_secret=None,
resource_owner_key=None,
resource_owner_secret=None,
callback_uri=None,
signature_method=SIGNATURE_HMAC,
signature_type=SIGNATURE_TYPE_AUTH_HEADER,
rsa_key=None, verifier=None):
Expand Down Expand Up @@ -78,8 +81,9 @@ def get_oauth_params(self):
(u'oauth_version', u'1.0'),
(u'oauth_signature_method', self.signature_method),
(u'oauth_consumer_key', self.client_key),
(u'oauth_token', self.resource_owner_key),
]
if self.resource_owner_key:
params.append((u'oauth_token', self.resource_owner_key))
if self.callback_uri:
params.append((u'oauth_callback', self.callback_uri))
if self.verifier:
Expand Down Expand Up @@ -174,8 +178,7 @@ def check_request_signature(self, uri, http_method=u'GET', body='',
timestamp = params.get(u'oauth_timestamp')
callback_uri = params.get(u'oauth_callback')
verifier = params.get(u'oauth_verifier')
if not all((signature, client_key, resource_owner_key, nonce,
timestamp)):
if not all((request_signature, client_key, nonce, timestamp)):
return False

# if version is supplied, it must be "1.0"
Expand All @@ -195,8 +198,10 @@ def check_request_signature(self, uri, http_method=u'GET', body='',
else:
signature_type = SIGNATURE_TYPE_QUERY

oauth_client = OAuth1aClient(client_key, client_secret,
resource_owner_key, resource_owner_secret,
oauth_client = OAuth1aClient(client_key,
client_secret=client_secret,
resource_owner_key=resource_owner_key,
resource_owner_secret=resource_owner_secret,
callback_uri=callback_uri,
signature_method=self.signature_method,
signature_type=signature_type,
Expand Down
15 changes: 9 additions & 6 deletions oauthlib/signature.py
Expand Up @@ -86,7 +86,7 @@ def collect_parameters(uri_query='', body='', headers=None,
params = []

if uri_query:
params.extend(urlparse.parse_qsl(uri_query))
params.extend(urlparse.parse_qsl(uri_query, keep_blank_values=True))

if headers:
# look for an authorization header (case-insensitive)
Expand All @@ -96,12 +96,15 @@ def collect_parameters(uri_query='', body='', headers=None,
params.extend(utils.parse_authorization_header(
authorization_header))

if isinstance(body, dict):
if isinstance(body, (str, unicode)):
params.extend(urlparse.parse_qsl(body, keep_blank_values=True))
elif isinstance(body, dict):
params.extend(body.items())
elif isinstance(body, list):
params.extend(body)
elif body:
params.extend(urlparse.parse_qsl(body))
else:
try:
params.extend(body)
except TypeError:
raise ValueError("Body must be a string, dict, or iterable")

# ensure all paramters are unicode and not escaped
unicode_params = []
Expand Down
82 changes: 53 additions & 29 deletions tests/test_signatures.py
Expand Up @@ -9,7 +9,7 @@

class SignatureTests(TestCase):

uri_query = "b5=%3D%253D&a3=a&c%40=&a2=r%20b"
uri_query = "b5=%3D%253D&a3=a&c%40=&a2=r%20b&c2=&a3=2 q"
authorization_header = """OAuth realm="Example",
oauth_consumer_key="9djdj82h48djs9d2",
oauth_token="kkk9d7dh3k39sjv7",
Expand All @@ -23,6 +23,7 @@ class SignatureTests(TestCase):
normalized_encoded_request_parameters = urllib.quote("""OAuth realm="Example",oauth_consumer_key="9djdj82h48djs9d2",oauth_token="kkk9d7dh3k39sjv7",oauth_signature_method="HMAC-SHA1",oauth_timestamp="137131201",oauth_nonce="7d8f3e4a",oauth_signature="bYT5CMsGcbgUdFHObYMEfcx6bsw%3D" """.strip())
client_secret = "ECrDNoq1VYzzzzzzzzzyAK7TwZNtPnkqatqZZZZ"
resource_owner_secret = "just-a-string asdasd"
control_base_string = "POST&http%253A%2F%2Fexample.com%2Frequest%253Fb5%253D%25253D%2525253D%2526a3%253Da%2526c%252540%253D%2526a2%253Dr%252520b&OAuth%2520realm%253D%2522Example%2522%252Coauth_consumer_key%253D%25229djdj82h48djs9d2%2522%252Coauth_token%253D%2522kkk9d7dh3k39sjv7%2522%252Coauth_signature_method%253D%2522HMAC-SHA1%2522%252Coauth_timestamp%253D%2522137131201%2522%252Coauth_nonce%253D%25227d8f3e4a%2522%252Coauth_signature%253D%2522bYT5CMsGcbgUdFHObYMEfcx6bsw%25253D%2522"

def test_construct_base_string(self):
"""
Expand Down Expand Up @@ -50,9 +51,6 @@ def test_construct_base_string(self):
%253D%2522bYT5CMsGcbgUdFHObYMEfcx6bsw%25253D%2522
"""

# This is the string we want to match
control_test_string = "POST&http%253A%2F%2Fexample.com%2Frequest%253Fb5%253D%25253D%2525253D%2526a3%253Da%2526c%252540%253D%2526a2%253Dr%252520b&OAuth%2520realm%253D%2522Example%2522%252Coauth_consumer_key%253D%25229djdj82h48djs9d2%2522%252Coauth_token%253D%2522kkk9d7dh3k39sjv7%2522%252Coauth_signature_method%253D%2522HMAC-SHA1%2522%252Coauth_timestamp%253D%2522137131201%2522%252Coauth_nonce%253D%25227d8f3e4a%2522%252Coauth_signature%253D%2522bYT5CMsGcbgUdFHObYMEfcx6bsw%25253D%2522"

# Create test variables
# Create test variables
# Create test variables
Expand All @@ -63,7 +61,7 @@ def test_construct_base_string(self):

base_string = construct_base_string(unicode(self.http_method), unicode(self.base_string_url), unicode(self.normalized_encoded_request_parameters))

self.assertEqual(control_test_string, base_string)
self.assertEqual(self.control_base_string, base_string)

def test_normalize_base_string_uri(self):
"""
Expand Down Expand Up @@ -99,10 +97,13 @@ def test_collect_parameters(self):

parameters = collect_parameters(uri_query=self.uri_query)

self.assertEquals(len(parameters), 3)
self.assertEquals(len(parameters), 6)
self.assertEquals(parameters[0], ('b5', '=%3D'))
self.assertEquals(parameters[1], ('a3', 'a'))
self.assertEquals(parameters[2], ('a2', 'r b'))
self.assertEquals(parameters[2], ('c@', ''))
self.assertEquals(parameters[3], ('a2', 'r b'))
self.assertEquals(parameters[4], ('c2', ''))
self.assertEquals(parameters[5], ('a3', '2 q'))

# check against authorization header as well
# check against authorization header as well
Expand All @@ -113,15 +114,18 @@ def test_collect_parameters(self):
})

# Redo the checks against all the parameters. Duplicated code but better safety
self.assertEquals(len(parameters), 8)
self.assertEquals(len(parameters), 11)
self.assertEquals(parameters[0], ('b5', '=%3D'))
self.assertEquals(parameters[1], ('a3', 'a'))
self.assertEquals(parameters[2], ('a2', 'r b'))
self.assertEquals(parameters[3], ('oauth_nonce', '7d8f3e4a'))
self.assertEquals(parameters[4], ('oauth_timestamp', '137131201'))
self.assertEquals(parameters[5], ('oauth_consumer_key', '9djdj82h48djs9d2'))
self.assertEquals(parameters[6], ('oauth_signature_method', 'HMAC-SHA1'))
self.assertEquals(parameters[7], ('oauth_token', 'kkk9d7dh3k39sjv7'))
self.assertEquals(parameters[2], ('c@', ''))
self.assertEquals(parameters[3], ('a2', 'r b'))
self.assertEquals(parameters[4], ('c2', ''))
self.assertEquals(parameters[5], ('a3', '2 q'))
self.assertEquals(parameters[6], ('oauth_nonce', '7d8f3e4a'))
self.assertEquals(parameters[7], ('oauth_timestamp', '137131201'))
self.assertEquals(parameters[8], ('oauth_consumer_key', '9djdj82h48djs9d2'))
self.assertEquals(parameters[9], ('oauth_signature_method', 'HMAC-SHA1'))
self.assertEquals(parameters[10], ('oauth_token', 'kkk9d7dh3k39sjv7'))

# Add in the body.
# TODO - add more valid content for the body. Daniel Greenfeld 2012/03/12
Expand All @@ -130,16 +134,19 @@ def test_collect_parameters(self):
body=self.body, headers={
'Authorization': self.authorization_header,
})
self.assertEquals(len(parameters), 9)
self.assertEquals(len(parameters), 12)
self.assertEquals(parameters[0], ('b5', '=%3D'))
self.assertEquals(parameters[1], ('a3', 'a'))
self.assertEquals(parameters[2], ('a2', 'r b'))
self.assertEquals(parameters[3], ('oauth_nonce', '7d8f3e4a'))
self.assertEquals(parameters[4], ('oauth_timestamp', '137131201'))
self.assertEquals(parameters[5], ('oauth_consumer_key', '9djdj82h48djs9d2'))
self.assertEquals(parameters[6], ('oauth_signature_method', 'HMAC-SHA1'))
self.assertEquals(parameters[7], ('oauth_token', 'kkk9d7dh3k39sjv7'))
self.assertEquals(parameters[8], ('content', 'This is being the body of things'))
self.assertEquals(parameters[2], ('c@', ''))
self.assertEquals(parameters[3], ('a2', 'r b'))
self.assertEquals(parameters[4], ('c2', ''))
self.assertEquals(parameters[5], ('a3', '2 q'))
self.assertEquals(parameters[6], ('oauth_nonce', '7d8f3e4a'))
self.assertEquals(parameters[7], ('oauth_timestamp', '137131201'))
self.assertEquals(parameters[8], ('oauth_consumer_key', '9djdj82h48djs9d2'))
self.assertEquals(parameters[9], ('oauth_signature_method', 'HMAC-SHA1'))
self.assertEquals(parameters[10], ('oauth_token', 'kkk9d7dh3k39sjv7'))
self.assertEquals(parameters[11], ('content', 'This is being the body of things'))

def test_normalize_parameters(self):
""" We copy some of the variables from the test method above."""
Expand All @@ -163,21 +170,38 @@ def test_normalize_parameters(self):
index = normalized.index(key)

def test_sign_hmac_sha1(self):
""" TODO: Someone make a better test for this."""
""" Verifying correct HMAC-SHA1 signature against one created by openssl."""

# construct_base_string copied from the test_construct_base_string above
base_string = construct_base_string(unicode(self.http_method), unicode(self.base_string_url), unicode(self.normalized_encoded_request_parameters))
# self.control_base_string saved in <message>, hmac_key in <key>.
# hmac_key = "ECrDNoq1VYzzzzzzzzzyAK7TwZNtPnkqatqZZZZ&just-a-string%20%20%20%20asdasd"
# Control signature created using openssl:
# $ echo -n $(cat <message>) | openssl dgst -binary -hmac <key> | base64
control_signature = "Uau4O9Kpd2k6rvh7UZN/RN+RG7Y="

# check for Unicode
self.assertRaises(ValueError, sign_hmac_sha1, base_string, self.client_secret, self.resource_owner_secret)
self.assertRaises(ValueError, sign_hmac_sha1, self.control_base_string, self.client_secret, self.resource_owner_secret)

# Do the actual test
sign = sign_hmac_sha1(unicode(base_string), unicode(self.client_secret), unicode(self.resource_owner_secret))
sign = sign_hmac_sha1(unicode(self.control_base_string), unicode(self.client_secret), unicode(self.resource_owner_secret))
self.assertEquals(len(sign), 28)
self.assertEquals(sign, control_signature)

def test_sign_rsa_sha1(self):
""" TODO: Someone figure a test for this."""
pass
""" Verify correct RSA-SHA1 signature against one created by openssl."""

base_string = "POST&http%253A%2F%2Fexample.com%2Frequest%253Fb5%253D%25253D%2525253D%2526a3%253Da%2526c%252540%253D%2526a2%253Dr%252520b&OAuth%2520realm%253D%2522Example%2522%252Coauth_consumer_key%253D%25229djdj82h48djs9d2%2522%252Coauth_token%253D%2522kkk9d7dh3k39sjv7%2522%252Coauth_signature_method%253D%2522HMAC-SHA1%2522%252Coauth_timestamp%253D%2522137131201%2522%252Coauth_nonce%253D%25227d8f3e4a%2522%252Coauth_signature%253D%2522bYT5CMsGcbgUdFHObYMEfcx6bsw%25253D%2522"

# Generated using: $ openssl genrsa -out <key>.pem 1024
# PyCrypto requires the key to be concatenated with linebreaks.
private_key = "-----BEGIN RSA PRIVATE KEY-----\nMIICXgIBAAKBgQDk1/bxyS8Q8jiheHeYYp/4rEKJopeQRRKKpZI4s5i+UPwVpupG\nAlwXWfzXwSMaKPAoKJNdu7tqKRniqst5uoHXw98gj0x7zamu0Ck1LtQ4c7pFMVah\n5IYGhBi2E9ycNS329W27nJPWNCbESTu7snVlG8V8mfvGGg3xNjTMO7IdrwIDAQAB\nAoGBAOQ2KuH8S5+OrsL4K+wfjoCi6MfxCUyqVU9GxocdM1m30WyWRFMEz2nKJ8fR\np3vTD4w8yplTOhcoXdQZl0kRoaDzrcYkm2VvJtQRrX7dKFT8dR8D/Tr7dNQLOXfC\nDY6xveQczE7qt7Vk7lp4FqmxBsaaEuokt78pOOjywZoInjZhAkEA9wz3zoZNT0/i\nrf6qv2qTIeieUB035N3dyw6f1BGSWYaXSuerDCD/J1qZbAPKKhyHZbVawFt3UMhe\n542UftBaxQJBAO0iJy1I8GQjGnS7B3yvyH3CcLYGy296+XO/2xKp/d/ty1OIeovx\nC60pLNwuFNF3z9d2GVQAdoQ89hUkOtjZLeMCQQD0JO6oPHUeUjYT+T7ImAv7UKVT\nSuy30sKjLzqoGw1kR+wv7C5PeDRvscs4wa4CW9s6mjSrMDkDrmCLuJDtmf55AkEA\nkmaMg2PNrjUR51F0zOEFycaaqXbGcFwe1/xx9zLmHzMDXd4bsnwt9kk+fe0hQzVS\nJzatanQit3+feev1PN3QewJAWv4RZeavEUhKv+kLe95Yd0su7lTLVduVgh4v5yLT\nGa6FHdjGPcfajt+nrpB1n8UQBEH9ZxniokR/IPvdMlxqXA==\n-----END RSA PRIVATE KEY-----"

# Base string saved in "<message>". Signature obtained using:
# $ echo -n $(cat <message>) | openssl dgst -sign <key>.pem | base64
# where echo -n suppresses the last linebreak.
control_signature = "zV5g8ArdMuJuOXlH8XOqfLHS11XdthfIn4HReDm7jz8JmgLabHGmVBqCkCfZoFJPHdka7tLvCplK/jsV4FUOnftrJOQhbXguuBdi87/hmxOFKLmQYqqlEW7BdXmwKLZckiqq3qE5XziBgKSAFRkxJ4gmJAymvJBtrJYN9728rK8="

sign = sign_rsa_sha1(base_string, private_key)
self.assertEquals(sign, control_signature)

def test_sign_plaintext(self):
""" """
Expand Down

0 comments on commit 9ab832c

Please sign in to comment.