diff --git a/google/auth/_service_account_info.py b/google/auth/_service_account_info.py index 989101113..dd39ea7b2 100644 --- a/google/auth/_service_account_info.py +++ b/google/auth/_service_account_info.py @@ -51,7 +51,7 @@ def from_dict(data, require=None): 'fields {}.'.format(', '.join(missing))) # Create a signer. - signer = crypt.Signer.from_service_account_info(data) + signer = crypt.RSASigner.from_service_account_info(data) return signer diff --git a/google/auth/app_engine.py b/google/auth/app_engine.py index e6e84d26a..6dc871256 100644 --- a/google/auth/app_engine.py +++ b/google/auth/app_engine.py @@ -26,6 +26,7 @@ from google.auth import _helpers from google.auth import credentials +from google.auth import crypt try: from google.appengine.api import app_identity @@ -33,42 +34,25 @@ app_identity = None -class Signer(object): +class Signer(crypt.Signer): """Signs messages using the App Engine App Identity service. This can be used in place of :class:`google.auth.crypt.Signer` when running in the App Engine standard environment. - - .. warning:: - The App Identity service signs bytes using Google-managed keys. - Because of this it's possible that the key used to sign bytes will - change. In some cases this change can occur between successive calls - to :attr:`key_id` and :meth:`sign`. This could result in a signature - that was signed with a different key than the one indicated by - :attr:`key_id`. It's recommended that if you use this in your code - that you account for this behavior by building in retry logic. """ @property def key_id(self): """Optional[str]: The key ID used to identify this private key. - .. note:: - This makes a request to the App Identity service. + .. warning:: + This is always ``None``. The key ID used by App Engine can not + be reliably determined ahead of time. """ - key_id, _ = app_identity.sign_blob(b'') - return key_id - - @staticmethod - def sign(message): - """Signs a message. - - Args: - message (Union[str, bytes]): The message to be signed. + return None - Returns: - bytes: The signature of the message. - """ + @_helpers.copy_docstring(crypt.Signer) + def sign(self, message): message = _helpers.to_bytes(message) _, signature = app_identity.sign_blob(message) return signature diff --git a/google/auth/crypt.py b/google/auth/crypt.py index 05839b46f..65bf37f22 100644 --- a/google/auth/crypt.py +++ b/google/auth/crypt.py @@ -24,20 +24,21 @@ valid = crypt.verify_signature(message, signature, cert) If you're going to verify many messages with the same certificate, you can use -:class:`Verifier`:: +:class:`RSAVerifier`:: cert = open('certs.pem').read() - verifier = crypt.Verifier.from_string(cert) + verifier = crypt.RSAVerifier.from_string(cert) valid = verifier.verify(message, signature) -To sign messages use :class:`Signer` with a private key:: +To sign messages use :class:`RSASigner` with a private key:: private_key = open('private_key.pem').read() - signer = crypt.Signer(private_key) + signer = crypt.RSASigner(private_key) signature = signer.sign(message) """ +import abc import io import json @@ -77,23 +78,17 @@ def _bit_list_to_bytes(bit_list): byte_vals = bytearray() for start in six.moves.xrange(0, num_bits, 8): curr_bits = bit_list[start:start + 8] - char_val = sum(val * digit - for val, digit in six.moves.zip(_POW2, curr_bits)) + char_val = sum( + val * digit for val, digit in six.moves.zip(_POW2, curr_bits)) byte_vals.append(char_val) return bytes(byte_vals) +@six.add_metaclass(abc.ABCMeta) class Verifier(object): - """This object is used to verify cryptographic signatures. - - Args: - public_key (rsa.key.PublicKey): The public key used to verify - signatures. - """ - - def __init__(self, public_key): - self._pubkey = public_key + """Abstract base class for crytographic signature verifiers.""" + @abc.abstractmethod def verify(self, message, signature): """Verifies a message against a cryptographic signature. @@ -105,6 +100,24 @@ def verify(self, message, signature): bool: True if message was signed by the private key associated with the public key that this object was constructed with. """ + # pylint: disable=missing-raises-doc,redundant-returns-doc + # (pylint doesn't recognize that this is abstract) + raise NotImplementedError('Verify must be implemented') + + +class RSAVerifier(Verifier): + """Verifies RSA cryptographic signatures using public keys. + + Args: + public_key (rsa.key.PublicKey): The public key used to verify + signatures. + """ + + def __init__(self, public_key): + self._pubkey = public_key + + @_helpers.copy_docstring(Verifier) + def verify(self, message, signature): message = _helpers.to_bytes(message) try: return rsa.pkcs1.verify(message, signature, self._pubkey) @@ -145,7 +158,7 @@ def from_string(cls, public_key): def verify_signature(message, signature, certs): - """Verify a cryptographic signature. + """Verify an RSA cryptographic signature. Checks that the provided ``signature`` was generated from ``bytes`` using the private key associated with the ``cert``. @@ -163,27 +176,22 @@ def verify_signature(message, signature, certs): certs = [certs] for cert in certs: - verifier = Verifier.from_string(cert) + verifier = RSAVerifier.from_string(cert) if verifier.verify(message, signature): return True return False +@six.add_metaclass(abc.ABCMeta) class Signer(object): - """Signs messages with a private key. - - Args: - private_key (rsa.key.PrivateKey): The private key to sign with. - key_id (str): Optional key ID used to identify this private key. This - can be useful to associate the private key with its associated - public key or certificate. - """ + """Abstract base class for cryptographic signers.""" - def __init__(self, private_key, key_id=None): - self._key = private_key - self.key_id = key_id + @abc.abstractproperty + def key_id(self): """Optional[str]: The key ID used to identify this private key.""" + raise NotImplementedError('Key id must be implemented') + @abc.abstractmethod def sign(self, message): """Signs a message. @@ -193,6 +201,32 @@ def sign(self, message): Returns: bytes: The signature of the message. """ + # pylint: disable=missing-raises-doc,redundant-returns-doc + # (pylint doesn't recognize that this is abstract) + raise NotImplementedError('Sign must be implemented') + + +class RSASigner(Signer): + """Signs messages with an RSA private key. + + Args: + private_key (rsa.key.PrivateKey): The private key to sign with. + key_id (str): Optional key ID used to identify this private key. This + can be useful to associate the private key with its associated + public key or certificate. + """ + + def __init__(self, private_key, key_id=None): + self._key = private_key + self._key_id = key_id + + @property + @_helpers.copy_docstring(Signer) + def key_id(self): + return self._key_id + + @_helpers.copy_docstring(Signer) + def sign(self, message): message = _helpers.to_bytes(message) return rsa.pkcs1.sign(message, self._key, 'SHA-256') diff --git a/google/auth/iam.py b/google/auth/iam.py index efa312710..e091e47f3 100644 --- a/google/auth/iam.py +++ b/google/auth/iam.py @@ -25,6 +25,7 @@ from six.moves import http_client from google.auth import _helpers +from google.auth import crypt from google.auth import exceptions _IAM_API_ROOT_URI = 'https://iam.googleapis.com/v1' @@ -32,21 +33,12 @@ _IAM_API_ROOT_URI + '/projects/-/serviceAccounts/{}:signBlob?alt=json') -class Signer(object): +class Signer(crypt.Signer): """Signs messages using the IAM `signBlob API`_. This is useful when you need to sign bytes but do not have access to the credential's private key file. - .. warning:: - The IAM API signs bytes using Google-managed keys. Because of this - it's possible that the key used to sign bytes will change. In some - cases this change can occur between successive calls to :attr:`key_id` - and :meth:`sign`. This could result in a signature that was signed - with a different key than the one indicated by :attr:`key_id`. It's - recommended that if you use this in your code that you account for - this behavior by building in retry logic. - .. _signBlob API: https://cloud.google.com/iam/reference/rest/v1/projects.serviceAccounts /signBlob @@ -98,20 +90,13 @@ def _make_signing_request(self, message): def key_id(self): """Optional[str]: The key ID used to identify this private key. - .. note:: - This makes an API request to the IAM API. + .. warning:: + This is always ``None``. The key ID used by IAM can not + be reliably determined ahead of time. """ - response = self._make_signing_request('') - return response['keyId'] + return None + @_helpers.copy_docstring(crypt.Signer) def sign(self, message): - """Signs a message. - - Args: - message (Union[str, bytes]): The message to be signed. - - Returns: - bytes: The signature of the message. - """ response = self._make_signing_request(message) return base64.b64decode(response['signature']) diff --git a/tests/oauth2/test_service_account.py b/tests/oauth2/test_service_account.py index f40ebf224..8ae545a1f 100644 --- a/tests/oauth2/test_service_account.py +++ b/tests/oauth2/test_service_account.py @@ -44,7 +44,7 @@ @pytest.fixture(scope='module') def signer(): - return crypt.Signer.from_string(PRIVATE_KEY_BYTES, '1') + return crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, '1') class TestCredentials(object): diff --git a/tests/test__service_account_info.py b/tests/test__service_account_info.py index 4caea95d3..546686530 100644 --- a/tests/test__service_account_info.py +++ b/tests/test__service_account_info.py @@ -31,7 +31,7 @@ def test_from_dict(): signer = _service_account_info.from_dict(SERVICE_ACCOUNT_INFO) - assert isinstance(signer, crypt.Signer) + assert isinstance(signer, crypt.RSASigner) assert signer.key_id == SERVICE_ACCOUNT_INFO['private_key_id'] @@ -59,5 +59,5 @@ def test_from_filename(): for key, value in six.iteritems(SERVICE_ACCOUNT_INFO): assert info[key] == value - assert isinstance(signer, crypt.Signer) + assert isinstance(signer, crypt.RSASigner) assert signer.key_id == SERVICE_ACCOUNT_INFO['private_key_id'] diff --git a/tests/test_app_engine.py b/tests/test_app_engine.py index af60bcfdb..d3a79d5c1 100644 --- a/tests/test_app_engine.py +++ b/tests/test_app_engine.py @@ -48,7 +48,7 @@ def test_key_id(self, app_identity_mock): signer = app_engine.Signer() - assert signer.key_id == mock.sentinel.key_id + assert signer.key_id is None def test_sign(self, app_identity_mock): app_identity_mock.sign_blob.return_value = ( diff --git a/tests/test_crypt.py b/tests/test_crypt.py index 9671230c6..56612dae3 100644 --- a/tests/test_crypt.py +++ b/tests/test_crypt.py @@ -69,7 +69,7 @@ def test_verify_signature(): to_sign = b'foo' - signer = crypt.Signer.from_string(PRIVATE_KEY_BYTES) + signer = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES) signature = signer.sign(to_sign) assert crypt.verify_signature( @@ -82,57 +82,57 @@ def test_verify_signature(): def test_verify_signature_failure(): to_sign = b'foo' - signer = crypt.Signer.from_string(PRIVATE_KEY_BYTES) + signer = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES) signature = signer.sign(to_sign) assert not crypt.verify_signature( to_sign, signature, OTHER_CERT_BYTES) -class TestVerifier(object): +class TestRSAVerifier(object): def test_verify_success(self): to_sign = b'foo' - signer = crypt.Signer.from_string(PRIVATE_KEY_BYTES) + signer = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES) actual_signature = signer.sign(to_sign) - verifier = crypt.Verifier.from_string(PUBLIC_KEY_BYTES) + verifier = crypt.RSAVerifier.from_string(PUBLIC_KEY_BYTES) assert verifier.verify(to_sign, actual_signature) def test_verify_unicode_success(self): to_sign = u'foo' - signer = crypt.Signer.from_string(PRIVATE_KEY_BYTES) + signer = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES) actual_signature = signer.sign(to_sign) - verifier = crypt.Verifier.from_string(PUBLIC_KEY_BYTES) + verifier = crypt.RSAVerifier.from_string(PUBLIC_KEY_BYTES) assert verifier.verify(to_sign, actual_signature) def test_verify_failure(self): - verifier = crypt.Verifier.from_string(PUBLIC_KEY_BYTES) + verifier = crypt.RSAVerifier.from_string(PUBLIC_KEY_BYTES) bad_signature1 = b'' assert not verifier.verify(b'foo', bad_signature1) bad_signature2 = b'a' assert not verifier.verify(b'foo', bad_signature2) def test_from_string_pub_key(self): - verifier = crypt.Verifier.from_string(PUBLIC_KEY_BYTES) - assert isinstance(verifier, crypt.Verifier) + verifier = crypt.RSAVerifier.from_string(PUBLIC_KEY_BYTES) + assert isinstance(verifier, crypt.RSAVerifier) assert isinstance(verifier._pubkey, rsa.key.PublicKey) def test_from_string_pub_key_unicode(self): public_key = _helpers.from_bytes(PUBLIC_KEY_BYTES) - verifier = crypt.Verifier.from_string(public_key) - assert isinstance(verifier, crypt.Verifier) + verifier = crypt.RSAVerifier.from_string(public_key) + assert isinstance(verifier, crypt.RSAVerifier) assert isinstance(verifier._pubkey, rsa.key.PublicKey) def test_from_string_pub_cert(self): - verifier = crypt.Verifier.from_string(PUBLIC_CERT_BYTES) - assert isinstance(verifier, crypt.Verifier) + verifier = crypt.RSAVerifier.from_string(PUBLIC_CERT_BYTES) + assert isinstance(verifier, crypt.RSAVerifier) assert isinstance(verifier._pubkey, rsa.key.PublicKey) def test_from_string_pub_cert_unicode(self): public_cert = _helpers.from_bytes(PUBLIC_CERT_BYTES) - verifier = crypt.Verifier.from_string(public_cert) - assert isinstance(verifier, crypt.Verifier) + verifier = crypt.RSAVerifier.from_string(public_cert) + assert isinstance(verifier, crypt.RSAVerifier) assert isinstance(verifier._pubkey, rsa.key.PublicKey) def test_from_string_pub_cert_failure(self): @@ -144,25 +144,25 @@ def test_from_string_pub_cert_failure(self): with load_pem_patch as load_pem: with pytest.raises(ValueError): - crypt.Verifier.from_string(cert_bytes) + crypt.RSAVerifier.from_string(cert_bytes) load_pem.assert_called_once_with(cert_bytes, 'CERTIFICATE') -class TestSigner(object): +class TestRSASigner(object): def test_from_string_pkcs1(self): - signer = crypt.Signer.from_string(PKCS1_KEY_BYTES) - assert isinstance(signer, crypt.Signer) + signer = crypt.RSASigner.from_string(PKCS1_KEY_BYTES) + assert isinstance(signer, crypt.RSASigner) assert isinstance(signer._key, rsa.key.PrivateKey) def test_from_string_pkcs1_unicode(self): key_bytes = _helpers.from_bytes(PKCS1_KEY_BYTES) - signer = crypt.Signer.from_string(key_bytes) - assert isinstance(signer, crypt.Signer) + signer = crypt.RSASigner.from_string(key_bytes) + assert isinstance(signer, crypt.RSASigner) assert isinstance(signer._key, rsa.key.PrivateKey) def test_from_string_pkcs8(self): - signer = crypt.Signer.from_string(PKCS8_KEY_BYTES) - assert isinstance(signer, crypt.Signer) + signer = crypt.RSASigner.from_string(PKCS8_KEY_BYTES) + assert isinstance(signer, crypt.RSASigner) assert isinstance(signer._key, rsa.key.PrivateKey) def test_from_string_pkcs8_extra_bytes(self): @@ -179,28 +179,29 @@ def test_from_string_pkcs8_extra_bytes(self): with decode_patch as decode: with pytest.raises(ValueError): - crypt.Signer.from_string(key_bytes) + crypt.RSASigner.from_string(key_bytes) # Verify mock was called. decode.assert_called_once_with( pem_bytes, asn1Spec=crypt._PKCS8_SPEC) def test_from_string_pkcs8_unicode(self): key_bytes = _helpers.from_bytes(PKCS8_KEY_BYTES) - signer = crypt.Signer.from_string(key_bytes) - assert isinstance(signer, crypt.Signer) + signer = crypt.RSASigner.from_string(key_bytes) + assert isinstance(signer, crypt.RSASigner) assert isinstance(signer._key, rsa.key.PrivateKey) def test_from_string_pkcs12(self): with pytest.raises(ValueError): - crypt.Signer.from_string(PKCS12_KEY_BYTES) + crypt.RSASigner.from_string(PKCS12_KEY_BYTES) def test_from_string_bogus_key(self): key_bytes = 'bogus-key' with pytest.raises(ValueError): - crypt.Signer.from_string(key_bytes) + crypt.RSASigner.from_string(key_bytes) def test_from_service_account_info(self): - signer = crypt.Signer.from_service_account_info(SERVICE_ACCOUNT_INFO) + signer = crypt.RSASigner.from_service_account_info( + SERVICE_ACCOUNT_INFO) assert signer.key_id == SERVICE_ACCOUNT_INFO[ crypt._JSON_FILE_PRIVATE_KEY_ID] @@ -208,12 +209,12 @@ def test_from_service_account_info(self): def test_from_service_account_info_missing_key(self): with pytest.raises(ValueError) as excinfo: - crypt.Signer.from_service_account_info({}) + crypt.RSASigner.from_service_account_info({}) assert excinfo.match(crypt._JSON_FILE_PRIVATE_KEY) def test_from_service_account_file(self): - signer = crypt.Signer.from_service_account_file( + signer = crypt.RSASigner.from_service_account_file( SERVICE_ACCOUNT_JSON_FILE) assert signer.key_id == SERVICE_ACCOUNT_INFO[ diff --git a/tests/test_iam.py b/tests/test_iam.py index 5ac991168..f7767887b 100644 --- a/tests/test_iam.py +++ b/tests/test_iam.py @@ -64,16 +64,12 @@ def test_constructor(self): mock.sentinel.service_account_email) def test_key_id(self): - key_id = '123' - request = make_request(http_client.OK, data={'keyId': key_id}) - credentials = make_credentials() - signer = iam.Signer( - request, credentials, mock.sentinel.service_account_email) + mock.sentinel.request, + mock.sentinel.credentials, + mock.sentinel.service_account_email) - assert signer.key_id == '123' - auth_header = request.call_args[1]['headers']['authorization'] - assert auth_header == 'Bearer token' + assert signer.key_id is None def test_sign_bytes(self): signature = b'DEADBEEF' diff --git a/tests/test_jwt.py b/tests/test_jwt.py index e4a9a0a28..df09ece61 100644 --- a/tests/test_jwt.py +++ b/tests/test_jwt.py @@ -44,7 +44,7 @@ @pytest.fixture def signer(): - return crypt.Signer.from_string(PRIVATE_KEY_BYTES, '1') + return crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, '1') def test_encode_basic(signer): @@ -78,7 +78,7 @@ def factory(claims=None, key_id=None): # False is specified to remove the signer's key id for testing # headers without key ids. if key_id is False: - signer.key_id = None + signer._key_id = None key_id = None return jwt.encode(signer, payload, key_id=key_id) @@ -265,7 +265,7 @@ def test_sign_bytes(self): assert crypt.verify_signature(to_sign, signature, PUBLIC_CERT_BYTES) def test_signer(self): - assert isinstance(self.credentials.signer, crypt.Signer) + assert isinstance(self.credentials.signer, crypt.RSASigner) def test_signer_email(self): assert (self.credentials.signer_email ==