# Laboratorio 8 - Transformaciones con Spark

#### Edwin Ortega 22305
#### Esteban Zambrano 22119
#### Diego García 22404

%md
### Carga de Datos y Análisis Exploratorio

%md
##### Instalación a tener en cuenta

In [0]:
%pip install openpyxl

%md
### Convertir Excel a un CSV por hoja
##### Rutas, imports y utilidades 

In [0]:
import os, re
import numpy as np
import pandas as pd

# Rutas
SOURCE_XLSX = "/Volumes/workspace/default/lab8/Principales_resultados_PNC_24.xlsx"

# Detectar carpeta del repo
ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
nb_path = ctx.notebookPath().get()
parts   = nb_path.split("/")
# raíz del repo
REPO_ROOT = "/Workspace/" + "/".join(parts[:4])
OUT_DIR   = os.path.join(REPO_ROOT, "Data")
os.makedirs(OUT_DIR, exist_ok=True)

# Funciones auxiliares
def slug(s: str) -> str:
    s = re.sub(r"\s+", "_", s.strip())
    s = s.replace("á","a").replace("é","e").replace("í","i").replace("ó","o").replace("ú","u").replace("ñ","n")
    s = re.sub(r"[^A-Za-z0-9_]+", "", s)
    return s.lower()

def clean_number(x):
    # Convierte '1,234', '-', '–', '' a numérico
    if pd.isna(x): return np.nan
    if isinstance(x, str):
        x = x.strip()
        if x in {"-", "–", "—", ""}: return 0
        x = x.replace(",", "")
        # algunos libros traen espacios finos o NBSP
        x = x.replace("\u202f","").replace("\xa0","")
    try:
        return float(x)
    except:
        return np.nan

# Palabras clave para encontrar la fila de encabezados útil
HEADER_HINTS = [
    "Departamento", "Mes de ocurrencia", "Día de la semana", "Dia de la semana",
    "Día de ocurrencia", "Dia de ocurrencia", "Hora de ocurrencia",
    "Tipo de accidente", "Año de ocurrencia", "Ano de ocurrencia"
]

MESES = ["Enero","Febrero","Marzo","Abril","Mayo","Junio","Julio","Agosto","Septiembre","Octubre","Noviembre","Diciembre"]
DIAS  = ["Lunes","Martes","Miércoles","Miercoles","Jueves","Viernes","Sábado","Sabado","Domingo"]
ANIOS = ["2020","2021","2022","2023","2024"]
TIPOS = ["Colisión","Colision","Colisi\u00f3n","Atropello","Derrape","Choque","Vuelco","Embarrancó","Embarranco","Encuentó","Encuento","Caída","Caida","Ignorado"]

NO_TABULARES = {"Portada-Presentación", "Índice", "Indice", "Directorio", "Ficha técnica", "Nota técnica"}

def find_header_row(raw_df: pd.DataFrame):
    # busca la primera fila que contenga un hint de encabezado
    for i, row in raw_df.iterrows():
        joined = " | ".join([str(x) for x in row.values if not pd.isna(x)])
        for hint in HEADER_HINTS:
            if hint.lower() in joined.lower():
                return i
    # fallback: fila donde la primera col diga "Total"
    first_col = raw_df.iloc[:,0].astype(str).str.strip().str.lower()
    idx = first_col[first_col.eq("total")].index
    if len(idx)>0:
        # suele ser una fila de datos; probamos la fila anterior como encabezado
        return max(0, idx[0]-1)
    return None

def normalize_columns(cols):
    # rellena vacíos y duplicados
    out, seen = [], {}
    for c in cols:
        c = "" if pd.isna(c) else str(c).strip()
        c = c.replace("\n"," ").replace("\r"," ")
        c = re.sub(r"\s+"," ", c)
        if c == "" or c.lower().startswith("unnamed:"):
            c = "col"
        # tildes coherentes
        c = (c
             .replace("Miércoles","Miércoles").replace("Miercoles","Miércoles")
             .replace("Sábado","Sábado").replace("Sabado","Sábado")
             .replace("Colision","Colisión").replace("Caida","Caída")
             .replace("Embarranco","Embarrancó").replace("Encuento","Encuentó"))
        # duplicados
        base = c
        k = seen.get(base, 0)
        if k:
            c = f"{base}_{k+1}"
        seen[base] = k+1
        out.append(c)
    return out

