1. Importar librerías y definir funciones base

In [0]:
%pip install openpyxl
%pip install pyreadstat

In [0]:
%restart_python

In [0]:
# Databricks notebook source
# ===========================
# Lab CC3066 – Spark DataFrames (INE/PNC 2020–2024)
# Ruta de trabajo del Excel (Databricks Volumes)
EXCEL_PATH = "/Volumes/workspace/default/pncc/20250527162011hk9xzLjtlLyIqA5fF0FY3udjjRUQlTkq.xlsx"

import os
import re
import numpy as np
import pandas as pd
import pyreadstat
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, NumericType
from pyspark.sql import functions as F

spark = SparkSession.getActiveSession() or SparkSession.builder.appName("Lab_CC3066").getOrCreate()

# ==============
# 1) Utilidades
# ==============

HEADER_HINTS = [
    "departamento", "vehículo", "vehiculo", "mes de ocurrencia", "día de la semana",
    "día de ocurrencia", "hora de ocurrencia", "condición del conductor y sexo",
    "grupos de edad", "tipo de vehículo y sexo de la persona", "zona de ocurrencia",
    "tipo de accidente y sexo", "tipo", "clase", "marca", "modelo de vehículo",
    "sexo", "grupo", "edad", "tipo de vehículo", "clases de vehículos", "color de vehículo"
]

def safe_col(name: str) -> str:
    n = str(name).strip()
    n = re.sub(r'\.0$', '', n)              # 2021.0 -> 2021
    n = n.replace('%', 'pct')
    n = re.sub(r'\s+', '_', n)              # espacios -> _
    n = re.sub(r'[^0-9a-zA-Z_]', '_', n)    # símbolos raros -> _
    if re.match(r'^\d', n):                 # si inicia con dígito, prefijo
        n = f'y{n}'
    return n

def make_unique(cols):
    seen = {}
    out = []
    for c in cols:
        base = (c or "col")
        if base not in seen:
            seen[base] = 0
            out.append(base)
        else:
            seen[base] += 1
            out.append(f"{base}_{seen[base]}")
    return out

def to_scalar(x):
    # Convierte contenedores a texto; deja escalares tal cual
    if isinstance(x, (list, dict, tuple, set)):
        return str(x) if len(x) else None
    return x

# Detectar fila de encabezados buscando palabras clave
def find_header_row(raw: pd.DataFrame) -> int | None:
    for i in range(len(raw)):
        row = raw.iloc[i].astype(str).str.lower()
        if any(word in row.tolist() for word in HEADER_HINTS):
            return i
    return None

NOTE_PAT = re.compile(r"(fuente|nota|serie|el mes que no aparece|no se presentaron|sin datos)", re.IGNORECASE)

def drop_noise_rows(df: pd.DataFrame) -> pd.DataFrame:
    # descarta si la PRIMERA columna parece nota
    mask1 = df.iloc[:,0].astype(str).str.contains(NOTE_PAT, na=False)
    # y descarta filas donde CUALQUIER columna tenga frases típicas de nota
    mask_any = df.apply(lambda r: r.astype(str).str.contains(NOTE_PAT, na=False).any(), axis=1)
    return df.loc[~(mask1 | mask_any)].copy()

