<a href="https://colab.research.google.com/github/aarongonzalezoxed/OTCC-CERTIFICATION-ADULTOS-1.1/blob/main/LTE_OTCC_CERTIFICATION_ADULTOS_UA_14_ENE_26.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# OTCC CERTIFICATION — Integrador (5 skills) por Data Management

import io, re
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

AUTO_DOWNLOAD = True
try:
    from google.colab import files
except Exception:
    files = None

print("Sube los reportes de AssessmentQ (puedes seleccionar varios; .csv, .xlsx o .xls).")
if files:
    uploaded = files.upload()
else:
    raise RuntimeError("Ejecuta esto en Google Colab.")

# ----- MaxScores (CERTIFICATION) -----
max_scores_map = {
    ('A1','SPEAKING'): 100, ('A1','USE OF ENGLISH'): 400, ('A1','READING'): 200, ('A1','LISTENING'): 200, ('A1','WRITING'): 200,
    ('A2','SPEAKING'): 100, ('A2','USE OF ENGLISH'): 400, ('A2','READING'): 200, ('A2','LISTENING'): 200, ('A2','WRITING'): 200,
    ('B1','SPEAKING'): 100, ('B1','USE OF ENGLISH'): 400, ('B1','READING'): 280, ('B1','LISTENING'): 240, ('B1','WRITING'): 200,
    ('B2','SPEAKING'): 100, ('B2','USE OF ENGLISH'): 400, ('B2','READING'): 280, ('B2','LISTENING'): 240, ('B2','WRITING'): 200,
    ('C1','SPEAKING'): 100, ('C1','USE OF ENGLISH'): 400, ('C1','READING'): 200, ('C1','LISTENING'): 240, ('C1','WRITING'): 300,
    ('C2','SPEAKING'): 100, ('C2','USE OF ENGLISH'): 400, ('C2','READING'): 200, ('C2','LISTENING'): 320, ('C2','WRITING'): 300,
}

# ----- Utils -----
def read_any_table(fname: str, data: bytes) -> pd.DataFrame:
    lower = fname.lower()
    if lower.endswith((".xlsx", ".xls")):
        return pd.read_excel(io.BytesIO(data), header=None, dtype=str, engine="openpyxl")
    try:
        return pd.read_csv(io.BytesIO(data), header=None, sep=None, engine="python", encoding="utf-8", dtype=str)
    except Exception:
        return pd.read_csv(io.BytesIO(data), header=None, sep=None, engine="python", encoding="latin-1", dtype=str)

def extract_exam_label_B1(df_raw: pd.DataFrame) -> str:
    try:
        return str(df_raw.iat[0,1]).strip()
    except Exception:
        return ""

def parse_exam_fields(exam_label: str):
    """
    exam_label ejemplo: 'OTCC CERTIFICATION 4A - A1 (S0_0212)'
    - test_name: todo antes del guión
    - applied_level: token MCER después del guión
    """
    test_name = "UNKNOWN TEST"
    applied_level = None

    if exam_label:
        s = re.sub(r"\(.*?\)", "", exam_label).strip()
        parts = [p.strip() for p in s.split("-")]
        if parts and parts[0]:
            test_name = parts[0].strip()
        if len(parts) >= 2:
            m = re.search(r"\b(A1|A2|B1|B2|C1|C2)\b", parts[1].upper())
            if m:
                applied_level = m.group(1)

    return test_name, applied_level

def set_header_row4(df_raw: pd.DataFrame):
    if df_raw.shape[0] < 4:
        raise ValueError("El archivo no trae al menos 4 filas (cabecera en fila 4).")
    header = df_raw.iloc[3].astype(str).tolist()
    df = df_raw.iloc[4:].copy()
    df.columns = header
    return df

PARTS = {"SPEAKING": 2, "USE OF ENGLISH": 3, "READING": 4, "LISTENING": 5, "WRITING": 6}
COL_SCORE   = "Part {n}- Score"
COL_PERCENT = "Part {n}- Score(%)"

ID_COLS = {
    "user_id": ["User Id","User ID","UserId","Userid"],
    "first":   ["First Name","Firts Name","Firstname","First name"],
    "last":    ["Last Name","Lastname","Last name"],
}
def find_first(df_cols, cands):
    for c in cands:
        if c in df_cols: return c
    low = {c.lower(): c for c in df_cols}
    for c in cands:
        if c.lower() in low: return low[c.lower()]
    return None

