-
Notifications
You must be signed in to change notification settings - Fork 69
/
db_auth.py
62 lines (51 loc) · 2.11 KB
/
db_auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import sqlalchemy as sa
from aiohttp_security.abc import AbstractAuthorizationPolicy
from passlib.hash import sha256_crypt
from . import db
class DBAuthorizationPolicy(AbstractAuthorizationPolicy):
def __init__(self, dbengine):
self.dbengine = dbengine
async def authorized_userid(self, identity):
async with self.dbengine.acquire() as conn:
where = sa.and_(db.users.c.login == identity,
sa.not_(db.users.c.disabled))
query = db.users.count().where(where)
ret = await conn.scalar(query)
if ret:
return identity
else:
return None
async def permits(self, identity, permission, context=None):
if identity is None:
return False
async with self.dbengine.acquire() as conn:
where = sa.and_(db.users.c.login == identity,
sa.not_(db.users.c.disabled))
query = db.users.select().where(where)
ret = await conn.execute(query)
user = await ret.fetchone()
if user is not None:
user_id = user[0]
is_superuser = user[3]
if is_superuser:
return True
where = db.permissions.c.user_id == user_id
query = db.permissions.select().where(where)
ret = await conn.execute(query)
result = await ret.fetchall()
if ret is not None:
for record in result:
if record.perm_name == permission:
return True
return False
async def check_credentials(db_engine, username, password):
async with db_engine.acquire() as conn:
where = sa.and_(db.users.c.login == username,
sa.not_(db.users.c.disabled))
query = db.users.select().where(where)
ret = await conn.execute(query)
user = await ret.fetchone()
if user is not None:
hashed = user[2]
return sha256_crypt.verify(password, hashed)
return False