def read_cuadro(excel_path: str, sheet_num: int, categoria: str) -> pd.DataFrame | None:
    sheet = f"cuadro {sheet_num}"
    raw = pd.read_excel(excel_path, sheet_name=sheet, header=None)

    header_idx = find_header_row(raw)
    if header_idx is None or header_idx + 1 >= len(raw):
        return None

    header_row = raw.iloc[header_idx].tolist()
    next_row   = raw.iloc[header_idx + 1].tolist()

    combined_header = []
    prefer_detail = {"año de ocurrencia","mes de ocurrencia","tipo de accidente","día de la semana",
                     "total","hombre","mujer","ignorado","grupos de edad"}
    for h, n in zip(header_row, next_row):
        if pd.isna(h) or str(h).strip() == "":
            combined_header.append(n)
        elif str(h).strip().lower() in prefer_detail:
            combined_header.append(n)  # usa detalle de la segunda fila
        else:
            combined_header.append(h)

    df = raw.iloc[header_idx + 2:].copy()
    df = df.dropna(how="all")
    df = drop_noise_rows(df)

    # Remueve notas/fuentes, etc.
    df = df[~df.iloc[:,0].astype(str).str.contains("fuente|nota|cuadro|serie", case=False, na=False)]
    df.columns = [str(c).strip() for c in combined_header]
    df["categoria"] = categoria
    df["cuadro"] = sheet_num

    # Filas con primera columna vacía fuera
    df = df[df.iloc[:,0].notna() & (df.iloc[:,0].astype(str).str.strip() != "")]
    # Quita columnas completamente vacías
    df = df.dropna(axis=1, how="all")
    return df

def build_dict(excel_path: str, rango: range, categoria: str) -> dict[str, pd.DataFrame]:
    x = {}
    for i in rango:
        try:
            df = read_cuadro(excel_path, i, categoria)
            if df is not None:
                x[f"{categoria}_{i}"] = df
        except Exception as e:
            print(f"[WARN] Error leyendo {categoria} cuadro {i}: {e}")
    return x

## Carga desde archivo y construcción dicts

In [0]:
directories = [
    "/Volumes/workspace/default/fallecidos_lesionados/",
    "/Volumes/workspace/default/hechos_transito/",
    "/Volumes/workspace/default/vehiculos_involucrados/"
]

for directory in directories:
    try:
        files = [f.name for f in dbutils.fs.ls(directory)]
        
        for file in files:
            lower_file = file.lower()
            file_path = os.path.join(directory, file)
            
            # XLSX
            if lower_file.endswith(".xlsx"):
                csv_path = file_path.replace(".xlsx", ".csv")
                try:
                    df = pd.read_excel(file_path)
                    df.to_csv(csv_path, index=False)
                    print(f"Convertido XLSX → CSV: {file_path} → {csv_path}")
                    
                    # Borrar original
                    dbutils.fs.rm(file_path)
                    print(f"Borrado XLSX: {file_path}")
                except Exception as e:
                    print(f"Error con XLSX {file_path}: {e}")
            
            # SAV
            elif lower_file.endswith(".sav"):
                csv_path = file_path.replace(".sav", ".csv")
                try:
                    df, meta = pyreadstat.read_sav(file_path)
                    df.to_csv(csv_path, index=False)
                    print(f"Convertido SAV → CSV: {file_path} → {csv_path}")
                    
                    # Borrar original
                    dbutils.fs.rm(file_path)
                    print(f"Borrado SAV: {file_path}")
                except Exception as e:
                    print(f"Error con SAV {file_path}: {e}")
                    
    except Exception as e:
        print(f"Error al procesar directorio {directory}: {e}")


hechos_dfs     = build_dict(EXCEL_PATH, range(1, 17),  "hechos")
vehiculos_dfs  = build_dict(EXCEL_PATH, range(17, 29), "vehiculos")
lesionados_dfs = build_dict(EXCEL_PATH, range(31, 47), "lesionados")
fallecidos_dfs = build_dict(EXCEL_PATH, range(47, 63), "fallecidos")

print("Hechos cargados    :", list(hechos_dfs.keys())[:5], "...")
print("Vehículos cargados :", list(vehiculos_dfs.keys())[:5], "...")
print("Lesionados cargados:", list(lesionados_dfs.keys())[:5], "...")
print("Fallecidos cargados:", list(fallecidos_dfs.keys())[:5], "...")

Función: pandas DF -> Spark DF con tipado robusto

