# Databases and ORM

## Relational Database

A key in the same table that is a primary key, if we reference this key on another table, then that is a foreign key.


![blog](https://raw.githubusercontent.com/azataiot/images/master/2021/12/22/blog.png)


### NoSQL databases

- key-value : Redis
- graph : Neo4j
- document oriented : MongoDB

![CleanShot2021-12-22at17.58.41@2x](https://raw.githubusercontent.com/azataiot/images/master/2021/12/22/CleanShot%202021-12-22%20at%2017.58.41@2x.png)

one collection for users, another one for posts.


## Communication with a SQL database with SQLAlchemy

![CleanShot2021-12-22at18.03.27](https://raw.githubusercontent.com/azataiot/images/master/2021/12/22/CleanShot%202021-12-22%20at%2018.03.27.png)

In [4]:

!pip install 'databases[sqlite]'



![CGLn1w](https://raw.githubusercontent.com/azataiot/images/master/2021/12/22/CGLn1w.png)

In [1]:
# models.py

from datetime import datetime
from typing import Optional
import sqlalchemy
from pydantic import BaseModel, Field


class PostBase(BaseModel):
    title: str
    content: str
    publication_date: datetime = Field(default_factory=datetime.now)


class PostPatch(BaseModel):
    title: Optional[str] = None
    content: Optional[str] = None


class PostCreate(PostBase):
    pass


class PostDB(PostBase):
    id: int


""" metadata object: It's role is to keep all the information of a database schema together.
This is why you should create it only once in your whole project and always use the same one throughout.
"""
metadata = sqlalchemy.MetaData()

# we will define a table using the Table class
posts = sqlalchemy.Table(
    #  first argument is the name of the table, followed by the metadata object.
    "posts",
    metadata,
    # we list all the columns that should be defined in our table
    # name of the column, followed by its type and a certain number of options.
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True, autoincrement=True),
    sqlalchemy.Column("publication_date", sqlalchemy.DateTime(), nullable=False),
    sqlalchemy.Column("title", sqlalchemy.String(length=255), nullable=False),
    sqlalchemy.Column("content", sqlalchemy.Text(), nullable=False)
)

In [12]:
metadata

MetaData()

## Connecting to a database

In [2]:
from databases import Database

# database.py

# database engine, followed by authentication information and the hostname of the database server.
DATABASE_URL = "sqlite:///./db.data"
# we instantiate a Database instance using this URL.
# This is the connection layer provided by databases that will allow us to perform asynchronous queries.
database = Database(DATABASE_URL)
# We also define sqlalchemy_engine, which is the standard synchronous connection object provided by SQLAlchemy
sqlalchemy_engine = sqlalchemy.create_engine(DATABASE_URL)


# we define a simple function whose role is to simply return the database instance.


NameError: name 'sqlalchemy' is not defined

In [7]:
def get_database() -> Database:
    return database

In [8]:
# app.py
from typing import Optional, Tuple, List
from fastapi import FastAPI, status, Depends, Query, HTTPException

app = FastAPI()


# Decorating functions with the on_event decorators allows us to trigger some useful logic when FastAPI starts or stops.
@app.on_event("startup")
async def startup():
    # This will ensure that the database connection is open and ready to process requests.
    await get_database().connect()
    # The goal of this method is to create the table's schema inside our database.
    # This method is designed to work with a standard SQLAlchemy engine; this is why we instantiated it earlier.
    metadata.create_all(sqlalchemy_engine)


@app.on_event("shutdown")
async def shutdown():
    await get_database().disconnect()


async def get_pagination(
        skip: int = Query(0, ge=0),
        limit: int = Query(10, ge=10)
) -> Tuple[int, int]:
    capped_limit = min(100, limit)
    return skip, capped_limit


async def get_post_or_404(
        post_id: int, database: Database = Depends(get_database)
) -> PostDB:
    select_query = posts.select().where(posts.c.id == post_id)
    raw_post = await database.fetch_one(select_query)

    if raw_post is None:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
    return PostDB(**raw_post)


@app.get("/posts")
async def list_posts(
        pagination: Tuple[int, int] = Depends(get_pagination),
        database: Database = Depends(get_database),
) -> List[PostDB]:
    skip, limit = pagination
    select_query = posts.select().offset(skip).limit(limit)
    rows = await database.fetch_all(select_query)
    results = [PostDB(**row) for row in rows]
    return results


@app.get("/posts/{id}")
async def get_post(post: PostDB = Depends(get_post_or_404)) -> PostDB:
    return post


@app.post("/posts", response_model=PostDB, status_code=status.HTTP_201_CREATED)
async def create_post(
        post: PostCreate,
        database: Database = Depends(get_database),
) -> PostDB:
    # we rely on the SQLAlchemy expression language, which consists of chained method calls
    # This query is built directly from the posts object, which is the Table instance that we defined earlier.
    insert_query = posts.insert().values(post.dict())
    # Thanks to database, we can execute it asynchronously.
    post_id = await database.execute(insert_query)  #An INSERT query will return the id of the newly inserted row.
    post_db = await get_post_or_404(post_id, database)
    return post_db


@app.patch("/posts/{id}", response_model=PostDB)
async def update_post(
        post_update: PostPatch,
        post: PostDB = Depends(get_post_or_404),
        database: Database = Depends(get_database)
) -> PostDB:
    update_query = (
        posts.update().where(posts.id == id).values(post_update.dict(exclude_unset=True))
    )
    post_id = await database.execute(update_query)
    post_db = await get_post_or_404(post_id, database)
    return post_db


@app.delete("/posts/{id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_post(post: PostDB = Depends(get_post_or_404), database: Database = Depends(get_database)):
    delete_query = posts.delete().where(posts.c.id == post.id)
    await database.execute(delete_query)

NameError: name 'PostDB' is not defined

## Making Select Queries

In [9]:
@app.get("/posts")
async def list_posts(
        pagination: Tuple[int, int] = Depends(get_pagination),
        database: Database = Depends(get_database),
) -> List[PostDB]:
    skip, limit = pagination
    select_query = posts.select().offset(skip).limit(limit)
    # we execute this query with the fetch_all method of database. This method will return a list of rows that match our query
    # Each row is returned to the form of a dictionary that associates column names and their values.
    rows = await database.fetch_all(select_query)

    results = [PostDB(**row) for row in rows]
    return results

NameError: name 'PostDB' is not defined

In [None]:
# get a single object

@app.get("posts/{id}")
async def get_post(post: PostDB = Depends(get_post_or_404)) -> PostDB:
    return post

In [None]:
async def get_post_or_404(
        # you have to provide every argument manually since you are outside the dependency injection context.
        post_id: int, database: Database = Depends(get_database)
) -> PostDB:
    # The first part is to set the actual column we want to compare.
    # Each column is accessible via its name from the c attribute of the table object, that is, posts.c.id
    select_query = posts.select().where(posts.c.id == post_id)
    # Then, we simply call fetch_one on the database object.
    # It's a convenient shortcut when we only expect one row at most.
    raw_post = await database.fetch_one(select_query)

    if raw_post is None:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
    return PostDB(**raw_post)

## Making Update and delete Queries

In [None]:
@app.patch("/posts/{id}", response_model=PostDB)
async def update_post(
        post_update: PostPatch,
        post: PostDB = Depends(get_post_or_404),
        database: Database = Depends(get_database)
) -> PostDB:
    update_query = (
        # we add a WHERE clause to only match the post we want to update.
        # since we are doing a partial update here,
        # you can see that we use the exclude_unset option to only get the values to update.
        posts.update().where(posts.id == id).values(post_update.dict(exclude_unset=True))
    )
    post_id = await database.execute(update_query)
    post_db = await get_post_or_404(post_id, database)
    return post_db

## Adding relationships