diff --git a/google/oauth2/challenges.py b/google/oauth2/challenges.py index c55796323..6468498bc 100644 --- a/google/oauth2/challenges.py +++ b/google/oauth2/challenges.py @@ -22,12 +22,19 @@ from google.auth import _helpers from google.auth import exceptions +from google.oauth2 import webauthn_handler_factory +from google.oauth2.webauthn_types import ( + AuthenticationExtensionsClientInputs, + GetRequest, + PublicKeyCredentialDescriptor, +) REAUTH_ORIGIN = "https://accounts.google.com" SAML_CHALLENGE_MESSAGE = ( "Please run `gcloud auth login` to complete reauthentication with SAML." ) +WEBAUTHN_TIMEOUT_MS = 120000 # Two minute timeout def get_user_password(text): @@ -110,6 +117,17 @@ def is_locally_eligible(self): @_helpers.copy_docstring(ReauthChallenge) def obtain_challenge_input(self, metadata): + # Check if there is an available Webauthn Handler, if not use pyu2f + try: + factory = webauthn_handler_factory.WebauthnHandlerFactory() + webauthn_handler = factory.get_handler() + if webauthn_handler is not None: + sys.stderr.write("Please insert and touch your security key\n") + return self._obtain_challenge_input_webauthn(metadata, webauthn_handler) + except Exception: + # Attempt pyu2f if exception in webauthn flow + pass + try: import pyu2f.convenience.authenticator # type: ignore import pyu2f.errors # type: ignore @@ -173,6 +191,66 @@ def obtain_challenge_input(self, metadata): sys.stderr.write("No security key found.\n") return None + def _obtain_challenge_input_webauthn(self, metadata, webauthn_handler): + sk = metadata.get("securityKey") + if sk is None: + raise exceptions.InvalidValue("securityKey is None") + challenges = sk.get("challenges") + application_id = sk.get("applicationId") + relying_party_id = sk.get("relyingPartyId") + if challenges is None or len(challenges) < 1: + raise exceptions.InvalidValue("challenges is None or empty") + if application_id is None: + raise exceptions.InvalidValue("application_id is None") + if relying_party_id is None: + raise exceptions.InvalidValue("relying_party_id is None") + + allow_credentials = [] + for challenge in challenges: + kh = challenge.get("keyHandle") + if kh is None: + raise exceptions.InvalidValue("keyHandle is None") + key_handle = self._unpadded_urlsafe_b64recode(kh) + allow_credentials.append(PublicKeyCredentialDescriptor(id=key_handle)) + + extension = AuthenticationExtensionsClientInputs(appid=application_id) + + challenge = challenges[0].get("challenge") + if challenge is None: + raise exceptions.InvalidValue("challenge is None") + + get_request = GetRequest( + origin=REAUTH_ORIGIN, + rpid=relying_party_id, + challenge=self._unpadded_urlsafe_b64recode(challenge), + timeout_ms=WEBAUTHN_TIMEOUT_MS, + allow_credentials=allow_credentials, + user_verification="required", + extensions=extension, + ) + + try: + get_response = webauthn_handler.get(get_request) + except Exception as e: + sys.stderr.write("Webauthn Error: {}.\n".format(e)) + raise e + + response = { + "clientData": get_response.response.client_data_json, + "authenticatorData": get_response.response.authenticator_data, + "signatureData": get_response.response.signature, + "applicationId": application_id, + "keyHandle": get_response.id, + "securityKeyReplyType": 2, + } + return {"securityKey": response} + + def _unpadded_urlsafe_b64recode(self, s): + """Converts standard b64 encoded string to url safe b64 encoded string + with no padding.""" + b = base64.urlsafe_b64decode(s) + return base64.urlsafe_b64encode(b).decode().rstrip("=") + class SamlChallenge(ReauthChallenge): """Challenge that asks the users to browse to their ID Providers. diff --git a/google/oauth2/webauthn_handler_factory.py b/google/oauth2/webauthn_handler_factory.py new file mode 100644 index 000000000..184329fed --- /dev/null +++ b/google/oauth2/webauthn_handler_factory.py @@ -0,0 +1,16 @@ +from typing import List, Optional + +from google.oauth2.webauthn_handler import PluginHandler, WebAuthnHandler + + +class WebauthnHandlerFactory: + handlers: List[WebAuthnHandler] + + def __init__(self): + self.handlers = [PluginHandler()] + + def get_handler(self) -> Optional[WebAuthnHandler]: + for handler in self.handlers: + if handler.is_available(): + return handler + return None diff --git a/tests/oauth2/test_challenges.py b/tests/oauth2/test_challenges.py index a06f55283..4116b913a 100644 --- a/tests/oauth2/test_challenges.py +++ b/tests/oauth2/test_challenges.py @@ -15,6 +15,7 @@ """Tests for the reauth module.""" import base64 +import os import sys import mock @@ -23,6 +24,13 @@ from google.auth import exceptions from google.oauth2 import challenges +from google.oauth2.webauthn_types import ( + AuthenticationExtensionsClientInputs, + AuthenticatorAssertionResponse, + GetRequest, + GetResponse, + PublicKeyCredentialDescriptor, +) def test_get_user_password(): @@ -54,6 +62,8 @@ def test_security_key(): # Test the case that security key challenge is passed with applicationId and # relyingPartyId the same. + os.environ.pop('"GOOGLE_AUTH_WEBAUTHN_PLUGIN"', None) + with mock.patch("pyu2f.model.RegisteredKey", return_value=mock_key): with mock.patch( "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate" @@ -70,6 +80,19 @@ def test_security_key(): print_callback=sys.stderr.write, ) + # Test the case that webauthn plugin is available + os.environ["GOOGLE_AUTH_WEBAUTHN_PLUGIN"] = "plugin" + + with mock.patch( + "google.oauth2.challenges.SecurityKeyChallenge._obtain_challenge_input_webauthn", + return_value={"securityKey": "security key response"}, + ): + + assert challenge.obtain_challenge_input(metadata) == { + "securityKey": "security key response" + } + os.environ.pop('"GOOGLE_AUTH_WEBAUTHN_PLUGIN"', None) + # Test the case that security key challenge is passed with applicationId and # relyingPartyId different, first call works. metadata["securityKey"]["relyingPartyId"] = "security_key_relying_party_id" @@ -173,6 +196,136 @@ def test_security_key(): assert excinfo.match(r"pyu2f dependency is required") +def test_security_key_webauthn(): + metadata = { + "status": "READY", + "challengeId": 2, + "challengeType": "SECURITY_KEY", + "securityKey": { + "applicationId": "security_key_application_id", + "challenges": [ + { + "keyHandle": "some_key", + "challenge": base64.urlsafe_b64encode( + "some_challenge".encode("ascii") + ).decode("ascii"), + } + ], + "relyingPartyId": "security_key_application_id", + }, + } + + challenge = challenges.SecurityKeyChallenge() + + sk = metadata["securityKey"] + sk_challenges = sk["challenges"] + + application_id = sk["applicationId"] + + allow_credentials = [] + for sk_challenge in sk_challenges: + allow_credentials.append( + PublicKeyCredentialDescriptor(id=sk_challenge["keyHandle"]) + ) + + extension = AuthenticationExtensionsClientInputs(appid=application_id) + + get_request = GetRequest( + origin=challenges.REAUTH_ORIGIN, + rpid=application_id, + challenge=challenge._unpadded_urlsafe_b64recode(sk_challenge["challenge"]), + timeout_ms=challenges.WEBAUTHN_TIMEOUT_MS, + allow_credentials=allow_credentials, + user_verification="required", + extensions=extension, + ) + + assertion_resp = AuthenticatorAssertionResponse( + client_data_json="clientDataJSON", + authenticator_data="authenticatorData", + signature="signature", + user_handle="userHandle", + ) + get_response = GetResponse( + id="id", + response=assertion_resp, + authenticator_attachment="authenticatorAttachment", + client_extension_results="clientExtensionResults", + ) + response = { + "clientData": get_response.response.client_data_json, + "authenticatorData": get_response.response.authenticator_data, + "signatureData": get_response.response.signature, + "applicationId": "security_key_application_id", + "keyHandle": get_response.id, + "securityKeyReplyType": 2, + } + + mock_handler = mock.Mock() + mock_handler.get.return_value = get_response + + # Test success case + assert challenge._obtain_challenge_input_webauthn(metadata, mock_handler) == { + "securityKey": response + } + mock_handler.get.assert_called_with(get_request) + + # Test exceptions + + # Missing Values + sk = metadata["securityKey"] + metadata["securityKey"] = None + with pytest.raises(exceptions.InvalidValue): + challenge._obtain_challenge_input_webauthn(metadata, mock_handler) + metadata["securityKey"] = sk + + c = metadata["securityKey"]["challenges"] + metadata["securityKey"]["challenges"] = None + with pytest.raises(exceptions.InvalidValue): + challenge._obtain_challenge_input_webauthn(metadata, mock_handler) + metadata["securityKey"]["challenges"] = [] + with pytest.raises(exceptions.InvalidValue): + challenge._obtain_challenge_input_webauthn(metadata, mock_handler) + metadata["securityKey"]["challenges"] = c + + aid = metadata["securityKey"]["applicationId"] + metadata["securityKey"]["applicationId"] = None + with pytest.raises(exceptions.InvalidValue): + challenge._obtain_challenge_input_webauthn(metadata, mock_handler) + metadata["securityKey"]["applicationId"] = aid + + rpi = metadata["securityKey"]["relyingPartyId"] + metadata["securityKey"]["relyingPartyId"] = None + with pytest.raises(exceptions.InvalidValue): + challenge._obtain_challenge_input_webauthn(metadata, mock_handler) + metadata["securityKey"]["relyingPartyId"] = rpi + + kh = metadata["securityKey"]["challenges"][0]["keyHandle"] + metadata["securityKey"]["challenges"][0]["keyHandle"] = None + with pytest.raises(exceptions.InvalidValue): + challenge._obtain_challenge_input_webauthn(metadata, mock_handler) + metadata["securityKey"]["challenges"][0]["keyHandle"] = kh + + ch = metadata["securityKey"]["challenges"][0]["challenge"] + metadata["securityKey"]["challenges"][0]["challenge"] = None + with pytest.raises(exceptions.InvalidValue): + challenge._obtain_challenge_input_webauthn(metadata, mock_handler) + metadata["securityKey"]["challenges"][0]["challenge"] = ch + + # Handler Exceptions + mock_handler.get.side_effect = exceptions.MalformedError + with pytest.raises(exceptions.MalformedError): + challenge._obtain_challenge_input_webauthn(metadata, mock_handler) + + mock_handler.get.side_effect = exceptions.InvalidResource + with pytest.raises(exceptions.InvalidResource): + challenge._obtain_challenge_input_webauthn(metadata, mock_handler) + + mock_handler.get.side_effect = exceptions.ReauthFailError + with pytest.raises(exceptions.ReauthFailError): + challenge._obtain_challenge_input_webauthn(metadata, mock_handler) + + @mock.patch("getpass.getpass", return_value="foo") def test_password_challenge(getpass_mock): challenge = challenges.PasswordChallenge() diff --git a/tests/oauth2/test_webauthn_handler_factory.py b/tests/oauth2/test_webauthn_handler_factory.py new file mode 100644 index 000000000..47890ce4b --- /dev/null +++ b/tests/oauth2/test_webauthn_handler_factory.py @@ -0,0 +1,29 @@ +import mock +import pytest # type: ignore + +from google.oauth2 import webauthn_handler +from google.oauth2 import webauthn_handler_factory + + +@pytest.fixture +def os_get_stub(): + with mock.patch.object( + webauthn_handler.os.environ, + "get", + return_value="gcloud_webauthn_plugin", + name="fake os.environ.get", + ) as mock_os_environ_get: + yield mock_os_environ_get + + +# Check that get_handler returns a value when env is set, +# that type is PluginHandler, and that no value is returned +# if env not set. +def test_WebauthHandlerFactory_get(os_get_stub): + factory = webauthn_handler_factory.WebauthnHandlerFactory() + assert factory.get_handler() is not None + + assert isinstance(factory.get_handler(), webauthn_handler.PluginHandler) + + os_get_stub.return_value = None + assert factory.get_handler() is None