-
Notifications
You must be signed in to change notification settings - Fork 61
/
dependencies.py
118 lines (98 loc) 路 3.9 KB
/
dependencies.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import logging
from datetime import datetime, timedelta
from typing import Optional, Union
from demo_project.core.config import settings
from fastapi import Depends
from fastapi.security.api_key import APIKeyHeader
from fastapi_azure_auth import (
B2CMultiTenantAuthorizationCodeBearer,
MultiTenantAzureAuthorizationCodeBearer,
SingleTenantAzureAuthorizationCodeBearer,
)
from fastapi_azure_auth.exceptions import InvalidAuth
from fastapi_azure_auth.user import User
log = logging.getLogger(__name__)
azure_scheme = SingleTenantAzureAuthorizationCodeBearer(
app_client_id=settings.APP_CLIENT_ID,
scopes={f'api://{settings.APP_CLIENT_ID}/user_impersonation': '**No client secret needed, leave blank**'},
tenant_id=settings.TENANT_ID,
)
async def validate_is_admin_user(user: User = Depends(azure_scheme)) -> None:
"""
Validate that a user is in the `AdminUser` role in order to access the API.
Raises a 401 authentication error if not.
"""
if 'AdminUser' not in user.roles:
raise InvalidAuth('User is not an AdminUser')
class IssuerFetcher:
def __init__(self) -> None:
"""
Example class for multi tenant apps, that caches issuers for an hour
"""
self.tid_to_iss: dict[str, str] = {}
self._config_timestamp: Optional[datetime] = None
async def __call__(self, tid: str) -> str:
"""
Check if memory cache needs to be updated or not, and then returns an issuer for a given tenant
:raises InvalidAuth when it's not a valid tenant
"""
refresh_time = datetime.now() - timedelta(hours=1)
if not self._config_timestamp or self._config_timestamp < refresh_time:
self._config_timestamp = datetime.now()
# logic to find your allowed tenants and it's issuers here
# (This example cache in memory for 1 hour)
self.tid_to_iss = {
'intility_tenant_id': 'https://login.microsoftonline.com/intility_tenant/v2.0',
}
try:
return self.tid_to_iss[tid]
except Exception as error:
log.exception('`iss` not found for `tid` %s. Error %s', tid, error)
raise InvalidAuth('You must be an Intility customer to access this resource')
issuer_fetcher = IssuerFetcher()
azure_scheme_auto_error_false = MultiTenantAzureAuthorizationCodeBearer(
app_client_id=settings.APP_CLIENT_ID,
scopes={
f'api://{settings.APP_CLIENT_ID}/user_impersonation': 'User impersonation',
},
validate_iss=True,
iss_callable=issuer_fetcher,
auto_error=False,
)
azure_scheme_auto_error_false_b2c = B2CMultiTenantAuthorizationCodeBearer(
app_client_id=settings.APP_CLIENT_ID,
openapi_authorization_url=str(settings.AUTH_URL),
openapi_token_url=str(settings.TOKEN_URL),
openid_config_url=str(settings.CONFIG_URL),
scopes={
f'api://{settings.APP_CLIENT_ID}/user_impersonation': 'User impersonation',
},
validate_iss=True,
iss_callable=issuer_fetcher,
auto_error=False,
)
api_key_auth_auto_error_false = APIKeyHeader(name='TEST-API-KEY', auto_error=False)
async def multi_auth(
azure_auth: Optional[User] = Depends(azure_scheme_auto_error_false),
api_key: Optional[str] = Depends(api_key_auth_auto_error_false),
) -> Union[User, str]:
"""
Example implementation.
"""
if azure_auth:
return azure_auth
if api_key == 'JonasIsCool':
return api_key
raise InvalidAuth('You must either provide a valid bearer token or API key')
async def multi_auth_b2c(
azure_auth: Optional[User] = Depends(azure_scheme_auto_error_false_b2c),
api_key: Optional[str] = Depends(api_key_auth_auto_error_false),
) -> Union[User, str]:
"""
Example implementation.
"""
if azure_auth:
return azure_auth
if api_key == 'JonasIsCool':
return api_key
raise InvalidAuth('You must either provide a valid bearer token or API key')