[Reference](https://medium.com/@lautisuarez081/fastapi-best-practices-and-design-patterns-building-quality-python-apis-31774ff3c28a)

In [2]:
!pip install fastapi

Collecting fastapi
  Downloading fastapi-0.115.0-py3-none-any.whl.metadata (27 kB)
Collecting starlette<0.39.0,>=0.37.2 (from fastapi)
  Downloading starlette-0.38.5-py3-none-any.whl.metadata (6.0 kB)
Downloading fastapi-0.115.0-py3-none-any.whl (94 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m94.6/94.6 kB[0m [31m5.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading starlette-0.38.5-py3-none-any.whl (71 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m71.4/71.4 kB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: starlette, fastapi
Successfully installed fastapi-0.115.0 starlette-0.38.5


In [3]:
from fastapi import APIRouter
from app.models.user import UserCreate, UserRead
from app.db import database

router = APIRouter()

@router.post("/users", response_model=UserRead)
async def create_user(user: UserCreate):
    # Data validation
    if not user.email or not user.password:
        raise ValueError("Email and password are required.")

    # Check if the user already exists
    existing_user = database.fetch_one("SELECT * FROM users WHERE email = :email", {"email": user.email})
    if existing_user:
        raise ValueError("User already exists.")

    # Create a new user in the database
    new_user_id = database.execute("INSERT INTO users (email, password) VALUES (:email, :password)", {
        "email": user.email,
        "password": user.password
    })

    # Get new user details
    new_user = database.fetch_one("SELECT * FROM users WHERE id = :id", {"id": new_user_id})

    return new_user

ModuleNotFoundError: No module named 'app'

In [5]:
# ------- REPOSITORY FILE -------
from app.models.user import UserCreate, UserDB
from app.db import database

class UserRepository:
    def __init__(self, db_session):
        self.db_session = db_session

    async def get_user_by_email(self, email: str) -> UserDB:
        query = "SELECT * FROM users WHERE email = :email"
        return await self.db_session.fetch_one(query, {"email": email})

    async def add_user(self, user_data: UserCreate) -> int:
        query = "INSERT INTO users (email, password) VALUES (:email, :password) RETURNING id"
        values = {"email": user_data.email, "password": user_data.password}
        new_user_id = await self.db_session.execute(query, values)
        return new_user_id

    async def get_user_by_id(self, user_id: int) -> UserDB:
        query = "SELECT * FROM users WHERE id = :id"
        return await self.db_session.fetch_one(query, {"id": user_id})


# ------- SERVICE FILE -------
from app.models.user import UserCreate, UserRead
from app.repositories.user_repository import UserRepository

class UserService:
    def __init__(self, user_repository: UserRepository):
        self.user_repository = user_repository

    async def validate_user_data(self, user_data: UserCreate) -> None:
        if not user_data.email or not user_data.password:
            raise ValueError("Email and password are required.")

    async def check_user_exists(self, email: str) -> None:
        existing_user = await self.user_repository.get_user_by_email(email)
        if existing_user:
            raise ValueError("User already exists.")

    async def create_user(self, user_data: UserCreate) -> UserRead:
        # Business Logic Validation
        await self.validate_user_data(user_data)
        await self.check_user_exists(user_data.email)

        new_user_id = await self.user_repository.add_user(user_data)

        return await self.user_repository.get_user_by_id(new_user_id)


# ------- USER ROUTER FILE -------
from fastapi import APIRouter, Depends
from app.models.user import UserCreate, UserRead
from app.services.user_service import UserService
from app.routers.dependencies import get_user_service

router = APIRouter()

@router.post("/users", response_model=UserRead)
async def create_user(user: UserCreate, user_service: UserService = Depends(get_user_service)):
    return await user_service.create_user(user)

In [7]:
from abc import ABC, abstractmethod
from app.models.user import UserCreate, UserRead

class IUserRepository(ABC):
    @abstractmethod
    async def get_user_by_email(self, email: str) -> UserRead:
        pass

    @abstractmethod
    async def add_user(self, user_data: UserCreate) -> int:
        pass

    @abstractmethod
    async def get_user_by_id(self, user_id: int) -> UserRead:
        pass

In [8]:
from app.models.user import UserCreate, UserRead
from app.db import database
from app.repositories.user_repository_interface import IUserRepository

class UserRepository(IUserRepository):
    def __init__(self, db_session):
        self.db_session = db_session

    async def get_user_by_email(self, email: str) -> UserRead:
        query = "SELECT * FROM users WHERE email = :email"
        return await self.db_session.fetch_one(query, {"email": email})

    async def add_user(self, user_data: UserCreate) -> int:
        query = "INSERT INTO users (email, password) VALUES (:email, :password) RETURNING id"
        values = {"email": user_data.email, "password": user_data.password}
        new_user_id = await self.db_session.execute(query, values)
        return new_user_id

    async def get_user_by_id(self, user_id: int) -> UserRead:
        query = "SELECT * FROM users WHERE id = :id"
        return await self.db_session.fetch_one(query, {"id": user_id})

In [9]:
from app.models.user import UserCreate, UserRead
from app.repositories.user_repository_interface import IUserRepository

class UserService:
    def __init__(self, user_repository: IUserRepository):
        self.user_repository = user_repository

    async def validate_user_data(self, user_data: UserCreate) -> None:
        if not user_data.email or not user_data.password:
            raise ValueError("Email and password are required.")

    async def check_user_exists(self, email: str) -> None:
        existing_user = await self.user_repository.get_user_by_email(email)
        if existing_user:
            raise ValueError("User already exists.")

    async def create_user(self, user_data: UserCreate) -> UserRead:
        await self.validate_user_data(user_data)
        await self.check_user_exists(user_data.email)
        new_user_id = await self.user_repository.add_user(user_data)
        new_user = await self.user_repository.get_user_by_id(new_user_id)
        return new_user

In [10]:
# 1. Data Access Object (DAO)
from app.models.user import UserCreate, UserRead
from app.repositories.user_repository_interface import IUserRepository

class UserService:
    def __init__(self, user_repository: IUserRepository):
        self.user_repository = user_repository

    async def validate_user_data(self, user_data: UserCreate) -> None:
        if not user_data.email or not user_data.password:
            raise ValueError("Email and password are required.")

    async def check_user_exists(self, email: str) -> None:
        existing_user = await self.user_repository.get_user_by_email(email)
        if existing_user:
            raise ValueError("User already exists.")

    async def create_user(self, user_data: UserCreate) -> UserRead:
        await self.validate_user_data(user_data)
        await self.check_user_exists(user_data.email)
        new_user_id = await self.user_repository.add_user(user_data)
        new_user = await self.user_repository.get_user_by_id(new_user_id)
        return new_user

In [11]:
# 2. Service Layer
from app.models.user import UserCreate, UserRead
from app.repositories.user_repository_interface import IUserRepository

class UserService:
    def __init__(self, user_repository: IUserRepository):
        self.user_repository = user_repository

    async def validate_user_data(self, user_data: UserCreate) -> None:
        if not user_data.email or not user_data.password:
            raise ValueError("Email and password are required.")

    async def check_user_exists(self, email: str) -> None:
        existing_user = await self.user_repository.get_user_by_email(email)
        if existing_user:
            raise ValueError("User already exists.")

    async def create_user(self, user_data: UserCreate) -> UserRead:
        await self.validate_user_data(user_data)
        await self.check_user_exists(user_data.email)
        new_user_id = await self.user_repository.add_user(user_data)
        new_user = await self.user_repository.get_user_by_id(new_user_id)
        return new_user