def to_num(x):
    try:
        return float(str(x).replace('%','').replace(',','').strip())
    except:
        return np.nan

def pct2_or_na(x):
    if x is None or (isinstance(x,float) and np.isnan(x)): return "N/A"
    return f"{round(float(x),2):.2f}%"

def safe_file(name):
    return re.sub(r'[\\/*?:\"<>|]+','_', name).strip()

# Nombre de salida solicitado:
def unified_name(group_name_before_dash: str):
    base = safe_file(group_name_before_dash) if group_name_before_dash else "OTCC CERTIFICATION"
    return f"{base} - Integrated report.csv"

# ----- Reglas (decimales) -----
LEVELS_ORDER = ["A1","A2","B1","B2","C1","C2"]
LEVEL_RANK = {lv:i for i,lv in enumerate(LEVELS_ORDER)}

def passing_criteria(applied_level, score_num):
    if not applied_level or score_num is None or (isinstance(score_num,float) and np.isnan(score_num)):
        return "N/A"
    s = float(score_num)

    if applied_level != "C2":
        if 91 <= s <= 100: return "DISTINCTION"
        if 81 <= s < 91:   return "PROFICIENT"
        if 70 <= s < 81:   return "PASS"
        if 60 <= s < 70:   return "PASS 1LB"
        return "PARTICIPATION CERTIFICATE"

    if 95 <= s <= 100: return "DISTINCTION"
    if 89 <= s < 95:   return "PROFICIENT"
    if 80 <= s < 89:   return "PASS"
    if 70 <= s < 80:   return "PASS C1"
    if 60 <= s < 70:   return "PASS B2"
    return "PARTICIPATION CERTIFICATE"

def level_achieved(applied_level, score_num):
    if not applied_level or score_num is None or (isinstance(score_num,float) and np.isnan(score_num)):
        return "N/A"
    s = float(score_num)

    if s < 60:
        return "PARTICIPATION CERTIFICATE"

    if applied_level != "C2":
        if 60 <= s < 70:
            if applied_level == "A1":
                return "PARTICIPATION CERTIFICATE"
            return LEVELS_ORDER[LEVEL_RANK[applied_level] - 1]
        return applied_level

    if 60 <= s < 70:
        return "B2"
    if 70 <= s < 80:
        return "C1"
    return "C2"

# ----- Acumuladores -----
invalid_files, fallback_missing_max = [], []
rows_out = []
acc = {"SPEAKING": [], "USE OF ENGLISH": [], "READING": [], "LISTENING": [], "WRITING": [], "OVERALL": []}
level_counts_achieved = {}

#nombre tomado del primer archivo válido
group_name_main = None