def detect_value_axis(columns):
    cols = [str(c) for c in columns]
    sets = {
        "mes": [c for c in cols if c in MESES],
        "dia_semana": [c for c in cols if c in DIAS],
        "anio": [c for c in cols if c in ANIOS],
        "tipo_accidente": [c for c in cols if c in TIPOS],
    }
    # elegir el que tenga más coincidencias
    winner, win_len = None, 0
    for k, v in sets.items():
        if len(v) > win_len:
            winner, win_len = k, len(v)
    return winner, sets.get(winner, [])

def tidy_if_possible(df: pd.DataFrame, sheet: str):
    # convierte a long si detecta ejes conocidos
    axis, value_cols = detect_value_axis(df.columns)
    if axis and len(value_cols) >= 3:
        # id_vars = todas menos las de valores y 'Total'
        id_vars = [c for c in df.columns if c not in set(value_cols) and c != "Total"]
        long_df = df.melt(id_vars=id_vars, value_vars=value_cols,
                          var_name=axis, value_name="valor")
        # cast numérico
        long_df["valor"] = long_df["valor"].map(clean_number)
        long_df.insert(0, "hoja", sheet)
        return long_df, "long"
    # si no, limpiamos números en las columnas numéricas detectadas
    for c in df.columns:
        if c == "Total" or re.fullmatch(r"\d{4}", str(c)) or c in MESES or c in DIAS or c in TIPOS:
            df[c] = df[c].map(clean_number)
    df.insert(0, "hoja", sheet)
    return df, "wide"

def is_non_tabular(name: str, raw_df: pd.DataFrame):
    if name in NO_TABULARES:
        return True
    # Si prácticamente todo es NaN
    after_drop = raw_df.dropna(how="all")
    return after_drop.shape[0] < 3 or after_drop.shape[1] < 2

# Proceso por hoja
xl = pd.ExcelFile(SOURCE_XLSX)
resumen = []

print("Hojas detectadas:")
print(" - " + "\n - ".join(xl.sheet_names))
print()

for sheet in xl.sheet_names:
    raw = xl.parse(sheet, header=None, dtype=object)
    if is_non_tabular(sheet, raw):
        print(f"SKIP hoja no tabular: {sheet}")
        continue

    hdr = find_header_row(raw)
    if hdr is None:
        print(f"(Aviso) {sheet}: no pude localizar encabezados. Exporto tal cual.")
        tmp = raw.dropna(how="all", axis=0).dropna(how="all", axis=1)
        tmp.columns = normalize_columns(tmp.iloc[0].tolist())
        tmp = tmp.iloc[1:].reset_index(drop=True)
        tidy, form = tidy_if_possible(tmp, sheet)
    else:
        body = raw.iloc[hdr:].copy()
        # primera fila como encabezado
        body.columns = normalize_columns(body.iloc[0].tolist())
        body = body.iloc[1:].reset_index(drop=True)
        # cortar “Fuente:” si aparece
        first_col_name = body.columns[0]
        if first_col_name:
            stop_idx = body[first_col_name].astype(str).str.startswith("Fuente").fillna(False)
            if stop_idx.any():
                body = body.loc[:stop_idx.idxmax()-1]
        # eliminar filas/cols completamente vacías
        body = body.dropna(how="all", axis=0).dropna(how="all", axis=1)
        tidy, form = tidy_if_possible(body, sheet)

    # Guardar CSV
    fname = f"PRPNC24_{slug(sheet)}.csv"
    out_path = os.path.join(OUT_DIR, fname)
    # Para escribir en /Workspace/...
    tidy.to_csv(out_path, index=False, encoding="utf-8-sig")
    print(f"OK → {sheet} -> {os.path.basename(out_path)}  ({form}, {len(tidy)} filas)")

    resumen.append({
        "hoja": sheet,
        "csv": os.path.basename(out_path),
        "formato": form,
        "filas": len(tidy)
    })

# Resumen final
res = pd.DataFrame(resumen).sort_values(["hoja"])
print("\nResumen de CSV generados:")    
try:
    display(res)
except:
    print(res.to_string(index=False))

print(f"\nArchivos guardados en:\n{OUT_DIR}")


##### Unir por grupo (hechos, vehículos, víctimas)

In [0]:
import os, glob, re, unicodedata, pandas as pd
from pyspark.sql import functions as F

