## Fast Api

In [None]:
# Get request

@app.get("/")
def read_root():
    return {"Hello": "World"}

## Post Request 

In [None]:
# Firstly when we use post reqest it help us to create a new data in the server.
# Simple post request which have no data than it behave like get but we need to send and receive data 
# that's why se use post request. So we send the data in the body of the request and we receive the data in the body of the response.

@app.post("/items/")
def create_item():
    return {"hello" : "world"}


Data Extract from the body of the payload

In [None]:
@app.post("/add")
def create_post(payload: dict = Body(...)):
    print(payload)
    return {"message": "Data received"}

Here we extract the data and send it back from the request

In [None]:
# how to send data in a body of a post request and also extract that data from the body.
# normally we store that data in the database.
@app.post("/new_post")
def create_post(payload: dict = Body(...)):
    print(payload)
    return {"new_post": f"title {payload['title']} content: {payload['content']}"}

# Here issue is that client send any data they want to send. So we need to validate the data before storing it in the database.
# that's why we use pydantic model to validate the data.

Why we need schema 

It's pain to get all the values from the body. <br>
The client can send whatever data they want. <br>
The data isn't getting validated. <br>
We ultimately want to force the client to send data in a schema that we expect. <br>

For that we use library called pydantic it validate our data .

In [None]:
class Post(BaseModel):
    title: str
    content: str
    published: bool = True
    rating: Optional[int] = None

@app.post("/new_post")
def create_post(new_post:Post):
    print(new_post)
    return{"new" :"post_created"}

We use pydantic model_dump/ dict to convert the body data into dictionary

In [None]:
# Now work with api to retrieve post 
# In fast api when we send array data to it it gonna serialize it on way means it convert 
# array into a json format.

new_posts = [{"title": "post1", "content": "content1","id":1},
             {"title": "post2", "content": "content2","id":2}]

@app.get("/posts")
def get_posts():
    return {"posts": new_posts}

In [None]:
# This method is used to update the post in the database. Here we have no db that's why we 
# use list to append it with unique id.

@app.post("/new_posts")
def create_new_post(post: Post):
    post_dict = post.dict()
    post_dict['id'] = randrange(0,100000)
    new_posts.append(post_dict)
    print(new_posts)
    return {"data": post_dict}

### Path parameter

In [None]:

@app.get("/posts/{post_id}")
def get_post(post_id: int):
    return {"post": f"here is post {post_id}"}

In [None]:
# no need to hard code the error value use http exception to handle the error.


@app.get("/posts/{post_id}")
def get_post(post_id: int , response: Response):
    post = find_post(post_id)
    if not post:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, 
                            detail=f"post with id {post_id} not found")
        # response.status_code = status.HTTP_404_NOT_FOUND
        # return {"detail": f"post with id {post_id} not found"}
    return {"post": f"here is post {post}"}

## Delete Post

In [None]:
@app.delete("/posts/{post_id}")
def post_delete(post_id: int):
    index = find_index_post(post_id)    
    new_posts.pop(index) # type: ignore
    return {"message": "post deleted successfully"}

here we use http 204 status code to show that the post is deleted successfully. But this error msg
is not returned any message.

@app.delete("/posts/{post_id}")
def post_delete(post_id: int):
    index = find_index_post(post_id)    
    new_posts.pop(index) # type: ignore
    return Response(status_code=status.HTTP_204_NO_CONTENT)

## Update Method (PUT)

In [None]:
# In this put mehtod we receive data from the front end we store it in post variable and then 
# we find the index of the post and then we update the post in the list.

@app.put("/posts/{post_id}")
def update_post(post_id: int, post: Post):
    index = find_index_post(post_id)
    if index is None:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, 
                            detail=f"post with id: {post_id} not found")
    post_dict = post.dict()
    post_dict['id'] = post_id
    new_posts[index] = post_dict
    return {"message": "post updated successfully"}


## Database (postgres)



Here we can established a new connection to the database.

In [None]:
# This is the hard code value which is given which is not a good practice so we can convert in to 
# dynamic value.

    try:
        conn = psycopg2.connect(host="localhost",database="fastapi",user="postgres",
                                password="Pk135430",cursor_factory=RealDictCursor)
        curr = conn.cursor()
        print("Database connected successfully")
        break
    except Exception as e:
        print("Database connection error")
        print(f"Error: {e}")
        time.sleep(5)


