forked from tiangolo/full-stack-fastapi-template
-
Notifications
You must be signed in to change notification settings - Fork 0
/
login.py
96 lines (84 loc) · 3.15 KB
/
login.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
from datetime import timedelta
from fastapi import APIRouter, Body, Depends, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app import crud
from app.api.utils.db import get_db
from app.api.utils.security import get_current_user
from app.core import config
from app.core.jwt import create_access_token
from app.core.security import get_password_hash
from app.models.user import User as DBUser
from app.schemas.msg import Msg
from app.schemas.token import Token
from app.schemas.user import User
from app.utils import (
generate_password_reset_token,
send_reset_password_email,
verify_password_reset_token,
)
router = APIRouter()
@router.post("/login/access-token", response_model=Token, tags=["login"])
def login_access_token(
db: Session = Depends(get_db), form_data: OAuth2PasswordRequestForm = Depends()
):
"""
OAuth2 compatible token login, get an access token for future requests
"""
user = crud.user.authenticate(
db, email=form_data.username, password=form_data.password
)
if not user:
raise HTTPException(status_code=400, detail="Incorrect email or password")
elif not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
access_token_expires = timedelta(minutes=config.ACCESS_TOKEN_EXPIRE_MINUTES)
return {
"access_token": create_access_token(
data={"user_id": user.id}, expires_delta=access_token_expires
),
"token_type": "bearer",
}
@router.post("/login/test-token", tags=["login"], response_model=User)
def test_token(current_user: DBUser = Depends(get_current_user)):
"""
Test access token
"""
return current_user
@router.post("/password-recovery/{email}", tags=["login"], response_model=Msg)
def recover_password(email: str, db: Session = Depends(get_db)):
"""
Password Recovery
"""
user = crud.user.get_by_email(db, email=email)
if not user:
raise HTTPException(
status_code=404,
detail="The user with this username does not exist in the system.",
)
password_reset_token = generate_password_reset_token(email=email)
send_reset_password_email(
email_to=user.email, email=email, token=password_reset_token
)
return {"msg": "Password recovery email sent"}
@router.post("/reset-password/", tags=["login"], response_model=Msg)
def reset_password(token: str = Body(...), new_password: str = Body(...), db: Session = Depends(get_db)):
"""
Reset password
"""
email = verify_password_reset_token(token)
if not email:
raise HTTPException(status_code=400, detail="Invalid token")
user = crud.user.get_by_email(db, email=email)
if not user:
raise HTTPException(
status_code=404,
detail="The user with this username does not exist in the system.",
)
elif not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
hashed_password = get_password_hash(new_password)
user.hashed_password = hashed_password
db.add(user)
db.commit()
return {"msg": "Password updated successfully"}