# 02 · Preprocesamiento — AVSI
**Artificial Vision Stacking Inspection** · *2025-10-22*

Este notebook implementa el **pipeline de preprocesamiento** para AVSI:
- Limpieza y verificación de imágenes.
- Redimensionamiento y normalización.
- Data augmentation (rotaciones, flips, jitter, brillo/contraste).
- División estratificada **train/val/test**.
- Exportación a `data/processed/` con `labels.csv`.


## 1. Configuración

In [None]:

from pathlib import Path
import os, random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import cv2
from PIL import Image
from sklearn.model_selection import train_test_split

plt.rcParams['figure.dpi'] = 120
random.seed(42)
np.random.seed(42)

ROOT = Path('.').resolve()
DATA_RAW = ROOT / 'data' / 'raw'
DATA_PROC = ROOT / 'data' / 'processed'

DS_SMALL = DATA_RAW / 'dataset_100'
DS_LARGE = DATA_RAW / 'dataset_1000'

EXPECTED_CLASSES = ['good_stack', 'bad_stack']  # ajustar según el proyecto

IMG_SIZE = (224, 224)          
AUG_PER_IMAGE = 1              
TEST_SIZE = 0.15
VAL_SIZE = 0.15                
MIN_WIDTH = 32                 
MIN_HEIGHT = 32

print('RAW:', DATA_RAW)
print('PROC:', DATA_PROC)


## 2. Funciones auxiliares

In [None]:

IMG_EXTS = {'.jpg', '.jpeg', '.png', '.bmp', '.tif', '.tiff'}

def list_images(folder: Path):
    if not folder.exists():
        return []
    return [p for p in folder.rglob('*') if p.suffix.lower() in IMG_EXTS]

def read_image(path: Path):
    img = cv2.imread(str(path))
    return img

def is_valid(img):
    if img is None:
        return False
    h, w = img.shape[:2]
    return (w >= MIN_WIDTH) and (h >= MIN_HEIGHT)

def resize_normalize(img, size=(224,224)):
    return cv2.resize(img, size, interpolation=cv2.INTER_AREA)

def to_rgb(img):
    if len(img.shape) == 2:
        return cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
    return img

def aug_flip(img):
    return cv2.flip(img, 1)

def aug_rotate(img, angle=10):
    h, w = img.shape[:2]
    M = cv2.getRotationMatrix2D((w/2, h/2), angle, 1.0)
    return cv2.warpAffine(img, M, (w, h), borderMode=cv2.BORDER_REFLECT)

def aug_jitter(img, alpha=10, beta=10):
    out = cv2.convertScaleAbs(img, alpha=1 + (alpha/255.0)*np.random.uniform(-1,1),
                                   beta=np.random.uniform(-beta, beta))
    return out

def make_augs(img):
    ops = [lambda x: x, aug_flip, aug_rotate, aug_jitter]
    out = [img]
    for _ in range(AUG_PER_IMAGE):
        op = random.choice(ops[1:])
        if op == aug_rotate:
            out.append(op(img, angle=random.choice([-12,-8,-5,5,8,12])))
        else:
            out.append(op(img))
    return out

def ensure_dir(path: Path):
    path.mkdir(parents=True, exist_ok=True)


## 3. Inventario y selección de origen

In [None]:

count_small = len(list_images(DS_SMALL))
count_large = len(list_images(DS_LARGE))
print('Imágenes dataset_100:', count_small)
print('Imágenes dataset_1000:', count_large)

SOURCE_BASE = DS_LARGE if count_large > 0 else DS_SMALL
print('Origen seleccionado:', SOURCE_BASE if SOURCE_BASE.exists() else '(no encontrado)')


## 4. Catálogo de imágenes y etiquetas

In [None]:

def build_catalog(base: Path):
    rows = []
    for p in list_images(base):
        label = p.parent.name if p.parent.name in EXPECTED_CLASSES else 'unknown'
        rows.append({'path': str(p), 'fname': p.name, 'label': label})
    return pd.DataFrame(rows)

