/
router.py
110 lines (93 loc) · 3.07 KB
/
router.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
from sqlalchemy import literal
from fastapi import APIRouter, Depends, Query
from typing import List
from boucanpy.core import logger, abort, only
from boucanpy.core.security import ScopedTo, TokenPayload
from boucanpy.core import SortQS, PaginationQS
from boucanpy.core.http_server import HttpServerRepo
from boucanpy.core.zone import ZoneRepo
from boucanpy.core.http_request import (
HttpRequestRepo,
HttpRequestsResponse,
HttpRequestResponse,
HttpRequestData,
HttpRequestCreateForm,
)
router = APIRouter()
options = {"prefix": ""}
@router.get(
"/http-request", name="http_request.index", response_model=HttpRequestsResponse
)
async def index(
sort_qs: SortQS = Depends(SortQS),
pagination: PaginationQS = Depends(PaginationQS),
http_request_repo: HttpRequestRepo = Depends(HttpRequestRepo()),
token: TokenPayload = Depends(ScopedTo("http-request:list")),
):
pg, items = (
http_request_repo.loads("http_server")
.sort(sort_qs)
.includes("http_server")
.paginate(pagination)
.data()
)
return HttpRequestsResponse(pagination=pg, http_requests=items)
@router.post(
"/http-request", name="http_request.store", response_model=HttpRequestResponse
)
async def store(
form: HttpRequestCreateForm,
http_request_repo: HttpRequestRepo = Depends(HttpRequestRepo()),
zone_repo: ZoneRepo = Depends(ZoneRepo()),
http_server_repo: HttpServerRepo = Depends(HttpServerRepo()),
token: str = Depends(ScopedTo("http-request:create")),
):
http_server_id = (
http_server_repo.first_or_fail(name=form.http_server_name.lower()).results().id
)
zone = (
zone_repo.filter(literal(form.name.lower()).contains(zone_repo.label("domain")))
.first()
.results()
)
zone_id = zone.id if zone else None
data = only(
dict(form),
[
"name",
"path",
"source_address",
"source_port",
"type",
"protocol",
"raw_request",
],
)
data["name"] = data["name"].lower()
data["type"] = data["type"].upper()
data["http_server_id"] = http_server_id
data["zone_id"] = zone_id
logger.info("store@router.py - Creating HTTP Request")
http_request = http_request_repo.create(data).data()
return HttpRequestResponse(http_request=http_request)
@router.get(
"/http-request/{http_request_id}",
name="http_request.show",
response_model=HttpRequestResponse,
)
async def show(
http_request_id: int,
http_request_repo: HttpRequestRepo = Depends(HttpRequestRepo()),
token: TokenPayload = Depends(ScopedTo("http-request:show")),
includes: List[str] = Query(None),
):
# probably a bunch of access bypasses with scopes via includes
# need easy way to scope for includes
includes = only(includes, ["http_server", "zone"], values=True)
http_request = (
http_request_repo.loads(includes)
.get_or_fail(http_request_id)
.includes(includes)
.data()
)
return HttpRequestResponse(http_request=http_request)