Here we can enter data into the database and than commit it.

In [None]:
@app.post("/new_posts", status_code= status.HTTP_201_CREATED)
def create_new_post(post: Post):
    
    # This is work but this can cause a sql injection probelem which attacker is used.
    # curr.execute(f"""INSERT INTO post (title, content) VALUES ('{post.title}', '{post.content}')""")
    # so we use
    curr.execute("""INSERT INTO post (title, content,published) VALUES (%s, %s,%s) 
                 RETURNING *""",(post.title, post.content,post.published))
    new_post = curr.fetchone()
    conn.commit()
    return {"new_post": new_post}
    

Fetching an individual post by an id

In [None]:
@app.post("/fetch_one/{post_id}")
def get_onepost(post_id: int):
    curr.execute(""" SELECT * FROM post where id = 1 """)
    test_post = curr.fetchone()
    print(test_post)
    post = find_post(post_id)
    if not post:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, 
                            detail=f"post with id: {post_id} not found")
    return {"post": post}

In [3]:
# This is also the same but it doesn't take hard code value it take dynamic value.

@app.post("/fetch_one/{post_id}")
def get_onepost(post_id: int):
    curr.execute(""" SELECT * FROM post where id = %s """,(str(post_id)))
    post = curr.fetchone()
    if not post:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, 
                            detail=f"post with id: {post_id} not found")
    return {"post": post}

### Delete Post 

In [None]:
@app.delete("/delete_post/{post_id}")
def del_post(post_id:int):
    curr.execute(""" DELETE FROM post where id = %s returning * """,(str(post_id),))
    delete_post = curr.fetchone()
    conn.commit()
    if delete_post == None:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, 
                            detail=f"post with id: {post_id} not found")
    return Response(status_code = status.HTTP_204_NO_CONTENT)


### SqlAlchemy

 In sql alchemy we have 3 type of code's<br>
 1. Database : In data base we have all the connection related database <br>
 2. Model : In this we define class as a table name we call it models<br>
 3. Main : Here is all the code which is fast api and run  <br>

In [1]:
# Database 

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

# SQLALCHEMY_DATABASE_URL = 'postgresql://<username>:<password>@localhost/<database>'
SQLALCHEMY_DATABASE_URL = 'postgresql://postgres:Pk135430@localhost/fastapi'

engine = create_engine(SQLALCHEMY_DATABASE_URL)

SessionLocal = sessionmaker(autocommit=False,autoflush=False,bind=engine)

Base = declarative_base()

#dependency
def get_db(): 
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close() 

In [None]:
#model 



from database import Base 
from sqlalchemy import Column, Integer, String, Boolean
from sqlalchemy.sql.sqltypes import TIMESTAMP
from sqlalchemy.sql.expression import text

class Post(Base):
    __tablename__ = 'posts'
    id = Column(Integer, primary_key=True, nullable=False)
    title = Column(String, nullable=False)
    content = Column(String, nullable=False)
    published = Column(Boolean, server_default = "True", nullable=False)
    created_at = Column(TIMESTAMP(timezone=True), nullable=False, server_default=text('now()'))
 

In [None]:
# main 

from typing import Optional
from fastapi import FastAPI , Body , Response ,status ,HTTPException , Depends
from pydantic import BaseModel
from random import randrange
import psycopg2
from psycopg2.extras import RealDictCursor
import time
from sqlalchemy.orm import Session
import models
from models import *
from database import engine , get_db

models.Base.metadata.create_all(bind=engine)

app = FastAPI()

class Post(BaseModel):
    title: str
    content: str
    published: bool = True
    # rating: Optional[int] = None
    # ratings: int | None = None

while True:
    try:
        conn = psycopg2.connect(host="localhost",database="fastapi",user="postgres",
                                password="Pk135430",cursor_factory=RealDictCursor)
        curr = conn.cursor()
        print("Database connected successfully")
        break
    except Exception as e:
        print("Database connection error")
        print(f"Error: {e}")
        time.sleep(5)



In [2]:
# Retreive data from the database