# Rutas
ROOT_DIR = "/Workspace/Users/ort22305@uvg.edu.gt/Lab8_DS" 
DATA_DIR = os.path.join(ROOT_DIR, "Data") 
assert os.path.isdir(DATA_DIR), f"No existe {DATA_DIR}. Genera antes los CSV por hoja."

# Utilidades
MESES = {"Enero","Febrero","Marzo","Abril","Mayo","Junio","Julio","Agosto","Septiembre","Octubre","Noviembre","Diciembre"}
DIAS  = {"Lunes","Martes","Miércoles","Miercoles","Jueves","Viernes","Sábado","Sabado","Domingo"}
TIPO_ACC = {"Colisión","Colision","Atropello","Derrape","Choque","Vuelco","Embarrancó","Embarranco","Encuentó","Encuentro","Caída","Caida","Ignorado"}

def strip_accents(s):
    if pd.isna(s): return s
    return ''.join(c for c in unicodedata.normalize('NFD', str(s)) if unicodedata.category(c)!='Mn')

def detect_group_from_csv(path):
    """Lee las primeras filas del CSV y detecta el grupo por palabras clave del título."""
    head = pd.read_csv(path, nrows=25, header=None, dtype=str, encoding="utf-8-sig")
    txt = ' '.join([strip_accents(x) for x in head.fillna('').values.ravel().tolist()]).lower()
    if "vehiculos involucrados" in txt: return "vehiculos"
    if ("fallecidos" in txt) or ("lesionados" in txt): return "victimas"
    if "accidentes de transito" in txt: return "hechos"
    return "desconocido"

def to_int_or_none(x):
    s = str(x).strip()
    if s in ("", "nan", "None", "-", "–"): 
        return None
    # quitar separador de miles (ej. "8,401")
    s = s.replace(",", "").replace(" ", "")
    try:
        return int(float(s))
    except:
        return None

def guess_dim2_name(cols2: set):
    """Etiqueta de la 2ª dimensión según las columnas detectadas."""
    if cols2 and cols2.issubset(MESES | {"Total"}):
        return "Mes"
    if cols2 and cols2.issubset(DIAS | {"Total"}):
        return "Dia_semana"
    # muchos cuadros de tipo de accidente traen estas columnas
    if cols2 and len(cols2 & TIPO_ACC) >= max(3, int(0.4*len(cols2))):
        return "Tipo_accidente"
    return "Categoria"

def tidy_prpnc24(path, default_year=2024):
    """
    Devuelve un DataFrame pandas en formato largo con columnas:
    [anio, dim1, dim1_val, dim2, dim2_val, valor]
    """
    raw = pd.read_csv(path, header=None, dtype=str, encoding="utf-8-sig")
    # buscar fila de encabezado real
    candidates = {"Departamento","Mes de ocurrencia","Día de la semana","Hora de ocurrencia",
                  "Día de ocurrencia","Tipo de accidente","Sexo","Edad","Rango de edad"}
    header_idx = None
    for i in range(min(50, len(raw))):
        row = raw.iloc[i].fillna("").astype(str).str.strip()
        if any(c in row.values for c in candidates):
            header_idx = i
            break
    if header_idx is None:
        # fallback: intenta detectar la fila anterior a la primera que contiene "Total"
        for i in range(5, min(80, len(raw))):
            if raw.iloc[i].astype(str).str.contains("Total", case=False, na=False).any():
                header_idx = i-1
                break
    if header_idx is None:
        # como último recurso: usa la primera fila no vacía
        header_idx = 0

    # set columnas y recorta encabezado
    df = raw.copy()
    df.columns = df.iloc[header_idx].fillna("").astype(str).str.strip()
    df = df.iloc[header_idx+1:].reset_index(drop=True)

    # eliminar pie ("Fuente:")
    mask_fuente = df.apply(lambda r: r.astype(str).str.contains("Fuente", case=False, na=False).any(), axis=1)
    if mask_fuente.any():
        df = df.loc[~mask_fuente].copy()

    # normaliza nombres de columnas
    df.columns = [strip_accents(c).strip() for c in df.columns]
    # quita columnas completamente vacías
    df = df.loc[:, (df != "").any(axis=0)]
    if df.empty:
        return pd.DataFrame(columns=["anio","dim1","dim1_val","dim2","dim2_val","valor"])

    first_col = df.columns[0]                     # p.ej. "Departamento" / "Mes de ocurrencia" / etc.
    year_cols = [c for c in df.columns if re.fullmatch(r"20\d{2}", c)]
    # columnas para 2ª dimensión (quitar Total y la primera columna)
    cols2 = [c for c in df.columns if c not in [first_col, "Total"] and not re.fullmatch(r"20\d{2}", c)]
    dim2_name = guess_dim2_name(set(cols2))

    if year_cols:
        wide = df[[first_col] + year_cols].copy()
        out = wide.melt(id_vars=[first_col], value_vars=year_cols, var_name="anio", value_name="valor")
        out["dim2"] = pd.NA
        out["dim2_val"] = pd.NA
    else:
        # un año (2024) con columnas de meses, días, tipo_accidente, etc.
        if not cols2:
            # si no hay 2ª dimensión, usa solo la primera
            tmp = df[[first_col]].copy()
            tmp["valor"] = None
            out = tmp.rename(columns={first_col: "dim1_val"})
            out["dim2"] = pd.NA
            out["dim2_val"] = pd.NA
        else:
            long = df.melt(id_vars=[first_col], value_vars=cols2,
                           var_name="dim2_val", value_name="valor")
            out = long.rename(columns={first_col: "dim1_val"})
            out["dim2"] = dim2_name
        out["anio"] = default_year

    out["dim1"] = first_col
    out["valor"] = out["valor"].apply(to_int_or_none)
    out = out.dropna(subset=["valor"]).reset_index(drop=True)
    # ordenar columnas
    out = out[["anio","dim1","dim1_val","dim2","dim2_val","valor"]]
    return out

