<a href="https://colab.research.google.com/github/BenjaminGMC/sic_ai_2025_jun/blob/main/proyectoConMnistBenjamin_sandoval.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Proyecto Integrador: AI Engineering con Fashion-MNIST

**Fecha de creación:** 2025-08-14T23:04:01.174268Z

Este notebook implementa un sistema E2E (End-to-End) con **Fashion-MNIST** que incluye:
- Clasificación con **CNN** (Keras/TensorFlow).
- Control de calidad con **Autoencoder** (AE).
- **Simulación de desbalance** y corrección con **aumento de datos** (opcional via GAN).
- **Despliegue** de inferencia con **FastAPI** y **Gradio**.
- **Monitoreo** de métricas de producción (latencia, confianza, drift).

> Ejecuta las celdas en orden. Recomendado usar **GPU** en Colab (Runtime → Change runtime type → GPU).

## 1) Configuración del Entorno
Instalamos dependencias (si hace falta en Colab) y fijamos semilla para reproducibilidad.

In [None]:
# (Puede tomar unos minutos en la primera ejecución)
!pip -q install tensorflow tensorflow-datasets gradio fastapi uvicorn pyngrok scikit-learn scipy matplotlib

import os, random, numpy as np, tensorflow as tf
import tensorflow_datasets as tfds

SEED = 42
random.seed(SEED)
np.random.seed(SEED)
tf.random.set_seed(SEED)

print("TF version:", tf.__version__)
print("GPU available:", len(tf.config.list_physical_devices('GPU')) > 0)

## 2) Carga y Preparación de Datos
- Cargamos **Fashion-MNIST** desde `tensorflow_datasets`.
- Normalizamos a `[0,1]` y separamos train/val/test.
- Implementamos una función para **simular desbalance** de una clase.

In [None]:
CLASS_NAMES = [
    "T-shirt/top", "Trouser", "Pullover", "Dress", "Coat",
    "Sandal", "Shirt", "Sneaker", "Bag", "Ankle boot"
]

(ds_train_full, ds_test), ds_info = tfds.load(
    "fashion_mnist",
    split=["train", "test"],
    as_supervised=True,
    with_info=True
)

NUM_CLASSES = ds_info.features["label"].num_classes
IMG_SIZE = 28

# Normalización
def normalize_img(image, label):
    image = tf.cast(image, tf.float32) / 255.0
    image = tf.expand_dims(image, -1)  # (28,28,1)
    return image, label

