In [1]:
%pip install fastapi uvicorn

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.3.1 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [3]:

# ---------------------- Step 1: Basic App ----------------------

from fastapi import FastAPI

app = FastAPI()

# Simple fake database
app_stats = {"visits": 0, "users": 0}

@app.get("/")
def read_root():
    app_stats["visits"] += 1
    return {"message": "Hello World from FastAPI!", "visits": app_stats["visits"]}

@app.get("/stats")
def get_stats():
    return app_stats

@app.post("/register")
def register_user():
    app_stats["users"] += 1
    return {"message": "User registered", "total_users": app_stats["users"]}

import uvicorn
import nest_asyncio
nest_asyncio.apply()

uvicorn.run(app, host="127.0.0.1", port=8000)

INFO:     Started server process [6324]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)


INFO:     127.0.0.1:57665 - "GET /docs HTTP/1.1" 200 OK
INFO:     127.0.0.1:57665 - "GET /openapi.json HTTP/1.1" 200 OK
INFO:     127.0.0.1:57665 - "GET / HTTP/1.1" 200 OK
INFO:     127.0.0.1:57666 - "GET /stats HTTP/1.1" 200 OK
INFO:     127.0.0.1:57672 - "POST /register HTTP/1.1" 200 OK


INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [6324]


In [5]:

# ---------------------- Step 2: Path Parameters ----------------------

from fastapi import FastAPI, HTTPException

app = FastAPI()

# Fake database
users_db = {
    1: {"name": "Alice", "age": 25, "city": "New York"},
    2: {"name": "Bob", "age": 30, "city": "London"},
    3: {"name": "Charlie", "age": 35, "city": "Paris"}
}

products_db = {
    1: {"name": "Laptop", "price": 1000},
    2: {"name": "Phone", "price": 500},
    3: {"name": "Tablet", "price": 300}
}

@app.get("/users/{user_id}")
def get_user(user_id: int):
    if user_id not in users_db:
        raise HTTPException(status_code=404, detail="User not found")
    return users_db[user_id]

@app.get("/products/{product_id}")
def get_product(product_id: int):
    if product_id not in products_db:
        raise HTTPException(status_code=404, detail="Product not found")
    return products_db[product_id]

@app.get("/users/{user_id}/age")
def get_user_age(user_id: int):
    if user_id not in users_db:
        raise HTTPException(status_code=404, detail="User not found")
    return {"user_id": user_id, "age": users_db[user_id]["age"]}

import uvicorn
import nest_asyncio
nest_asyncio.apply()

uvicorn.run(app, host="127.0.0.1", port=8000)

INFO:     Started server process [6324]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)


INFO:     127.0.0.1:57751 - "GET / HTTP/1.1" 404 Not Found
INFO:     127.0.0.1:57751 - "GET /favicon.ico HTTP/1.1" 404 Not Found
INFO:     127.0.0.1:57751 - "GET /docs HTTP/1.1" 200 OK
INFO:     127.0.0.1:57751 - "GET /openapi.json HTTP/1.1" 200 OK
INFO:     127.0.0.1:57752 - "GET /users/1 HTTP/1.1" 200 OK
INFO:     127.0.0.1:57759 - "GET /products/3 HTTP/1.1" 200 OK
INFO:     127.0.0.1:57764 - "GET /users/2/age HTTP/1.1" 200 OK


INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [6324]


In [6]:
# ---------------------- Step 3: Query Parameters ----------------------

from fastapi import FastAPI
from typing import Optional

app = FastAPI()

# Fake database
products_db = [
    {"id": 1, "name": "Laptop", "price": 1000, "category": "electronics"},
    {"id": 2, "name": "Book", "price": 20, "category": "education"},
    {"id": 3, "name": "Phone", "price": 500, "category": "electronics"},
    {"id": 4, "name": "Pen", "price": 5, "category": "office"},
    {"id": 5, "name": "Tablet", "price": 300, "category": "electronics"}
]

