-
Notifications
You must be signed in to change notification settings - Fork 10
/
userdb.py
121 lines (101 loc) · 3.54 KB
/
userdb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import logging
from ipaddress import IPv4Address, AddressValueError
from flask import current_app
from sqlalchemy import create_engine
from sqlalchemy.exc import OperationalError
from sipa.model.user import BaseUserDB
from sipa.backends.exceptions import InvalidConfiguration
logger = logging.getLogger(__name__)
class UserDB(BaseUserDB):
def __init__(self, user):
super().__init__(user)
mask = current_app.config.get('DB_HELIOS_IP_MASK')
self.test_ipmask_validity(mask)
self.ip_mask = mask
@staticmethod
def test_ipmask_validity(mask):
"""Test whether a valid ip mask (at max one consecutive '%') was given
This is being done by replacing '%' with the maximum possible
value ('255'). Thus, everything surrounding the '%' except
for dots causes an invalid IPv4Address and thus a
`ValueError`.
"""
try:
IPv4Address(mask.replace("%", "255"))
except AddressValueError:
raise ValueError("Mask {} is not a valid IP address or contains "
"more than one consecutive '%' sign".format(mask))
@staticmethod
def sql_query(query, args=()):
"""Prepare and execute a raw sql query.
'args' is a tuple needed for string replacement.
"""
database = current_app.extensions['db_helios']
conn = database.connect()
result = conn.execute(query, args)
conn.close()
return result
@property
def has_db(self):
try:
userdb = self.sql_query(
"SELECT SCHEMA_NAME "
"FROM INFORMATION_SCHEMA.SCHEMATA "
"WHERE SCHEMA_NAME = %s",
(self.db_name(),),
).fetchone()
return userdb is not None
except OperationalError:
logger.critical("User db of user %s unreachable", self.db_name())
return None
def create(self, password):
self.sql_query(
"CREATE DATABASE "
"IF NOT EXISTS `%s`" % self.db_name(),
)
self.change_password(password)
def drop(self):
self.sql_query(
"DROP DATABASE "
"IF EXISTS `%s`" % self.db_name(),
)
self.sql_query(
"DROP USER %s@%s",
(self.db_name(), self.ip_mask),
)
def change_password(self, password):
user = self.sql_query(
"SELECT user "
"FROM mysql.user "
"WHERE user = %s",
(self.db_name(),),
).fetchall()
if not user:
self.sql_query(
"CREATE USER %s@%s "
"IDENTIFIED BY %s",
(self.db_name(), self.ip_mask, password,),
)
else:
self.sql_query(
"SET PASSWORD "
"FOR %s@%s = PASSWORD(%s)",
(self.db_name(), self.ip_mask, password,),
)
self.sql_query(
"GRANT SELECT, INSERT, UPDATE, DELETE, "
"ALTER, CREATE, DROP, INDEX, LOCK TABLES "
"ON `{}`.* "
"TO %s@%s".format(self.db_name()),
(self.db_name(), self.ip_mask),
)
def db_name(self):
return self.user.login.value
def register_userdb_extension(app):
try:
app.extensions['db_helios'] = create_engine(
app.config['DB_HELIOS_URI'],
echo=False, connect_args={'connect_timeout': app.config['SQL_TIMEOUT']}
)
except KeyError as exception:
raise InvalidConfiguration(*exception.args)