In [0]:
def pandas_to_spark(df_pd: pd.DataFrame):
    # 1) elimina filas 'total' en cualquier columna
    mask_total = df_pd.apply(lambda r: r.astype(str).str.strip().str.lower().eq('total').any(), axis=1)
    dfc = df_pd.loc[~mask_total].copy()

    # 2) normaliza nombres y unicidad
    dfc.columns = [safe_col(c) for c in dfc.columns]
    dfc = dfc.dropna(axis=1, how='all')
    dfc.columns = make_unique(dfc.columns)

    # 3) contenedores -> cadena/None
    dfc = dfc.applymap(to_scalar)

    # 4) tipificación 80%
    col_is_numeric = {}
    for c in dfc.columns:
        ser = pd.to_numeric(dfc[c], errors="coerce")
        ratio = ser.notna().mean() if len(ser) else 0.0
        if ratio >= 0.8:
            dfc[c] = ser.astype(float)
            col_is_numeric[c] = True
        else:
            dfc[c] = dfc[c].where(pd.notnull(dfc[c]), None).astype("string")
            col_is_numeric[c] = False

    # 5) NaN -> None
    dfc = dfc.where(pd.notnull(dfc), None)

    # 6) schema explícito
    schema = StructType([
        StructField(c, DoubleType() if col_is_numeric.get(c, False) else StringType(), True)
        for c in dfc.columns
    ])

    sdf = spark.createDataFrame(dfc.astype(object), schema=schema)
    return sdf

Construir Spark DFs por categoría (dicts de SDFs)

In [0]:
def dict_pandas_to_spark(dict_pandas: dict) -> dict:
    out = {}
    for key, df in dict_pandas.items():
        out[key] = pandas_to_spark(df)
    return out

hechos_sdfs     = dict_pandas_to_spark(hechos_dfs)
vehiculos_sdfs  = dict_pandas_to_spark(vehiculos_dfs)
lesionados_sdfs = dict_pandas_to_spark(lesionados_dfs)
fallecidos_sdfs = dict_pandas_to_spark(fallecidos_dfs)

In [0]:
def choose_tipo_vehiculo_col(columns: list[str]) -> str | None:
    # busca "tipo" o "clase" + "vehicul"
    cands = [c for c in columns if ("vehicul" in c.lower()) and ("tipo" in c.lower() or "clase" in c.lower())]
    if not cands:
        # plan B: columnas con nombres conocidos
        cands = [c for c in columns if c.lower() in {"tipo_de_vehículo","tipo_de_vehiculo","clases_de_vehículos","clases_de_vehiculos"}]
    return cands[0] if cands else None

def distinct_tipo_vehiculo(dict_sdfs: dict, titulo: str, limit=200):
    print(f"\n===== Distinct 'tipo de vehículo' :: {titulo} =====")
    seen = set()
    cols_elegidas = []
    for key, sdf in dict_sdfs.items():
        col = choose_tipo_vehiculo_col(sdf.columns)
        if not col:
            continue
        if not is_mostly_string(sdf, col):
            continue
        cols_elegidas.append((key, col))
        vals = (sdf
                .select(F.col(col).cast("string").alias("tipo_vehiculo"))
                .where(F.col("tipo_vehiculo").isNotNull() & (F.col("tipo_vehiculo") != ""))
                .distinct()
                .limit(limit)
                .toPandas()["tipo_vehiculo"])
        for v in vals:
            seen.add(v.strip())
    if not cols_elegidas:
        print("No se encontró columna elegible para 'tipo de vehículo'.")
        return
    print("Columnas elegidas:", cols_elegidas[:6], "..." if len(cols_elegidas)>6 else "")
    print(f"{len(seen)} valores distintos:")
    for v in sorted(seen):
        print("-", v)


### Preguntas a responder
1. Contar registros por tabla (long)

## #1 – Conteos, .show(), describe y summary (por tabla)

