# Shefu — YOLOv8 + Regresión Multi‑Entrada (Imagen + Binarios)

Este cuaderno **integra** el detector **YOLOv8** (para recortar el completo) con un **regresor multi‑entrada** (imagen recortada + 4 banderas de `motivo`) para producir una **nota 0–100**.

## Resumen del flujo
1. **Detector (YOLOv8)**: foto completa → **ROI** del completo.
2. **Pre‑procesamiento**: recorte, resize, normalización.
3. **Multi‑input**: embeddings visuales + 4 flags.
4. **Regresión**: nota 0–100.


In [None]:
# (Opcional) Instalar deps si tu entorno no las tiene.
# !pip install ultralytics==8.2.103 tensorflow==2.15.0 pandas==2.1.4 numpy==1.26.4 scikit-learn==1.3.2 matplotlib==3.8.0

from ultralytics import YOLO
import os, json, ast
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.applications import MobileNetV3Small
from sklearn.model_selection import train_test_split
import cv2
import matplotlib.pyplot as plt

print('TensorFlow:', tf.__version__)
print('GPU devices:', tf.config.list_physical_devices('GPU'))


In [None]:
# =====================
# Configuración
# =====================
DATA_CSV_PATH = '/mnt/data/project-2-at-2025-09-12-06-46-b36c7e7f.csv'  # <- AJUSTA
IMAGES_ROOT  = '/mnt/data/images'                                        # <- AJUSTA
YOLO_WEIGHTS = '/mnt/data/completos_detector.pt'                         # <- AJUSTA
CROPS_DIR    = '/mnt/data/crops_yolo'

IMG_SIZE     = 224
BATCH_SIZE   = 16
EPOCHS       = 25
VAL_SPLIT    = 0.2
RANDOM_STATE = 42

os.makedirs(CROPS_DIR, exist_ok=True)
print('CSV:', DATA_CSV_PATH)
print('IMAGES_ROOT:', IMAGES_ROOT)
print('YOLO_WEIGHTS:', YOLO_WEIGHTS)
print('CROPS_DIR:', CROPS_DIR)


In [None]:
# =====================
# Parsing score y motivo
# =====================
def parse_score_field(x):
    if pd.isna(x):
        return np.nan
    try:
        data = json.loads(x)
        if isinstance(data, list) and len(data) > 0 and 'number' in data[0]:
            return float(data[0]['number'])
    except Exception:
        pass
    try:
        data = ast.literal_eval(x)
        if isinstance(data, list) and len(data) > 0 and 'number' in data[0]:
            return float(data[0]['number'])
    except Exception:
        pass
    return np.nan

def parse_motivo_to_flags(motivo):
    flags = {'pan_quemado':0, 'falta_ingrediente':0, 'desordenado':0, 'buen_balance_visual':0}
    if pd.isna(motivo):
        return flags
    text = str(motivo)
    choices = None
    if '"choices"' in text or "'choices'" in text:
        try:
            obj = json.loads(text)
        except Exception:
            try:
                obj = ast.literal_eval(text)
            except Exception:
                obj = None
        if isinstance(obj, dict) and 'choices' in obj:
            choices = obj['choices']

    def set_from_string(s):
        s = s.lower()
        if 'pan quemado' in s:
            flags['pan_quemado']=1
        if 'falta de ingrediente' in s:
            flags['falta_ingrediente']=1
        if 'desordenado' in s:
            flags['desordenado']=1
        if 'buen balance visual' in s:
            flags['buen_balance_visual']=1

    if choices is None:
        set_from_string(text)
    else:
        for c in choices:
            set_from_string(str(c))
    return flags


In [None]:
# =====================
# YOLOv8 → crop con cache
# =====================
detector = YOLO(YOLO_WEIGHTS)

