-
Notifications
You must be signed in to change notification settings - Fork 5
/
main.py
84 lines (70 loc) · 2.66 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import json
from http import HTTPStatus
from typing import Any, Optional
from fastapi import FastAPI
from fastapi_jwt_auth import AuthJWT # type: ignore
from fastapi_jwt_auth.exceptions import AuthJWTException # type: ignore
from starlette.requests import Request
from starlette.responses import JSONResponse
from antarest.core.config import Config
from antarest.core.interfaces.eventbus import DummyEventBusService, IEventBus
from antarest.core.utils.fastapi_sqlalchemy import db
from antarest.login.ldap import LdapService
from antarest.login.repository import BotRepository, GroupRepository, RoleRepository, UserLdapRepository, UserRepository
from antarest.login.service import LoginService
from antarest.login.web import create_login_api
def build_login(
application: Optional[FastAPI],
config: Config,
service: Optional[LoginService] = None,
event_bus: IEventBus = DummyEventBusService(),
) -> LoginService:
"""
Login module linking dependency
Args:
application: flask application
config: server configuration
service: used by testing to inject mock. Let None to use true instantiation
event_bus: used by testing to inject mock. Let None to use true instantiation
Returns: user facade service
"""
if service is None:
user_repo = UserRepository(config)
bot_repo = BotRepository()
group_repo = GroupRepository()
role_repo = RoleRepository()
ldap_repo = UserLdapRepository()
ldap = LdapService(
config=config, users=ldap_repo, groups=group_repo, roles=role_repo
)
service = LoginService(
user_repo=user_repo,
bot_repo=bot_repo,
group_repo=group_repo,
role_repo=role_repo,
ldap=ldap,
event_bus=event_bus,
)
if application:
@application.exception_handler(AuthJWTException)
def authjwt_exception_handler(
request: Request, exc: AuthJWTException
) -> Any:
return JSONResponse(
status_code=HTTPStatus.UNAUTHORIZED,
content={"detail": exc.message},
)
@AuthJWT.token_in_denylist_loader # type: ignore
def check_if_token_is_revoked(decrypted_token: Any) -> bool:
subject = json.loads(decrypted_token["sub"])
user_id = subject["id"]
token_type = subject["type"]
with db():
return (
token_type == "bots"
and service is not None
and not service.exists_bot(user_id)
)
if application:
application.include_router(create_login_api(service, config))
return service