Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ repos:
- id: mixed-line-ending
exclude: ^.*\.(lock)$
- id: detect-private-key
exclude: api/src/authentication/mock_token_generator.py
exclude: api/src/tests/integration/mock_authentication.py
- id: no-commit-to-branch
args: [--branch, main, --branch, master]
stages: [commit-msg]
Expand Down
54 changes: 2 additions & 52 deletions api/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion api/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ license = ""
cachetools = "^5.3.0"
python = "^3.10"
fastapi = "^0.101.0"
python-jose = {extras = ["cryptography"], version = "^3.3.0"}
pyjwt = "^2.8.0"
uvicorn = {extras = ["standard"], version = "^0.21.1"}
pymongo = "4.1.1"
certifi = "^2023.7.22"
Expand Down
23 changes: 6 additions & 17 deletions api/src/authentication/authentication.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import httpx
import jwt
from cachetools import TTLCache, cached
from fastapi import Security
from fastapi.security import OAuth2AuthorizationCodeBearer
from jose import JWTError, jwt

from authentication.mock_token_generator import mock_rsa_public_key
from authentication.models import User
from common.exceptions import credentials_exception
from common.logger import logger
from config import config, default_user
from config import config

oauth2_scheme = OAuth2AuthorizationCodeBearer(
authorizationUrl=config.OAUTH_AUTH_ENDPOINT,
Expand All @@ -18,39 +17,29 @@


@cached(cache=TTLCache(maxsize=32, ttl=86400))
def fetch_openid_configuration() -> dict[str, str]:
def get_JWK_client() -> jwt.PyJWKClient:
try:
oid_conf_response = httpx.get(config.OAUTH_WELL_KNOWN)
oid_conf_response.raise_for_status()
oid_conf = oid_conf_response.json()
json_web_key_set_response = httpx.get(oid_conf["jwks_uri"])
json_web_key_set_response.raise_for_status()
return {
"authorization_endpoint": oid_conf["authorization_endpoint"],
"token_endpoint": oid_conf["token_endpoint"],
"jwks": json_web_key_set_response.json()["keys"],
}
return jwt.PyJWKClient(oid_conf["jwks_uri"])
except Exception as error:
logger.error(f"Failed to fetch OpenId Connect configuration for '{config.OAUTH_WELL_KNOWN}': {error}")
raise credentials_exception


def auth_with_jwt(jwt_token: str = Security(oauth2_scheme)) -> User:
if not config.AUTH_ENABLED:
return default_user
if not jwt_token:
raise credentials_exception
# If TEST_TOKEN is true, we are running tests. Use the self-signed keys. If not, get keys from auth server.
key = mock_rsa_public_key if config.TEST_TOKEN else {"keys": fetch_openid_configuration()["jwks"]}

key = get_JWK_client().get_signing_key_from_jwt(jwt_token).key
try:
payload = jwt.decode(jwt_token, key, algorithms=["RS256"], audience=config.OAUTH_AUDIENCE)
if config.MICROSOFT_AUTH_PROVIDER in payload["iss"]:
# Azure AD uses an oid string to uniquely identify users. Each user has a unique oid value.
user = User(user_id=payload["oid"], **payload)
else:
user = User(user_id=payload["sub"], **payload)
except JWTError as error:
except jwt.exceptions.InvalidTokenError as error:
logger.warning(f"Failed to decode JWT: {error}")
raise credentials_exception

Expand Down
9 changes: 4 additions & 5 deletions api/src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,13 @@ class Config(BaseSettings):
raise ValueError("Authentication was enabled, but some auth configuration parameters are missing")

if not config.AUTH_ENABLED:
print("\n")
print("################ WARNING ################")
print("# Authentication is disabled #")
print("################ WARNING ################\n")

default_user: User = User(
**{
"user_id": "nologin",
"full_name": "Not Authenticated",
"email": "nologin@example.com",
}
user_id="nologin",
full_name="Not Authenticated",
email="nologin@example.com",
)
3 changes: 3 additions & 0 deletions api/src/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
from starlette.testclient import TestClient

from app import create_app
from authentication.authentication import auth_with_jwt
from config import config
from data_providers.clients.mongodb.mongo_database_client import MongoDatabaseClient
from features.todo.repository.todo_repository import TodoRepository, get_todo_repository
from tests.integration.mock_authentication import mock_auth_with_jwt


@pytest.fixture(scope="function")
Expand Down Expand Up @@ -39,6 +41,7 @@ def use_todo_repository_mock():
return TodoRepository(client=test_client)

app.dependency_overrides[get_todo_repository] = use_todo_repository_mock
app.dependency_overrides[auth_with_jwt] = mock_auth_with_jwt
yield client


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
from starlette.status import HTTP_200_OK
from starlette.testclient import TestClient

from authentication.mock_token_generator import generate_mock_token
from authentication.models import User
from config import config
from tests.integration.mock_authentication import get_mock_jwt_token

pytestmark = pytest.mark.integration

Expand All @@ -14,7 +14,7 @@ def test_whoami(self, test_app: TestClient):
config.AUTH_ENABLED = True
config.TEST_TOKEN = True
user = User(user_id="1", email="foo@bar.baz", roles=["a"])
headers = {"Authorization": f"Bearer {generate_mock_token(user)}"}
headers = {"Authorization": f"Bearer {get_mock_jwt_token(user)}"}
response = test_app.get("/whoami", headers=headers)
data = response.json()
assert response.status_code == HTTP_200_OK
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
from jose import jwt
import jwt
from fastapi import Security

from authentication.authentication import oauth2_scheme
from authentication.models import User
from config import default_user
from common.exceptions import credentials_exception
from config import config, default_user

# Generated with: 'openssl req -nodes -new -x509 -keyout server.key -out server.cert'
mock_rsa_private_key = """

def get_mock_rsa_private_key() -> str:
"""
Used for testing.
Generated with: 'openssl req -nodes -new -x509 -keyout server.key -out server.cert'.
"""
return """
-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDfsOW9ih/oBUwl
LEH4t2C2GZeq3/dEXCkK54CNPZv979rir0nQQ5pLVcoohoVFe+QwC746xg8t7/YP
Expand Down Expand Up @@ -35,9 +43,13 @@
-----END PRIVATE KEY-----
"""

# Python-jose require public keys instead of x509 certs.
# Convert cert to pub key with: 'openssl x509 -pubkey -noout < server.cert'
mock_rsa_public_key = """

def get_mock_rsa_public_key() -> str:
"""
Used for testing.
Convert cert to pub key with: 'openssl x509 -pubkey -noout < server.cert'
"""
return """
-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA37DlvYof6AVMJSxB+Ldg
thmXqt/3RFwpCueAjT2b/e/a4q9J0EOaS1XKKIaFRXvkMAu+OsYPLe/2D3Fh8HB1
Expand All @@ -50,7 +62,7 @@
"""


def generate_mock_token(user: User = default_user) -> str:
def get_mock_jwt_token(user: User = default_user) -> str:
"""
This function is for testing purposes only
Used for behave testing
Expand All @@ -59,9 +71,25 @@ def generate_mock_token(user: User = default_user) -> str:
payload = {
"name": user.full_name,
"preferred_username": user.email,
"scp": "FoR_test_scope",
"scp": "testing",
"sub": user.user_id,
"roles": user.roles,
"iss": "mock-auth-server",
"aud": "TEST",
}
return jwt.encode(payload, mock_rsa_private_key, algorithm="RS256")
# This absolutely returns a str, so this is possibly a mypy bug
return jwt.encode(payload, get_mock_rsa_private_key(), algorithm="RS256") # type: ignore[no-any-return]


def mock_auth_with_jwt(jwt_token: str = Security(oauth2_scheme)) -> User:
if not config.AUTH_ENABLED:
return default_user
try:
payload = jwt.decode(jwt_token, get_mock_rsa_public_key(), algorithms=["RS256"], audience="TEST")
print(payload)
user = User(user_id=payload["sub"], **payload)
except jwt.exceptions.InvalidTokenError as error:
raise credentials_exception from error
if user is None:
raise credentials_exception
return user