def yolo_crop_to_file(img_path, save_dir=CROPS_DIR, conf=0.25):
    base = os.path.basename(img_path)
    crop_path = os.path.join(save_dir, base)
    if os.path.exists(crop_path):
        return crop_path
    results = detector(img_path, conf=conf, verbose=False)
    boxes = results[0].boxes.xyxy.cpu().numpy().astype(int) if len(results)>0 else np.array([])
    if boxes.shape[0]==0:
        return None
    x1,y1,x2,y2 = boxes[0]
    img = cv2.imread(img_path)
    if img is None:
        return None
    h,w = img.shape[:2]
    x1,y1 = max(0,x1), max(0,y1)
    x2,y2 = min(w,x2), min(h,y2)
    crop = img[y1:y2, x1:x2]
    if crop.size==0:
        return None
    cv2.imwrite(crop_path, crop)
    return crop_path


In [None]:
# =====================
# Construcción del dataframe con crops
# =====================
df = pd.read_csv(DATA_CSV_PATH)
df['score_clean'] = df['score'].apply(parse_score_field)
flags = df['motivo'].apply(parse_motivo_to_flags)
for k in ['pan_quemado','falta_ingrediente','desordenado','buen_balance_visual']:
    df[k] = flags.apply(lambda d: d[k])

def resolve_original_path(p):
    if pd.isna(p):
        return None
    base = os.path.basename(str(p))
    return os.path.join(IMAGES_ROOT, base)

df['orig_path'] = df['data'].apply(resolve_original_path)

crop_paths = []
missing = 0
for p in df['orig_path'].tolist():
    if p is None or not os.path.exists(p):
        crop_paths.append(None)
        missing += 1
        continue
    cp = yolo_crop_to_file(p, save_dir=CROPS_DIR, conf=0.25)
    crop_paths.append(cp)

df['crop_path'] = crop_paths
df = df.dropna(subset=['score_clean','crop_path']).reset_index(drop=True)
df = df[df['crop_path'].apply(lambda p: os.path.exists(p))].reset_index(drop=True)

print('Total con crop válido:', len(df))
if missing>0:
    print(f'Advertencia: {missing} imágenes originales no se encontraron en IMAGES_ROOT.')
df.head(5)


In [None]:
# =====================
# tf.data (crops + flags)
# =====================
train_df, val_df = train_test_split(
    df, test_size=VAL_SPLIT, random_state=RANDOM_STATE, shuffle=True
)

def load_and_resize(img_path):
    img = tf.io.read_file(img_path)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.resize(img, (IMG_SIZE, IMG_SIZE))
    img = tf.cast(img, tf.float32)/255.0
    return img

def augment(img):
    img = tf.image.random_flip_left_right(img)
    img = tf.image.random_brightness(img, max_delta=0.1)
    img = tf.image.random_contrast(img, 0.9, 1.1)
    return img

BIN_COLS = ['pan_quemado','falta_ingrediente','desordenado','buen_balance_visual']

def make_ds(frame, bs, training=True):
    img_paths = frame['crop_path'].values
    y = (frame['score_clean'].values/100.0).astype(np.float32)
    xbin = frame[BIN_COLS].values.astype(np.float32)
    ds_img = tf.data.Dataset.from_tensor_slices(img_paths).map(load_and_resize, num_parallel_calls=tf.data.AUTOTUNE)
    if training:
        ds_img = ds_img.map(augment, num_parallel_calls=tf.data.AUTOTUNE)
    ds_bin = tf.data.Dataset.from_tensor_slices(xbin)
    ds_y   = tf.data.Dataset.from_tensor_slices(y)
    ds = tf.data.Dataset.zip(((ds_img, ds_bin), ds_y))
    if training:
        ds = ds.shuffle(buffer_size=len(frame), reshuffle_each_iteration=True)
    return ds.batch(bs).prefetch(tf.data.AUTOTUNE)

train_ds = make_ds(train_df, BATCH_SIZE, training=True)
val_ds   = make_ds(val_df,   BATCH_SIZE, training=False)