@app.get("/sqlalchemy")
def test_post(db: Session = Depends(get_db)):
    return {"status": "ok"}



In [None]:
# Creaet a post using ORM 

@app.post("/postss", status_code= status.HTTP_201_CREATED)
def create_postss(post:Post , db:Session = Depends(get_db)):
    new_post = models.Post(title = post.title, content = post.content,
                            published = post.published)
    db.add(new_post)
    db.commit()
    db.refresh(new_post)
    
    return {"new_post": new_post}

# The above code is used to create a post in the database using ORM.But in this code their
# is a little catch (title = post.title, content = post.content,published = post.published)
# When we have a lot of fields we manually assign the value to the fields so this is not a good
# practice. So we use dictionary unpacking to assign the value to the fields.

@app.post("/postss", status_code= status.HTTP_201_CREATED)
def create_postss(post:Post , db:Session = Depends(get_db)):
    new_post = models.Post(**post.dict())
    db.add(new_post)
    db.commit()
    db.refresh(new_post)
    
    return {"new_post": new_post}

# Above two code is do the same work but the second code is more efficient than the first one.

In [None]:
# Here we filter out the specifc post from the database.

@app.get("/posts/{post_id}")
def get_post(post_id: int , db:Session = Depends(get_db)):
    post = db.query(models.Post).filter(models.Post.id == post_id).first()
    if not post:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, 
                            detail=f"post with id: {post_id} not found")
    return {"post": post}


In [None]:
# Delete a post  from the database.

@app.delete("/delete_post/{post_id}", status_code= status.HTTP_204_NO_CONTENT)
def del_post(post_id:int , db:Session = Depends(get_db)):
#     curr.execute(""" DELETE FROM post where id = %s returning * """,(str(post_id),))
#     delete_post = curr.fetchone()
#     conn.commit()
#     if delete_post == None:
    delete_post = db.query(models.Post).filter(models.Post.id == post_id)
    print(delete_post)
    if delete_post.first() == None:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, 
                            detail=f"post with id: {post_id} not found")
    
    delete_post.delete(synchronize_session=False)
    db.commit()
    return {"message": "post deleted successfully"}

In [None]:
# Updated a Post in the database.

@app.put("/update_post/{post_id}")
def updated_post(post_id:int, post:Post, db:Session = Depends(get_db)):
    post_query = db.query(models.Post).filter(models.Post.id == post_id)
    updated_post = post_query.first()
    if updated_post == None:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, 
                            detail=f"post with id: {post_id} not found")
    post_query.update(post.model_dump(), synchronize_session=False)
    db.commit()
    return {"data": post_query.first()}

### create a new file called schemas.py <br>
so we add schema in to schemas file. <br>


Pydantic Models Deep Dive <br>
We create user pydantic model in which user can register their self. <br>
Response Model <br>
In sql alchemy model we create request model and response model.In request model usercreate we accept email and password and if it fulfill it than give back a response of id and email.<br>
Creating Users Table <br>
User Registration Path Operation <br>

In [None]:
# Above we are using email vaidator so to validate the email. We use pydantic email validator.

# Now i'm working with password hashing technique which uses brypt algorithm to hash the password.

from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# this is use to tell that which algo is used to bcrypt the password 

@app.post("/users/" ,status_code=status.HTTP_201_CREATED,response_model=schemas.UserOut)
def create_user(user: schemas.UserCreate, db:Session = Depends(get_db)):

    # hash the password
    hashed_password = pwd_context.hash(user.password)
    user.password = hashed_password

    # new_user = models.User(email = user.email, password = user.password)
    new_user = models.User(**user.model_dump())
    db.add(new_user)
    db.commit()
    db.refresh(new_user)
    return new_user

In [None]:
# User get by their id 

@app.get("/users/{user_id}", response_model=schemas.UserOut)
def get_user(user_id:int , db:Session =Depends(get_db)):
    user = db.query(models.User).filter(models.User.id == user_id).first()
    if not user:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, 
                            detail=f"user with id: {user_id} not found")
    return user

In [2]:
Now here we are seperate every route in the routers folder,
we create a post and user router. 

also use ApiRouter from fast api to set the router in the main file.
ApiRouter is used to set the prefix so we don't need to write the prefix in every route.



