In [1]:
import aiosqlite
from aiosqlite import Connection
aiosqlite.__version__ # type: ignore

'0.19.0'

In [2]:
import aiosql
aiosql.__version__

'9.0'

In [3]:
from secrets import token_hex
from passlib.context import CryptContext

hasher = CryptContext(schemes=["bcrypt"], deprecated="auto")

In [4]:
con = await aiosqlite.connect("wolfeyauth.db")
await con.executescript("PRAGMA foreign_keys = ON")
queries = aiosql.from_path("app/queries", "aiosqlite")
queries._available_queries

{'create_access',
 'create_scope',
 'create_user',
 'delete_access',
 'delete_scope',
 'delete_user',
 'read_access',
 'read_access_cursor',
 'read_owned_scopes',
 'read_owned_scopes_cursor',
 'read_scope_existence',
 'read_scope_owner',
 'read_user',
 'update_disabled',
 'update_hashedkey',
 'update_scope_owner'}

Create a User

In [19]:
async def create_user( name: str):
    key = token_hex(32)
    hashedkey = hasher.hash(key)
    await queries.create_user(con, name=name, hashedkey=hashedkey)
    return key

await queries.delete_user(con, "wolfey")
wolfey_key = await create_user("wolfey")
print("key:", wolfey_key)
print(await queries.read_user(con, "wolfey"))

key: d713f326ab20bd164e20a4788b3ed6b6554cc99390181f2f65ee588c1ea4fb00
('wolfey', '$2b$12$REfxe92HqIOi.5d4e8ft1.fs/CbEpC.ojNhkALgKL3CVv/sJuah2.', None)


Refresh a User's key

In [6]:
async def refresh_key(con: Connection, name: str):
    key = token_hex(32)
    hashedkey = hasher.hash(key)
    await queries.update_hashedkey(con, name=name, hashedkey=hashedkey)
    return key

wolfey_key = await refresh_key(con, "wolfey")
print("wolfey's new key", wolfey_key)
print(await queries.read_user(con, "wolfey"))

wolfey's new key c0d0071f14a9ba39e7ec7a2b2376a4455d27007759067bfb95e4f77e7fe5d663
('wolfey', '$2b$12$CcDyE2qLPeKtzVl6FLG46OISJwlH/1fIF/vs1ek2nqAuR267hIaEO', None)


In [7]:
async def set_user_disabled(con: Connection, name: str, disabled: bool):
    await queries.update_disabled(con, name=name, disabled=1 if disabled else None)

await set_user_disabled(con, "wolfey", True)
print(await queries.read_user(con, "wolfey"))
await set_user_disabled(con, "wolfey", False)
print(await queries.read_user(con, "wolfey"))

('wolfey', '$2b$12$CcDyE2qLPeKtzVl6FLG46OISJwlH/1fIF/vs1ek2nqAuR267hIaEO', 1)
('wolfey', '$2b$12$CcDyE2qLPeKtzVl6FLG46OISJwlH/1fIF/vs1ek2nqAuR267hIaEO', None)


Create a Scope

In [21]:
async def create_scope_and_access(con, name: str, owner: str):
    await queries.create_scope(con, name, owner)
    return await queries.create_access(con, username=owner, scopename=name)

await queries.delete_scope(con, "wolfey_scope")
await create_scope_and_access(con, "wolfey_scope", "wolfey")

async def read_scopes(con, user: str) -> list[str]:
    scopes = await queries.read_scopes(con, username=user)
    return [scope[0] for scope in scopes]

await read_scopes(con, "wolfey")

['wolfey_scope']

Read owned scopes

In [22]:
await queries.read_owned_scopes(con, owner="wolfey")

[('wolfey_scope', 'wolfey')]

Read scope owner

In [10]:
await queries.read_scope_owner(con, name="wolfey_scope")

'wolfey'

Check an authorized access

In [11]:
try:
    await queries.create_access(con, username="wolfey", scopename="wolfey_scope")
except:
    pass

wolfey_access = await read_scopes(con, "wolfey")
if wolfey_access.count("wolfey_scope") != 0:
    print("wolfey has access!")
else:
    print("wolfey does NOT have access!")

wolfey has access!


Check an unauthorized access

In [12]:
try:
    await create_user("jana")
except:
    pass
access = await read_scopes(con, "jana")
print(access)
if access.count("wolfey_scope") != 0:
    print("jana has access!")
else:
    print("jana does NOT have access!")

[]
jana does NOT have access!


Update scope owner

In [13]:
async def update_owner_and_access(con: Connection, scope_name: str, new_owner: str):
    await queries.update_scope_owner(con, name=scope_name, owner=new_owner)
    return await queries.create_access(con, username=new_owner, scopename=scope_name)

try:
    await update_owner_and_access("wolfey_scope", "jana")
except:
    pass

wolfey_scope_owner = await queries.read_scope_owner(con, name="wolfey_scope")
print(wolfey_scope_owner, "owns wolfey_scope")

access = await read_scopes(con, "jana")
if access.count("wolfey_scope") != 0:
    print("jana has access!")
else:
    print("jana does NOT have access!")

wolfey owns wolfey_scope
jana does NOT have access!


Creating and Deleting an access

In [14]:
await queries.delete_access(con, username="wolfey", scopename="wolfey_scope")
try:
    await queries.create_access(con, username="wolfey", scopename="wolfey_scope")
except:
    pass

wolfey_access = await read_scopes(con, "wolfey")
if wolfey_access.count("wolfey_scope") != 0:
    print("wolfey has access!")
else:
    print("wolfey does NOT have access!")

await queries.delete_access(con, username="wolfey", scopename="wolfey_scope")

