Skip to content

Commit

Permalink
feat: Enable webauthn plugin for security keys (#1528)
Browse files Browse the repository at this point in the history
* feat: Enable webauthn handling when plugin is installed.

* Minor code cleanup.

* feat: Enable webauthn plugin for security keys

Move key press prompt and remove TODO question.

* feat: Enable webauthn plugin for security keys

Fix lint and mypy errors.

* feat: Enable webauthn plugin for security keys

Check dict accesses for None.
Remove commented out line.

* feat: Enable webauthn plugin for security keys

Change _urlsafe_b64recode to _unpadded_urlsafe_b64recode for clarity.

* feat: Enable webauthn plugin for security keys

Fix broken test and add test clauses to bring coverage to 100%.

---------

Co-authored-by: arithmetic1728 <58957152+arithmetic1728@users.noreply.github.com>
  • Loading branch information
cpisunyer and arithmetic1728 committed Jun 6, 2024
1 parent adb94f7 commit e2d5e63
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 0 deletions.
78 changes: 78 additions & 0 deletions google/oauth2/challenges.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,19 @@

from google.auth import _helpers
from google.auth import exceptions
from google.oauth2 import webauthn_handler_factory
from google.oauth2.webauthn_types import (
AuthenticationExtensionsClientInputs,
GetRequest,
PublicKeyCredentialDescriptor,
)


REAUTH_ORIGIN = "https://accounts.google.com"
SAML_CHALLENGE_MESSAGE = (
"Please run `gcloud auth login` to complete reauthentication with SAML."
)
WEBAUTHN_TIMEOUT_MS = 120000 # Two minute timeout


def get_user_password(text):
Expand Down Expand Up @@ -110,6 +117,17 @@ def is_locally_eligible(self):

@_helpers.copy_docstring(ReauthChallenge)
def obtain_challenge_input(self, metadata):
# Check if there is an available Webauthn Handler, if not use pyu2f
try:
factory = webauthn_handler_factory.WebauthnHandlerFactory()
webauthn_handler = factory.get_handler()
if webauthn_handler is not None:
sys.stderr.write("Please insert and touch your security key\n")
return self._obtain_challenge_input_webauthn(metadata, webauthn_handler)
except Exception:
# Attempt pyu2f if exception in webauthn flow
pass

try:
import pyu2f.convenience.authenticator # type: ignore
import pyu2f.errors # type: ignore
Expand Down Expand Up @@ -173,6 +191,66 @@ def obtain_challenge_input(self, metadata):
sys.stderr.write("No security key found.\n")
return None

def _obtain_challenge_input_webauthn(self, metadata, webauthn_handler):
sk = metadata.get("securityKey")
if sk is None:
raise exceptions.InvalidValue("securityKey is None")
challenges = sk.get("challenges")
application_id = sk.get("applicationId")
relying_party_id = sk.get("relyingPartyId")
if challenges is None or len(challenges) < 1:
raise exceptions.InvalidValue("challenges is None or empty")
if application_id is None:
raise exceptions.InvalidValue("application_id is None")
if relying_party_id is None:
raise exceptions.InvalidValue("relying_party_id is None")

allow_credentials = []
for challenge in challenges:
kh = challenge.get("keyHandle")
if kh is None:
raise exceptions.InvalidValue("keyHandle is None")
key_handle = self._unpadded_urlsafe_b64recode(kh)
allow_credentials.append(PublicKeyCredentialDescriptor(id=key_handle))

extension = AuthenticationExtensionsClientInputs(appid=application_id)

challenge = challenges[0].get("challenge")
if challenge is None:
raise exceptions.InvalidValue("challenge is None")

get_request = GetRequest(
origin=REAUTH_ORIGIN,
rpid=relying_party_id,
challenge=self._unpadded_urlsafe_b64recode(challenge),
timeout_ms=WEBAUTHN_TIMEOUT_MS,
allow_credentials=allow_credentials,
user_verification="required",
extensions=extension,
)

try:
get_response = webauthn_handler.get(get_request)
except Exception as e:
sys.stderr.write("Webauthn Error: {}.\n".format(e))
raise e

response = {
"clientData": get_response.response.client_data_json,
"authenticatorData": get_response.response.authenticator_data,
"signatureData": get_response.response.signature,
"applicationId": application_id,
"keyHandle": get_response.id,
"securityKeyReplyType": 2,
}
return {"securityKey": response}

def _unpadded_urlsafe_b64recode(self, s):
"""Converts standard b64 encoded string to url safe b64 encoded string
with no padding."""
b = base64.urlsafe_b64decode(s)
return base64.urlsafe_b64encode(b).decode().rstrip("=")


class SamlChallenge(ReauthChallenge):
"""Challenge that asks the users to browse to their ID Providers.
Expand Down
16 changes: 16 additions & 0 deletions google/oauth2/webauthn_handler_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from typing import List, Optional

from google.oauth2.webauthn_handler import PluginHandler, WebAuthnHandler


class WebauthnHandlerFactory:
handlers: List[WebAuthnHandler]

def __init__(self):
self.handlers = [PluginHandler()]

def get_handler(self) -> Optional[WebAuthnHandler]:
for handler in self.handlers:
if handler.is_available():
return handler
return None
153 changes: 153 additions & 0 deletions tests/oauth2/test_challenges.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""Tests for the reauth module."""

import base64
import os
import sys

import mock
Expand All @@ -23,6 +24,13 @@

from google.auth import exceptions
from google.oauth2 import challenges
from google.oauth2.webauthn_types import (
AuthenticationExtensionsClientInputs,
AuthenticatorAssertionResponse,
GetRequest,
GetResponse,
PublicKeyCredentialDescriptor,
)


def test_get_user_password():
Expand Down Expand Up @@ -54,6 +62,8 @@ def test_security_key():

# Test the case that security key challenge is passed with applicationId and
# relyingPartyId the same.
os.environ.pop('"GOOGLE_AUTH_WEBAUTHN_PLUGIN"', None)

with mock.patch("pyu2f.model.RegisteredKey", return_value=mock_key):
with mock.patch(
"pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate"
Expand All @@ -70,6 +80,19 @@ def test_security_key():
print_callback=sys.stderr.write,
)

# Test the case that webauthn plugin is available
os.environ["GOOGLE_AUTH_WEBAUTHN_PLUGIN"] = "plugin"

with mock.patch(
"google.oauth2.challenges.SecurityKeyChallenge._obtain_challenge_input_webauthn",
return_value={"securityKey": "security key response"},
):

assert challenge.obtain_challenge_input(metadata) == {
"securityKey": "security key response"
}
os.environ.pop('"GOOGLE_AUTH_WEBAUTHN_PLUGIN"', None)

# Test the case that security key challenge is passed with applicationId and
# relyingPartyId different, first call works.
metadata["securityKey"]["relyingPartyId"] = "security_key_relying_party_id"
Expand Down Expand Up @@ -173,6 +196,136 @@ def test_security_key():
assert excinfo.match(r"pyu2f dependency is required")


def test_security_key_webauthn():
metadata = {
"status": "READY",
"challengeId": 2,
"challengeType": "SECURITY_KEY",
"securityKey": {
"applicationId": "security_key_application_id",
"challenges": [
{
"keyHandle": "some_key",
"challenge": base64.urlsafe_b64encode(
"some_challenge".encode("ascii")
).decode("ascii"),
}
],
"relyingPartyId": "security_key_application_id",
},
}

challenge = challenges.SecurityKeyChallenge()

sk = metadata["securityKey"]
sk_challenges = sk["challenges"]

application_id = sk["applicationId"]

allow_credentials = []
for sk_challenge in sk_challenges:
allow_credentials.append(
PublicKeyCredentialDescriptor(id=sk_challenge["keyHandle"])
)

extension = AuthenticationExtensionsClientInputs(appid=application_id)

get_request = GetRequest(
origin=challenges.REAUTH_ORIGIN,
rpid=application_id,
challenge=challenge._unpadded_urlsafe_b64recode(sk_challenge["challenge"]),
timeout_ms=challenges.WEBAUTHN_TIMEOUT_MS,
allow_credentials=allow_credentials,
user_verification="required",
extensions=extension,
)

assertion_resp = AuthenticatorAssertionResponse(
client_data_json="clientDataJSON",
authenticator_data="authenticatorData",
signature="signature",
user_handle="userHandle",
)
get_response = GetResponse(
id="id",
response=assertion_resp,
authenticator_attachment="authenticatorAttachment",
client_extension_results="clientExtensionResults",
)
response = {
"clientData": get_response.response.client_data_json,
"authenticatorData": get_response.response.authenticator_data,
"signatureData": get_response.response.signature,
"applicationId": "security_key_application_id",
"keyHandle": get_response.id,
"securityKeyReplyType": 2,
}

mock_handler = mock.Mock()
mock_handler.get.return_value = get_response

# Test success case
assert challenge._obtain_challenge_input_webauthn(metadata, mock_handler) == {
"securityKey": response
}
mock_handler.get.assert_called_with(get_request)

# Test exceptions

# Missing Values
sk = metadata["securityKey"]
metadata["securityKey"] = None
with pytest.raises(exceptions.InvalidValue):
challenge._obtain_challenge_input_webauthn(metadata, mock_handler)
metadata["securityKey"] = sk

c = metadata["securityKey"]["challenges"]
metadata["securityKey"]["challenges"] = None
with pytest.raises(exceptions.InvalidValue):
challenge._obtain_challenge_input_webauthn(metadata, mock_handler)
metadata["securityKey"]["challenges"] = []
with pytest.raises(exceptions.InvalidValue):
challenge._obtain_challenge_input_webauthn(metadata, mock_handler)
metadata["securityKey"]["challenges"] = c

aid = metadata["securityKey"]["applicationId"]
metadata["securityKey"]["applicationId"] = None
with pytest.raises(exceptions.InvalidValue):
challenge._obtain_challenge_input_webauthn(metadata, mock_handler)
metadata["securityKey"]["applicationId"] = aid

rpi = metadata["securityKey"]["relyingPartyId"]
metadata["securityKey"]["relyingPartyId"] = None
with pytest.raises(exceptions.InvalidValue):
challenge._obtain_challenge_input_webauthn(metadata, mock_handler)
metadata["securityKey"]["relyingPartyId"] = rpi

kh = metadata["securityKey"]["challenges"][0]["keyHandle"]
metadata["securityKey"]["challenges"][0]["keyHandle"] = None
with pytest.raises(exceptions.InvalidValue):
challenge._obtain_challenge_input_webauthn(metadata, mock_handler)
metadata["securityKey"]["challenges"][0]["keyHandle"] = kh

ch = metadata["securityKey"]["challenges"][0]["challenge"]
metadata["securityKey"]["challenges"][0]["challenge"] = None
with pytest.raises(exceptions.InvalidValue):
challenge._obtain_challenge_input_webauthn(metadata, mock_handler)
metadata["securityKey"]["challenges"][0]["challenge"] = ch

# Handler Exceptions
mock_handler.get.side_effect = exceptions.MalformedError
with pytest.raises(exceptions.MalformedError):
challenge._obtain_challenge_input_webauthn(metadata, mock_handler)

mock_handler.get.side_effect = exceptions.InvalidResource
with pytest.raises(exceptions.InvalidResource):
challenge._obtain_challenge_input_webauthn(metadata, mock_handler)

mock_handler.get.side_effect = exceptions.ReauthFailError
with pytest.raises(exceptions.ReauthFailError):
challenge._obtain_challenge_input_webauthn(metadata, mock_handler)


@mock.patch("getpass.getpass", return_value="foo")
def test_password_challenge(getpass_mock):
challenge = challenges.PasswordChallenge()
Expand Down
29 changes: 29 additions & 0 deletions tests/oauth2/test_webauthn_handler_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import mock
import pytest # type: ignore

from google.oauth2 import webauthn_handler
from google.oauth2 import webauthn_handler_factory


@pytest.fixture
def os_get_stub():
with mock.patch.object(
webauthn_handler.os.environ,
"get",
return_value="gcloud_webauthn_plugin",
name="fake os.environ.get",
) as mock_os_environ_get:
yield mock_os_environ_get


# Check that get_handler returns a value when env is set,
# that type is PluginHandler, and that no value is returned
# if env not set.
def test_WebauthHandlerFactory_get(os_get_stub):
factory = webauthn_handler_factory.WebauthnHandlerFactory()
assert factory.get_handler() is not None

assert isinstance(factory.get_handler(), webauthn_handler.PluginHandler)

os_get_stub.return_value = None
assert factory.get_handler() is None

0 comments on commit e2d5e63

Please sign in to comment.