# Procesar todos los PRPNC24_cuadro_*.csv
csv_paths = sorted(glob.glob(os.path.join(DATA_DIR, "PRPNC24_cuadro_*.csv")))
registros = []
skipped = []

for p in csv_paths:
    grupo = detect_group_from_csv(p)
    if grupo == "desconocido":
        skipped.append(os.path.basename(p))
        continue
    # número de cuadro
    m = re.search(r"cuadro_(\d+)\.csv$", os.path.basename(p))
    cuadro_num = int(m.group(1)) if m else None

    # algunos cuadros son serie 2020-2024 (4 y 6). Para el resto usamos 2024.
    default_year = 2024
    if cuadro_num in {4, 6}:
        default_year = None

    tidy = tidy_prpnc24(p, default_year=2024 if default_year is None else default_year)
    if tidy.empty:
        skipped.append(os.path.basename(p))
        continue
    tidy["grupo"]  = grupo
    tidy["cuadro"] = cuadro_num
    tidy["csv"]    = os.path.basename(p)
    registros.append(tidy)

# Concatenar todo y separar por grupo
all_long = pd.concat(registros, ignore_index=True) if registros else pd.DataFrame(
    columns=["anio","dim1","dim1_val","dim2","dim2_val","valor","grupo","cuadro","csv"]
)

hechos_pd    = all_long[all_long["grupo"]=="hechos"].reset_index(drop=True)
vehiculos_pd = all_long[all_long["grupo"]=="vehiculos"].reset_index(drop=True)
victimas_pd  = all_long[all_long["grupo"]=="victimas"].reset_index(drop=True)

# Spark
hechos_long_s  = spark.createDataFrame(hechos_pd)    if not hechos_pd.empty    else spark.createDataFrame([], "anio string, dim1 string, dim1_val string, dim2 string, dim2_val string, valor int, grupo string, cuadro int, csv string")
vehic_long_s   = spark.createDataFrame(vehiculos_pd) if not vehiculos_pd.empty else spark.createDataFrame([], "anio string, dim1 string, dim1_val string, dim2 string, dim2_val string, valor int, grupo string, cuadro int, csv string")
victims_long_s = spark.createDataFrame(victimas_pd)  if not victimas_pd.empty  else spark.createDataFrame([], "anio string, dim1 string, dim1_val string, dim2 string, dim2_val string, valor int, grupo string, cuadro int, csv string")

hechos_long_s.createOrReplaceTempView("hechos_long")
vehic_long_s.createOrReplaceTempView("vehiculos_long")
victims_long_s.createOrReplaceTempView("victimas_long")

