Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Client for sig_version 4 #159

Merged
merged 3 commits into from
Apr 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 23 additions & 10 deletions duo_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def canon_params(params):
return '&'.join(args)


def canonicalize(method, host, uri, params, date, sig_version):
def canonicalize(method, host, uri, params, date, sig_version, body=None):
"""
Return a canonical string version of the given request attributes.

Expand All @@ -64,6 +64,7 @@ def canonicalize(method, host, uri, params, date, sig_version):
* params: string containing request params
* date: date string for request
* sig_version: signature version integer
* body: request body, must be string for sig_version 4
"""
if sig_version == 1:
canon = [
Expand Down Expand Up @@ -96,20 +97,20 @@ def canonicalize(method, host, uri, params, date, sig_version):
method.upper(),
host.lower(),
uri,
'',
hashlib.sha512(params.encode('utf-8')).hexdigest(),
canon_params(params),
hashlib.sha512(body.encode('utf-8')).hexdigest(),
]
else:
raise ValueError("Unknown signature version: {}".format(sig_version))
return '\n'.join(canon)


def sign(ikey, skey, method, host, uri, date, sig_version, params,
def sign(ikey, skey, method, host, uri, date, sig_version, params, body=None,
digestmod=hashlib.sha1): # noqa: DUO130, HMAC-SHA1 still secure
"""
Return basic authorization header line with a Duo Web API signature.
"""
canonical = canonicalize(method, host, uri, params, date, sig_version)
canonical = canonicalize(method, host, uri, params, date, sig_version, body=body)
if isinstance(skey, six.text_type):
skey = skey.encode('utf-8')
if isinstance(canonical, six.text_type):
Expand Down Expand Up @@ -221,12 +222,24 @@ def api_call(self, method, path, params):
* params: dict mapping from parameter name to stringified value,
or a dict to be converted to json.
"""
params_go_in_body = method in ('POST', 'PUT', 'PATCH')
if self.sig_version in (1, 2):
params = normalize_params(params)
elif self.sig_version in (3, 4):
# v1 and v2 canonicalization don't distinguish between
# params and body. There's no separate body input.
body = None
elif self.sig_version == 3:
# Raises if params are not a dict that can be converted
# to json.
params = self.canon_json(params)
body = params
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit hacky, but since sig v3 is deprecated, we can remove this in a later PR.

Copy link
Contributor

@ben-duo ben-duo Apr 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I thought we had already removed it here. We did remove it from other code bases. We should get on that ASAP. (I misread splitting out this if statement as adding it back before.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've got a diff in progress to do so. All red :)

elif self.sig_version == 4:
if params_go_in_body:
body = self.canon_json(params)
params = {}
else:
body = ''
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the distinction between an empty string for body (here) vs. None (as at line 229) important?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 229 is for canonicalization versions where the body must never be concatenated into the canonicalized representation of the request. I like setting body to a non-string value in those cases because future bugs that tried to use the body wrong would raise, making them easier to catch early.

Perhaps body should be a required parameter to canonicalize() -- or default to None -- for that reason?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to make canonicalize backwards compatible, so I didn't make it required, but I will default to None.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the distinction between an empty string for body (here) vs. None (as at line 229) important?

For sig v4, the body must be an empty string for GET and a json string for other request methods. For other sig versions, it's ignored.

params = normalize_params(params)

if self.sig_timezone == 'UTC':
now = email.utils.formatdate()
Expand All @@ -244,7 +257,8 @@ def api_call(self, method, path, params):
now,
self.sig_version,
params,
self.digestmod)
body=body,
digestmod=self.digestmod)
headers = {
'Authorization': auth,
'Date': now,
Expand All @@ -253,10 +267,9 @@ def api_call(self, method, path, params):
if self.user_agent:
headers['User-Agent'] = self.user_agent

if method in ['POST', 'PUT']:
if params_go_in_body:
if self.sig_version in (3,4):
headers['Content-type'] = 'application/json'
body = params
else:
headers['Content-type'] = 'application/x-www-form-urlencoded'
body = six.moves.urllib.parse.urlencode(params, doseq=True)
Expand Down Expand Up @@ -426,7 +439,7 @@ def json_cursor_api_call(self, method, path, params, get_records_func):
:param get_records_func: Function that can be called to extract an
iterable of records from the parsed response
json.

:returns: Generator which will yield records from the api response(s).
"""

Expand Down
78 changes: 76 additions & 2 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,10 @@ def test_v4_with_json(self):
'POST\n'
'foo.bar52.com\n'
'/Foo/BaR2/qux\n\n' + hashed_body)
params = duo_client.client.Client.canon_json(JSON_BODY)
params = {}
actual = duo_client.client.canonicalize(
'POST', 'foO.BaR52.cOm', '/Foo/BaR2/qux', params,
'Tue, 04 Jul 2017 14:12:00', sig_version=4)
'Tue, 04 Jul 2017 14:12:00', sig_version=4, body=JSON_STRING)

