forked from PythonFreeCourse/calendar
/
user.py
94 lines (71 loc) · 2.31 KB
/
user.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
from typing import List
from pydantic import BaseModel, Field
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session
from app.dependencies import get_db
from app.database.models import Event, User, UserEvent
from app.internal.utils import save
from fastapi import APIRouter, Depends
router = APIRouter(
prefix="/user",
tags=["user"],
responses={404: {"description": "Not found"}},
)
class UserModel(BaseModel):
username: str
password: str
email: str = Field(regex='^\\S+@\\S+\\.\\S+$')
language: str
language_id: int
@router.get("/list")
async def get_all_users(session=Depends(get_db)):
return session.query(User).all()
@router.get("/")
async def get_user(id: int, session=Depends(get_db)):
return session.query(User).filter_by(id=id).first()
@router.post("/")
def manually_create_user(user: UserModel, session=Depends(get_db)):
create_user(**user.dict(), session=session)
return f'User {user.username} successfully created'
def create_user(username: str,
password: str,
email: str,
language_id: int,
session: Session) -> User:
"""Creates and saves a new user."""
user = User(
username=username,
password=password,
email=email,
language_id=language_id
)
save(session, user)
return user
def get_users(session: Session, **param):
"""Returns all users filter by param."""
try:
users = list(session.query(User).filter_by(**param))
except SQLAlchemyError:
return []
else:
return users
def does_user_exist(
session: Session,
*, user_id=None,
username=None, email=None
):
"""Returns True if user exists, False otherwise.
function can receive one of the there parameters"""
if user_id:
return len(get_users(session=session, id=user_id)) == 1
if username:
return len(get_users(session=session, username=username)) == 1
if email:
return len(get_users(session=session, email=email)) == 1
return False
def get_all_user_events(session: Session, user_id: int) -> List[Event]:
"""Returns all events that the user participants in."""
return (
session.query(Event).join(UserEvent)
.filter(UserEvent.user_id == user_id).all()
)