In [51]:
# --- CELDA 1: RUTAS Y LIBRERÍAS ---

from pathlib import Path
import pandas as pd, numpy as np, re

# Busca la raíz del proyecto (para poder ejecutar desde /notebooks)
def find_root(start=None, max_up=6):
    p = Path.cwd() if start is None else Path(start)
    for _ in range(max_up):
        if (p / "data" / "raw").exists():
            return p
        p = p.parent
    raise FileNotFoundError("No se encontró la carpeta 'data/raw'.")

ROOT      = find_root()
DATA_RAW  = ROOT / "data" / "raw"
DATA_PROC = ROOT / "data" / "processed"
REPORTS   = ROOT / "reports" / "tables"
for d in (DATA_PROC, REPORTS): d.mkdir(parents=True, exist_ok=True)

print("Rutas configuradas correctamente ✅")


Rutas configuradas correctamente ✅


In [53]:
# --- CELDA 2: CARGA DEL EXCEL PERÚ ---

EXCEL_FILE = DATA_RAW / "PERU_PISA2022.xlsx"
assert EXCEL_FILE.exists(), f"No se encontró {EXCEL_FILE}"

per = pd.read_excel(EXCEL_FILE, engine="openpyxl")
per.columns = [c.upper().strip() for c in per.columns]
print("Archivo cargado:", EXCEL_FILE.name)
print("Shape:", per.shape)


Archivo cargado: PERU_PISA2022.xlsx
Shape: (6968, 1278)


In [54]:
# --- CELDA 3: DETECTAR COLUMNAS PV DE MATH ---

def detect_pv_cols(columns, domain="MATH", k=10):
    pat = re.compile(rf"PV(\d+){domain}$", re.IGNORECASE)
    found = [(int(pat.match(c).group(1)), c) for c in columns if pat.match(c)]
    if found:
        found.sort()
        return [c for _, c in found]
    return sorted([c for c in columns if c.startswith("PV") and domain in c])

pv_math = detect_pv_cols(per.columns, "MATH", 10)
assert len(pv_math) >= 5, f"No se detectaron suficientes PVs de MATH. Detectados: {pv_math}"
print("PVs detectados:", pv_math[:3], "...")


PVs detectados: ['PV1MATH', 'PV2MATH', 'PV3MATH'] ...


In [55]:
# --- CELDA 4: PROMEDIO DE PVs Y ETIQUETADO DE NIVELES ---

# 1. Promedio de los 10 plausible values (exactamente como hizo el paper)
per["MATH_MEAN"] = per[pv_math].mean(axis=1)

# 2. Umbrales oficiales PISA 2022 (Matemáticas)
THR = [357.77, 420.07, 482.38, 544.68, 607.03, 669.30]

def score_to_level(s):
    if pd.isna(s): return np.nan
    if s < THR[0]: return 0
    if s < THR[1]: return 1
    if s < THR[2]: return 2
    if s < THR[3]: return 3
    if s < THR[4]: return 4
    if s < THR[5]: return 5
    return 6

def level7_to_3(l):
    if pd.isna(l): return np.nan
    if l in {0,1,2}: return "LOW"
    if l in {3,4}:   return "MED"
    if l in {5,6}:   return "HIGH"
    return np.nan

# 3. Asignar nivel 0–6 y luego Low/Med/High
per["MATH_LEVEL_0_6"]  = per["MATH_MEAN"].apply(score_to_level)
per["MATH_LEVEL_3CAT"] = per["MATH_LEVEL_0_6"].apply(level7_to_3)

# 4. Guardar el resultado como Excel
y_out = per[["MATH_MEAN","MATH_LEVEL_0_6","MATH_LEVEL_3CAT"]].copy()
y_out.to_excel(DATA_PROC / "y_3niveles.xlsx", index=False)

print("✅ Archivo guardado:", DATA_PROC / "y_3niveles.xlsx")
print("\nDistribución de categorías:")
print(y_out["MATH_LEVEL_3CAT"].value_counts(dropna=False))


✅ Archivo guardado: c:\Users\Gerson\Downloads\PISA-ML\data\processed\y_3niveles.xlsx

Distribución de categorías:
MATH_LEVEL_3CAT
LOW     6076
MED      866
HIGH      26
Name: count, dtype: int64


