Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added db/__init__.py
Empty file.
8 changes: 8 additions & 0 deletions db/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from sqlalchemy.engine import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

SQL_ALCHEMY_URL = "sqlite:///sqlalchemy.sqlite.todo_info"
engine = create_engine(SQL_ALCHEMY_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
27 changes: 27 additions & 0 deletions db/database_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from sqlalchemy import Column, String, Integer, Boolean, ForeignKey
from sqlalchemy.orm import relationship

from db.database import Base


class User(Base):
__tablename__ = "user"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True)
username = Column(String, unique=True)
hashed_password = Column(String)
first_name = Column(String)
last_name = Column(String)
is_active = Column(Boolean)
todo = relationship("Todo", back_populates="owner")


class Todo(Base):
__tablename__ = "todo"
id = Column(Integer, primary_key=True, index=True)
title = Column(String)
description = Column(String)
priority = Column(Integer)
complete = Column(Boolean, default=False)
owner_id = Column(Integer, ForeignKey("user.id"))
owner = relationship("User", back_populates="todo")
20 changes: 20 additions & 0 deletions exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from fastapi import HTTPException
from fastapi import status


def unauthorized_token_exception():
exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid Bearer Token",
headers={"WWW-Authenticate": "Bearer"}
)
return exception


def authenticate_credentials_exceptions():
exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Username Or password Doesn't Match",
headers={"WWW-Authenticate": "Bearer"}
)
return exception
Empty file added logics/__init__.py
Empty file.
11 changes: 11 additions & 0 deletions logics/bcrypt_process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from passlib.context import CryptContext

bcrypt_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def get_password_hash(password: str):
return bcrypt_context.hash(password)


def verify_password(password: str, hashed_password: str):
return bcrypt_context.verify(password, hashed_password)
74 changes: 74 additions & 0 deletions logics/database_operation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from sqlalchemy.orm import Session
from sqlalchemy import exists
from db import database_models
import schemas
from fastapi import Form
from logics.bcrypt_process import get_password_hash


def get_all_todo_list(db: Session, user_id: int):
todo_list = list(map(
lambda x: {"id": x.id, "title": x.title, "description": x.description, "priority": x.priority,
"complete": x.complete},
db.query(database_models.Todo).filter_by(owner_id=user_id).all()))
return todo_list


def save_todo_in_database(db: Session, todo_info: schemas.TodoInfoWithoutComplete,
user_id: int):
todo_model = database_models.Todo()
todo_model.title = todo_info.title
todo_model.description = todo_info.description
todo_model.priority = todo_info.priority
todo_model.owner_id = user_id
db.add(todo_model)
db.commit()


def get_todo_by_id_from_database(db: Session, _id, user_id: int):
todo = db.query(database_models.Todo).filter_by(id=_id, owner_id=user_id).first()
if todo:
return {"title": todo.title, "description": todo.description, "priority": todo.priority,
"complete": todo.complete}
else:
return None


def update_todo_status_in_database(db: Session, todo: schemas.TodoInfo, user_id: int):
valid_todo_params = {}
for key, value in todo.dict().items():
if key == "id":
continue
if value is not None:
valid_todo_params[key] = value
if len(valid_todo_params) == 0:
return 0
update_status = db.query(database_models.Todo).filter_by(id=todo.id, owner_id=user_id).update(
valid_todo_params
)
db.commit()
return update_status


def is_already_exist_in_database(db: Session, search_colum, search_value):
if search_colum == "username":
return db.query(exists().where(database_models.User.username == search_value)).scalar()
elif search_colum == "email":
return db.query(exists().where(database_models.User.email == search_value)).scalar()


def save_user_in_database(db: Session, form_data: Form):
user_table_row = database_models.User()
if is_already_exist_in_database(db, "email", form_data.email):
return False
user_table_row.email = form_data.email
if is_already_exist_in_database(db, "username", form_data.username):
return False
user_table_row.username = form_data.username
user_table_row.hashed_password = get_password_hash(form_data.password)
user_table_row.first_name = form_data.first_name
user_table_row.last_name = form_data.last_name
user_table_row.is_active = form_data.is_active
db.add(user_table_row)
db.commit()
return True
23 changes: 23 additions & 0 deletions logics/json_web_token.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from datetime import timedelta, datetime
from typing import Optional

from jose import jwt
from fastapi.security import OAuth2PasswordBearer

SECRET_KEY = "KRWVCMPAUTFDJSLL"
ALGORITHM = "HS256"

oauth2_bearer = OAuth2PasswordBearer(tokenUrl="token")


def create_jwt_access_token(username: str, user_id: int, expires_delta: Optional[timedelta]):
encode = {
"sub": username,
"id": user_id,
}
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
encode["exp"] = expire
return jwt.encode(encode, SECRET_KEY, algorithm=ALGORITHM)
11 changes: 11 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from fastapi import FastAPI