In [0]:
def ae_conteos_describe_summary(nombre: str, sdfs: dict, n_show: int = 5):
    print(f"\n===== {nombre.upper()} :: Conteos por hoja =====")
    total_registros = 0
    for key, sdf in sdfs.items():
        c = sdf.count()
        total_registros += c
        print(f"{key:20s} -> {c:6d} registros")
    print(f"TOTAL {nombre}: {total_registros}")

    # Muestra de una hoja representativa (la primera)
    if sdfs:
        first_key = list(sdfs.keys())[0]
        print(f"\n--- Ejemplo .show() :: {first_key} ---")
        sdfs[first_key].show(n_show, truncate=False)

        # describe/summary de columnas numéricas
        num_cols = [f.name for f in sdfs[first_key].schema.fields if isinstance(f.dataType, NumericType)]
        if num_cols:
            print(f"\n--- describe(numéricas) :: {first_key} ---")
            sdfs[first_key].select(*num_cols).describe().show(truncate=False)

            print(f"\n--- summary(numéricas) :: {first_key} ---")
            sdfs[first_key].select(*num_cols).summary("count","mean","stddev","min","25%","50%","75%","max").show(truncate=False)
        else:
            print(f"\n--- {first_key}: no hay columnas numéricas detectadas ---")

ae_conteos_describe_summary("hechos", hechos_sdfs)
ae_conteos_describe_summary("vehiculos", vehiculos_sdfs)
ae_conteos_describe_summary("lesionados", lesionados_sdfs)
ae_conteos_describe_summary("fallecidos", fallecidos_sdfs)

## #2 – Años disponibles por tabla y validación

In [0]:
EXPECTED_YEARS = set(range(2020, 2025))  # 2020..2024
YEAR_RX = re.compile(r'(?<!\d)(20\d{2})(?:\.0)?(?!\d)')

def extract_year_from_col(colname: str) -> int | None:
    m = YEAR_RX.search(str(colname))
    if not m: return None
    try:
        return int(m.group(1))
    except:
        return None

def detect_years_in_pdf(df: pd.DataFrame) -> set[int]:
    years = set()
    for c in df.columns:
        y = extract_year_from_col(c)
        if y: years.add(y)
    return years

def report_years_dict(dict_pd: dict, titulo: str):
    print(f"\n===== Verificación de años: {titulo} =====")
    all_years = set()
    for key, df in dict_pd.items():
        found = detect_years_in_pdf(df)
        all_years |= found
        missing = sorted(EXPECTED_YEARS - found)
        outside = sorted(y for y in found if y not in EXPECTED_YEARS)
        print(f"[{key}] encontrados: {sorted(found) if found else '—'} | faltantes vs 2020–2024: {missing if missing else 'ninguno'} | fuera de rango: {outside if outside else 'ninguno'}")
    print(f"\nAños agregados (union) en {titulo}: {sorted(all_years) if all_years else '—'}")
    return all_years

years_hechos     = report_years_dict(hechos_dfs, "hechos")
years_vehiculos  = report_years_dict(vehiculos_dfs, "vehiculos")
years_lesionados = report_years_dict(lesionados_dfs, "lesionados")
years_fallecidos = report_years_dict(fallecidos_dfs, "fallecidos")

print("\n¿Coinciden los conjuntos de años (intersección)?")
intersection_all = years_hechos & years_vehiculos & years_lesionados & years_fallecidos
print("Intersección común:", sorted(intersection_all) if intersection_all else "—")

### #3 – Valores distintos de 'tipo de accidente'   (buscamos columnas candidatas por nombre aproximado)

In [0]:
from pyspark.sql.types import NumericType

def choose_tipo_accidente_col(columns: list[str]) -> str | None:
    cols_lower = {c.lower(): c for c in columns}
    # 1) candidatos con "tipo" y "accid"
    cands = [c for c in columns if ("tipo" in c.lower() and "accid" in c.lower())]
    # 2) excluir "vehicul"
    cands = [c for c in cands if "vehicul" not in c.lower()]
    # 3) fallback suave: algo que suene a clasificación de accidente
    if not cands:
        cands = [c for c in columns if ("clasific" in c.lower() and "accid" in c.lower())
                 and "vehicul" not in c.lower()]
    if not cands:
        return None
    return cands[0]