In [56]:
FEATURES_PAPER = [
    # Tabla 5
    "ST001D01T","ST004D01T","ST250Q01JA","ST250Q02JA","ST250Q03JA","ST250Q04JA",
    "ST250Q05JA","ST251Q01JA","ST251Q02JA","ST251Q03JA",
    "ST251Q04JA","ST251Q06JA","ST251Q07JA","ST253Q01JA",
    "ST254Q01JA","ST254Q02JA","ST254Q03JA","ST254Q04JA","ST254Q05JA","ST254Q06JA",
    "ST255Q01JA","ST230Q01JA","ST259Q01JA","ST019AQ01T","ST022Q01TA","ST226Q01JA",
    "ST125Q01NA","ST062Q01TA","ST062Q02TA","ST062Q03TA",
    # Tabla 6
    "ST038Q03NA","ST038Q04NA","ST038Q05NA","ST038Q06NA","ST038Q07NA","ST038Q08NA",
    "ST038Q09JA","ST038Q10JA","ST038Q11JA",
    "ST294Q01JA","ST294Q02JA","ST294Q03JA","ST294Q04JA","ST294Q05JA",
    "ST295Q01JA","ST295Q02JA","ST295Q03JA","ST295Q04JA","ST295Q05JA",
    "ST016Q01NA","ST297Q01JA","ST297Q03JA","ST297Q05JA","ST297Q07JA","ST297Q09JA",
    "REPEAT","LANGN"
]
feat_avail = [c for c in FEATURES_PAPER if c in per.columns]
X0 = per[feat_avail].copy()
print("Variables seleccionadas:", len(X0.columns))


Variables seleccionadas: 57


In [57]:
# --- CELDA 6: LIMPIEZA BÁSICA ---
na_frac_col = X0.isna().mean()
cols_keep = [c for c in X0.columns if na_frac_col[c] < 0.95]
X1 = X0[cols_keep].copy()

nunique_eff = X1.nunique(dropna=True)
cols_keep2 = [c for c in X1.columns if nunique_eff[c] > 1]
X2 = X1[cols_keep2].copy()

row_na_frac = X2.isna().mean(axis=1)
mask_rows = row_na_frac < 0.95
X3 = X2.loc[mask_rows].reset_index(drop=True)

y_aligned = per.loc[mask_rows, ["MATH_MEAN","MATH_LEVEL_0_6","MATH_LEVEL_3CAT"]].reset_index(drop=True)
print("Después de limpieza → X3:", X3.shape, " y:", y_aligned.shape)


Después de limpieza → X3: (6968, 57)  y: (6968, 3)


In [58]:
# --- CELDA 7: TIPOS Y CODIFICACIÓN ---

import numpy as np, pandas as pd

# Reglas: Binarias = ST004, ST250, ST297, REPEAT | Numéricas = ST016, ST259 | Resto = categóricas
BIN_VARS = [c for c in X3.columns if c.startswith(("ST004","ST250","ST297"))] + ([c for c in X3.columns if c == "REPEAT"])
NUM_VARS = [c for c in X3.columns if c.startswith(("ST016","ST259"))]
CAT_VARS = [c for c in X3.columns if c not in set(BIN_VARS + NUM_VARS)]

# Normalizar binarios a 0/1 (maneja 1/2 y 0/1; imputación a 0)
for c in BIN_VARS:
    s = pd.to_numeric(X3[c], errors="coerce")
    uniq = set(s.dropna().unique())
    if uniq <= {0,1}:
        X3[c] = s.fillna(0).astype("int8")
    elif uniq <= {1,2}:
        X3[c] = s.replace({1:1, 2:0}).fillna(0).astype("int8")  # 1 = sí/presente
    else:
        maxv = s.max(skipna=True)
        X3[c] = (s == maxv).fillna(0).astype("int8")

# Numéricas → media
if NUM_VARS:
    X3[NUM_VARS] = X3[NUM_VARS].apply(pd.to_numeric, errors="coerce")
    X3[NUM_VARS] = X3[NUM_VARS].fillna(X3[NUM_VARS].mean())

# Categóricas → one-hot
if CAT_VARS:
    X_cat = pd.get_dummies(X3[CAT_VARS].astype("string"), drop_first=True)
else:
    X_cat = pd.DataFrame(index=X3.index)

# Concatenar final
X_numbin = X3[NUM_VARS + BIN_VARS] if (NUM_VARS or BIN_VARS) else pd.DataFrame(index=X3.index)
X_full = pd.concat([X_numbin, X_cat], axis=1)

print(f"Tipos → bin: {len(BIN_VARS)}, num: {len(NUM_VARS)}, cat: {len(CAT_VARS)}")
print("X_full listo:", X_full.shape)


Tipos → bin: 12, num: 2, cat: 43
X_full listo: (6968, 183)


In [60]:
# --- CELDA 7.1: RECUENTO 0/1 EN BINARIAS (con ruta correcta) ---
from pathlib import Path
import pandas as pd

rep = []
for c in BIN_VARS:
    vc = X3[c].value_counts(dropna=False).to_dict()
    rep.append({"col": c, "zeros": vc.get(0,0), "ones": vc.get(1,0)})

