In [7]:
from datetime import datetime, timedelta, timezone
from typing import Annotated

from fastapi import Depends, FastAPI, HTTPException, status
app = FastAPI()

from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=[
        "http://localhost", "https://localhost",
        "https://dx2102.github.io",
    ],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

from fastapi.testclient import TestClient
client = TestClient(app)

from jose import JWTError, jwt

from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

from pydantic import BaseModel


In [8]:

# Auth: Signup and Login

# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE = timedelta(weeks=1)

HTTP_401 = HTTPException(
    401,
    "Failed to authenticate",
)

class User(BaseModel):
    username: str
    hashed_password: str

fake_users_db = {
    "admin": {
        "username": "admin",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
    },
}

class Token(BaseModel):
    # Contains the jwt. Returned by /signup and /login
    access_token: str
    token_type: str

def sign_token(username: str) -> Token:
    return Token(
        access_token=jwt.encode(
            {
                "exp": datetime.now(timezone.utc) + ACCESS_TOKEN_EXPIRE,
                "sub": username,
            },
            SECRET_KEY,
            algorithm=ALGORITHM,
        ),
        token_type="bearer",
    )

async def require_user(token: str = Depends(oauth2_scheme)) -> User:
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
    except JWTError:
        raise HTTP_401
    username: str = payload.get("sub")
    if username is None:
        raise HTTP_401
    if username not in fake_users_db:
        raise HTTP_401
    return User(**fake_users_db[username])

class SignupForm(BaseModel):
    username: str
    password: str
    confirm_password: str

@app.post("/signup")
async def signup(form: SignupForm):
    if form.password != form.confirm_password:
        raise HTTPException(400, detail="Passwords do not match")
    if form.username in fake_users_db:
        raise HTTPException(400, detail="Username already taken")
    fake_users_db[form.username] = dict(
        username=form.username,
        hashed_password=pwd_context.hash(form.password),
    )
    return sign_token(form.username)

@app.post("/login")
async def login(form: OAuth2PasswordRequestForm = Depends()) -> Token:
    if form.username not in fake_users_db:
        raise HTTP_401
    user = User(**fake_users_db[form.username])
    if not pwd_context.verify(form.password, user.hashed_password):
        raise HTTP_401
    return sign_token(user.username)

@app.get("/users/me/", response_model=User)
async def read_users_me(user: User = Depends(require_user)):
    return user


In [9]:
# test auth

if "test" in fake_users_db:
    del fake_users_db["test"] 

print(client.post("/signup", json={
    "username": "test",
    "password": "test",
    "confirm_password": "test",
}).json())

print(client.post("/signup", json={
    # raises User name already taken
    "username": "test",
    "password": "test",
    "confirm_password": "test",
}).json())

print(res := client.post("/login", data={
    "username": "test",
    "password": "test",
}).json())

print(client.get("/users/me", headers={
    "Authorization": f"Bearer {res['access_token']}",
}).json())



{'access_token': 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3MDYwNjg5OTcsInN1YiI6InRlc3QifQ.abDYEx87FzodjMG5RL3N0F5jlyXJHjZQvgxpw7lZTko', 'token_type': 'bearer'}
{'detail': 'Username already taken'}
{'access_token': 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3MDYwNjg5OTgsInN1YiI6InRlc3QifQ.0T5Z5mDI9XBBNc_N_W-xTM8zj53ljaH4IdvsCNriaTU', 'token_type': 'bearer'}
{'username': 'test', 'hashed_password': '$2b$12$Renpih/NB2sH9cjpkRxjXuyQ68cV8LHTtAD86twJI/g2.AdY7T8ja'}


In [10]:
print(res := client.post("/login", data={
    "username": "admin",
    "password": "adl2024",
}).json())

{'detail': 'Failed to authenticate'}


In [11]:
import uvicorn
import nest_asyncio

if __name__ == "__main__":
    nest_asyncio.apply()
    #uvicorn.run(app, host="0.0.0.0", port=80)

