<a href="https://colab.research.google.com/github/aarongonzalezoxed/OTCC-CERTIFICACION-BECAR-1.0/blob/main/SCRIPT_CERTIFICATION_BECAR.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# OTCC CERTIFICATION BECAR – Integrador de reportes (CSV/XLSX) por Data Management

!pip -q install openpyxl

import re
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from openpyxl import load_workbook

# Colab helpers (descarga automática)
try:
    from google.colab import files
    IN_COLAB = True
except Exception:
    IN_COLAB = False

# -------------------------
# Config
# -------------------------
LEVELS_ORDER = ["A1", "A2", "B1", "B2", "C1", "C2"]
LEVEL_RANK = {lv: i for i, lv in enumerate(LEVELS_ORDER)}

# Nombres exactos esperados en fila 4 (headers)
REQ_HEADERS = {
    "user_id": "User ID",
    "first_name": "First name",
    "last_name": "Last name",
    "use": "Part 2- Score(%)",
    "reading": "Part 3- Score(%)",
    "listening": "Part 4- Score(%)",
    "writing": "Part 5- Score(%)",
}

# -------------------------
# Utilidades
# -------------------------
def clean_percent_to_float(x):
    """Convierte '77.22%' / '77,22' / 77.22 a float; si vacío -> NaN."""
    if x is None:
        return np.nan
    if isinstance(x, (int, float, np.integer, np.floating)):
        return float(x)
    s = str(x).strip()
    if s == "" or s.lower() in {"nan", "none", "null"}:
        return np.nan
    s = s.replace("%", "").strip()
    s = s.replace(",", ".")
    # Extraer primer número
    m = re.search(r"-?\d+(\.\d+)?", s)
    if not m:
        return np.nan
    try:
        return float(m.group(0))
    except:
        return np.nan

def parse_b1_title_and_level(b1_value):
    """
    B1 ejemplo: 'OTCC CERTIFICATION 4A - A1 (S0_0212)'
    Nombre prueba: antes del guión
    Nivel: después del guión (primer token tipo A1/A2/B1/B2/C1/C2)
    Ignora paréntesis y lo entre nombre y guión.
    """
    if b1_value is None:
        return ("UNKNOWN TEST", None)

    s = str(b1_value).strip()

    # Quitar lo entre paréntesis
    s_no_paren = re.sub(r"\(.*?\)", "", s).strip()

    # Separar por guión medio
    parts = [p.strip() for p in s_no_paren.split("-")]
    test_name = parts[0].strip() if parts else "UNKNOWN TEST"

    applied = None
    if len(parts) >= 2:
        right = parts[1]
        # Buscar nivel MCER
        m = re.search(r"\b(A1|A2|B1|B2|C1|C2)\b", right)
        if m:
            applied = m.group(1)

    return (test_name if test_name else "UNKNOWN TEST", applied)

# -------------------------
# FIX REAL: Passing Criteria (con decimales correctos)
# -------------------------
def passing_criteria(applied_level, score):
    """Columna J: Passing Criteria."""
    if applied_level not in LEVELS_ORDER or pd.isna(score):
        return ""

    s = float(score)

    if applied_level != "C2":
        # Rangos continuos para decimales:
        # 91-100, 81-<91, 70-<81, 60-<70, <60
        if 91 <= s <= 100: return "DISTINCTION"
        if 81 <= s < 91:   return "PROFICIENT"
        if 70 <= s < 81:   return "PASS"
        if 60 <= s < 70:   return "PASS 1LB"
        return "PARTICIPATION CERTIFICATE"

    # C2 (también continuo para decimales)
    if 95 <= s <= 100: return "DISTINCTION"
    if 89 <= s < 95:   return "PROFICIENT"
    if 80 <= s < 89:   return "PASS"
    if 70 <= s < 80:   return "PASS C1"
    if 60 <= s < 70:   return "PASS B2"
    return "PARTICIPATION CERTIFICATE"