@app.get("/products")
def get_products(
    category: Optional[str] = None,
    min_price: Optional[float] = None,
    max_price: Optional[float] = None,
    limit: int = 10
):
    # Filter products based on query parameters
    filtered_products = products_db
    
    if category:
        filtered_products = [p for p in filtered_products if p["category"] == category]
    
    if min_price is not None:
        filtered_products = [p for p in filtered_products if p["price"] >= min_price]
    
    if max_price is not None:
        filtered_products = [p for p in filtered_products if p["price"] <= max_price]
    
    return {
        "products": filtered_products[:limit],
        "total_found": len(filtered_products)
    }

@app.get("/search")
def search_products(q: str):
    # Search in product names
    results = [p for p in products_db if q.lower() in p["name"].lower()]
    return {"query": q, "results": results, "count": len(results)}

import uvicorn
import nest_asyncio
nest_asyncio.apply()

uvicorn.run(app, host="127.0.0.1", port=8000)

Task exception was never retrieved
future: <Task finished name='Task-33' coro=<Server.serve() done, defined at c:\Users\PC\AppData\Local\Programs\Python\Python312\Lib\site-packages\uvicorn\server.py:67> exception=KeyboardInterrupt()>
Traceback (most recent call last):
  File "c:\Users\PC\AppData\Local\Programs\Python\Python312\Lib\site-packages\uvicorn\main.py", line 579, in run
    server.run()
  File "c:\Users\PC\AppData\Local\Programs\Python\Python312\Lib\site-packages\uvicorn\server.py", line 65, in run
    return asyncio.run(self.serve(sockets=sockets))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\PC\AppData\Roaming\Python\Python312\site-packages\nest_asyncio.py", line 30, in run
    return loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\PC\AppData\Roaming\Python\Python312\site-packages\nest_asyncio.py", line 92, in run_until_complete
    self._run_once()
  File "C:\Users\PC\AppData\Roaming\Python\Python312\site-packa

INFO:     127.0.0.1:57827 - "GET / HTTP/1.1" 404 Not Found
INFO:     127.0.0.1:57827 - "GET /favicon.ico HTTP/1.1" 404 Not Found
INFO:     127.0.0.1:57827 - "GET /docs HTTP/1.1" 200 OK
INFO:     127.0.0.1:57827 - "GET /openapi.json HTTP/1.1" 200 OK
INFO:     127.0.0.1:57828 - "GET /products?category=electronics&min_price=20&max_price=1000&limit=10 HTTP/1.1" 200 OK
INFO:     127.0.0.1:57842 - "GET /search?q=electroni HTTP/1.1" 200 OK
INFO:     127.0.0.1:57843 - "GET /search?q=electronics HTTP/1.1" 200 OK
INFO:     127.0.0.1:57846 - "GET /search?q=Laptop HTTP/1.1" 200 OK


INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [6324]


In [None]:
# ---------------------- Step 4: Request Body (POST) ----------------------

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class Item(BaseModel):
    name: str
    price: float
    description: str = None

class User(BaseModel):
    username: str
    email: str
    age: int

# Fake databases
items_db = []
users_db = []
item_id_counter = 1
user_id_counter = 1

@app.post("/items")
def create_item(item: Item):
    global item_id_counter
    new_item = {
        "id": item_id_counter,
        "name": item.name,
        "price": item.price,
        "description": item.description
    }
    items_db.append(new_item)
    item_id_counter += 1
    return {"message": "Item created", "item": new_item}

@app.post("/users")
def create_user(user: User):
    global user_id_counter
    new_user = {
        "id": user_id_counter,
        "username": user.username,
        "email": user.email,
        "age": user.age
    }
    users_db.append(new_user)
    user_id_counter += 1
    return {"message": "User created", "user": new_user}

@app.get("/items")
def get_all_items():
    return {"items": items_db, "total": len(items_db)}

@app.get("/users")
def get_all_users():
    return {"users": users_db, "total": len(users_db)}

import uvicorn
import nest_asyncio
nest_asyncio.apply()

uvicorn.run(app, host="127.0.0.1", port=8000)


In [None]:
# ---------------------- Step 5: Response Models ----------------------

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List

app = FastAPI()