df = build_catalog(SOURCE_BASE) if SOURCE_BASE.exists() else pd.DataFrame()
print('Total imágenes:', len(df))
display(df.head())
print(df['label'].value_counts(dropna=False))


## 5. Procesamiento, augmentations y preparación

In [None]:

TRAIN_DIR = DATA_PROC / 'train'
VAL_DIR = DATA_PROC / 'val'
TEST_DIR = DATA_PROC / 'test'
for d in [TRAIN_DIR, VAL_DIR, TEST_DIR]:
    ensure_dir(d)

labels_unique = sorted(df['label'].unique().tolist()) if not df.empty else []
for d in [TRAIN_DIR, VAL_DIR, TEST_DIR]:
    for lbl in labels_unique:
        ensure_dir(d / lbl)

valid_rows = []
for _, row in df.iterrows():
    img = read_image(Path(row['path']))
    if not is_valid(img):
        continue
    img = to_rgb(img)
    img = resize_normalize(img, IMG_SIZE)
    for k, im in enumerate(make_augs(img)):
        valid_rows.append({'label': row['label'], 'img': im, 'src': row['path'], 'aug_idx': k})

print('Total procesadas (incluye augmentations):', len(valid_rows))


## 6. Split estratificado y guardado

In [None]:

if len(valid_rows) == 0:
    print('[Aviso] No hay imágenes válidas. Revisa data/raw/.')
else:
    df_split = pd.DataFrame([{'label': r['label']} for r in valid_rows])
    idx_all = np.arange(len(df_split))

    # Split test
    trainval_idx, test_idx = train_test_split(idx_all, test_size=TEST_SIZE, stratify=df_split['label'], random_state=42)
    # Split val
    df_trainval = df_split.iloc[trainval_idx]
    train_idx, val_idx = train_test_split(trainval_idx, test_size=VAL_SIZE, stratify=df_trainval['label'], random_state=42)

    print('Split -> train:', len(train_idx), 'val:', len(val_idx), 'test:', len(test_idx))

    records = []
    def save_set(indices, base_dir: Path):
        for j in indices:
            r = valid_rows[j]
            label = r['label']
            fname = f"{Path(r['src']).stem}_a{r['aug_idx']}.jpg"
            out_path = base_dir / label / fname
            cv2.imwrite(str(out_path), r['img'])
            records.append({'split': base_dir.name, 'path': str(out_path), 'label': label, 'fname': fname, 'source': r['src'], 'aug_idx': r['aug_idx']})

    save_set(train_idx, TRAIN_DIR)
    save_set(val_idx, VAL_DIR)
    save_set(test_idx, TEST_DIR)

    df_meta = pd.DataFrame(records)
    df_meta.to_csv(DATA_PROC / 'labels.csv', index=False)
    display(df_meta.head())
    print('Guardado en:', DATA_PROC)


## 7. Verificación visual simple

In [None]:

if 'df_meta' in locals() and not df_meta.empty:
    counts = df_meta.groupby(['split','label']).size().reset_index(name='n')
    plt.figure()
    for sp in counts['split'].unique():
        total = counts[counts['split']==sp]['n'].sum()
        plt.bar(sp, total)
    plt.title('Conteo por split')
    plt.xlabel('Split')
    plt.ylabel('Imágenes')
    plt.show()

    cls_counts = df_meta.groupby(['label']).size().reset_index(name='n')
    plt.figure()
    for _, row in cls_counts.iterrows():
        plt.bar(row['label'], row['n'])
    plt.title('Conteo por clase (total)')
    plt.xlabel('Clase')
    plt.ylabel('Imágenes')
    plt.show()
else:
    print('[Aviso] No hay df_meta para graficar.')


## 8. Siguientes pasos
- Ajusta `EXPECTED_CLASSES` si tus carpetas de clase difieren.
- Modifica `IMG_SIZE`, `AUG_PER_IMAGE`, `TEST_SIZE` y `VAL_SIZE` según rendimiento.
- Continua con **03_modelado.ipynb** (carga desde `data/processed/`).