# ----- Parseo -----
for fname, content in uploaded.items():
    try:
        raw = read_any_table(fname, content)
        exam_label = extract_exam_label_B1(raw)

        if not exam_label or not (exam_label.upper().startswith("OTCC CERTIFICATION") or exam_label.upper().startswith("OTCC KIDS CERTIFICATION")):
            invalid_files.append(fname); continue

        test_name_before_dash, applied_level_from_name = parse_exam_fields(exam_label)
        if group_name_main is None and test_name_before_dash:
            group_name_main = test_name_before_dash

        df = set_header_row4(raw)

        uid = find_first(df.columns, ID_COLS["user_id"])
        fn  = find_first(df.columns, ID_COLS["first"])
        ln  = find_first(df.columns, ID_COLS["last"])
        if not (uid and fn and ln):
            raise ValueError("Faltan columnas de identidad (User/First/Last).")

        outc = 0
        for _, r in df.iterrows():
            user_id = str(r.get(uid,"")).strip()
            first_name = str(r.get(fn,"")).strip()
            last_name = str(r.get(ln,"")).strip()

            if user_id == "" and first_name == "" and last_name == "":
                continue

            full_name = (first_name + " " + last_name).strip()

            # % por habilidad (calc con max scores; fallback a Score(%) si no hay)
            skill_calc = {}
            for skill, pn in PARTS.items():
                sc_col = COL_SCORE.format(n=pn)
                pc_col = COL_PERCENT.format(n=pn)
                raw_pts = to_num(r.get(sc_col, np.nan))
                raw_pct = to_num(r.get(pc_col, np.nan))

                calc = np.nan
                key = (applied_level_from_name, skill)
                if applied_level_from_name and key in max_scores_map and not np.isnan(raw_pts):
                    max_sc = max_scores_map[key]
                    if max_sc:
                        calc = raw_pts/max_sc*100.0

                if np.isnan(calc):
                    calc = raw_pct
                    if applied_level_from_name and key not in max_scores_map:
                        fallback_missing_max.append((fname, applied_level_from_name, skill))

                skill_calc[skill] = None if (isinstance(calc,float) and np.isnan(calc)) else float(round(calc,4))

            any_skill = any(v is not None for v in skill_calc.values())

            if not any_skill:
                listening = reading = uoe = writing = speaking = "N/A"
                overall_num = None
                overall_str = "N/A"
                lvl_ach = "N/A"
                passc = "N/A"
            else:
                listening = pct2_or_na(skill_calc.get("LISTENING"))
                reading   = pct2_or_na(skill_calc.get("READING"))
                uoe       = pct2_or_na(skill_calc.get("USE OF ENGLISH"))
                writing   = pct2_or_na(skill_calc.get("WRITING"))
                speaking  = pct2_or_na(skill_calc.get("SPEAKING"))

                vals = [v for v in skill_calc.values() if v is not None]
                overall_num = round(float(np.mean(vals)),4) if vals else None
                overall_str = pct2_or_na(overall_num)

                lvl_ach = level_achieved(applied_level_from_name, overall_num)
                passc   = passing_criteria(applied_level_from_name, overall_num)

                for sk in ["SPEAKING","USE OF ENGLISH","READING","LISTENING","WRITING"]:
                    if skill_calc.get(sk) is not None:
                        acc[sk].append(skill_calc[sk])
                if overall_num is not None:
                    acc["OVERALL"].append(overall_num)

            level_counts_achieved[lvl_ach] = level_counts_achieved.get(lvl_ach, 0) + 1

            rows_out.append({
                "User ID": user_id,
                "Test name": test_name_before_dash,
                "Full name": full_name,
                "Applied level": applied_level_from_name if applied_level_from_name else "N/A",
                "Listening": listening,
                "Reading": reading,
                "Use of English": uoe,
                "Writing": writing,
                "Speaking": speaking,
                "Score Obtained": overall_str,
                "Level Achieved": lvl_ach,
                "Passing Criteria": passc,
            })
            outc += 1

        print(f"OK: {fname} → {outc} filas integradas. [{exam_label}]")

    except Exception as e:
        print(f"ERROR en {fname}: {e}")
        invalid_files.append(fname)

if not rows_out:
    print("\nSin filas válidas. Revisa los archivos.")
    raise SystemExit

df_master = pd.DataFrame(rows_out)

# Orden: por Level Achieved (A1→C2), N/A al final, otros textos antes de N/A
def rank_level_ach(x):
    if x in LEVEL_RANK:
        return LEVEL_RANK[x]
    if x == "N/A":
        return 9999
    return 9998

df_master["_rank_level_ach"] = df_master["Level Achieved"].apply(rank_level_ach)
df_master = df_master.sort_values(by=["_rank_level_ach", "User ID"], ascending=[True, True]).drop(columns=["_rank_level_ach"])

OUT_COLS = [
    "User ID","Test name","Full name","Applied level",
    "Listening","Reading","Use of English","Writing","Speaking",
    "Score Obtained","Level Achieved","Passing Criteria"
]
df_master = df_master[OUT_COLS]

#Nombre de archivo basado en el nombre antes del guion
out_csv = unified_name(group_name_main or "OTCC CERTIFICATION")
df_master.to_csv(out_csv, index=False, encoding='utf-8-sig')
print(f"\nCSV generado: {out_csv} — {len(df_master)} filas.")
if files and AUTO_DOWNLOAD:
    try: files.download(out_csv)
    except Exception as e: print(f"No se pudo descargar {out_csv}: {e}")

# ----- Gráficos amarillos -----
def save_and_download(fig, fname, dpi=300):
    fig.tight_layout()
    fig.savefig(fname, dpi=dpi, bbox_inches="tight")
    plt.close(fig)
    if files and AUTO_DOWNLOAD:
        try: files.download(fname)
        except Exception as e: print(f"No se pudo descargar {fname}: {e}")

