-
Notifications
You must be signed in to change notification settings - Fork 1
/
auth.py
148 lines (119 loc) · 4.19 KB
/
auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#!/usr/bin/env python3
import asyncio
import datetime
import json
import os
import requests
import string
from types import FunctionType
import API.api as api
from game import *
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
class Auth:
auth = {}
ssl_verify = True
def __init__(self):
if os.getenv("SSL_VERIFY"):
self.ssl_verify = bool(int(os.getenv("SSL_VERIFY")))
self.read_cache()
def get_cipher(self):
token = bytes(os.getenv("API_TOKEN"), "latin-1")
key = (token * 16)[:32]
iv = key[:16]
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor()
decryptor = cipher.decryptor()
return encryptor, decryptor
def write_cache(self):
filename = os.getenv("CACHE_FILE", ".cache")
encryptor, _ = self.get_cipher()
with open(filename, "wb") as f:
json_bytes = bytes(json.dumps(self.auth), "utf-8")
padding = (16 - len(json_bytes) % 16) * b" "
encrypted_json = (
encryptor.update(json_bytes + padding) + encryptor.finalize()
).decode("latin-1")
encrypted_bytes = bytes(encrypted_json, "latin-1")
f.write(encrypted_bytes)
def read_cache(self):
filename = os.getenv("CACHE_FILE", ".cache")
_, decryptor = self.get_cipher()
try:
if os.path.isfile(filename):
with open(filename, "rb") as f:
encrypted_bytes = f.read().strip()
encrypted_json = (
decryptor.update(encrypted_bytes) + decryptor.finalize()
)
JSON = encrypted_json.decode("latin-1")
self.auth = json.loads(JSON)
except json.decoder.JSONDecodeError as e:
print(e)
def login(
self,
game: Game,
command: FunctionType,
target: string,
author: string,
message: string,
):
loop = asyncio.new_event_loop()
response = loop.run_until_complete(
api.get(game, command, target, message, "auth/token/login")
)
json = {"error": True}
if response:
print(response)
future = datetime.datetime.utcnow() + datetime.timedelta(days=7)
self.auth[author] = {
"character": response["name"],
"token": response["token"],
"expiration": str(future),
}
self.write_cache()
return response
return json
def register(self, username: string, password: string):
headers = {"X-Bot-Token": os.getenv("API_TOKEN")}
response = requests.post(
"{}/api/auth/register".format(os.getenv("API_HOSTNAME")),
data={
"name": username,
"password": password,
"password_confirmation": password,
},
verify=self.ssl_verify,
headers=headers,
)
try:
response.json() # We execute this to test if the JSON is valid
if response.status_code == 200:
return True
return False
except json.decoder.JSONDecodeError:
return False
except:
return False
def check(self, nick):
past = datetime.datetime.utcnow() - datetime.timedelta(days=8)
return (
True
if datetime.datetime.utcnow()
- datetime.datetime.fromisoformat(
self.auth.get(str(nick), {}).get("expiration", str(past))
)
< datetime.timedelta(days=7)
else False
)
def logout(self, nick):
del self.auth[nick]
self.write_cache()
def rename(self, nick, newnick):
self.auth[newnick] = self.auth[nick]
del self.auth[nick]
self.write_cache()
async def get_character(self, nick):
return self.auth.get(nick, {}).get("character", None)
async def get_token(self, nick):
return self.auth.get(nick, {}).get("token", None)