from db import database_models
from db.database import engine
from routers import auth, signup, todos

app = FastAPI()
database_models.Base.metadata.create_all(engine)
app.include_router(signup.router)
app.include_router(auth.router)
app.include_router(todos.router)
5 changes: 5 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
fastapi==0.81.0
jose==1.0.0
passlib==1.7.4
pydantic==1.7.4
SQLAlchemy==1.3.22
Empty file added routers/__init__.py
Empty file.
56 changes: 56 additions & 0 deletions routers/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from datetime import timedelta

from fastapi import APIRouter, Depends
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from jose import jwt, JWTError
from db import database_models
from exceptions import authenticate_credentials_exceptions, unauthorized_token_exception
from logics.bcrypt_process import verify_password
from logics.json_web_token import create_jwt_access_token, oauth2_bearer, SECRET_KEY, ALGORITHM
from db.database import SessionLocal

router = APIRouter(
prefix="/auth",
tags=["auth"]
)


def get_db():
db = None
try:
db = SessionLocal()
yield db
finally:
db.close()


def authenticate_user(user_name: str, password: str, db: Session):
query_result = db.query(database_models.User).filter_by(username=user_name).first()
if query_result:
if verify_password(password, query_result.hashed_password):
return query_result
return None


@router.post("/token")
async def get_access_token_by_login(form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db)):
authenticated_user = authenticate_user(form_data.username, form_data.password, db)
if authenticated_user:
token_expires = timedelta(minutes=20)
token = create_jwt_access_token(authenticated_user.username, authenticated_user.id, token_expires)
return {"token": token}
raise authenticate_credentials_exceptions()


def get_current_user_from_jwt_token(token: str = Depends(oauth2_bearer)):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
user_id: int = payload.get("id")
if username is None or user_id is None:
raise unauthorized_token_exception()
return {"username": username, "id": user_id}
except JWTError:
raise unauthorized_token_exception()
30 changes: 30 additions & 0 deletions routers/signup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from fastapi import APIRouter, Depends
import schemas
from sqlalchemy.orm import Session

from logics.database_operation import save_user_in_database
from db.database import SessionLocal

router = APIRouter(
prefix="/signup",
tags=["signup"]
)


def get_db():
db = None
try:
db = SessionLocal()
yield db
finally:
db.close()


@router.post("/email")
async def save_user(form_data: schemas.CreateUser = Depends(schemas.CreateUser.as_form),
db: Session = Depends(get_db)):
response = save_user_in_database(db, form_data)
if response:
return {"message": "User Added Successfully"}
else:
return {"message": "Username Or Email Already In Use"}
71 changes: 71 additions & 0 deletions routers/todos.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from typing import List, Optional, Union, Dict

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session

import schemas
from db.database import SessionLocal
from logics.database_operation import get_all_todo_list, save_todo_in_database, get_todo_by_id_from_database, \
update_todo_status_in_database
from routers.auth import get_current_user_from_jwt_token, unauthorized_token_exception

router = APIRouter(
prefix="/todo",
tags=["todo"]
)


def get_db():
db = None
try:
db = SessionLocal()
yield db
finally:
db.close()


@router.get("/")
async def root():
return {"message": "Welcome To Heaven"}


@router.get("/db", response_model=Union[Dict[str, List[schemas.TodoInfo]], Dict[str, schemas.TodoInfo], Dict[str, str]])
async def get_todo(_id: Optional[int] = None, db: Session = Depends(get_db),
user: Dict[str, Union[str, int]] = Depends(get_current_user_from_jwt_token)):
if not user:
raise unauthorized_token_exception()
if _id:
todo = get_todo_by_id_from_database(db, _id, user["id"])
if todo:
return {"data": todo}
else:
raise HTTPException(status_code=422, detail="No Todo Found On This ID")
try:
todo_lists = get_all_todo_list(db, user["id"])
if not todo_lists:
raise HTTPException(status_code=421, detail="No Data Found On Database")
return {"data": todo_lists}
except Exception:
raise HTTPException(status_code=400, detail="No Database Found")


@router.post("/save")
async def save_todo(todo: schemas.TodoInfoWithoutComplete, db: Session = Depends(get_db),
user: Dict[str, Union[str, int]] = Depends(get_current_user_from_jwt_token)):
if user:
save_todo_in_database(db, todo, user["id"])
return {"message": "Todo Created Successfully"}
raise unauthorized_token_exception()


@router.put("/update")
async def update_todo(todo: schemas.TodoInfo = Depends(), db: Session = Depends(get_db),
user: Dict[str, Union[str, int]] = Depends(get_current_user_from_jwt_token)):
# print(todo.dict())
if user:
update_success = update_todo_status_in_database(db, todo, user["id"])
if not update_success:
raise HTTPException(status_code=420, detail="Update Failed.. Check Url Params")
return {"message": "Successfully Update Data"}
else:
raise unauthorized_token_exception()
Loading