bar_colors = ["#ffb300", "#ffc107", "#ffd54f", "#ffca28", "#ffb74d", "#ffe082"]
pie_colors = ["#ff8f00", "#ffa000", "#ffb300", "#ffc107", "#ffd54f", "#ffe082"]

spk = np.mean(acc["SPEAKING"]) if acc["SPEAKING"] else np.nan
uoe = np.mean(acc["USE OF ENGLISH"]) if acc["USE OF ENGLISH"] else np.nan
rea = np.mean(acc["READING"]) if acc["READING"] else np.nan
lis = np.mean(acc["LISTENING"]) if acc["LISTENING"] else np.nan
wri = np.mean(acc["WRITING"]) if acc["WRITING"] else np.nan
ovr = np.mean(acc["OVERALL"]) if acc["OVERALL"] else np.nan

cats = ["Speaking","Use of English","Reading","Listening","Writing","Overall"]
vals = [spk,uoe,rea,lis,wri,ovr]

title1 = f"{(group_name_main or 'OTCC CERTIFICATION')} - Average per skill and overall group average"
fig1 = plt.figure(figsize=(12,6))
ax = fig1.gca()
bars = ax.bar(cats, vals, color=bar_colors)
ax.set_ylim(0,100)
ax.set_ylabel("Percentage (%)", fontsize=11)
ax.set_title(title1, fontsize=16, pad=12)
for b,v in zip(bars,vals):
    if v==v:
        ax.text(b.get_x()+b.get_width()/2, b.get_height()+1, f"{round(v,2):.2f}%", ha='center', va='bottom', fontsize=10)
save_and_download(fig1, safe_file(title1 + ".png"))

pairs_unsorted = [(lvl,cnt) for lvl,cnt in level_counts_achieved.items() if cnt > 0]
if pairs_unsorted:
    def ach_order_key(lvl):
        if lvl in LEVEL_RANK: return LEVEL_RANK[lvl]
        if lvl == "N/A": return 999
        return 998

    pairs = sorted(pairs_unsorted, key=lambda x: ach_order_key(x[0]))
    labels, sizes = zip(*pairs)
    total = int(sum(sizes))

    fig2 = plt.figure(figsize=(10,7))
    ax2 = fig2.gca()
    colors_used = [pie_colors[i % len(pie_colors)] for i in range(len(sizes))]

    wedges, _ = ax2.pie(
        sizes,
        startangle=0, counterclock=False,
        colors=colors_used,
        wedgeprops=dict(width=0.50, edgecolor='white', linewidth=1.2)
    )

    ax2.text(0,0,f"Total\n{total}", ha='center', va='center', fontsize=13)
    ax2.axis('equal')

    title2 = f"{(group_name_main or 'OTCC CERTIFICATION')} - Distribution by English language level"
    ax2.set_title(title2, fontsize=16, pad=14)

    inner_r = 1.0 - 0.50
    r_text  = inner_r + 0.25
    for w, lbl, sz in zip(wedges, labels, sizes):
        theta = (w.theta2 + w.theta1) / 2.0
        ang   = np.deg2rad(theta)
        x = r_text * np.cos(ang); y = r_text * np.sin(ang)
        pct = 100.0 * sz / total
        fz = 11 if pct >= 6 else 9
        ax2.text(x, y, f"{lbl} {pct:.1f}%", ha='center', va='center',
                 fontsize=fz, fontweight='bold', color='black')

    legend_text = "\n".join([f"{lvl} = {cnt} persons at this level" for lvl,cnt in pairs])
    fig2.text(0.98, 0.02, legend_text, ha='right', va='bottom',
              bbox=dict(boxstyle="round,pad=0.6", facecolor="white", alpha=0.95, linewidth=0.8),
              fontsize=13)

    save_and_download(fig2, safe_file(title2 + ".png"))
else:
    print("Aviso: no se detectaron niveles logrados; se omitió el donut.")

if invalid_files:
    print("\nDescartados por no ser 'OTCC CERTIFICATION' o con errores:")
    for x in invalid_files:
        print("  -", x)

if fallback_missing_max:
    print("\nAviso: % nativo usado por falta de max_score para:")
    seen=set()
    for f,l,s in fallback_missing_max:
        if (l,s) not in seen:
            print(f"  - level={l}, skill={s}")
            seen.add((l,s))

print("\nListo.")