Skip to content

Commit

Permalink
Merge branch 'sigv4-opt-in' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
toastdriven committed Jan 29, 2014
2 parents b9dbaad + 1d02cc7 commit 686cfa9
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 3 deletions.
7 changes: 7 additions & 0 deletions boto/auth.py
Expand Up @@ -36,6 +36,7 @@
import datetime
from email.utils import formatdate
import hmac
import os
import sys
import time
import urllib
Expand Down Expand Up @@ -903,6 +904,12 @@ def _wrapper(self):

def detect_potential_s3sigv4(func):
def _wrapper(self):
if os.environ.get('S3_USE_SIGV4', False):
return ['hmac-v4-s3']

if boto.config.get('s3', 'use-sigv4', False):
return ['hmac-v4-s3']

if hasattr(self, 'host'):
if '.cn-' in self.host:
return ['hmac-v4-s3']
Expand Down
23 changes: 22 additions & 1 deletion boto/s3/connection.py
Expand Up @@ -148,6 +148,16 @@ class Location(object):
CNNorth1 = 'cn-north-1'


class NoHostProvided(object):
# An identifying object to help determine whether the user provided a
# ``host`` or not. Never instantiated.
pass


class HostRequiredError(BotoClientError):
pass


class S3Connection(AWSAuthConnection):

DefaultHost = boto.config.get('s3', 'host', 's3.amazonaws.com')
Expand All @@ -157,11 +167,15 @@ class S3Connection(AWSAuthConnection):
def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
is_secure=True, port=None, proxy=None, proxy_port=None,
proxy_user=None, proxy_pass=None,
host=DefaultHost, debug=0, https_connection_factory=None,
host=NoHostProvided, debug=0, https_connection_factory=None,
calling_format=DefaultCallingFormat, path='/',
provider='aws', bucket_class=Bucket, security_token=None,
suppress_consec_slashes=True, anon=False,
validate_certs=None, profile_name=None):
no_host_provided = False
if host is NoHostProvided:
no_host_provided = True
host = self.DefaultHost
if isinstance(calling_format, basestring):
calling_format=boto.utils.find_class(calling_format)()
self.calling_format = calling_format
Expand All @@ -174,6 +188,13 @@ def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
path=path, provider=provider, security_token=security_token,
suppress_consec_slashes=suppress_consec_slashes,
validate_certs=validate_certs, profile_name=profile_name)
# We need to delay until after the call to ``super`` before checking
# to see if SigV4 is in use.
if no_host_provided:
if 'hmac-v4-s3' in self._required_auth_capability():
raise HostRequiredError(
"When using SigV4, you must specify a 'host' parameter."
)

@detect_potential_s3sigv4
def _required_auth_capability(self):
Expand Down
34 changes: 34 additions & 0 deletions tests/unit/__init__.py
Expand Up @@ -4,6 +4,7 @@
import unittest
import httplib

import mock
from mock import Mock


Expand Down Expand Up @@ -77,3 +78,36 @@ def set_http_response(self, status_code, reason='', header=[], body=None):

def default_body(self):
return ''


class MockServiceWithConfigTestCase(AWSMockServiceTestCase):
def setUp(self):
super(MockServiceWithConfigTestCase, self).setUp()
self.environ = {}
self.config = {}
self.config_patch = mock.patch('boto.provider.config.get',
self.get_config)
self.has_config_patch = mock.patch('boto.provider.config.has_option',
self.has_config)
self.environ_patch = mock.patch('os.environ', self.environ)
self.config_patch.start()
self.has_config_patch.start()
self.environ_patch.start()

def tearDown(self):
self.config_patch.stop()
self.has_config_patch.stop()
self.environ_patch.stop()

def has_config(self, section_name, key):
try:
self.config[section_name][key]
return True
except KeyError:
return False

def get_config(self, section_name, key, default=None):
try:
return self.config[section_name][key]
except KeyError:
return None
47 changes: 46 additions & 1 deletion tests/unit/auth/test_sigv4.py
Expand Up @@ -20,11 +20,14 @@
# IN THE SOFTWARE.
#
import copy
import mock
from mock import Mock
from tests.unit import unittest
import os
from tests.unit import unittest, MockServiceWithConfigTestCase

from boto.auth import HmacAuthV4Handler
from boto.auth import S3HmacAuthV4Handler
from boto.auth import detect_potential_s3sigv4
from boto.connection import HTTPRequest


Expand Down Expand Up @@ -431,3 +434,45 @@ def test_canonical_request(self):
request = self.auth.mangle_path_and_params(request)
authed_req = self.auth.canonical_request(request)
self.assertEqual(authed_req, expected)


class FakeS3Connection(object):
def __init__(self, *args, **kwargs):
self.host = kwargs.pop('host', None)

@detect_potential_s3sigv4
def _required_auth_capability(self):
return ['nope']

def _mexe(self, *args, **kwargs):
pass


class TestSigV4OptIn(MockServiceWithConfigTestCase):
connection_class = FakeS3Connection

def test_sigv4_opt_out(self):
# Default is opt-out.
fake = FakeS3Connection(host='s3.amazonaws.com')
self.assertEqual(fake._required_auth_capability(), ['nope'])

def test_sigv4_non_optional(self):
# Default is opt-out.
fake = FakeS3Connection(host='s3.cn-north-1.amazonaws.com.cn')
self.assertEqual(fake._required_auth_capability(), ['hmac-v4-s3'])

def test_sigv4_opt_in_config(self):
# Opt-in via the config.
self.config = {
's3': {
'use-sigv4': True,
},
}
fake = FakeS3Connection()
self.assertEqual(fake._required_auth_capability(), ['hmac-v4-s3'])

def test_sigv4_opt_in_env(self):
# Opt-in via the ENV.
self.environ['S3_USE_SIGV4'] = True
fake = FakeS3Connection(host='s3.amazonaws.com')
self.assertEqual(fake._required_auth_capability(), ['hmac-v4-s3'])
48 changes: 47 additions & 1 deletion tests/unit/s3/test_connection.py
Expand Up @@ -19,10 +19,13 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
#
import mock

from tests.unit import unittest
from tests.unit import AWSMockServiceTestCase
from tests.unit import MockServiceWithConfigTestCase

from boto.s3.connection import S3Connection
from boto.s3.connection import S3Connection, HostRequiredError


class TestSignatureAlteration(AWSMockServiceTestCase):
Expand All @@ -46,6 +49,49 @@ def test_switched(self):
)


class TestSigV4HostError(MockServiceWithConfigTestCase):
connection_class = S3Connection

def test_historical_behavior(self):
self.assertEqual(
self.service_connection._required_auth_capability(),
['s3']
)
self.assertEqual(self.service_connection.host, 's3.amazonaws.com')

def test_sigv4_opt_in(self):
# Switch it at the config, so we can check to see how the host is
# handled.
self.config = {
's3': {
'use-sigv4': True,
}
}

with self.assertRaises(HostRequiredError):
# No host+SigV4 == KABOOM
self.connection_class(
aws_access_key_id='less',
aws_secret_access_key='more'
)

# Ensure passing a ``host`` still works.
conn = self.connection_class(
aws_access_key_id='less',
aws_secret_access_key='more',
host='s3.cn-north-1.amazonaws.com.cn'
)
self.assertEqual(
conn._required_auth_capability(),
['hmac-v4-s3']
)
self.assertEqual(
conn.host,
's3.cn-north-1.amazonaws.com.cn'
)



class TestUnicodeCallingFormat(AWSMockServiceTestCase):
connection_class = S3Connection

Expand Down

0 comments on commit 686cfa9

Please sign in to comment.