print(f"[RESUMEN] filas -> hechos: {hechos_long_s.count():,} | vehiculos: {vehic_long_s.count():,} | victimas: {victims_long_s.count():,}")
print("Saltadas por forma no reconocida:", skipped[:10], ("..." if len(skipped)>10 else ""))

# Muestras
print("\n[hechos_long] ejemplo:")
display(hechos_long_s.limit(15))

print("\n[vehiculos_long] ejemplo:")
display(vehic_long_s.limit(15))

print("\n[victimas_long] ejemplo:")
display(victims_long_s.limit(15))

# Guardar también como CSV 'planos' para Git/GitHub
out_hechos    = os.path.join(DATA_DIR, "hechos_long.csv")
out_vehiculos = os.path.join(DATA_DIR, "vehiculos_long.csv")
out_victimas  = os.path.join(DATA_DIR, "victimas_long.csv")
hechos_pd.to_csv(out_hechos, index=False, encoding="utf-8-sig")
vehiculos_pd.to_csv(out_vehiculos, index=False, encoding="utf-8-sig")
victimas_pd.to_csv(out_victimas, index=False, encoding="utf-8-sig")
print("\nGuardados CSV normalizados en Data/:")
print(" -", out_hechos)
print(" -", out_vehiculos)
print(" -", out_victimas)


%md
##### Cargar los CSV con Spark

In [0]:
# === RUTA en Volume UC (NO /Workspace) ===
DATA_DIR = "/Volumes/workspace/default/lab8/Data"

# Hechos
spark.sql(f"""
CREATE OR REPLACE TEMP VIEW hechos_long AS
SELECT
  CAST(anio AS INT)                           AS anio,
  dim1, dim1_val, dim2, dim2_val,
  CAST(regexp_replace(valor, '[^0-9-]', '') AS INT) AS valor,
  lower(grupo) AS grupo,
  cuadro, csv
FROM read_files('{DATA_DIR}/hechos_long.csv',
                format => 'csv', header => true)
WHERE lower(coalesce(dim1_val,'')) <> 'total'
  AND lower(coalesce(dim2_val,'')) <> 'total'
""")

# Vehículos
spark.sql(f"""
CREATE OR REPLACE TEMP VIEW vehiculos_long AS
SELECT
  CAST(anio AS INT)                           AS anio,
  dim1, dim1_val, dim2, dim2_val,
  CAST(regexp_replace(valor, '[^0-9-]', '') AS INT) AS valor,
  lower(grupo) AS grupo,
  cuadro, csv
FROM read_files('{DATA_DIR}/vehiculos_long.csv',
                format => 'csv', header => true)
WHERE lower(coalesce(dim1_val,'')) <> 'total'
  AND lower(coalesce(dim2_val,'')) <> 'total'
""")

# Víctimas
spark.sql(f"""
CREATE OR REPLACE TEMP VIEW victimas_long AS
SELECT
  CAST(anio AS INT)                           AS anio,
  dim1, dim1_val, dim2, dim2_val,
  CAST(regexp_replace(valor, '[^0-9-]', '') AS INT) AS valor,
  lower(grupo) AS grupo,
  cuadro, csv
FROM read_files('{DATA_DIR}/victimas_long.csv',
                format => 'csv', header => true)
WHERE lower(coalesce(dim1_val,'')) <> 'total'
  AND lower(coalesce(dim2_val,'')) <> 'total'
""")

# --- Chequeos rápidos ---
display(spark.sql("""
SELECT 'hechos' AS tabla, COUNT(*) AS filas FROM hechos_long
UNION ALL
SELECT 'vehiculos', COUNT(*) FROM vehiculos_long
UNION ALL
SELECT 'victimas', COUNT(*) FROM victimas_long
"""))

display(spark.sql("SELECT DISTINCT anio FROM hechos_long ORDER BY anio"))
display(spark.sql("SELECT DISTINCT anio FROM vehiculos_long ORDER BY anio"))
display(spark.sql("SELECT DISTINCT anio FROM victimas_long ORDER BY anio"))

# Ejemplos
display(spark.sql("SELECT * FROM hechos_long    ORDER BY anio, cuadro LIMIT 15"))
display(spark.sql("SELECT * FROM vehiculos_long ORDER BY anio, cuadro LIMIT 15"))
display(spark.sql("SELECT * FROM victimas_long  ORDER BY anio, cuadro LIMIT 15"))