ds_train_full = ds_train_full.map(normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_test = ds_test.map(normalize_img, num_parallel_calls=tf.data.AUTOTUNE)

# Split train/val
VAL_SPLIT = 0.1
train_count = ds_info.splits["train"].num_examples
val_count = int(train_count * VAL_SPLIT)

ds_val = ds_train_full.take(val_count)
ds_train = ds_train_full.skip(val_count)

# Simular desbalance en una clase
def make_imbalanced(ds, target_class=6, minority_ratio=0.2):
    """
    Reduce las muestras de 'target_class' a 'minority_ratio' del total de esa clase.
    target_class por defecto = 6 (Shirt).
    """
    by_class = {c: [] for c in range(NUM_CLASSES)}
    for img, y in tfds.as_numpy(ds):
        by_class[int(y)].append((img, int(y)))
    target_samples = by_class[target_class]
    keep_n = max(1, int(len(target_samples) * minority_ratio))
    random.shuffle(target_samples)
    by_class[target_class] = target_samples[:keep_n]
    all_samples = []
    for c in range(NUM_CLASSES):
        all_samples.extend(by_class[c])
    random.shuffle(all_samples)
    images = np.stack([s[0] for s in all_samples], axis=0)
    labels = np.array([s[1] for s in all_samples], dtype=np.int64)
    return tf.data.Dataset.from_tensor_slices((images, labels))

TARGET_MINORITY_CLASS = 6  # "Shirt"
MINORITY_RATIO = 0.2
ds_train_imbalanced = make_imbalanced(ds_train, target_class=TARGET_MINORITY_CLASS, minority_ratio=MINORITY_RATIO)

BATCH_SIZE = 128
ds_train_imbalanced = ds_train_imbalanced.shuffle(10_000, seed=SEED).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
ds_val = ds_val.batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
ds_test_batched = ds_test.batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

print("Imbalanced train ready. Minority class:", CLASS_NAMES[TARGET_MINORITY_CLASS])

## 3) Modelado
### CNN (Clasificación)
Arquitectura: 2 capas Conv2D + MaxPool + Dropout + Dense.

### Autoencoder (Control de Calidad)
Entrenado como **normal** con imágenes de una clase (por defecto clase 0 = T-shirt/top).
El **error de reconstrucción** (MSE) se usa como métrica de calidad.

In [None]:
from tensorflow.keras import layers, models

def build_cnn(input_shape=(IMG_SIZE, IMG_SIZE, 1), num_classes=NUM_CLASSES):
    inputs = layers.Input(shape=input_shape)
    x = layers.Conv2D(32, 3, activation="relu", padding="same")(inputs)
    x = layers.MaxPooling2D()(x)
    x = layers.Conv2D(64, 3, activation="relu", padding="same")(x)
    x = layers.MaxPooling2D()(x)
    x = layers.Dropout(0.25)(x)
    x = layers.Flatten()(x)
    x = layers.Dense(128, activation="relu")(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(num_classes, activation="softmax")(x)
    model = models.Model(inputs, outputs, name="cnn_classifier")
    model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
    return model

def build_autoencoder(input_shape=(IMG_SIZE, IMG_SIZE, 1), latent_dim=32):
    inp = layers.Input(shape=input_shape)
    x = layers.Conv2D(32, 3, activation="relu", padding="same")(inp)
    x = layers.MaxPooling2D()(x)
    x = layers.Conv2D(64, 3, activation="relu", padding="same")(x)
    x = layers.MaxPooling2D()(x)
    x = layers.Flatten()(x)
    latent = layers.Dense(latent_dim, activation="relu")(x)

    x = layers.Dense((IMG_SIZE//4)*(IMG_SIZE//4)*64, activation="relu")(latent)
    x = layers.Reshape((IMG_SIZE//4, IMG_SIZE//4, 64))(x)
    x = layers.UpSampling2D()(x)
    x = layers.Conv2DTranspose(32, 3, activation="relu", padding="same")(x)
    x = layers.UpSampling2D()(x)
    out = layers.Conv2DTranspose(1, 3, activation="sigmoid", padding="same")(x)

    ae = models.Model(inp, out, name="autoencoder")
    ae.compile(optimizer="adam", loss="mse")
    return ae

cnn = build_cnn()
ae = build_autoencoder()
cnn.summary()
ae.summary()

## 4) Entrenamiento
- Entrenamos la **CNN** con el conjunto **desbalanceado**.
- Entrenamos el **Autoencoder** sólo con imágenes de la clase **normal**.

In [None]:
EPOCHS_CNN = 8
EPOCHS_AE = 8

# Entrenamiento CNN
history_cnn = cnn.fit(
    ds_train_imbalanced,
    validation_data=ds_val,
    epochs=EPOCHS_CNN
)

# Extraer solo clase normal para AE (clase 0 por defecto)
AE_NORMAL_CLASS = 0
def filter_class(ds, class_id):
    return ds.filter(lambda x, y: tf.equal(y, class_id)).map(lambda x, y: (x, x))

ds_train_normal = filter_class(ds_train_full.map(normalize_img), AE_NORMAL_CLASS)
ds_train_normal = ds_train_normal.batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

history_ae = ae.fit(ds_train_normal, epochs=EPOCHS_AE)

## 5) Evaluación
- Precisión global y **matriz de confusión** para la CNN.
- **AUC** del Autoencoder para distinguir clase normal vs. resto.
- Histogramas del **error de reconstrucción**.

In [None]:
from sklearn.metrics import confusion_matrix, accuracy_score, roc_auc_score
import numpy as np
import matplotlib.pyplot as plt

# CNN: precisión y matriz de confusión
y_true = []
y_pred = []
y_conf = []

for x_batch, y_batch in ds_test_batched:
    probs = cnn.predict(x_batch, verbose=0)
    preds = probs.argmax(axis=1)
    y_true.extend(y_batch.numpy().tolist())
    y_pred.extend(preds.tolist())
    y_conf.extend(probs.max(axis=1).tolist())

acc = accuracy_score(y_true, y_pred)
cm = confusion_matrix(y_true, y_pred)

print("Test Accuracy (CNN):", acc)
print("Confusion Matrix:\n", cm)

# AE: error de reconstrucción
def reconstruction_error(ae, imgs):
    recon = ae.predict(imgs, verbose=0)
    err = np.mean((recon - imgs.numpy())**2, axis=(1,2,3))
    return err

errs = []
labels_normal_vs_rest = []
for x_batch, y_batch in ds_test_batched:
    err = reconstruction_error(ae, x_batch)
    errs.extend(err.tolist())
    labels_normal_vs_rest.extend((y_batch.numpy() == AE_NORMAL_CLASS).astype(int).tolist())

errs = np.array(errs)
labels_normal_vs_rest = np.array(labels_normal_vs_rest)

auc = roc_auc_score(labels_normal_vs_rest, -errs)  # menor error => más "normal"
print("AE AUC (normal vs resto):", auc)

# Visualización: histograma del error
plt.figure()
plt.hist(errs[labels_normal_vs_rest==1], bins=40, alpha=0.6, label=f"Normal (clase {AE_NORMAL_CLASS})")
plt.hist(errs[labels_normal_vs_rest==0], bins=40, alpha=0.6, label="Resto")
plt.title("Error de reconstrucción (AE)")
plt.xlabel("MSE")
plt.ylabel("Frecuencia")
plt.legend()
plt.show()

## 6) Lógica de Producción
Combinamos **confianza del clasificador** y **calidad (AE)** con umbrales:
- `MIN_CONFIDENCE = 0.7`
- `MAX_RECON_ERROR` = percentil 95 del error en la clase normal.

In [None]:
# Calibramos MAX_RECON_ERROR con percentil 95 en la clase normal del set de test
normal_errs = []
for x_batch, y_batch in ds_test_batched:
    mask = (y_batch.numpy() == AE_NORMAL_CLASS)
    if np.any(mask):
        xb = x_batch.numpy()[mask]
        xb = tf.convert_to_tensor(xb)
        e = reconstruction_error(ae, xb)
        normal_errs.extend(e.tolist())

MAX_RECON_ERROR = float(np.percentile(normal_errs, 95)) if normal_errs else 0.02
MIN_CONFIDENCE = 0.7

print("MAX_RECON_ERROR (p95 normal):", MAX_RECON_ERROR)
print("MIN_CONFIDENCE:", MIN_CONFIDENCE)

def decide(image_tensor):
    probs = cnn.predict(tf.expand_dims(image_tensor, 0), verbose=0)[0]
    pred = int(np.argmax(probs))
    conf = float(np.max(probs))
    rec = ae.predict(tf.expand_dims(image_tensor, 0), verbose=0)[0]
    recon_error = float(np.mean((rec - image_tensor.numpy())**2))
    accept = (conf >= MIN_CONFIDENCE) and (recon_error <= MAX_RECON_ERROR)
    reason = []
    if conf < MIN_CONFIDENCE: reason.append("Baja confianza")
    if recon_error > MAX_RECON_ERROR: reason.append("Alta anomalía / baja calidad")
    reason = ", ".join(reason) if reason else "OK"
    return {
        "accepted": bool(accept),
        "pred_class": int(pred),
        "pred_label": CLASS_NAMES[pred],
        "confidence": conf,
        "reconstruction_error": recon_error,
        "reason": reason
    }

# Demo con una muestra del test
for x_batch, y_batch in ds_test_batched.take(1):
    sample = x_batch[0]
    print(decide(sample))

## 7) Despliegue del Modelo
### Opción A: **FastAPI** + **Ngrok** (opcional)
Guarda modelos y ejecuta el servidor. En Colab puedes tunelar el puerto 8000 con `pyngrok`.

In [None]:
import os, numpy as np, tensorflow as tf
MODEL_DIR = "/content/models"
os.makedirs(MODEL_DIR, exist_ok=True)
cnn.save(os.path.join(MODEL_DIR, "cnn.keras"))
ae.save(os.path.join(MODEL_DIR, "ae.keras"))
np.save(os.path.join(MODEL_DIR, "max_recon_error.npy"), np.array([MAX_RECON_ERROR]))
print("Modelos guardados en", MODEL_DIR)

FASTAPI_CODE = """
import io, base64, numpy as np, uvicorn, tensorflow as tf
from fastapi import FastAPI
from pydantic import BaseModel
from PIL import Image

CLASS_NAMES = [
    "T-shirt/top", "Trouser", "Pullover", "Dress", "Coat",
    "Sandal", "Shirt", "Sneaker", "Bag", "Ankle boot"
]
MIN_CONFIDENCE = 0.7

cnn = tf.keras.models.load_model('/content/models/cnn.keras', compile=False)
ae  = tf.keras.models.load_model('/content/models/ae.keras', compile=False)
try:
    MAX_RECON_ERROR = float(np.load('/content/models/max_recon_error.npy'))
except Exception:
    MAX_RECON_ERROR = 0.02

app = FastAPI()

class PredictRequest(BaseModel):
    image_b64: str

def preprocess(image_bytes):
    img = Image.open(io.BytesIO(image_bytes)).convert('L').resize((28,28))
    arr = np.array(img).astype('float32')/255.0
    arr = np.expand_dims(arr, -1)
    return tf.convert_to_tensor(arr)

@app.get('/health')
def health():
    return {'status': 'ok'}

@app.post('/predict')
def predict(req: PredictRequest):
    image_bytes = base64.b64decode(req.image_b64)
    x = preprocess(image_bytes)
    probs = cnn.predict(tf.expand_dims(x, 0), verbose=0)[0]
    pred = int(np.argmax(probs))
    conf = float(np.max(probs))
    rec = ae.predict(tf.expand_dims(x, 0), verbose=0)[0]
    recon_error = float(np.mean((rec - x.numpy())**2))
    accept = (conf >= MIN_CONFIDENCE) and (recon_error <= MAX_RECON_ERROR)
    reason = []
    if conf < MIN_CONFIDENCE: reason.append('Baja confianza')
    if recon_error > MAX_RECON_ERROR: reason.append('Alta anomalía')
    reason = ', '.join(reason) if reason else 'OK'
    return {
        'accepted': bool(accept),
        'pred_class': int(pred),
        'pred_label': CLASS_NAMES[pred],
        'confidence': conf,
        'reconstruction_error': recon_error,
        'reason': reason
    }

if __name__ == '__main__':
    uvicorn.run(app, host='0.0.0.0', port=8000)
"""
with open("/content/app_fastapi.py", "w") as f:
    f.write(FASTAPI_CODE)

print("Escribe en una celda aparte: !python app_fastapi.py")

### Opción B: **Gradio**

In [None]:
import gradio as gr, numpy as np, tensorflow as tf
from PIL import Image

loaded_cnn = tf.keras.models.load_model("/content/models/cnn.keras", compile=False)
loaded_ae  = tf.keras.models.load_model("/content/models/ae.keras", compile=False)
CLASS_NAMES = [
    "T-shirt/top", "Trouser", "Pullover", "Dress", "Coat",
    "Sandal", "Shirt", "Sneaker", "Bag", "Ankle boot"
]
try:
    MAX_RECON_ERROR = float(np.load("/content/models/max_recon_error.npy"))
except Exception:
    MAX_RECON_ERROR = 0.02
MIN_CONFIDENCE = 0.7

def classify_and_qc(img: Image.Image):
    img = img.convert("L").resize((28,28))
    arr = np.array(img).astype("float32")/255.0
    arr = np.expand_dims(arr, -1)
    x = tf.convert_to_tensor(arr)

    probs = loaded_cnn.predict(tf.expand_dims(x, 0), verbose=0)[0]
    pred = int(np.argmax(probs))
    conf = float(np.max(probs))

    rec = loaded_ae.predict(tf.expand_dims(x, 0), verbose=0)[0]
    recon_error = float(np.mean((rec - x.numpy())**2))

    accept = (conf >= MIN_CONFIDENCE) and (recon_error <= MAX_RECON_ERROR)
    out = {
        "Predicción": CLASS_NAMES[pred],
        "Confianza": conf,
        "Recon_error": recon_error,
        "Aceptado": bool(accept)
    }
    return out

demo = gr.Interface(
    fn=classify_and_qc,
    inputs=gr.Image(type="pil"),
    outputs="label",
    title="Fashion-MNIST: Clasificación + Control de Calidad (AE)",
    description="Sube una imagen de prenda (28x28 escala de grises funciona mejor)."
)
print("Ejecuta: demo.launch(share=True)")

## 8) Monitoreo en Producción (Simulado)
Calculamos métricas: latencia media y p95, confianza media y p10, KL(train || prod).

In [None]:
import time
from scipy.stats import entropy
import numpy as np

# Estimar distribución de entrenamiento (a partir de labels en ds_train_imbalanced)
train_labels = []
for x, y in ds_train_imbalanced.unbatch().take(5000):
    train_labels.append(int(y.numpy()))
train_hist = np.bincount(train_labels, minlength=NUM_CLASSES) + 1e-9
train_dist = train_hist / train_hist.sum()

confs_prod, preds_prod, latencies = [], [], []

for x_batch, y_batch in ds_test_batched:
    t0 = time.time()
    probs = cnn.predict(x_batch, verbose=0)
    latencies.append(time.time() - t0)
    preds = probs.argmax(axis=1)
    preds_prod.extend(preds.tolist())
    confs_prod.extend(probs.max(axis=1).tolist())

lat_mean = float(np.mean(latencies))
lat_p95  = float(np.percentile(latencies, 95))
conf_mean = float(np.mean(confs_prod))
conf_p10  = float(np.percentile(confs_prod, 10))

prod_hist = np.bincount(preds_prod, minlength=NUM_CLASSES) + 1e-9
prod_dist = prod_hist / prod_hist.sum()
kl_div = float(entropy(train_dist, prod_dist))  # KL(train || prod)

print({
    "lat_mean_s": lat_mean,
    "lat_p95_s": lat_p95,
    "conf_mean": conf_mean,
    "conf_p10": conf_p10,
    "KL(train||prod)": kl_div
})

## 9) Aumento con DCGAN (Opcional)
Entrenamos una **DCGAN** simple para la clase minoritaria y reentrenamos la CNN con datos sintéticos.

In [None]:
from tensorflow.keras import layers, models, optimizers
import numpy as np, tensorflow as tf

MINORITY_CLASS = TARGET_MINORITY_CLASS

minority_imgs = []
for x, y in ds_train_imbalanced.unbatch().take(60000):
    if int(y.numpy()) == MINORITY_CLASS:
        minority_imgs.append(x.numpy())
minority_imgs = np.array(minority_imgs, dtype="float32")
print("Minority images:", minority_imgs.shape)

LATENT_DIM = 64

def build_generator():
    model = models.Sequential([
        layers.Input(shape=(LATENT_DIM,)),
        layers.Dense(7*7*128),
        layers.Reshape((7,7,128)),
        layers.Conv2DTranspose(64, 4, strides=2, padding="same", activation="relu"),
        layers.Conv2DTranspose(32, 4, strides=2, padding="same", activation="relu"),
        layers.Conv2D(1, 3, padding="same", activation="sigmoid"),
    ])
    return model

def build_discriminator():
    model = models.Sequential([
        layers.Input(shape=(28,28,1)),
        layers.Conv2D(32, 3, strides=2, padding="same", activation="relu"),
        layers.Conv2D(64, 3, strides=2, padding="same", activation="relu"),
        layers.Flatten(),
        layers.Dense(1, activation="sigmoid"),
    ])
    model.compile(optimizer=optimizers.Adam(1e-4), loss="binary_crossentropy")
    return model

gen = build_generator()
disc = build_discriminator()
disc.trainable = True

gan_input = layers.Input(shape=(LATENT_DIM,))
gan_output = disc(gen(gan_input))
gan = models.Model(gan_input, gan_output)
disc.trainable = False
gan.compile(optimizer=optimizers.Adam(1e-4), loss="binary_crossentropy")

EPOCHS_GAN = 3
BATCH_GAN = 64
steps_per_epoch = max(1, len(minority_imgs)//BATCH_GAN)

for epoch in range(EPOCHS_GAN):
    for step in range(steps_per_epoch):
        if len(minority_imgs) == 0:
            break
        # Discriminator
        idx = np.random.randint(0, len(minority_imgs), BATCH_GAN)
        real = minority_imgs[idx]
        z = np.random.normal(size=(BATCH_GAN, LATENT_DIM)).astype("float32")
        fake = gen.predict(z, verbose=0)
        x = np.concatenate([real, fake], axis=0)
        y = np.concatenate([np.ones((BATCH_GAN,1)), np.zeros((BATCH_GAN,1))], axis=0)
        disc.trainable = True
        d_loss = disc.train_on_batch(x, y)
        # Generator
        z = np.random.normal(size=(BATCH_GAN, LATENT_DIM)).astype("float32")
        y_gan = np.ones((BATCH_GAN,1))
        disc.trainable = False
        g_loss = gan.train_on_batch(z, y_gan)
    print(f"Epoch {epoch+1}/{EPOCHS_GAN} - d_loss: {d_loss:.4f} - g_loss: {g_loss:.4f}")

# Generar sintéticos y reentrenar
N_SYN = min(2000, 5*len(minority_imgs)) if len(minority_imgs)>0 else 0
if N_SYN > 0:
    z = np.random.normal(size=(N_SYN, LATENT_DIM)).astype("float32")
    synth = gen.predict(z, verbose=0)
    X_aug = np.concatenate([minority_imgs, synth], axis=0)
    y_aug = np.full((X_aug.shape[0],), MINORITY_CLASS, dtype="int32")

    X_rest, y_rest = [], []
    for x, y in ds_train_full.unbatch().take(60000):
        if int(y.numpy()) != MINORITY_CLASS:
            X_rest.append((tf.cast(x, tf.float32)/255.0).numpy())
            y_rest.append(int(y.numpy()))
    X_rest = np.array(X_rest, dtype="float32")
    y_rest = np.array(y_rest, dtype="int32")

    X_train_bal = np.concatenate([X_rest, X_aug], axis=0)
    y_train_bal = np.concatenate([y_rest, y_aug], axis=0)

    def build_cnn_local():
        from tensorflow.keras import layers, models
        inputs = layers.Input(shape=(28,28,1))
        x = layers.Conv2D(32, 3, activation="relu", padding="same")(inputs)
        x = layers.MaxPooling2D()(x)
        x = layers.Conv2D(64, 3, activation="relu", padding="same")(x)
        x = layers.MaxPooling2D()(x)
        x = layers.Dropout(0.25)(x)
        x = layers.Flatten()(x)
        x = layers.Dense(128, activation="relu")(x)
        x = layers.Dropout(0.5)(x)
        outputs = layers.Dense(10, activation="softmax")(x)
        model = models.Model(inputs, outputs)
        model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
        return model

    cnn_bal = build_cnn_local()
    cnn_bal.fit(
        tf.data.Dataset.from_tensor_slices((X_train_bal, y_train_bal)).shuffle(20000).batch(128),
        epochs=4,
        validation_data=ds_val
    )

    # Evaluación comparativa
    from sklearn.metrics import accuracy_score
    def eval_model(m):
        y_true, y_pred = [], []
        for xb, yb in ds_test_batched:
            probs = m.predict(xb, verbose=0)
            y_true.extend(yb.numpy().tolist())
            y_pred.extend(probs.argmax(axis=1).tolist())
        return accuracy_score(y_true, y_pred)

    # Se asume 'acc' calculado previamente para la CNN base
    base_acc = acc
    bal_acc = eval_model(cnn_bal)
    print({"base_acc": base_acc, "augmented_acc": bal_acc})
else:
    print("No se pudo entrenar DCGAN o no hay suficientes muestras minoritarias.")

## 10) Guardado de Modelos

In [None]:
cnn.save("/content/models/cnn_final.keras")
ae.save("/content/models/ae_final.keras")
print("Modelos guardados en /content/models")