class Item(BaseModel):
    id: int
    name: str
    price: float
    in_stock: bool

class ItemCreate(BaseModel):
    name: str
    price: float
    in_stock: bool = True

# Fake database
items_db = [
    {"id": 1, "name": "Laptop", "price": 1000.0, "in_stock": True},
    {"id": 2, "name": "Phone", "price": 500.0, "in_stock": True},
    {"id": 3, "name": "Tablet", "price": 300.0, "in_stock": False}
]
item_id_counter = 4

@app.get("/items", response_model=List[Item])
def get_items():
    return items_db

@app.get("/items/{item_id}", response_model=Item)
def get_item(item_id: int):
    for item in items_db:
        if item["id"] == item_id:
            return item
    raise HTTPException(status_code=404, detail="Item not found")

@app.post("/items", response_model=Item)
def create_item(item: ItemCreate):
    global item_id_counter
    new_item = {
        "id": item_id_counter,
        "name": item.name,
        "price": item.price,
        "in_stock": item.in_stock
    }
    items_db.append(new_item)
    item_id_counter += 1
    return new_item

@app.get("/items/category/available", response_model=List[Item])
def get_available_items():
    return [item for item in items_db if item["in_stock"]]

import uvicorn
import nest_asyncio
nest_asyncio.apply()

uvicorn.run(app, host="127.0.0.1", port=8000)

In [None]:
# ---------------------- Step 6: Complete CRUD ----------------------

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List

app = FastAPI()

class Task(BaseModel):
    id: int
    title: str
    completed: bool = False
    priority: str = "medium"

class TaskCreate(BaseModel):
    title: str
    priority: str = "medium"

class TaskUpdate(BaseModel):
    title: str = None
    completed: bool = None
    priority: str = None

# Fake database
tasks_db = [
    {"id": 1, "title": "Buy groceries", "completed": False, "priority": "high"},
    {"id": 2, "title": "Walk the dog", "completed": True, "priority": "medium"},
    {"id": 3, "title": "Read a book", "completed": False, "priority": "low"}
]
task_id_counter = 4

# CREATE
@app.post("/tasks", response_model=Task)
def create_task(task: TaskCreate):
    global task_id_counter
    new_task = {
        "id": task_id_counter,
        "title": task.title,
        "completed": False,
        "priority": task.priority
    }
    tasks_db.append(new_task)
    task_id_counter += 1
    return new_task

# READ (all)
@app.get("/tasks", response_model=List[Task])
def get_tasks():
    return tasks_db

# READ (one)
@app.get("/tasks/{task_id}", response_model=Task)
def get_task(task_id: int):
    for task in tasks_db:
        if task["id"] == task_id:
            return task
    raise HTTPException(status_code=404, detail="Task not found")

# UPDATE
@app.put("/tasks/{task_id}", response_model=Task)
def update_task(task_id: int, task_update: TaskUpdate):
    for i, task in enumerate(tasks_db):
        if task["id"] == task_id:
            if task_update.title is not None:
                tasks_db[i]["title"] = task_update.title
            if task_update.completed is not None:
                tasks_db[i]["completed"] = task_update.completed
            if task_update.priority is not None:
                tasks_db[i]["priority"] = task_update.priority
            return tasks_db[i]
    raise HTTPException(status_code=404, detail="Task not found")

# DELETE
@app.delete("/tasks/{task_id}")
def delete_task(task_id: int):
    for i, task in enumerate(tasks_db):
        if task["id"] == task_id:
            deleted_task = tasks_db.pop(i)
            return {"message": "Task deleted", "task": deleted_task}
    raise HTTPException(status_code=404, detail="Task not found")

import uvicorn
import nest_asyncio
nest_asyncio.apply()

uvicorn.run(app, host="127.0.0.1", port=8000)

In [None]:
# ---------------------- Step 7: Error Handling ----------------------

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI()

class User(BaseModel):
    name: str
    age: int
    email: str

class Product(BaseModel):
    name: str
    price: float
    stock: int

# Fake databases
users_db = {
    1: {"name": "Alice", "age": 25, "email": "alice@email.com"},
    2: {"name": "Bob", "age": 30, "email": "bob@email.com"}
}