In [None]:
# =====================
# Modelo multi‑entrada
# =====================
inp_img = layers.Input(shape=(IMG_SIZE, IMG_SIZE, 3), name='img')
backbone = MobileNetV3Small(include_top=False, weights='imagenet', input_tensor=inp_img)
backbone.trainable = False
x1 = layers.GlobalAveragePooling2D()(backbone.output)
x1 = layers.Dense(128, activation='relu')(x1)

inp_bin = layers.Input(shape=(4,), name='bin')
x2 = layers.Dense(16, activation='relu')(inp_bin)

x  = layers.Concatenate()([x1, x2])
x  = layers.Dense(64, activation='relu')(x)
x  = layers.Dropout(0.3)(x)
out = layers.Dense(1, activation='linear', name='score')(x)

model = models.Model(inputs=[inp_img, inp_bin], outputs=out)
model.compile(optimizer=tf.keras.optimizers.Adam(1e-3), loss='mse', metrics=['mae'])
model.summary()


In [None]:
# =====================
# Entrenamiento Fase 1
# =====================
callbacks = [
    tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True),
    tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=2, min_lr=1e-6, verbose=1)
]

history1 = model.fit(train_ds, validation_data=val_ds, epochs=EPOCHS, callbacks=callbacks)

plt.figure(); plt.plot(history1.history['loss'], label='train'); plt.plot(history1.history['val_loss'], label='val'); plt.title('MSE'); plt.legend(); plt.show()
plt.figure(); plt.plot(history1.history['mae'], label='train'); plt.plot(history1.history['val_mae'], label='val'); plt.title('MAE'); plt.legend(); plt.show()


In [None]:
# =====================
# Fine‑tuning
# =====================
N = 20
for layer in backbone.layers[-N:]:
    layer.trainable = True
model.compile(optimizer=tf.keras.optimizers.Adam(1e-5), loss='mse', metrics=['mae'])
history2 = model.fit(train_ds, validation_data=val_ds, epochs=10, callbacks=callbacks)

plt.figure(); plt.plot(history2.history['loss'], label='train_ft'); plt.plot(history2.history['val_loss'], label='val_ft'); plt.title('MSE Fine‑tuning'); plt.legend(); plt.show()
plt.figure(); plt.plot(history2.history['mae'], label='train_ft'); plt.plot(history2.history['val_mae'], label='val_ft'); plt.title('MAE Fine‑tuning'); plt.legend(); plt.show()


In [None]:
# =====================
# Guardar modelo y TFLite
# =====================
OUTPUT_DIR = '/mnt/data'
os.makedirs(OUTPUT_DIR, exist_ok=True)
keras_path  = os.path.join(OUTPUT_DIR, 'shefu_multiinput_yolo.h5')
tflite_path = os.path.join(OUTPUT_DIR, 'shefu_multiinput_yolo.tflite')
model.save(keras_path)
print('Guardado Keras:', keras_path)

converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model = converter.convert()
with open(tflite_path, 'wb') as f:
    f.write(tflite_model)
print('Guardado TFLite:', tflite_path)


In [None]:
# =====================
# Inferencia end‑to‑end (YOLO + crop + regresor)
# =====================
def infer_full_image(image_path, flags_vec, conf=0.25):
    cp = yolo_crop_to_file(image_path, save_dir=CROPS_DIR, conf=conf)
    if cp is None or not os.path.exists(cp):
        print('No se pudo obtener crop del completo.')
        return None
    img = tf.image.decode_jpeg(tf.io.read_file(cp), channels=3)
    img = tf.image.resize(img, (IMG_SIZE, IMG_SIZE))
    img = tf.cast(img, tf.float32)/255.0
    img = tf.expand_dims(img, 0)
    xbin = np.array(flags_vec, dtype=np.float32).reshape(1, -1)
    pred = model.predict([img, xbin], verbose=0)[0,0]
    score = float(np.clip(pred, 0, 1)*100.0)
    print(f'Nota estimada: {score:.1f}/100  (flags={flags_vec})')
    return score

# Ejemplo:
# infer_full_image('/mnt/data/images/ejemplo.jpg', [0,0,1,0])