In [None]:
from fastapi import  APIRouter

router = APIRouter(
    prefix="/posts",
    tags=["posts"]
)

### JWT Token Autherization.

In jwt we have three things. <br>
1:- Header <br>
2:- Payload <br>
3:- Secret <br>

Pass these three to the api for given permission. Header and payload is visible so we use secret to generate a new signature so only authorize will login in this.

In [None]:
# For login is firstly make a auth rout in the router folder 
# and then create a login route in the auth router.

# than make schema inside the login_user inside the schema.

# for to verify the password we use verify_password method from the passlib library.in utils folder. 

# pass the route to the main file and import the router in the main file.

# than chk this user api accordingly in postman. 

In [None]:
@router.post("/login")
def login(login_user:Userlogin,db: Session = Depends(get_db)):

    user = db.query(models.User).filter(models.User.email == login_user.email).first()

    if not user:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Invalid Credentials")
    
    if not utils.verify(login_user.password, user.password):
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Incorrect password")
    
    return {"message": "Successfully logged in"}

In [None]:
# schema for the login user
class Userlogin(BaseModel):
    email: EmailStr
    password: str

In [None]:
# utils folder
def verify(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

In [None]:
# JWT way 

# First we install the pyjwt library to generate the token or python-jose[cryptography].

In [None]:
@router.post("/login")
def login(login_user:Userlogin,db: Session = Depends(get_db)):

    user = db.query(models.User).filter(models.User.email == login_user.email).first()

    if not user:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Invalid Credentials")
    
    if not utils.verify(login_user.password, user.password):
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Incorrect password")
    

    access_token = oauth2.create_access_token(data={"user_id": user.id})


    return {"access_token": access_token, "token_type": "bearer"}

In [None]:
we create a folder called oauth2 and add this in it.

from jose import JWTError, jwt
from datetime import datetime, timedelta
#secret key
#Algorithm
#expires time

SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

def create_access_token(data: dict):
    to_encode = data.copy()

    expire = datetime.now() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

    return encoded_jwt

In [None]:
# Now we use fastapi lib 
# from fastapi.security.oauth2 import OAuth2PasswordRequestForm

# by usiing this we don't need to create a schema for the login user.

# it acutomatically create the schema for the login user.

# it can create like 

# {
#     "username": "string",
#     "password": "string"
# }

In [None]:
@router.post("/login")
def login(user_credentials: OAuth2PasswordRequestForm = Depends(),
          db: Session = Depends(get_db)):

    user = db.query(models.User).filter(models.User.email == user_credentials.username).first()

    if not user:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Invalid Credentials")
    
    if not utils.verify(user_credentials.password, user.password):
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Incorrect password")
    

    access_token = oauth2.create_access_token(data={"user_id": user.id})


    return {"access_token": access_token, "token_type": "bearer"}

In [None]:
for this above we use form data to send post request in the postman.
not in the body raw.

In [15]:
# def fact(n):
#     if n == 1:
#         return 1
#     return n * fact(n-1)

# print(fact(5))

120


In [None]:
# Now i try to verify the token either it is present or not. 

def verify_token(token: str , credentials_exception):

    try:
        payload = jwt.decode(token , SECRET_KEY, ALGORITHM)
        print(payload)
        id: str = payload.get("users_id") # type: ignore
        if id is None:
            raise credentials_exception
        
        token_data = schemas.TokenData(id=id)
    except JWTError:
        raise credentials_exception

In [None]:
# Two schema is created in the schema folder one is token and the other is token data.
class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    id: Optional[str] = None

#### Now i want to get the current user So we try to verify the token and get the current user id with the help of token.




In [None]:

from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")

def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
                            detail="Could not validate credentials"
                            ,headers={"WWW-Authenticate": "Bearer"})

    return verify_token(token, credentials_exception)       

In [None]:
# than add the Dependency injection in route file so every time we chk it authorization.

# user_id: int = Depends(oauth2.get_current_user)  addd this in the every route 
# where we need to check the authorization.

In [None]:
# Access the login token by using postman it generate the token data and tahan pass this 
# to the authntication bearer in the header of the postman. than use form data to give the
# id and password than create a new post in the postman.