products_db = {
    1: {"name": "Laptop", "price": 1000.0, "stock": 5},
    2: {"name": "Phone", "price": 500.0, "stock": 0}
}

user_id_counter = 3
product_id_counter = 3

@app.post("/users")
def create_user(user: User):
    global user_id_counter
    
    # Validation errors
    if user.age < 18:
        raise HTTPException(status_code=400, detail="Age must be 18 or older")
    
    if "@" not in user.email:
        raise HTTPException(status_code=400, detail="Invalid email format")
    
    # Check if email already exists
    for existing_user in users_db.values():
        if existing_user["email"] == user.email:
            raise HTTPException(status_code=409, detail="Email already exists")
    
    users_db[user_id_counter] = user.dict()
    user_id_counter += 1
    return {"message": "User created", "user_id": user_id_counter - 1}

@app.get("/users/{user_id}")
def get_user(user_id: int):
    if user_id not in users_db:
        raise HTTPException(status_code=404, detail="User not found")
    return users_db[user_id]

@app.post("/products/{product_id}/buy")
def buy_product(product_id: int, quantity: int = 1):
    if product_id not in products_db:
        raise HTTPException(status_code=404, detail="Product not found")
    
    product = products_db[product_id]
    
    if product["stock"] < quantity:
        raise HTTPException(
            status_code=400, 
            detail=f"Not enough stock. Available: {product['stock']}"
        )
    
    product["stock"] -= quantity
    total_price = product["price"] * quantity
    
    return {
        "message": "Purchase successful",
        "product": product["name"],
        "quantity": quantity,
        "total_price": total_price,
        "remaining_stock": product["stock"]
    }

import uvicorn
import nest_asyncio
nest_asyncio.apply()

uvicorn.run(app, host="127.0.0.1", port=8000)

In [None]:
# ---------------------- Step 8: File Upload ----------------------

from fastapi import FastAPI, File, UploadFile, Form, HTTPException
from typing import List
from datetime import datetime

app = FastAPI()

# Fake database for file metadata
files_db = []
file_id_counter = 1

@app.post("/upload")
async def upload_file(file: UploadFile = File(...)):
    global file_id_counter
    
    # Read file content
    content = await file.read()
    
    # Validate file size (max 1MB for demo)
    if len(content) > 1024 * 1024:  # 1MB
        raise HTTPException(status_code=400, detail="File too large (max 1MB)")
    
    # Store file metadata
    file_record = {
        "id": file_id_counter,
        "filename": file.filename,
        "content_type": file.content_type,
        "size": len(content),
        "upload_date": datetime.now().isoformat()
    }
    
    files_db.append(file_record)
    file_id_counter += 1
    
    return {
        "message": "File uploaded successfully",
        "file_info": file_record
    }

@app.post("/upload-multiple")
async def upload_multiple_files(files: List[UploadFile] = File(...)):
    if len(files) > 3:
        raise HTTPException(status_code=400, detail="Maximum 3 files allowed")
    
    uploaded_files = []
    
    for file in files:
        content = await file.read()
        
        file_record = {
            "id": len(files_db) + 1,
            "filename": file.filename,
            "size": len(content),
            "upload_date": datetime.now().isoformat()
        }
        
        files_db.append(file_record)
        uploaded_files.append(file_record)
    
    return {
        "message": f"Uploaded {len(uploaded_files)} files",
        "files": uploaded_files
    }

@app.post("/upload-with-data")
async def upload_with_data(
    title: str = Form(...),
    description: str = Form(...),
    file: UploadFile = File(...)
):
    global file_id_counter
    
    content = await file.read()
    
    # Create document record
    document = {
        "id": file_id_counter,
        "title": title,
        "description": description,
        "filename": file.filename,
        "size": len(content),
        "upload_date": datetime.now().isoformat()
    }
    
    files_db.append(document)
    file_id_counter += 1
    
    return {"message": "Document uploaded", "document": document}

@app.get("/files")
def list_files():
    return {"files": files_db, "total": len(files_db)}

