-
Notifications
You must be signed in to change notification settings - Fork 0
/
config.py
152 lines (118 loc) · 4.26 KB
/
config.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
from argon2 import PasswordHasher
from datetime import datetime
from fastapi import HTTPException, Request, Security
from fastapi.security.api_key import APIKeyQuery
import jwt
import mysql.connector as database
import os
from pyjarowinkler.distance import get_jaro_distance
import random
import string
import unicodedata
connection = database.connect(
user = os.environ["zythogora_db_username"],
password = os.environ["zythogora_db_password"],
host = os.environ["zythogora_db_host"],
database = os.environ["zythogora_db_database"]
)
key = APIKeyQuery(name="api-key", auto_error=False)
async def get_api_key(request: Request, key: str = Security(key)):
# JWT Auth
jwt_header = request.headers.get("Authorization")
if jwt_header:
try:
# FIXME: EXPIRATION TIME IS UNCHECKED UNTIL THE MIGRATION IS PERFORMED
token = jwt.decode(jwt_header, os.environ["zythogora_jwt_secret"], algorithms=["HS512"], options={"verify_exp": False})
return {
"user": token["client_id"],
"exp": token["exp"]
}
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=401,
detail={
"error": "AUTH_EXPIRED_ACCESS_TOKEN",
"message": "Your access token has expired."
}
)
except:
raise HTTPException(
status_code=401,
detail={
"error": "AUTH_INVALID_ACCESS_TOKEN",
"message": "That access token is invalid."
}
)
# API Key Auth
connection.ping(reconnect=True)
with connection.cursor(prepared=True) as cursor:
if not key:
raise HTTPException(status_code=401, detail="Unauthorized")
cursor.execute("""
SELECT id, user, key_hash, permissions, is_dev, iat, exp
FROM API_Keys
WHERE key_help LIKE %s
""", (key[:4] + "%" + key[31:],))
query_apikey = cursor.fetchall()
if not query_apikey:
raise HTTPException(status_code=403, detail="Invalid API Key.")
api_key = None
for el in query_apikey:
try:
if PasswordHasher().verify(el[2], key):
api_key = el
break
except:
continue
if not api_key or api_key[6] < datetime.now():
raise HTTPException(status_code=403, detail="Invalid API Key.")
cursor.execute("""
INSERT INTO API_Requests
(api_key, method, endpoint, ip, port)
VALUES (%s, %s, %s, %s, %s)
""", (
api_key[0],
request.method,
request.url.path,
request.client.host,
request.client.port
))
connection.commit()
return {
"user": api_key[1],
"exp": api_key[6]
}
def normalize_str(input_str: str):
lower_str = input_str.lower()
normalized_str = unicodedata.normalize('NFD', lower_str).encode('ASCII', 'ignore').decode("utf-8")
return normalized_str
async def search(search_term: str, term_query, count: int = 10, page: int = 1):
search_term = normalize_str(search_term)
if len(term_query) <= (page - 1) * count:
return [ ]
res = [ ]
contains = [ ]
jaro_winkler = { }
for el in term_query:
term = normalize_str(el[1])
if term == search_term:
res.insert(0, el[0])
elif term.startswith(search_term):
res.append(el[0])
elif search_term in term:
contains.append(el[0])
else:
jaro_winkler[el[0]] = get_jaro_distance(search_term, term, winkler=True)
if len(res) + len(contains) < count * page:
jaro_winkler = [ k for k, v in sorted(jaro_winkler.items(), key=lambda item: item[1]) ]
jaro_winkler.reverse()
else:
jaro_winkler = [ ]
results = (res + contains + jaro_winkler)[(page - 1) * count : page * count]
return results
def get_random_string(length):
return ''.join(random.SystemRandom().choice(
string.ascii_lowercase +
string.ascii_uppercase +
string.digits
) for _ in range(length))