In [2]:
pip install fastapi

Collecting fastapi
  Downloading fastapi-0.115.8-py3-none-any.whl.metadata (27 kB)
Collecting starlette<0.46.0,>=0.40.0 (from fastapi)
  Downloading starlette-0.45.3-py3-none-any.whl.metadata (6.3 kB)
Downloading fastapi-0.115.8-py3-none-any.whl (94 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m94.8/94.8 kB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading starlette-0.45.3-py3-none-any.whl (71 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m71.5/71.5 kB[0m [31m5.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: starlette, fastapi
Successfully installed fastapi-0.115.8 starlette-0.45.3


In [3]:
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel, Field
from typing import List, Dict, Optional
import uuid

app = FastAPI()

# Simulated in-memory database
users_db = {}
houses_db = {}
rooms_db = {}
devices_db = {}

# --------------------------- Models ---------------------------
class User(BaseModel):
    id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    name: str
    email: str
    password: str  # In a real system, passwords should be hashed

class House(BaseModel):
    id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    owner_id: str
    address: str
    metadata: Optional[Dict[str, str]] = {}

class Room(BaseModel):
    id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    house_id: str
    name: str
    metadata: Optional[Dict[str, str]] = {}

class Device(BaseModel):
    id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    room_id: str
    type: str
    status: str  # ON, OFF, ERROR, etc.
    metadata: Optional[Dict[str, str]] = {}

# --------------------------- API Endpoints ---------------------------

# User Endpoints
@app.post("/users/", response_model=User)
def create_user(user: User):
    if user.email in users_db:
        raise HTTPException(status_code=400, detail="A user with this email already exists.")
    users_db[user.email] = user
    return user

@app.get("/users/{user_id}", response_model=User)
def get_user(user_id: str):
    for user in users_db.values():
        if user.id == user_id:
            return user
    raise HTTPException(status_code=404, detail="User not found.")

# House Endpoints
@app.post("/houses/", response_model=House)
def create_house(house: House):
    houses_db[house.id] = house
    return house

@app.get("/houses/{house_id}", response_model=House)
def get_house(house_id: str):
    if house_id not in houses_db:
        raise HTTPException(status_code=404, detail="House not found.")
    return houses_db[house_id]

@app.put("/houses/{house_id}", response_model=House)
def update_house(house_id: str, house: House):
    if house_id not in houses_db:
        raise HTTPException(status_code=404, detail="House not found.")
    houses_db[house_id] = house
    return house

@app.delete("/houses/{house_id}")
def delete_house(house_id: str):
    if house_id not in houses_db:
        raise HTTPException(status_code=404, detail="House not found.")
    del houses_db[house_id]
    return {"message": "House deleted successfully."}

# Room Endpoints
@app.post("/rooms/", response_model=Room)
def create_room(room: Room):
    rooms_db[room.id] = room
    return room

@app.get("/rooms/{room_id}", response_model=Room)
def get_room(room_id: str):
    if room_id not in rooms_db:
        raise HTTPException(status_code=404, detail="Room not found.")
    return rooms_db[room_id]

@app.put("/rooms/{room_id}", response_model=Room)
def update_room(room_id: str, room: Room):
    if room_id not in rooms_db:
        raise HTTPException(status_code=404, detail="Room not found.")
    rooms_db[room_id] = room
    return room

@app.delete("/rooms/{room_id}")
def delete_room(room_id: str):
    if room_id not in rooms_db:
        raise HTTPException(status_code=404, detail="Room not found.")
    del rooms_db[room_id]
    return {"message": "Room deleted successfully."}

# Device Endpoints
@app.post("/devices/", response_model=Device)
def create_device(device: Device):
    devices_db[device.id] = device
    return device

@app.get("/devices/{device_id}", response_model=Device)
def get_device(device_id: str):
    if device_id not in devices_db:
        raise HTTPException(status_code=404, detail="Device not found.")
    return devices_db[device_id]

@app.put("/devices/{device_id}", response_model=Device)
def update_device(device_id: str, device: Device):
    if device_id not in devices_db:
        raise HTTPException(status_code=404, detail="Device not found.")
    devices_db[device_id] = device
    return device

@app.delete("/devices/{device_id}")
def delete_device(device_id: str):
    if device_id not in devices_db:
        raise HTTPException(status_code=404, detail="Device not found.")
    del devices_db[device_id]
    return {"message": "Device deleted successfully."}