def level_achieved(applied_level, score):
    """Columna I: Level Achieved."""
    if applied_level not in LEVELS_ORDER or pd.isna(score):
        return ""

    s = float(score)

    # Regla general: 0-59 => PARTICIPATION CERTIFICATE
    if 0 <= s < 60:
        return "PARTICIPATION CERTIFICATE"

    if applied_level != "C2":
        # 60-69 => un nivel inferior; excepto A1 => PARTICIPATION CERTIFICATE
        if 60 <= s < 70:
            if applied_level == "A1":
                return "PARTICIPATION CERTIFICATE"
            idx = LEVEL_RANK[applied_level]
            lower = LEVELS_ORDER[idx - 1] if idx - 1 >= 0 else "PARTICIPATION CERTIFICATE"
            return lower
        # >=70 => aplicado
        return applied_level

    # C2 específico
    if 60 <= s < 70:
        return "B2"
    if 70 <= s < 80:
        return "C1"
    # >=80 => C2
    return "C2"

def find_header_indices(header_row):
    """Devuelve dict: {header_name: col_index} para los headers requeridos."""
    name_to_idx = {}
    for idx, val in enumerate(header_row):
        if val is None:
            continue
        name_to_idx[str(val).strip()] = idx

    out = {}
    for k, hname in REQ_HEADERS.items():
        if hname not in name_to_idx:
            raise ValueError(f"No encontré la columna requerida en fila 4: '{hname}'")
        out[k] = name_to_idx[hname]
    return out

# -------------------------
# Lectura por archivo
# -------------------------
def process_xlsx(path):
    wb = load_workbook(path, data_only=True)
    ws = wb.active

    b1 = ws["B1"].value
    test_name, applied_level = parse_b1_title_and_level(b1)

    # Fila 4 (headers)
    header_row = [ws.cell(row=4, column=c).value for c in range(1, ws.max_column + 1)]
    idx = find_header_indices(header_row)

    # Datos desde fila 5
    rows = []
    for r in range(5, ws.max_row + 1):
        user_id = ws.cell(row=r, column=idx["user_id"] + 1).value
        first = ws.cell(row=r, column=idx["first_name"] + 1).value
        last = ws.cell(row=r, column=idx["last_name"] + 1).value

        if user_id is None or str(user_id).strip() == "":
            continue

        use = clean_percent_to_float(ws.cell(row=r, column=idx["use"] + 1).value)
        reading = clean_percent_to_float(ws.cell(row=r, column=idx["reading"] + 1).value)
        listening = clean_percent_to_float(ws.cell(row=r, column=idx["listening"] + 1).value)
        writing = clean_percent_to_float(ws.cell(row=r, column=idx["writing"] + 1).value)

        rows.append({
            "User ID": str(user_id).strip(),
            "Full name": f"{str(first).strip() if first is not None else ''} {str(last).strip() if last is not None else ''}".strip(),
            "Applied level": applied_level if applied_level else "",
            "Use of English": use,
            "Reading": reading,
            "Listening": listening,
            "Writing": writing,
            "_test_name": test_name
        })

    return pd.DataFrame(rows)

def process_csv(path):
    raw = pd.read_csv(path, header=None, dtype=str, keep_default_na=False, encoding="utf-8-sig")

    b1 = raw.iat[0, 1] if raw.shape[0] > 0 and raw.shape[1] > 1 else None
    test_name, applied_level = parse_b1_title_and_level(b1)

    if raw.shape[0] < 4:
        raise ValueError("CSV demasiado corto: no existe la fila 4 de headers.")
    headers = raw.iloc[3].tolist()

    idx = find_header_indices(headers)

    data = raw.iloc[4:].copy()
    rows = []
    for _, row in data.iterrows():
        user_id = row.iloc[idx["user_id"]] if idx["user_id"] < len(row) else ""
        if user_id is None or str(user_id).strip() == "":
            continue

        first = row.iloc[idx["first_name"]] if idx["first_name"] < len(row) else ""
        last = row.iloc[idx["last_name"]] if idx["last_name"] < len(row) else ""

        use = clean_percent_to_float(row.iloc[idx["use"]] if idx["use"] < len(row) else None)
        reading = clean_percent_to_float(row.iloc[idx["reading"]] if idx["reading"] < len(row) else None)
        listening = clean_percent_to_float(row.iloc[idx["listening"]] if idx["listening"] < len(row) else None)
        writing = clean_percent_to_float(row.iloc[idx["writing"]] if idx["writing"] < len(row) else None)

        rows.append({
            "User ID": str(user_id).strip(),
            "Full name": f"{str(first).strip()} {str(last).strip()}".strip(),
            "Applied level": applied_level if applied_level else "",
            "Use of English": use,
            "Reading": reading,
            "Listening": listening,
            "Writing": writing,
            "_test_name": test_name
        })

    return pd.DataFrame(rows)