def is_mostly_string(sdf, colname, sample=200, string_ratio=0.6):
    # toma una muestra y estima si la mayoría son valores no numéricos y no vacíos
    pdf = (sdf
           .select(F.col(colname).cast("string").alias("v"))
           .where(F.col("v").isNotNull() & (F.col("v") != ""))
           .limit(sample)
           .toPandas())
    if pdf.empty:
        return False
    ser = pdf["v"]
    # no-numérico si to_numeric da NaN
    non_num = pd.to_numeric(ser, errors="coerce").isna().mean()
    return non_num >= string_ratio

def distinct_tipo_accidente(dict_sdfs: dict, titulo: str, limit=200):
    print(f"\n===== Distinct 'tipo de accidente' :: {titulo} =====")
    seen = set()
    cols_elegidas = []
    for key, sdf in dict_sdfs.items():
        col = choose_tipo_accidente_col(sdf.columns)
        if not col:
            continue
        # evita columnas numéricas mal detectadas
        if not is_mostly_string(sdf, col):
            continue
        cols_elegidas.append((key, col))
        vals = (sdf
                .select(F.col(col).cast("string").alias("tipo_accidente"))
                .where(F.col("tipo_accidente").isNotNull() & (F.col("tipo_accidente") != ""))
                .distinct()
                .limit(limit)
                .toPandas()["tipo_accidente"])
        for v in vals:
            seen.add(v.strip())
    if not cols_elegidas:
        print("No se encontró columna elegible para 'tipo de accidente' en estas hojas.")
        return
    print("Columnas elegidas:", cols_elegidas[:6], "..." if len(cols_elegidas)>6 else "")
    print(f"{len(seen)} valores distintos:")
    for v in sorted(seen):
        print("-", v)


distinct_tipo_accidente(hechos_sdfs, "hechos")
distinct_tipo_accidente(vehiculos_sdfs, "vehiculos")
distinct_tipo_accidente(lesionados_sdfs, "lesionados")
distinct_tipo_accidente(fallecidos_sdfs, "fallecidos")



### #4 – # de departamentos únicos por base(detecta columna 'departamento' aproximada)

In [0]:
def count_departamentos_unique(dict_sdfs: dict, titulo: str,
                               dep_cands=("departamento","departamentos","depto","dept","depart","depar")):
    print(f"\n===== Departamentos únicos :: {titulo} =====")
    deptos = set()
    cols_encontradas = 0

    # Aux: si no tienes ya definida find_first_column, deja esto aquí
    def find_first_column(candidates: list[str], columns: list[str]) -> str | None:
        for col in columns:
            low = col.lower()
            for cand in candidates:
                if cand in low:  # match por substring
                    return col
        return None

    for key, sdf in dict_sdfs.items():
        col = find_first_column(list(dep_cands), sdf.columns)
        if not col:
            continue
        cols_encontradas += 1

        vals = (sdf
                .select(F.col(col).cast("string").alias("departamento"))
                .where(F.col("departamento").isNotNull() & (F.col("departamento") != ""))
                .distinct()
                .toPandas()["departamento"])

        for v in vals:
            if v is not None:
                deptos.add(str(v).strip())

    print(f"Total únicos (unión de hojas): {len(deptos)}")
    print(f"Columnas 'departamento' detectadas en {cols_encontradas} hojas")
    if deptos:
        print("Ejemplos:", sorted(list(deptos))[:15])
count_departamentos_unique(hechos_sdfs, "hechos")
count_departamentos_unique(vehiculos_sdfs, "vehiculos")
count_departamentos_unique(lesionados_sdfs, "lesionados")
count_departamentos_unique(fallecidos_sdfs, "fallecidos")


5. ¿Cuál es el total de accidentes por año y departamento?

6. ¿Qué día de la semana registra más accidentes en 2024?