@app.delete("/files/{file_id}")
def delete_file(file_id: int):
    for i, file_record in enumerate(files_db):
        if file_record["id"] == file_id:
            deleted_file = files_db.pop(i)
            return {"message": "File deleted", "file": deleted_file}
    raise HTTPException(status_code=404, detail="File not found")

import uvicorn
import nest_asyncio
nest_asyncio.apply()

uvicorn.run(app, host="127.0.0.1", port=8000)



In [None]:

# ---------------------- Step 9: Background Tasks & Dependencies ----------------------

from fastapi import FastAPI, BackgroundTasks, Depends, Header, HTTPException
from typing import Optional
from datetime import datetime
import time

app = FastAPI()

# Fake databases
users_db = {
    "admin": {"id": 1, "username": "admin", "role": "admin"},
    "user1": {"id": 2, "username": "user1", "role": "user"}
}

notifications_db = []
tasks_db = []

# Valid API keys
API_KEYS = {
    "admin-key": "admin",
    "user-key": "user1"
}

# Dependency: Check API key
def get_current_user(x_api_key: Optional[str] = Header(None)):
    if not x_api_key or x_api_key not in API_KEYS:
        raise HTTPException(status_code=401, detail="Invalid API Key")
    
    username = API_KEYS[x_api_key]
    return users_db[username]

# Background task functions
def send_email_notification(email: str, message: str):
    time.sleep(2)  # Simulate email sending
    notification = {
        "id": len(notifications_db) + 1,
        "email": email,
        "message": message,
        "sent_at": datetime.now().isoformat(),
        "status": "sent"
    }
    notifications_db.append(notification)
    print(f"Email sent to {email}: {message}")

def process_data_task(data: dict):
    time.sleep(3)  # Simulate data processing
    task_result = {
        "id": len(tasks_db) + 1,
        "data": data,
        "processed_at": datetime.now().isoformat(),
        "status": "completed"
    }
    tasks_db.append(task_result)
    print(f"Data processed: {data}")

# Protected route with dependency
@app.get("/profile")
def get_profile(current_user: dict = Depends(get_current_user)):
    return {
        "user": current_user,
        "access_time": datetime.now().isoformat()
    }

# Background task endpoint
@app.post("/send-notification")
def send_notification(
    email: str,
    message: str,
    background_tasks: BackgroundTasks,
    current_user: dict = Depends(get_current_user)
):
    background_tasks.add_task(send_email_notification, email, message)
    
    return {
        "message": "Notification queued",
        "email": email,
        "queued_by": current_user["username"],
        "queued_at": datetime.now().isoformat()
    }

@app.post("/process-data")
def process_data(
    data: dict,
    background_tasks: BackgroundTasks,
    current_user: dict = Depends(get_current_user)
):
    # Only admin can process data
    if current_user["role"] != "admin":
        raise HTTPException(status_code=403, detail="Admin access required")
    
    background_tasks.add_task(process_data_task, data)
    
    return {
        "message": "Data processing started",
        "started_by": current_user["username"],
        "started_at": datetime.now().isoformat()
    }

# Check background task results
@app.get("/notifications")
def get_notifications(current_user: dict = Depends(get_current_user)):
    return {"notifications": notifications_db, "total": len(notifications_db)}

@app.get("/tasks")
def get_tasks_status(current_user: dict = Depends(get_current_user)):
    if current_user["role"] != "admin":
        raise HTTPException(status_code=403, detail="Admin access required")
    
    return {"tasks": tasks_db, "total": len(tasks_db)}

# Async endpoint
@app.get("/async-data")
async def get_async_data(current_user: dict = Depends(get_current_user)):
    import asyncio
    await asyncio.sleep(1)  # Simulate async database call
    
    return {
        "data": ["item1", "item2", "item3"],
        "fetched_by": current_user["username"],
        "fetched_at": datetime.now().isoformat()
    }

import uvicorn
import nest_asyncio
nest_asyncio.apply()

# Test with these API keys:
# Admin access: X-API-Key: admin-key
# User access: X-API-Key: user-key
uvicorn.run(app, host="127.0.0.1", port=8000)