# TALLER 2
## MANUELA ARANGO, MARIA PAULA GASCA, MAKSIM NAGI

# FidelizaBot

In [None]:
# ============================================
# CELDA 1: Importaciones + Ruta de DB + Esquema
# ============================================
from __future__ import annotations

import os
import sys
import sqlite3
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, List, Tuple, Type

# --- Utilidad: verificar si un directorio es escribible (crea un archivo temporal) ---
def _is_writable_dir(path: str) -> bool:
    try:
        os.makedirs(path, exist_ok=True)
        testfile = os.path.join(path, ".wtest")
        with open(testfile, "w", encoding="utf-8") as f:
            f.write("ok")
        os.remove(testfile)
        return True
    except Exception:
        return False

# --- Carpeta de datos por plataforma (sin librerías externas tipo 'appdirs') ---
def _user_data_dir(app_name: str = "FidelizaBot") -> str:
    home = os.path.expanduser("~")
    if os.name == "nt":  # Windows
        base = os.environ.get("APPDATA", os.path.join(home, "AppData", "Roaming"))
        return os.path.join(base, app_name)
    elif sys.platform == "darwin":  # macOS
        return os.path.join(home, "Library", "Application Support", app_name)
    else:  # Linux/Unix
        return os.path.join(home, ".local", "share", app_name)

# --- Resolución robusta de ruta de la base (evita 'OperationalError: unable to open database file') ---
def resolve_db_path(filename: str = "fidelizabot.db") -> str:
    # 1) Permite override por variable de entorno (laboratorios, servidores)
    env_path = os.environ.get("FIDELIZABOT_DB")
    if env_path:
        try:
            db_dir = os.path.dirname(env_path) or "."
            os.makedirs(db_dir, exist_ok=True)
            # Probar escritura rápidamente
            with open(os.path.join(db_dir, ".wtest"), "w", encoding="utf-8") as f:
                f.write("ok")
            os.remove(os.path.join(db_dir, ".wtest"))
            return env_path
        except Exception:
            print("[WARN] FIDELIZABOT_DB no es escribible. Probando alternativas...", file=sys.stderr)

    # 2) Intentar en ./data (directorio de trabajo del notebook)
    project_dir = os.getcwd()  # en notebooks no hay __file__, usamos cwd
    data_dir = os.path.join(project_dir, "data")
    if _is_writable_dir(data_dir):
        return os.path.join(data_dir, filename)

    # 3) Carpeta de datos del usuario (APPDATA/Library/~/.local/share)
    udir = _user_data_dir()
    if _is_writable_dir(udir):
        return os.path.join(udir, filename)

    # 4) Directorio de trabajo actual como último intento "persistente"
    if _is_writable_dir(os.getcwd()):
        return os.path.join(os.getcwd(), filename)

    # 5) Fallback en memoria (no persistente). Muy raro llegar acá.
    print("[WARN] Sin carpetas escribibles. Usando DB en memoria (no persiste).", file=sys.stderr)
    return ":memory:"

DB_PATH = resolve_db_path()

def get_conn() -> sqlite3.Connection:
    """
    Abre una conexión SQLite configurada para acceder a columnas por nombre.
    Notar: cada operación usa 'with get_conn()' para asegurar cierre/commit.
    """
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    return conn

def init_db(seed: bool = True) -> None:
    """
    Crea el esquema si no existe (idempotente) y, opcionalmente, siembra recompensas.
    """
    with get_conn() as conn:
        cur = conn.cursor()

        cur.execute("""
            CREATE TABLE IF NOT EXISTS clientes (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                nombre TEXT NOT NULL,
                email TEXT UNIQUE,
                puntos INTEGER NOT NULL DEFAULT 0,
                nivel TEXT NOT NULL CHECK(nivel IN ('BRONCE','PLATA','ORO')),
                fecha_registro TEXT NOT NULL
            );
        """)

        cur.execute("""
            CREATE TABLE IF NOT EXISTS transacciones (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                cliente_id INTEGER NOT NULL,
                fecha TEXT NOT NULL,
                monto_cop INTEGER NOT NULL,
                tarjeta_aliada INTEGER NOT NULL DEFAULT 0,
                puntos_ganados INTEGER NOT NULL DEFAULT 0,
                puntos_redimidos INTEGER NOT NULL DEFAULT 0,
                descripcion TEXT,
                FOREIGN KEY (cliente_id) REFERENCES clientes(id)
            );
        """)

        cur.execute("""
            CREATE TABLE IF NOT EXISTS recompensas (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                nombre TEXT NOT NULL,
                costo_puntos INTEGER NOT NULL,
                descripcion TEXT
            );
        """)

        if seed:
            cur.execute("SELECT COUNT(*) AS c FROM recompensas;")
            if int(cur.fetchone()["c"]) == 0:
                cur.executemany(
                    "INSERT INTO recompensas (nombre, costo_puntos, descripcion) VALUES (?, ?, ?);",
                    [
                        ("Café pequeño", 120, "Café filtrado tamaño pequeño."),
                        ("Capuchino mediano", 250, "Capuchino en vaso mediano."),
                        ("Sandwich del día", 400, "Sandwich simple de la casa."),
                        ("Combo café + postre", 550, "Café y postre a elección."),
                        ("Merch oficial", 900, "Taza/termo/bolsa de la marca."),
                    ],
                )
        conn.commit()