def process_file(path):
    lower = path.lower()
    if lower.endswith(".xlsx") or lower.endswith(".xlsm") or lower.endswith(".xltx") or lower.endswith(".xltm"):
        return process_xlsx(path)
    elif lower.endswith(".csv"):
        return process_csv(path)
    else:
        raise ValueError(f"Extensión no soportada: {path}")

# -------------------------
# Carga de archivos en Colab
# -------------------------
print("Sube tus archivos (CSV/XLSX). Puedes seleccionar varios a la vez.")
uploaded = files.upload() if IN_COLAB else None

if IN_COLAB:
    input_paths = list(uploaded.keys())
else:
    raise RuntimeError("Este script está pensado para ejecutarse en Google Colab.")

# -------------------------
# Integración
# -------------------------
all_frames = []
test_names = []

for p in input_paths:
    df_part = process_file(p)
    if not df_part.empty:
        all_frames.append(df_part)
        test_names.extend(df_part["_test_name"].dropna().unique().tolist())

if not all_frames:
    raise ValueError("No se encontró data válida en los archivos subidos.")

df = pd.concat(all_frames, ignore_index=True)

# Calcular Score Obtained numérico
df["Score Obtained (num)"] = df[["Use of English", "Reading", "Listening", "Writing"]].mean(axis=1, skipna=False)

# Formato con % (columna H)
df["Score Obtained"] = df["Score Obtained (num)"].map(lambda x: "" if pd.isna(x) else f"{x:.2f}%")

# Passing criteria (J) - FIX aplicado
df["Passing Criteria"] = df.apply(lambda r: passing_criteria(r["Applied level"], r["Score Obtained (num)"]), axis=1)

# Level achieved (I)
df["Level Achieved"] = df.apply(lambda r: level_achieved(r["Applied level"], r["Score Obtained (num)"]), axis=1)

# Orden por Applied level A1->C2
df["_level_rank"] = df["Applied level"].map(lambda x: LEVEL_RANK.get(x, 999))
df = df.sort_values(by=["_level_rank", "User ID"], ascending=[True, True]).reset_index(drop=True)

# Selección final de columnas y orden
final_cols = [
    "User ID",
    "Full name",
    "Applied level",
    "Use of English",
    "Reading",
    "Listening",
    "Writing",
    "Score Obtained",
    "Level Achieved",
    "Passing Criteria",
]
out_df = df[final_cols].copy()

# Convertir skills a formato con % (2 decimales)
for col in ["Use of English", "Reading", "Listening", "Writing"]:
    out_df[col] = out_df[col].map(lambda x: "" if pd.isna(x) else f"{float(x):.2f}%")

# -------------------------
# Nombre de la prueba para títulos/archivos
# -------------------------
test_names = [t for t in test_names if isinstance(t, str) and t.strip() != ""]
test_name_main = test_names[0].strip() if len(set(test_names)) == 1 and test_names else (
    test_names[0].strip() if test_names else "INTEGRATED REPORT"
)
if len(set(test_names)) > 1:
    test_name_main = "MULTIPLE TESTS"