bin_report = pd.DataFrame(rep).sort_values("col")

# como el notebook está en /notebooks, subimos un nivel:
OUT_DIR = Path("../data/processed")
OUT_DIR.mkdir(parents=True, exist_ok=True)

OUT_FILE = OUT_DIR / "binarios_recuento.xlsx"
bin_report.to_excel(OUT_FILE, index=False)

print("✅ Guardado en:", OUT_FILE.resolve())
bin_report.head()


✅ Guardado en: C:\Users\Gerson\Downloads\PISA-ML\data\processed\binarios_recuento.xlsx


Unnamed: 0,col,zeros,ones
11,REPEAT,6109,859
0,ST004D01T,3494,3474
1,ST250Q01JA,2456,4512
2,ST250Q02JA,2902,4066
3,ST250Q03JA,4442,2526


In [64]:
# --- CELDA 7.9: GUARDAR DATASET IMPUTADO Y CODIFICADO ---
from pathlib import Path
import pandas as pd

# Detectar raíz y carpeta /processed
def find_root(start=None, max_up=6):
    p = Path.cwd() if start is None else Path(start)
    for _ in range(max_up):
        if (p / "data").exists():
            return p
        p = p.parent
    raise FileNotFoundError("No encontré carpeta 'data' hacia arriba (buscando 'data').")

ROOT = find_root()
DATA_PROC = ROOT / "data" / "processed"
DATA_PROC.mkdir(parents=True, exist_ok=True)

# Unir X_full (ya imputado y codificado) con y
per_final = pd.concat([X_full, y_aligned], axis=1)

OUT_FILE = DATA_PROC / "PERU_PISA2022_FINAL_IMPUTADO.xlsx"
per_final.to_excel(OUT_FILE, index=False)

print(f"✅ Archivo completo guardado en: {OUT_FILE.name}")
print("Filas:", per_final.shape[0], "| Columnas:", per_final.shape[1])


✅ Archivo completo guardado en: PERU_PISA2022_FINAL_IMPUTADO.xlsx
Filas: 6968 | Columnas: 186


In [65]:
# --- CELDA 8: GUARDADOS ROBUSTOS (X por partes, y y diccionario de tipos) ---
from pathlib import Path
import math
import pandas as pd

def find_root(start=None, max_up=6):
    p = Path.cwd() if start is None else Path(start)
    for _ in range(max_up):
        if (p / "data").exists():
            return p
        p = p.parent
    raise FileNotFoundError("No encontré carpeta 'data' hacia arriba (buscando 'data').")

ROOT = find_root()
DATA_PROC = ROOT / "data" / "processed"
DATA_PROC.mkdir(parents=True, exist_ok=True)

# ---- Guardar X (particionando columnas si hiciera falta)
MAXC = 16000
parts = math.ceil(X_full.shape[1] / MAXC)
for i in range(parts):
    lo, hi = i*MAXC, min((i+1)*MAXC, X_full.shape[1])
    out_x = DATA_PROC / f"peru_X_full__part{i+1}.xlsx"
    X_full = X_full.astype(int)
    X_full.iloc[:, lo:hi].to_excel(out_x, sheet_name=f"X_{lo+1}-{hi}", index=False)
    print(f"✅ Guardé X columnas {lo+1}-{hi} en: {out_x.name}")

# ---- Guardar y alineado
out_y = DATA_PROC / "y_3niveles.xlsx"
y_aligned.to_excel(out_y, index=False)
print(f"✅ Guardé y en: {out_y.name}  (shape={y_aligned.shape})")

# ---- Guardar diccionario de tipos
dicc = pd.DataFrame({
    "col": list(X3.columns),
    "tipo": ["bin" if c in BIN_VARS else ("num" if c in NUM_VARS else "cat") for c in X3.columns]
}).sort_values("col")

out_dic = DATA_PROC / "diccionario_vars.xlsx"
dicc.to_excel(out_dic, index=False)
print(f"✅ Guardé diccionario en: {out_dic.name}")

print("\nResumen final:")
print("X_full:", X_full.shape)
print("bin:", len(BIN_VARS), "| num:", len(NUM_VARS), "| cat:", len(CAT_VARS))
print("Directorio de salida:", DATA_PROC.resolve())


✅ Guardé X columnas 1-183 en: peru_X_full__part1.xlsx
✅ Guardé y en: y_3niveles.xlsx  (shape=(6968, 3))
✅ Guardé diccionario en: diccionario_vars.xlsx

Resumen final:
X_full: (6968, 183)
bin: 12 | num: 2 | cat: 43
Directorio de salida: C:\Users\Gerson\Downloads\PISA-ML\data\processed
