## 158: Validación de Roles con Dependencias en FastAPI 

Agregar control de acceso basado en roles (por ejemplo, solo los administradores pueden acceder a ciertas rutas) utilizando el token JWT y dependencias en FastAPI.

🧩 Código completo FastAPI con validación de roles
python
Copiar
Editar
from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import declarative_base, sessionmaker, Session
from passlib.context import CryptContext
from jose import jwt, JWTError
from datetime import datetime, timedelta

# --- Configuración Base ---
app = FastAPI()
DATABASE_URL = "sqlite:///./usuarios.db"
SECRET_KEY = "CLAVE_ULTRA_SECRETA"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# --- Configuración de Base de Datos ---
Base = declarative_base()
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(bind=engine, autoflush=False)

class Usuario(Base):
    __tablename__ = "usuarios"
    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True)
    contraseña = Column(String)
    rol = Column(String)

Base.metadata.create_all(bind=engine)

# --- Modelos Pydantic ---
class UsuarioRegistro(BaseModel):
    email: str
    contraseña: str
    rol: str

class UsuarioLogin(BaseModel):
    email: str
    contraseña: str

# --- Utilidades ---
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

def hashear_contraseña(contraseña: str) -> str:
    return pwd_context.hash(contraseña)

def verificar_contraseña(plain: str, hashed: str) -> bool:
    return pwd_context.verify(plain, hashed)

def crear_token(email: str, rol: str) -> str:
    exp = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    return jwt.encode({"sub": email, "rol": rol, "exp": exp}, SECRET_KEY, algorithm=ALGORITHM)

def obtener_usuario_actual(token: str = Depends(oauth2_scheme)) -> dict:
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return {"email": payload.get("sub"), "rol": payload.get("rol")}
    except JWTError:
        raise HTTPException(status_code=401, detail="Token inválido")

def verificar_rol(rol_requerido: str):
    def dependencia(usuario: dict = Depends(obtener_usuario_actual)):
        if usuario["rol"] != rol_requerido:
            raise HTTPException(status_code=403, detail="No tienes permisos suficientes")
        return usuario
    return dependencia

# --- Endpoints ---
@app.post("/registrar")
def registrar(usuario: UsuarioRegistro, db: Session = Depends(get_db)):
    existe = db.query(Usuario).filter(Usuario.email == usuario.email).first()
    if existe:
        raise HTTPException(status_code=400, detail="El usuario ya existe")
    nuevo_usuario = Usuario(
        email=usuario.email,
        contraseña=hashear_contraseña(usuario.contraseña),
        rol=usuario.rol
    )
    db.add(nuevo_usuario)
    db.commit()
    return {"mensaje": "Usuario registrado exitosamente"}

@app.post("/login")
def login(datos: UsuarioLogin, db: Session = Depends(get_db)):
    usuario = db.query(Usuario).filter(Usuario.email == datos.email).first()
    if not usuario or not verificar_contraseña(datos.contraseña, usuario.contraseña):
        raise HTTPException(status_code=401, detail="Credenciales inválidas")
    token = crear_token(usuario.email, usuario.rol)
    return {"access_token": token, "token_type": "bearer"}

@app.get("/admin/solo-admin")
def solo_admin(usuario=Depends(verificar_rol("admin"))):
    return {"mensaje": f"Hola administrador {usuario['email']}"}

@app.get("/usuario/solo-usuario")
def solo_usuario(usuario=Depends(verificar_rol("usuario"))):
    return {"mensaje": f"Hola usuario {usuario['email']}"}