def safe_filename(s):
    s = re.sub(r"[\\/:*?\"<>|]+", "_", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s

base_name = safe_filename(test_name_main)

# -------------------------
# Export CSV final (NOMBRE CORRECTO)
# -------------------------
output_csv = f"{base_name} - Integrated Report.csv"
out_df.to_csv(output_csv, index=False, encoding="utf-8-sig")
print(f"CSV generado: {output_csv}")

# -------------------------
# Gráfico 1
# -------------------------
skill_order = ["Listening", "Reading", "Vocabulary", "Use of English"]  # (no usamos Vocabulary)
# En tu requerimiento final son 4: Use of English, Reading, Listening, Writing
skill_order = ["Listening", "Reading", "Use of English", "Writing"]

skill_means = [df[s].mean() for s in skill_order]
overall_mean = df["Score Obtained (num)"].mean()

vals = skill_means + [overall_mean]
labels = skill_order + ["Overall"]

fig1, ax1 = plt.subplots(figsize=(11, 4.5))
cmap = plt.cm.Blues
colors = [cmap(0.90), cmap(0.80), cmap(0.70), cmap(0.60), cmap(0.45)]
bars = ax1.bar(labels, vals, color=colors, edgecolor="none")

title1 = f"{test_name_main} - Average per skill and overall group average"
ax1.set_title(title1, fontsize=13)
ax1.set_ylim(0, 100)
ax1.set_ylabel("Average (%)")

for b, v in zip(bars, vals):
    ax1.text(b.get_x() + b.get_width()/2, v + 1.0, f"{v:.2f}%", ha="center", va="bottom", fontsize=10)

ax1.grid(axis="y", alpha=0.25)
plt.tight_layout()

output_png1 = f"{safe_filename(title1)}.png"
plt.savefig(output_png1, dpi=200)
plt.close(fig1)
print(f"Gráfico 1 generado: {output_png1}")

# -------------------------
# Gráfico 2
# -------------------------
counts = df["Applied level"].value_counts().reindex(LEVELS_ORDER).dropna()
counts = counts[counts > 0]
total = int(counts.sum())

fig2, ax2 = plt.subplots(figsize=(9, 6.2))  # un poco más grande para que se aprecie mejor

labels2 = counts.index.tolist()
sizes = counts.values.astype(float)

colors2 = [plt.cm.Blues(0.30 + 0.60*(i/max(1, len(labels2)-1))) for i in range(len(labels2))]

wedges, texts, autotexts = ax2.pie(
    sizes,
    labels=labels2,
    colors=colors2,
    startangle=90,
    counterclock=False,
    wedgeprops=dict(width=0.38, edgecolor="white"),
    labeldistance=1.12,
    autopct=lambda p: f"{p:.1f}%",
    pctdistance=0.76
)

# Mejorar legibilidad de textos
for t in texts:
    t.set_fontsize(11)
for t in autotexts:
    t.set_fontsize(10)

ax2.text(0, 0, f"Total\n{total}", ha="center", va="center", fontsize=13)

title2 = f"{test_name_main} - Distribution by English language level"
ax2.set_title(title2, fontsize=13)

lines = [f"{lv} = {int(counts[lv])} people at this level" for lv in labels2]
box_text = "\n".join(lines)
ax2.text(
    1.10, 0.10, box_text,
    transform=ax2.transAxes,
    ha="left", va="bottom",
    fontsize=10,
    bbox=dict(boxstyle="round,pad=0.4", fc="white", ec="lightgray", alpha=0.97)
)

ax2.set_aspect("equal")
plt.tight_layout()

output_png2 = f"{safe_filename(title2)}.png"
plt.savefig(output_png2, dpi=300)  # DPI más alto para que “se aprecie bien”
plt.close(fig2)
print(f"Gráfico 2 generado: {output_png2}")

# -------------------------
# Descargas automáticas (Colab)
# -------------------------
if IN_COLAB:
    files.download(output_csv)
    files.download(output_png1)
    files.download(output_png2)

print("\n✅ Listo equipo OPS: CSV integrado + 2 PNG generados y descargados.")