# Inicializamos la DB al importar esta celda (seguro e idempotente)
init_db(seed=True)
DB_PATH


In [None]:
# ============================================
# CELDA 2: Dominio (POO: herencia + polimorfismo)
# ============================================

@dataclass
class Cliente:
    """
    Clase base del dominio. Define contrato y lógica común para el cálculo.
    Subclases ajustan el multiplicador del tier (polimorfismo).
    """
    id: Optional[int]
    nombre: str
    email: Optional[str]
    puntos: int
    nivel: str             # 'BRONCE' | 'PLATA' | 'ORO'
    fecha_registro: str

    # Parámetros de negocio (fácil de cambiar si la rúbrica lo pide)
    BASE_POR_MIL: int = 1
    MULT_TARJETA: float = 2.0
    UMBRAL_PLATA: int = 500
    UMBRAL_ORO: int = 1500

    def calcular_puntos(self, monto_cop: int, tarjeta_aliada: bool) -> int:
        """
        Regla base: floor(monto/1000) * multiplicadores.
        Se aplica bonus por tarjeta aliada si corresponde.
        """
        if monto_cop <= 0:
            return 0
        base = (monto_cop // 1000) * self.BASE_POR_MIL
        mult = self._multiplicador_tier()
        if tarjeta_aliada:
            mult *= self.MULT_TARJETA
        return int(base * mult)

    def _multiplicador_tier(self) -> float:
        """Ganancia extra por tier; por defecto 1.0 (subclases sobrescriben)."""
        return 1.0

    def aplicar_upgrade_si_corresponde(self) -> str:
        """Devuelve el nivel correcto según puntos acumulados (upgrade automático)."""
        if self.puntos >= self.UMBRAL_ORO:
            return "ORO"
        if self.puntos >= self.UMBRAL_PLATA:
            return "PLATA"
        return "BRONCE"

    def beneficios(self) -> List[str]:
        """Descripción legible para UI/README."""
        return [
            "1 punto por cada $1.000 COP.",
            "Duplica puntos pagando con tarjeta aliada.",
        ]

class ClienteBronce(Cliente):
    def _multiplicador_tier(self) -> float:
        return 1.0
    def beneficios(self) -> List[str]:
        return super().beneficios() + ["Nivel Bronce: tasa base."]

class ClientePlata(Cliente):
    def _multiplicador_tier(self) -> float:
        return 1.25
    def beneficios(self) -> List[str]:
        return super().beneficios() + ["Nivel Plata: +25% puntos."]

class ClienteOro(Cliente):
    def _multiplicador_tier(self) -> float:
        return 1.5
    def beneficios(self) -> List[str]:
        return super().beneficios() + ["Nivel Oro: +50% puntos y prioridad."]

# Fábrica: mapea string de nivel -> subclase concreta
NIVEL_A_CLASE: dict[str, Type[Cliente]] = {
    "BRONCE": ClienteBronce,
    "PLATA": ClientePlata,
    "ORO": ClienteOro,
}

def cliente_from_row(row: sqlite3.Row) -> Cliente:
    """
    Instancia la subclase adecuada a partir de los datos persistidos (nivel).
    Esto deja ver el polimorfismo en uso real.
    """
    cls = NIVEL_A_CLASE.get(row["nivel"], ClienteBronce)
    return cls(
        id=row["id"],
        nombre=row["nombre"],
        email=row["email"],
        puntos=row["puntos"],
        nivel=row["nivel"],
        fecha_registro=row["fecha_registro"],
    )


In [None]:
# ============================================
# CELDA 3: Repositorios (acceso a datos sin ORM)
# ============================================

class ClienteRepo:
    @staticmethod
    def crear(nombre: str, email: Optional[str]) -> Cliente:
        """
        Crea en nivel BRONCE con 0 puntos. Devuelve la instancia del dominio.
        """
        fecha = datetime.now().isoformat(timespec="seconds")
        with get_conn() as conn:
            cur = conn.cursor()
            cur.execute(
                "INSERT INTO clientes (nombre, email, puntos, nivel, fecha_registro) VALUES (?, ?, 0, 'BRONCE', ?);",
                (nombre.strip(), email, fecha),
            )
            new_id = cur.lastrowid
            cur.execute("SELECT * FROM clientes WHERE id = ?;", (new_id,))
            return cliente_from_row(cur.fetchone())

    @staticmethod
    def obtener(cliente_id: int) -> Optional[Cliente]:
        with get_conn() as conn:
            cur = conn.cursor()
            cur.execute("SELECT * FROM clientes WHERE id = ?;", (cliente_id,))
            row = cur.fetchone()
            return cliente_from_row(row) if row else None

    @staticmethod
    def obtener_por_email(email: str) -> Optional[Cliente]:
        with get_conn() as conn:
            cur = conn.cursor()
            cur.execute("SELECT * FROM clientes WHERE email = ?;", (email,))
            row = cur.fetchone()
            return cliente_from_row(row) if row else None

    @staticmethod
    def actualizar(cliente_id: int, puntos: int, nivel: str) -> None:
        with get_conn() as conn:
            conn.execute("UPDATE clientes SET puntos = ?, nivel = ? WHERE id = ?;", (puntos, nivel, cliente_id))

class TransaccionRepo:
    @staticmethod
    def registrar(
        cliente_id: int,
        monto_cop: int,
        tarjeta_aliada: bool,
        puntos_g: int,
        puntos_r: int,
        desc: str,
    ) -> None:
        """
        Guarda una fila de transacción. Nota: 'monto_cop' puede ser 0 en redenciones.
        """
        fecha = datetime.now().isoformat(timespec="seconds")
        with get_conn() as conn:
            conn.execute(
                """
                INSERT INTO transacciones
                (cliente_id, fecha, monto_cop, tarjeta_aliada, puntos_ganados, puntos_redimidos, descripcion)
                VALUES (?, ?, ?, ?, ?, ?, ?);
                """,
                (cliente_id, fecha, monto_cop, int(tarjeta_aliada), puntos_g, puntos_r, desc),
            )

    @staticmethod
    def historial(cliente_id: int) -> List[sqlite3.Row]:
        with get_conn() as conn:
            cur = conn.cursor()
            cur.execute("SELECT * FROM transacciones WHERE cliente_id = ? ORDER BY id DESC;", (cliente_id,))
            return cur.fetchall()

class RecompensaRepo:
    @staticmethod
    def listar() -> List[sqlite3.Row]:
        with get_conn() as conn:
            cur = conn.cursor()
            cur.execute("SELECT * FROM recompensas ORDER BY costo_puntos ASC;")
            return cur.fetchall()

    @staticmethod
    def obtener(recompensa_id: int) -> Optional[sqlite3.Row]:
        with get_conn() as conn:
            cur = conn.cursor()
            cur.execute("SELECT * FROM recompensas WHERE id = ?;", (recompensa_id,))
            return cur.fetchone()


In [None]:
# ============================================
# CELDA 4: Servicio de aplicación + helpers de salida
# ============================================

class LoyaltyEngine:
    """
    Orquesta casos de uso aplicando reglas de negocio y persistencia.
    Aquí se evidencia el polimorfismo al calcular puntos sin conocer la subclase.
    """

    def registrar_cliente(self, nombre: str, email: Optional[str]) -> Cliente:
        if not nombre or not nombre.strip():
            raise ValueError("El nombre es obligatorio.")
        # Si el email ya existe y el profe re-ejecuta la celda, devolvemos el existente
        if email:
            existente = ClienteRepo.obtener_por_email(email)
            if existente:
                return existente
        return ClienteRepo.crear(nombre, email or None)

    def registrar_compra(
        self,
        cliente_id: int,
        monto_cop: int,
        tarjeta_aliada: bool,
        descripcion: Optional[str] = None
    ) -> Tuple[Cliente, int]:
        c = ClienteRepo.obtener(cliente_id)
        if not c:
            raise ValueError("Cliente no encontrado.")
        if monto_cop <= 0:
            raise ValueError("El monto debe ser positivo.")

        puntos = c.calcular_puntos(monto_cop, tarjeta_aliada)
        TransaccionRepo.registrar(c.id, monto_cop, tarjeta_aliada, puntos, 0, descripcion or "Compra en tienda")
        c.puntos += puntos
        c.nivel = c.aplicar_upgrade_si_corresponde()
        ClienteRepo.actualizar(c.id, c.puntos, c.nivel)
        return ClienteRepo.obtener(c.id), puntos  # releemos por si cambia la subclase

    def redimir(self, cliente_id: int, recompensa_id: int) -> Tuple[Cliente, sqlite3.Row]:
        c = ClienteRepo.obtener(cliente_id)
        if not c:
            raise ValueError("Cliente no encontrado.")
        r = RecompensaRepo.obtener(recompensa_id)
        if not r:
            raise ValueError("Recompensa no encontrada.")
        costo = int(r["costo_puntos"])
        if c.puntos < costo:
            raise ValueError("Puntos insuficientes.")

        TransaccionRepo.registrar(c.id, 0, False, 0, costo, f"Redención: {r['nombre']}")
        c.puntos -= costo
        ClienteRepo.actualizar(c.id, c.puntos, c.nivel)  # en este modelo no hacemos downgrade automático
        return ClienteRepo.obtener(c.id), r

    def ver_cliente(self, cliente_id: int) -> Cliente:
        c = ClienteRepo.obtener(cliente_id)
        if not c:
            raise ValueError("Cliente no encontrado.")
        return c

    def listar_recompensas(self) -> List[sqlite3.Row]:
        return RecompensaRepo.listar()

    def historial(self, cliente_id: int) -> List[sqlite3.Row]:
        return TransaccionRepo.historial(cliente_id)

# --- Helpers para mostrar resultados en consola/notebook (legibles) ---
def print_cliente(c: Cliente) -> None:
    print(f"[Cliente #{c.id}] {c.nombre}")
    print(f"  Email: {c.email or '-'}")
    print(f"  Nivel: {c.nivel}")
    print(f"  Puntos: {c.puntos}")
    print("  Beneficios:")
    for b in c.beneficios():
        print(f"   - {b}")

def print_historial(rows: List[sqlite3.Row]) -> None:
    if not rows:
        print("  (sin transacciones)")
        return
    for r in rows:
        print(
            f"  #{r['id']} | {r['fecha']} | monto=${r['monto_cop']:,.0f} | "
            f"tarjeta={'Sí' if r['tarjeta_aliada'] else 'No'} | "
            f"+{r['puntos_ganados']} | -{r['puntos_redimidos']} | {r['descripcion'] or ''}"
        )


In [None]:
# ============================================
# CELDA 5: DEMOSTRACIÓN (sin input(); re-ejecutable)
# ============================================
# Nota: para evitar colisiones con UNIQUE(email) al re-ejecutar, usamos un sufijo temporal.
from time import time as _now

engine = LoyaltyEngine()

# 1) Registrar (o recuperar) cliente
email_demo = f"ana+{int(_now())}@example.com"
cliente = engine.registrar_cliente("Ana Café", email_demo)
print_cliente(cliente)
print("-" * 60)

# 2) Compras (monto, tarjeta_aliada)
compras = [(18_000, False), (45_500, True), (79_999, False), (120_000, True)]
for monto, tarj in compras:
    cliente, pts = engine.registrar_compra(cliente.id, monto, tarj)
    print(f"Compra ${monto:,.0f} | tarjeta={'Sí' if tarj else 'No'} => +{pts} pts | Total={cliente.puntos} | Nivel={cliente.nivel}")
print("-" * 60)

# 3) Listar recompensas
recs = engine.listar_recompensas()
print("Recompensas disponibles:")
for r in recs:
    print(f"  {r['id']}) {r['nombre']} — {r['costo_puntos']} pts")
print("-" * 60)

# 4) Redimir la primera recompensa que alcance y cueste >= 250 pts (si existe)
objetivo = next((r for r in recs if r["costo_puntos"] <= cliente.puntos and r["costo_puntos"] >= 250), None)
if objetivo:
    cliente, r = engine.redimir(cliente.id, objetivo["id"])
    print(f"Redimido: {r['nombre']} ({r['costo_puntos']} pts) => Saldo: {cliente.puntos} pts")
else:
    print("No hay recompensas que pueda redimir todavía.")
print("-" * 60)

# 5) Historial del cliente
print("Historial de transacciones:")
hist = engine.historial(cliente.id)
print_historial(hist)

# Mostrar dónde quedó la DB para trazabilidad en el taller
print("-" * 60)
print(f"DB_PATH = {DB_PATH}")