self.assertEqual(actual, expected)

Expand Down Expand Up @@ -432,6 +432,80 @@ def test_json_request(self):
self.assertEqual(response.headers['Content-type'], 'application/json')
self.assertIn('Authorization', response.headers)

class TestRequestsV4(unittest.TestCase):
# usful args for testing GETs
args_in = {
'foo':['bar'],
'baz':['qux', 'quux=quuux', 'foobar=foobar&barbaz=barbaz']}
args_out = dict(
(key, [v for v in val])
for (key, val) in list(args_in.items()))

def setUp(self):
self.client = duo_client.client.Client(
'test_ikey', 'test_akey', 'example.com', sig_timezone='America/Detroit',
digestmod=hashlib.sha512, sig_version=4)
# monkeypatch client's _connect()
self.client._connect = lambda: util.MockHTTPConnection()

def test_get_no_params(self):
(response, dummy) = self.client.api_call('GET', '/foo/bar', {})
self.assertEqual(response.method, 'GET')
self.assertEqual(response.uri, '/foo/bar?')
self.assertIn('Authorization', response.headers)

def test_get_params(self):
(response, dummy) = self.client.api_call(
'GET', '/foo/bar', self.args_in)
self.assertEqual(response.method, 'GET')
(uri, args) = response.uri.split('?')
self.assertEqual(uri, '/foo/bar')
self.assertEqual(util.params_to_dict(args), self.args_out)
self.assertIn('Authorization', response.headers)

def test_json_api_call_get_no_params(self):
response = self.client.json_api_call('GET', '/foo/bar', {})
self.assertEqual(response['method'], 'GET')
self.assertEqual(response['uri'], '/foo/bar?')
self.assertEqual(response['body'], None)
self.assertIn('Authorization', response['headers'])

def test_json_api_call_get_params(self):
response = self.client.json_api_call(
'GET', '/foo/bar', self.args_in)
self.assertEqual(response['method'], 'GET')
(uri, args) = response['uri'].split('?')
self.assertEqual(uri, '/foo/bar')
self.assertEqual(util.params_to_dict(args), self.args_out)
self.assertIn('Authorization', response['headers'])

def test_json_post(self):
(response, dummy) = self.client.api_call('POST', '/foo/bar', JSON_BODY)

self.assertEqual(response.method, 'POST')
self.assertEqual(response.uri, '/foo/bar')
self.assertEqual(response.body, JSON_STRING)

self.assertIn('Content-type', response.headers)
self.assertEqual(response.headers['Content-type'], 'application/json')
self.assertIn('Authorization', response.headers)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you explicitly testing for the existence of an authorization header here? or alternately, why not test for the authn header in the GET tests above? both kinds of requests have the header IIRC?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I copied this entire test class from TestJsonRequests, which is all sig v3, and added some cases from TestRequests to ensure all these cases work for sig v4. I'll add the header assertions to all the cases.


def test_json_fails_with_bad_args(self):
with self.assertRaises(ValueError) as e:
(response, dummy) = self.client.api_call('POST', '/foo/bar', '')
self.assertEqual(e.exception.args[0], "JSON request must be an object.")

def test_json_put(self):
(response, dummy) = self.client.api_call('PUT', '/foo/bar', JSON_BODY)

self.assertEqual(response.method, 'PUT')
self.assertEqual(response.uri, '/foo/bar')
self.assertEqual(response.body, JSON_STRING)

self.assertIn('Content-type', response.headers)
self.assertEqual(response.headers['Content-type'], 'application/json')
self.assertIn('Authorization', response.headers)

class TestParseJsonResponse(unittest.TestCase):
APIResponse = collections.namedtuple('APIResponse', 'status reason')

Expand Down