if await queries.read_scopes(con, username="wolfey", scopename="wolfey_scope"):
    print("wolfey has access!")
else:
    print("wolfey does NOT have access!")

wolfey has access!
wolfey does NOT have access!


Authenticate User

In [15]:
from enum import Enum

class AuthenticateResult(Enum):
    SUCCESS = 0
    USER_NOT_FOUND = 1
    INVALID_KEY = 2
    USER_DISABLED = 3

async def authenticate_user(name: str, key: str):
    user = await queries.read_user(con, name=name)
    if user is None:
        return AuthenticateResult.USER_NOT_FOUND
    if user[2] is not None:
        return AuthenticateResult.USER_DISABLED
    if not hasher.verify(key, user[1]):
        return AuthenticateResult.INVALID_KEY
    return AuthenticateResult.SUCCESS

print(await authenticate_user("joe_mama", "some_invalid_key"))
print(await authenticate_user("wolfey", "some_invalid_key"))
await set_user_disabled(con, "wolfey", True)
print(await authenticate_user("wolfey", wolfey_key))
await set_user_disabled(con, "wolfey", False)
print(await authenticate_user("wolfey", wolfey_key))

AuthenticateResult.USER_NOT_FOUND
AuthenticateResult.INVALID_KEY
AuthenticateResult.USER_DISABLED
AuthenticateResult.SUCCESS


Obtain a JWT Token

In [16]:
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
from dotenv import load_dotenv
from datetime import datetime, timedelta, timezone
import os
import jwt
load_dotenv()

def get_private_key():
    with open("cert/private_key.pem", "rb") as key_file:
        return serialization.load_pem_private_key(
            key_file.read(),
            password=bytes(os.environ["PRIVATE_KEY_PASSWORD"], encoding="utf-8"),
        ).private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.PKCS8,
            encryption_algorithm=serialization.NoEncryption()
        )
    
def get_public_key():
    with open("cert/public_key.pub", "rb") as key_file:
        return serialization.load_pem_public_key(
            key_file.read()
        ).public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )
        
private_key = get_private_key()
public_key = get_public_key()
TOKEN_LIFETIME_MINUTES = 30
AUTH_SCOPE = os.environ["SCOPE_NAME"]

def create_token(user: str, scopes: list[str]):
    expires = datetime.now(tz=timezone.utc) + timedelta(minutes=TOKEN_LIFETIME_MINUTES)
    payload = { "sub": user, "iss": AUTH_SCOPE, "aud": scopes, "exp": expires }
    return jwt.encode(payload, private_key, algorithm="RS256")

async def login(con: Connection, user: str, key: str):
    """
    Performs authentication + token creation
    Conforms to OAuth2 RFC
    RS256 for central auth scope
    """
    authentication_res = await authenticate_user(user, key)
    if authentication_res != AuthenticateResult.SUCCESS:
        return authentication_res
    
    scopes = await read_scopes(con, user)
    print(f'{user} has scopes: {scopes}')
    token = create_token(user, scopes)
    return token

def verify_token(token: str, scope: str):
    token_bytes = bytes(token, encoding="utf-8")
    return jwt.decode(token_bytes, public_key, issuer=AUTH_SCOPE, audience=scope, algorithms=["RS256"])

await queries.create_access(con, username="wolfey", scopename="wolfey_scope")
wolfey_token = await login(con, "wolfey", wolfey_key)
print(wolfey_token)

if isinstance(wolfey_token, str):
    decoded_token = verify_token(wolfey_token, "wolfey_scope")
    print("Login success!")
    print(decoded_token)

wolfey has scopes: ['wolfey_scope']
eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJ3b2xmZXkiLCJpc3MiOiJ3b2xmZXlfYXV0aCIsImF1ZCI6WyJ3b2xmZXlfc2VydmljZSJdLCJleHAiOjE2OTU1MjIzNDR9.A-_7HCCGrg9JV8_q_P5Wt1ouDwEV9fLqe3YFNrBRU4yngIFuJJIfNMDT8NbPii6m_MoZUP7HV8K4KkUgo75UAWDsTJyUTygGWl2QIbrRkfIq72Ptk-uHI62Vjpz1coq1OjC71qVAy_NRZv3jJUJBd0ZN-Ly6wMwaHY8Z6AN3KaV-0xiOFK5H928t8sDG2axKs8h91iUWHC48zw91cXvJYwbcdoLrymnO5yC7j351P3fuHJYzZeUKMcMkyHZ-qz1T599XqpLZSypcyazPnu_-gRPss_p_ZnZvQzT0ePuLPB3W-k2FadcS0HwuGwNBL8NTSeGB5cu0kcuBDj7z7k7Hbw
Login success!
{'sub': 'wolfey', 'iss': 'wolfey_auth', 'aud': ['wolfey_scope'], 'exp': 1695522344}


Delete a scope

In [17]:
if await queries.read_scope_existence(con, name="wolfey_scope"):
    print("wolfey_scope exists!")
else:
    print("wolfey_scope does not exist!")

await queries.delete_scope(con, name="wolfey_scope")

if await queries.read_scope_existence(con, name="wolfey_scope"):
    print("wolfey_scope exists!")
else:
    print("wolfey_scope does not exist!")

wolfey_scope exists!
wolfey_scope does not exist!


Delete a User

In [18]:
print(await con.execute_fetchall('SELECT name FROM user WHERE name = "wolfey" OR name = "jana"'))

import asyncio

await asyncio.gather(
    queries.delete_user(con, "wolfey"),
    queries.delete_user(con, "jana")
)

await con.execute_fetchall('SELECT * FROM user WHERE name = "wolfey" OR name = "jana"')

[('jana',), ('wolfey',)]


[]