In [0]:
# === RUTAS ===
VOLUME_NAME = "lab8"  # <-- este es el que sí existe
BASE_DIR = f"/Volumes/workspace/default/{VOLUME_NAME}"
INPUT_XLSX = f"{BASE_DIR}/Bases de datos principales PNC.xlsx"

# Crear la subcarpeta con API de Databricks (NO usar Path.mkdir/os.mkdir en /Volumes)
dbutils.fs.mkdirs(OUT_DIR)

# Verifica que todo exista
display(dbutils.fs.ls(BASE_DIR))
print("Excel path ->", INPUT_XLSX)
print("Output dir ->", OUT_DIR)


In [0]:
pip install openpyxl

In [0]:
from pathlib import Path

VOLUME_NAME = "lab8"
BASE_DIR = Path("/Volumes/workspace/default") / VOLUME_NAME

INPUT_XLSX = (BASE_DIR / "Bases de datos principales PNC.xlsx").as_posix()

USER_SUB = "silvia"
OUT_DIR = (BASE_DIR / USER_SUB).as_posix()

# Crear carpeta en Volumes con dbutils (no Path.mkdir)
dbutils.fs.mkdirs(OUT_DIR)

# Verifica
display(dbutils.fs.ls(BASE_DIR.as_posix()))
print("Excel:", INPUT_XLSX)
print("OUT_DIR:", OUT_DIR)


Mostrar cuántos registros hay en cada tabla (hechos, vehículos, fallecidos, lesionados). Muestre algunos resultados con la función .show(). Genere un describe y summary para aquellas columnas que considere importantes según cada archivo

In [0]:
import re, random

random.seed(42)
files = {f.path.split("/")[-1].lower(): f.path for f in dbutils.fs.ls(str(OUT_DIR))}

def find_csv_for(i: int):
    candidates = [
        f"cuadro {i}.csv",
        f"cuadro_{i}.csv",
        f"Cuadro {i}.csv".lower(),
        f"Cuadro_{i}.csv".lower(),
    ]
    for c in candidates:
        if c.lower() in files:
            return files[c.lower()]
    return None

# --- Función para limpiar nombres de columnas ---
def normalizar_cols(df):
    nuevos = []
    vistos = set()
    for i, c in enumerate(df.columns):
        name = str(c) if c is not None else f"col_{i+1}"
        # quitar espacios adelante/atrás
        name = name.strip()
        # reemplazar espacios por "_"
        name = re.sub(r"\s+", "_", name)
        # quitar caracteres no alfanuméricos
        name = re.sub(r"[^0-9A-Za-z_]", "", name)
        # evitar nombres vacíos
        if not name:
            name = f"col_{i+1}"
        # asegurar que no se repitan nombres
        base = name
        k = 1
        while name in vistos:
            k += 1
            name = f"{base}_{k}"
        vistos.add(name)
        nuevos.append(name)
    return df.toDF(*nuevos)

# 2) Cargar todos los que existan del 1 al 65
dfs = {}         
counts = {}
loaded_names = [] 

for i in range(1, 66):
    name = f"cuadro {i}"
    path = find_csv_for(i)
    if not path:
        print(f"[WARN] No encontré CSV para {name} en {OUT_DIR}")
        continue
    try:
        df = (spark.read.format("csv")
              .option("header","true")
              .option("inferSchema","true")
              .load(path))
        # normalizar columnas
        df = normalizar_cols(df)
        dfs[name] = df
        cnt = df.count()
        counts[name] = cnt
        loaded_names.append(name)
        print(f"[OK] Cargado {name}: {cnt} filas, {len(df.columns)} columnas -> {path}")
    except Exception as e:
        print(f"[ERR] No se pudo cargar {name} desde {path}: {e}")

if not loaded_names:
    displayHTML("<p style='color:#c00'>No se cargó ningún cuadro. Revisa los nombres de archivos en OUT_DIR.</p>")

# 3) Elegir 5 al azar (o menos si hay menos de 5 cargados)
k = min(5, len(loaded_names))
sampled = random.sample(loaded_names, k) if k > 0 else []

# 4) Mostrar 5 tablas aleatorias con título + conteo + vista de 10 filas
for name in sampled:
    df = dfs[name]
    n  = counts[name]
    displayHTML(f"<h3 style='margin:10px 0'>{name.title()}</h3>"
                f"<p style='margin:0 0 8px 0'>Registros: <b>{n}</b></p>")
    display(df.limit(10))

In [0]:
# ======================= MUESTRA DINÁMICA (reemplaza las 4 llamadas fijas) =======================
TITULOS = {
    1: "Accidentes de tránsito ocurridos en la República de Guatemala, por año, según departamento. Serie histórica 2020 - 2024.",
    2: "Accidentes de tránsito ocurridos en la República de Guatemala, por mes, según departamento, año 2024.",
    3: "Accidentes de tránsito ocurridos en la República de Guatemala, por día de la semana, según departamento, año 2024.",
    4: "Accidentes de tránsito ocurridos en la República de Guatemala, por año, según mes. Serie histórica 2020 - 2024.",
    5: "Accidentes de tránsito ocurridos en la República de Guatemala, por día de la semana, según mes, año 2024.",
    6: "Accidentes de tránsito ocurridos en la República de Guatemala, por año, según día de la semana. Serie histórica 2020 - 2024.",
    7: "Accidentes de tránsito ocurridos en la República de Guatemala, por día de la semana, según hora de ocurrencia, año 2024.",
    8: "Accidentes de tránsito ocurridos en la República de Guatemala, por tipo de accidente, según departamento, año 2024.",
    9: "Accidentes de tránsito ocurridos en la República de Guatemala, por tipo de accidente, según mes, año 2024.",
    10: "Accidentes de tránsito ocurridos en la República de Guatemala, por tipo de accidente, según día de ocurrencia, año 2024.",
    11: "Accidentes de tránsito ocurridos por atropello en la República de Guatemala, por mes, según departamento, año 2024.",
    12: "Accidentes de tránsito ocurridos por atropello en la República de Guatemala, por día de la semana, según departamento, año 2024.",
    13: "Accidentes de tránsito ocurridos por atropello en la República de Guatemala, por día de la semana, según mes, año 2024.",
    14: "Accidentes de tránsito ocurridos en el municipio de Guatemala, por zona de ocurrencia, según hora, año 2024.",
    15: "Accidentes de tránsito ocurridos en el municipio de Guatemala, por día de la semana, según zona de ocurrencia, año 2024.",
    16: "Accidentes de tránsito ocurridos en el municipio de Guatemala, por tipo de accidente, según zona de ocurrencia, año 2024.",
    17: "Cantidad de vehículos involucrados en accidentes de tránsito ocurridos en la República de Guatemala, por año, según departamento. Serie histórica 2020 -2024.",
    18: "Cantidad de vehículos involucrados en accidentes de tránsito ocurridos en la República de Guatemala, por tipo de accidente, según tipo de vehículo, año 2024.",
    19: "Cantidad de vehículos involucrados en accidentes de tránsito ocurridos en la República de Guatemala, por tipo de accidente, según color de vehículo, año 2024.",
    20: "Cantidad de vehículos involucrados en accidentes de tránsito ocurridos en la República de Guatemala, por tipo de accidente, según modelo de vehículo, año 2024.",
    21: "Cantidad de vehículos involucrados en accidentes de tránsito ocurridos en la República de Guatemala, por tipo de vehículo, según departamento, año 2024.",
    22: "Cantidad de vehículos involucrados en accidentes de tránsito ocurridos en la República de Guatemala, por sexo y condición del conductor, según departamento, año 2024.",
    23: "Cantidad de vehículos involucrados en accidentes de tránsito ocurridos en la República de Guatemala, por sexo y condición del conductor, según mes, año 2024.",
    24: "Cantidad de vehículos involucrados en accidentes de tránsito ocurridos en la República de Guatemala, por día de la semana, según condición del conductor y sexo, año 2024.",
    25: "Cantidad de vehículos involucrados en accidentes de tránsito ocurridos en la República de Guatemala, por sexo y condición del conductor, según hora, año 2024.",
    26: "Cantidad de vehículos involucrados en accidentes de tránsito ocurridos en la República de Guatemala, por sexo y condición del conductor, según grupos de edad, año 2024",
    27: "Cantidad de vehículos involucrados en accidentes de tránsito ocurridos en la República de Guatemala, por tipo de accidente, según tipo de vehículo y sexo, año 2024.",
    28: "Cantidad de vehículos involucrados en accidentes de tránsito ocurridos en el municipio de Guatemala, por tipo de vehículo, según zona de ocurrencia, año 2024.",
    29: "Víctimas por accidentes de tránsito ocurridos en la República de Guatemala por año, según departamento. Serie histórica 2020 - 2024.",
    30: "Víctimas por accidentes de tránsito ocurridos en la República de Guatemala, por sexo y estado del implicado, según departamento, año 2024.",
    31: "Lesionados en accidentes de tránsito ocurridos en la República de Guatemala, por año, según departamento. Serie histórica 2020 - 2024.",
    32: "Lesionados en accidentes de tránsito ocurridos en la República de Guatemala, por mes, según departamento, año 2024.",
    33: "Lesionados en accidentes de tránsito ocurridos en la República de Guatemala, por día de la semana, según departamento, año 2024.",
    34: "Lesionados en accidentes de tránsito ocurridos en la República de Guatemala, por grupos de edad, según departamento, año 2024.",
    35: "Lesionados en accidentes de tránsito ocurridos en la República de Guatemala, por día de la semana, según hora, año 2024.",
    36: "Lesionados en accidentes de tránsito ocurridos en la República de Guatemala, por sexo y tipo de accidente, según tipo de vehículo, año 2024.",
    37: "Lesionados en accidentes de tránsito ocurridos en la República de Guatemala, por día de la semana, según tipo de accidente y sexo, año 2024.",
    38: "Lesionados en accidentes de tránsito ocurridos en la República de Guatemala, por sexo, según grupos de edad, año 2024.",
    39: "Lesionados por atropello en accidentes de tránsito ocurridos en la República de Guatemala, por mes, según departamento, año 2024.",
    40: "Lesionados por atropello en accidentes de tránsito ocurridos en la República de Guatemala, por día de la semana, según departamento, año 2024.",
    41: "Lesionados por atropello en accidentes de tránsito ocurridos en la República de Guatemala, por grupos de edad, según departamento, año 2024.",
    42: "Lesionados por atropello en accidentes de tránsito ocurridos en la República de Guatemala, por día de la semana, según hora, año 2024.",
    43: "Lesionados por atropello en accidentes de tránsito ocurridos en la República de Guatemala, por sexo, según tipo de vehículo, año 2024.",
    44: "Lesionados en accidentes de tránsito ocurridos en el municipio de Guatemala, por grupos de edad, según zona de ocurrencia, año 2024.",
    45: "Lesionados en accidentes de tránsito ocurridos en el municipio de Guatemala, por día de la semana, según zona de ocurrencia, año 2024.",
    46: "Lesionados en accidentes de tránsito ocurridos en el municipio de Guatemala, por sexo, según zona de ocurrencia, año 2024.",
    47: "Fallecidos en accidentes de tránsito ocurridos en la República de Guatemala, por año, según departamento. Serie histórica 2020 - 2024.",
    48: "Fallecidos en accidentes de tránsito ocurridos en la República de Guatemala, por mes, según departamento, año 2024.",
    49: "Fallecidos en accidentes de tránsito ocurridos en la República de Guatemala, por día de la semana, según departamento, año 2024.",
    50: "Fallecidos en accidentes de tránsito ocurridos en la República de Guatemala, por grupos de edad, según departamento, año 2024.",
    51: "Fallecidos en accidentes de tránsito ocurridos en la República de Guatemala, por día de la semana, según hora, año 2024.",
    52: "Fallecidos en accidentes de tránsito ocurridos en la República de Guatemala, por sexo y tipo de accidente, según tipo de vehículo, año 2024.",
    53: "Fallecidos en accidentes de tránsito ocurridos en la República de Guatemala, por día de la semana, según tipo de accidente y sexo, año 2024.",
    54: "Fallecidos en accidentes de tránsito ocurridos en la República de Guatemala, por sexo, según grupos de edad, año 2024.",
    55: "Fallecidos por atropello en accidentes de tránsito ocurridos en la República de Guatemala, por mes, según departamento, año 2024.",
    56: "Fallecidos por atropello en accidentes de tránsito ocurridos en la República de Guatemala, por día de la semana, según departamento, año 2024.",
    57: "Fallecidos por atropello en accidentes de tránsito ocurridos en la República de Guatemala, por grupos de edad, según departamento, año 2024.",
    58: "Fallecidos por atropello en accidentes de tránsito ocurridos en la República de Guatemala, por día de la semana, según hora, año 2024.",
    59: "Fallecidos por atropello en accidentes de tránsito ocurridos en la República de Guatemala, por sexo, según tipo de vehículo, año 2024.",
    60: "Fallecidos en accidentes de tránsito ocurridos en el municipio de Guatemala, por grupos de edad, según zona de ocurrencia, año 2024.",
    61: "Fallecidos en accidentes de tránsito ocurridos en el municipio de Guatemala, por día de la semana, según zona de ocurrencia, año 2024.",
    62: "Fallecidos en accidentes de tránsito ocurridos en el municipio de Guatemala, por sexo, según zona de ocurrencia, año 2024.",
    63: "Tasa de víctimas involucradas en accidentes de tránsito ocurridos en la República de Guatemala por año, según departamento por cada 100,000 habitantes. Serie histórica 2020 - 2024.",
    64: "Tasa de víctimas lesionadas involucradas en accidentes de tránsito ocurridos en la República de Guatemala por año, según departamento por cada 100,000 habitantes. Serie histórica 2020 – 2024.",
    65: "Tasa de víctimas fallecidas involucradas en accidentes de tránsito ocurridos en la República de Guatemala por año, según departamento por cada 100,000 habitantes. Serie histórica 2020 - 2024."
}

import re

def _titulo_para(name: str) -> str:
    # name viene como "cuadro 12", extraemos el índice para buscar en TITULOS
    m = re.search(r"(\d+)$", name.strip())
    if m:
        idx = int(m.group(1))
        return TITULOS.get(idx, f"Cuadro {idx}")
    return name.title()

from pyspark.sql import functions as F

def show_sample(df, titulo, n=10, cols=None, truncate=False):
    displayHTML(f"<h4 style='margin:6px 0'>{titulo} — muestra de {n} filas</h4>")
    # Si nos pasan columnas, filtramos a las que existan
    if cols:
        if isinstance(cols, str):
            cols = [c.strip() for c in cols.split(",")]
        keep = [c for c in cols if c in df.columns]
        if keep:
            df = df.select(*[F.col(c) for c in keep])
    # En Databricks, display(df.limit(n)) es más cómodo que df.show()
    display(df.limit(n))

# Qué mostramos: si ya tienes 'sampled', úsalo; si no, usa hasta 5 de loaded_names
a_mostrar = sampled if sampled else loaded_names[:5]

for name in a_mostrar:
    df = dfs[name]
    titulo = _titulo_para(name)
    show_sample(df, titulo, n=10)
# ================================================================================================

In [0]:
import re

def _idx(name: str) -> int:
    m = re.search(r"(\d+)$", name.strip())
    return int(m.group(1)) if m else 10**9

numeric_primitives = ("byte", "short", "int", "bigint", "long", "float", "double")

for name in sorted(loaded_names, key=_idx):
    df = dfs[name]

    # Detect numeric columns (including decimal with any precision/scale)
    num_cols = [
        c for c, t in df.dtypes
        if t in numeric_primitives or t.lower().startswith("decimal")
    ]

    displayHTML(f"<h4 style='margin:10px 0'>{name.title()} — columnas numéricas</h4>")
    if not num_cols:
        displayHTML("<p style='margin:0 0 8px 0'><i>Sin columnas numéricas detectadas.</i></p>")
        continue

    # Ensure all columns exist in DataFrame
    existing_num_cols = [c for c in num_cols if c in df.columns]
    if not existing_num_cols:
        displayHTML("<p style='margin:0 0 8px 0'><i>No valid numeric columns found in DataFrame.</i></p>")
        continue

    # Show statistical summary of numeric columns
    displayHTML("<p style='margin:6px 0 4px 0'><i>Resumen (describe):</i></p>")
    display(
        df.select(*existing_num_cols).describe()
    )

In [0]:
from pyspark.sql import Row

percentiles = [0.25, 0.5, 0.75]
rel_tol = 0.01  # tolerancia de approxQuantile

for name in sorted(loaded_names, key=_idx):
    df = dfs[name]
    num_cols = [c for c, t in df.dtypes
                if t in numeric_primitives or t.lower().startswith("decimal")]

    if not num_cols:
        continue

    displayHTML(f"<h4 style='margin:10px 0'>{name.title()} — percentiles</h4>")
    rows = []
    for c in num_cols:
        qs = df.approxQuantile(c, percentiles, rel_tol)
        rows.append(Row(col=c, p25=qs[0], p50=qs[1], p75=qs[2]))
    spark.createDataFrame(rows).show(truncate=False)

#2. Identificar los años disponibles en cada tabla y validar si coinciden.

In [0]:
# ===================== DETECCIÓN DE AÑOS EN TODAS LAS TABLAS (1–65) =====================
def _idx(name: str) -> int:
    m = re.search(r"(\d+)$", name.strip())
    return int(m.group(1)) if m else 10**9

YEAR_COL_REGEX = re.compile(
    r"^(anio|ano|a[o]?|year|anio_?hecho|ano_?hecho|anio_?ocurrencia|ano_?ocurrencia|anio_?registro)$",
    re.I
)

YEAR_MIN, YEAR_MAX = 1990, 2035

def _years_from_candidate_cols(df):
    years = set()
    candidates = [c for c in df.columns if YEAR_COL_REGEX.search(c)]
    for c in candidates:
        ydf = (df
               .select(F.col(c).cast("int").alias("y"))
               .where(F.col("y").isNotNull() & (F.col("y") >= YEAR_MIN) & (F.col("y") <= YEAR_MAX))
               .select("y").distinct())
        years.update([r["y"] for r in ydf.collect()])
    return years

def _years_from_headers(df):
    """Si la tabla está en formato ancho (años como columnas), detecta encabezados 'YYYY' válidos."""
    years = set()
    for c in df.columns:
        cc = c.strip()
        if re.fullmatch(r"\d{4}", cc):
            y = int(cc)
            if YEAR_MIN <= y <= YEAR_MAX:
                years.add(y)
    return years

def detectar_anios(df):
    """Intenta por columnas de año; si no hay, intenta por encabezados."""
    y1 = _years_from_candidate_cols(df)
    if y1:
        return sorted(y1)
    y2 = _years_from_headers(df)
    if y2:
        return sorted(y2)
    return []

displayHTML("<h3 style='margin:8px 0'>Años disponibles por tabla</h3>")

anios_por_tabla = {}
problemas_fuera_de_rango = []

for name in sorted(loaded_names, key=_idx):
    df = dfs[name]
    anios = detectar_anios(df)
    anios_por_tabla[name] = anios if anios else None
    lista = ", ".join(map(str, anios)) if anios else "<i>No se encontraron años</i>"
    displayHTML(f"<p style='margin:4px 0'><b>{name.title()}:</b> {lista}</p>")

# --- Validación global ---
validas = [set(v) for v in anios_por_tabla.values() if v]

if validas:
    all_equal = all(s == validas[0] for s in validas)
    inter = sorted(list(set.intersection(*validas))) if len(validas) > 1 else sorted(list(validas[0]))
    uni = sorted(list(set.union(*validas)))        if len(validas) > 1 else sorted(list(validas[0]))

    displayHTML("<h3 style='margin:12px 0 4px 0'>Validación</h3>")
    displayHTML(f"<p style='margin:0'><b>¿Todos iguales?</b> {'Sí' if all_equal else 'No'}</p>")
    displayHTML(f"<p style='margin:0'><b>Intersección:</b> {', '.join(map(str, inter)) if inter else '—'}</p>")
    displayHTML(f"<p style='margin:0'><b>Unión:</b> {', '.join(map(str, uni)) if uni else '—'}</p>")
else:
    displayHTML("<p style='margin:8px 0'><i>No hay tablas con años detectados para comparar.</i></p>")
#==============================================================================

# 3. Mostrar los valores distintos de tipo de accidente.

In [0]:
# ===================== TIPOS DE ACCIDENTE (valores distintos) =====================
import re
from pyspark.sql import functions as F

# --- 1) identificar qué cuadros "van de tipo de accidente" a partir del diccionario ---
diccionario = TITULOS  
cuadros_tipo = sorted([i for i, desc in diccionario.items() if "tipo de accidente" in desc.lower()])

# --- 2) normalizador simple de columnas (igual al que usamos antes) ---
import re as _re
def normalizar_cols(df):
    nuevos, vistos = [], set()
    for i, c in enumerate(df.columns):
        name = str(c) if c is not None else f"col_{i+1}"
        name = name.strip()
        name = _re.sub(r"\s+", "_", name)
        name = _re.sub(r"[^0-9A-Za-z_]", "", name)
        if not name:
            name = f"col_{i+1}"
        base, k = name, 1
        while name in vistos:
            k += 1
            name = f"{base}_{k}"
        vistos.add(name)
        nuevos.append(name)
    return df.toDF(*nuevos)

# --- 3) helpers de detección ---
RE_TIPO_COL = re.compile(r"(tipo.*acciden|acciden.*tipo)", re.I)

# columnas de dimensión / medidas a excluir cuando la tabla esté "ancha"
EXCLUDE_HEADERS = set([
    "departamento","depto","municipio","zona","mes","dia","hora",
    "anio","ano","ao","anio_hecho","ano_hecho","anio_ocurrencia","ano_ocurrencia","anio_registro",
    "sexo","condicion","estado","grupos_de_edad","grupo_de_edad","grupo_edad","edad",
    "tipo_de_vehiculo","tipo_vehiculo","vehiculo","color","modelo",
    "total","cantidad","conteo","victimas","lesionados","fallecidos"
])

YEAR_MIN, YEAR_MAX = 1990, 2035

def _is_year_col(name: str) -> bool:
    return bool(re.fullmatch(r"\d{4}", name)) and (YEAR_MIN <= int(name) <= YEAR_MAX)

def _idx(name: str) -> int:
    m = re.search(r"(\d+)$", name.strip())
    return int(m.group(1)) if m else 10**9

# --- 4) recorrer y mostrar valores distintos ---
displayHTML("<h3 style='margin:8px 0'>Valores distintos de <i>tipo de accidente</i></h3>")

tipos_global = set()

for i in cuadros_tipo:
    name = f"cuadro {i}"
    if name not in dfs:
        displayHTML(f"<p><b>{name.title()}:</b> <i>No cargado.</i></p>")
        continue

    df_raw = dfs[name]
    df = normalizar_cols(df_raw)

    # a) intentar columna explícita
    tipo_col = None
    for c in df.columns:
        if RE_TIPO_COL.search(c):
            tipo_col = c
            break

    tipos = []
    if tipo_col:
        # columna encontrada -> extraer valores distintos (string limpio, sin nulos)
        vals = (df.select(F.col(tipo_col).cast("string").alias("tipo"))
                  .where(F.col("tipo").isNotNull() & (F.length(F.trim("tipo")) > 0))
                  .select(F.trim("tipo").alias("tipo")).distinct()
                  .orderBy("tipo"))
        tipos = [r["tipo"] for r in vals.collect()]
    else:
        # b) formato ancho: los encabezados son los tipos
        # candidatos = columnas que NO sean año, NO estén en EXCLUDE_HEADERS y NO sean 4 dígitos
        candidatos = []
        for c in df.columns:
            cl = c.lower()
            if _is_year_col(c):
                continue
            if cl in EXCLUDE_HEADERS:
                continue
            # si parece claramente métrica numérica genérica, saltar (opcional)
            if cl in ("valor","valores","monto","cantidad_total"):
                continue
            candidatos.append(c)

        # si la tabla es bien ancha, estos suelen ser los tipos
        # Validación ligera: si hay muy pocos candidatos y además ninguna col de "tipo", mostramos aviso
        tipos = sorted(candidatos)

    if tipos:
        tipos_global.update(tipos)
        lista = ", ".join(tipos)
        displayHTML(f"<p style='margin:4px 0'><b>{name.title()}:</b> {lista}</p>")
    else:
        displayHTML(f"<p style='margin:4px 0'><b>{name.title()}:</b> <i>No se detectaron tipos</i></p>")

# --- 5) resumen global ---
if tipos_global:
    displayHTML("<h4 style='margin:12px 0 6px 0'>Resumen global (únicos)</h4>")
    displayHTML("<p style='margin:0'>" + ", ".join(sorted(tipos_global)) + "</p>")
else:
    displayHTML("<p style='margin:12px 0'><i>No se detectaron tipos de accidente en los cuadros identificados.</i></p>")
# =============================================================================================

#4. Calcular cuántos departamentos únicos aparecen en las bases.

In [0]:
# ===================== DEPARTAMENTOS ÚNICOS (filtrando por TITULOS) =====================
import re
from pyspark.sql import functions as F

# 1) Identificar qué cuadros mencionan "departamento" en el título/descripción
cuadros_con_departamento = sorted(
    [i for i, txt in TITULOS.items() if re.search(r"departament", txt, re.I)]
)

# 2) Helper: localizar la columna de departamento en cada DF
DEPTO_COL_REGEX = re.compile(r"^(depto|departamento|departament|dpto|dep)$", re.I)

def encontrar_col_departamento(df):
    for c in df.columns:
        if DEPTO_COL_REGEX.search(c.strip()):
            return c
    return None

# 3) Recorrer solo esos cuadros y reunir valores únicos
departamentos_global = set()

for i in cuadros_con_departamento:
    name = f"cuadro {i}"
    if name not in dfs:
        print(f"[WARN] {name} no está cargado; se omite.")
        continue

    df = dfs[name]
    col = encontrar_col_departamento(df)
    if not col:
        print(f"[INFO] {name} no tiene columna de departamento detectable; se omite.")
        continue

    vals = (df.select(F.col(col).cast("string").alias("depto"))
              .where(F.col("depto").isNotNull() & (F.length(F.trim("depto")) > 0))
              .select(F.trim("depto").alias("depto")).distinct())
    departamentos_global.update([r["depto"] for r in vals.collect()])

# 4) Mostrar resultados
lista_departamentos = sorted(departamentos_global)
displayHTML("<h3 style='margin:8px 0'>Departamentos únicos (solo cuadros que mencionan “departamento”)</h3>")
displayHTML(f"<p style='margin:0 0 6px 0'><b>Total:</b> {len(lista_departamentos)}</p>")
displayHTML("<p>" + ", ".join(lista_departamentos) + "</p>")
# =========================================================================================

# 5. ¿Cuál es el total de accidentes por año y departamento? Apóyese de la función groupBy. Investigue la función display que tiene Databricks y muestre su resultado en formato de gráfico de barras.

In [0]:
from pyspark.sql import functions as F
import re

df = dfs["cuadro 1"]

dept_col = next((c for c in df.columns if re.search(r"(depto|depart|departamento)", c, re.I)), df.columns[0])
year_cols = [c for c in df.columns if re.fullmatch(r"\d{4}", c)]

df_total = df.filter(F.lower(F.col(dept_col)).like("total%"))
if df_total.count() == 0:
    df_total = df.groupBy().agg(*[F.sum(F.col(c)).alias(c) for c in year_cols])

df_total_long = (df_total
    .select(
        F.explode(
            F.arrays_zip(
                F.array(*[F.lit(c) for c in year_cols]),
                F.array(*[F.col(c) for c in year_cols])
            )
        ).alias("kv")
    )
    .select(F.col("kv.0").alias("año"), F.col("kv.1").cast("int").alias("total_accidentes"))
    .orderBy("año"))

display(df_total_long)

Databricks visualization. Run in Databricks to view.

In [0]:
df_dept = df.filter(~F.lower(F.col(dept_col)).like("total%"))

df_long = (df_dept
    .select(
        F.col(dept_col),
        F.explode(
            F.arrays_zip(
                F.array(*[F.lit(c) for c in year_cols]),
                F.array(*[F.col(c) for c in year_cols])
            )
        ).alias("kv")
    )
    .select(
        F.col(dept_col).alias("departamento"),
        F.col("kv.0").alias("anio"),
        F.col("kv.1").cast("int").alias("accidentes")
    ))

df_grouped = (df_long.groupBy("anio", "departamento")
                       .agg(F.sum("accidentes").alias("total_accidentes"))
                       .orderBy("anio", "departamento"))

# Mostrar en Databricks
display(df_grouped)


Databricks visualization. Run in Databricks to view.

# 6. ¿Qué día de la semana registra más accidentes en 2024? Graficar con display en un gráfico de columnas

In [0]:
from pyspark.sql import functions as F
import re, unicodedata

df = dfs["cuadro 3"]  # ajusta el nombre si difiere

def strip_accents(s: str) -> str:
    return "".join(c for c in unicodedata.normalize("NFD", s or "") if unicodedata.category(c) != "Mn")

def norm(s: str) -> str:
    s = strip_accents(s).lower()
    return re.sub(r"[^a-z]", "", s)

# Variantes por día (para capturar 'Mircoles', 'Sbado', abreviaturas, etc.)
day_variants = {
    "Lunes":      ["lun"],
    "Martes":     ["mar"],
    "Miercoles":  ["mie","mir"],   # 'mircoles' sin 'e'
    "Jueves":     ["jue"],
    "Viernes":    ["vier","vie"],
    "Sabado":     ["sab","sb"],     # 'sbado' sin 'a'
    "Domingo":    ["dom"],
}

# Detectar columna real por día
colmap = {}
for c in df.columns:
    nk = norm(c)
    for day, roots in day_variants.items():
        if any((nk.startswith(r) or r in nk) for r in roots):
            # Si hay empate, quédate con el match más largo (más específico)
            if day not in colmap or len(nk) > len(norm(colmap[day])):
                colmap[day] = c

faltan = [d for d in day_variants if d not in colmap]
if faltan:
    raise ValueError(f"No encontré todas las columnas de días. Faltan: {faltan}. Detectadas: {colmap}")

# Quitar la fila Total si existe
col_dep = next((c for c in df.columns if "depar" in norm(c)), None)
df_work = df.where(F.lower(F.col(col_dep).cast("string")) != "total") if col_dep else df

# Unpivot a formato largo
pairs = []
ordered_days = ["Lunes","Martes","Miercoles","Jueves","Viernes","Sabado","Domingo"]
for d in ordered_days:
    pairs += [f"'{d}'", f"`{colmap[d]}`"]
stack_expr = f"stack(7, {', '.join(pairs)}) as (dia, accidentes)"

df_long = (df_work.selectExpr(stack_expr)
                   .select("dia", F.col("accidentes").cast("double")))

# Sumar 2024 por día
df_dia = (df_long.groupBy("dia")
                 .agg(F.sum(F.coalesce("accidentes", F.lit(0))).alias("total_accidentes"))
                 .orderBy(F.desc("total_accidentes")))

# Mostrar tabla (en la UI selecciona Bar chart: X=dia, Y=total_accidentes)
display(df_dia)

Databricks visualization. Run in Databricks to view.

# 7. Mostrar la distribución de accidentes por hora del día en el municipio de Guatemala. Graficar en un histograma.

In [0]:
from pyspark.sql import functions as F

# 1) Tomamos el cuadro 14
df = dfs["cuadro 14"]

# 2) Quitamos columnas 'Total' e 'Ignorada' si existen
cols_to_drop = [c for c in ["Total", "Ignorada"] if c in df.columns]
df_clean = df.drop(*cols_to_drop)

# 3) (Por si venían como filas) quitamos filas 'Ignorada' y 'Total'
df_clean = df_clean.filter(~F.col("Hora_de_ocurrencia").isin(["Ignorada", "Total"]))

# 4) Sumamos todas las columnas numéricas por franja horaria
num_cols = [c for c in df_clean.columns if c != "Hora_de_ocurrencia"]
df_horas = (
    df_clean
    .withColumn(
        "total_accidentes",
        sum(F.coalesce(F.col(c).cast("double"), F.lit(0.0)) for c in num_cols)
    )
    .select("Hora_de_ocurrencia", "total_accidentes")
)

# 5) Ordenamos por la hora inicial (00, 01, 02, …)
df_horas = (
    df_horas
    .withColumn("start_h", F.regexp_extract("Hora_de_ocurrencia", r"^(\d{2})", 1).cast("int"))
    .orderBy("start_h")
    .drop("start_h")
)

# 6) Mostrar y graficar con display (en la UI: Plot -> Bar -> Keys=Hora_de_ocurrencia, Values=total_accidentes)
display(df_horas)

Databricks visualization. Run in Databricks to view.

# 8. Unir la tabla de hechos de tránsito con la de vehículos usando una llavecompuesta por año, mes, departamento y tipo de accidente. ¿Cuántos registros combinados se logran?

In [0]:

# hechos_long: [anio, mes, departamento (NULL), tipo_de_accidente, accidentes]
# vehiculos_long: [anio, mes=NULL, departamento=NULL, tipo_de_accidente, num_vehiculos]

joined_min = hechos_long.join(
    vehiculos_long.select("anio","tipo_de_accidente","num_vehiculos"),
    on=["anio","tipo_de_accidente"], how="inner"
)

conteo_min = joined_min.count()
print(f"Registros combinados por ['anio','tipo_de_accidente']: {conteo_min}")
joined_min.orderBy("anio","tipo_de_accidente").show(20, truncate=False)


9. De la unión anterior, calcular el promedio de vehículos por accidente en cada
departamento. Guardar este resultado en formato Parquet. Luego, vuelva a cargarlo y
grafique los 10 departamentos con más vehículos/accidente

In [0]:
from pyspark.sql import functions as F
import re

def _norm_cols(df):
    try:
        return normalizar_cols(df)
    except NameError:
        d = df
        for c in d.columns:
            d = d.withColumnRenamed(c, c.strip().lower().replace(" ", "_"))
        return d

def _numeric_cols(df):
    prim = ("byte","short","int","bigint","long","float","double")
    return [c for c,t in df.dtypes if t in prim or t.lower().startswith("decimal")]

# Accidentes por departamento (2024) desde cuadro 1
c1 = _norm_cols(dfs["cuadro 1"])
dept_c1 = next((c for c in c1.columns if re.search(r"depto|depart", c, re.I)), None)
acc_2024 = (c1.select(F.col(dept_c1).alias("departamento"), F.col("2024").cast("double").alias("accidentes"))
              .where(~F.col("departamento").like("total%"))
              .withColumn("departamento", F.lower(F.trim(F.col("departamento")))))

# Vehículos por departamento (sumando todos los tipos) desde cuadro 8
c8 = _norm_cols(dfs["cuadro 8"])
dept_c8 = next((c for c in c8.columns if re.search(r"depto|depart", c, re.I)), None)
num_cols_c8 = [c for c in _numeric_cols(c8) if c.lower() != "total"]
veh_depto = (c8.select(F.col(dept_c8).alias("departamento"), *num_cols_c8)
               .where(~F.col(dept_c8).like("total%")))
veh_depto = (veh_depto
             .withColumn("departamento", F.lower(F.trim(F.col("departamento"))))
             .withColumn("vehiculos", sum([F.col(c).cast("double") for c in num_cols_c8])))

# Join por departamento y ratio
joined_dep = acc_2024.join(veh_depto.select("departamento","vehiculos"), on="departamento", how="inner")

res_dep = (joined_dep
           .select("departamento", (F.col("vehiculos")/F.col("accidentes")).alias("vehiculos_por_acc"))
           .orderBy(F.col("vehiculos_por_acc").desc()))

# Guardar & mostrar
out_path = str(BASE_DIR / "Output/vehiculos_por_accidente.parquet")
res_dep.write.mode("overwrite").parquet(out_path)
res_reload = spark.read.parquet(out_path)

print(f"✅ Guardado en Parquet: {out_path}")
display(res_reload.orderBy(F.col("vehiculos_por_acc").desc()).limit(10))  # grafícalo como barras en la UI


10. Encontrar el top 5 de colores de vehículos más involucrados en accidentes.



In [0]:
# === PUNTO 10: Top 5 de colores de vehículos ===
from pyspark.sql import functions as F

# Cargar el cuadro que contiene la distribución de vehículos por color
# Ajusta el número de cuadro si tu archivo tiene diferente orden
vehiculos_color = _norm_cols(dfs["cuadro 19"])

# Detectar columna de color
color_col = next((c for c in vehiculos_color.columns if "color" in c.lower()), None)
if not color_col:
    raise RuntimeError("No se encontró una columna 'Color' en el cuadro de vehículos por color. Verifica el número de cuadro.")

# Convertir todas las demás columnas numéricas y sumar
num_cols = [c for c,t in vehiculos_color.dtypes if t in ("int","double","float","bigint") and c != color_col]
vehiculos_color = vehiculos_color.withColumnRenamed(color_col, "color")

vehiculos_color_total = vehiculos_color.select(
    "color",
    sum([F.coalesce(F.col(c).cast("double"), F.lit(0)) for c in num_cols]).alias("total_accidentes")
)

# Agrupar por color por si hay filas repetidas
vehiculos_color_total = vehiculos_color_total.groupBy("color").agg(F.sum("total_accidentes").alias("total_accidentes"))

# Ordenar descendente y limitar a 5
top5_colores = vehiculos_color_total.orderBy(F.col("total_accidentes").desc()).limit(5)

display(top5_colores)  # en Databricks, usa gráfico de barras


**11**. (5 pts) Calcular cuántos lesionados por atropello hubo en 2024, por mes. Graficar en serie
temporal (línea).


In [0]:
import pandas as pd, re
from pyspark.sql import functions as F

excel_path = "/Volumes/workspace/default/lab8/Bases de datos principales PNC.xlsx"

# ----------------- utilidades -----------------
MES_MAP = {
    "enero":1,"febrero":2,"marzo":3,"abril":4,"mayo":5,"junio":6,
    "julio":7,"agosto":8,"septiembre":9,"setiembre":9,"octubre":10,"noviembre":11,"diciembre":12,
    "ene":1,"feb":2,"mar":3,"abr":4,"may":5,"jun":6,"jul":7,"ago":8,"sep":9,"oct":10,"nov":11,"dic":12,
    "1":1,"2":2,"3":3,"4":4,"5":5,"6":6,"7":7,"8":8,"9":9,"10":10,"11":11,"12":12,
}
MES_KEYS = set(MES_MAP.keys())

def s(x):
    if not isinstance(x,str): x = "" if pd.isna(x) else str(x)
    x = x.strip()
    x = (x.replace("Á","A").replace("É","E").replace("Í","I").replace("Ó","O").replace("Ú","U")
           .replace("á","a").replace("é","e").replace("í","i").replace("ó","o").replace("ú","u")
           .replace("Ñ","N").replace("ñ","n"))
    return x.lower()

def is_month_token(tok): 
    return s(tok) in MES_KEYS

def month_to_num(tok):
    return MES_MAP.get(s(tok))

def first_col_with(df, regex):
    for c in df.columns:
        col = df[c].astype(str).map(s)
        if col.str.contains(regex, regex=True, na=False).any():
            return c
    return None

# ----------------- escaneo robusto -----------------
xls = pd.ExcelFile(excel_path)
cuadros = [sh for sh in xls.sheet_names if sh.lower().startswith("cuadro")]

result = None
used_sheet = None
for sh in cuadros:
    try:
        raw = pd.read_excel(excel_path, sheet_name=sh, header=None, dtype=str)
        if raw.empty or raw.shape[1] < 2: 
            continue

        # --- Caso A: MESES COMO COLUMNAS (hay una fila con muchos nombres de mes) ---
        header_row = None
        for i in range(min(20, len(raw))):
            row = [s(x) for x in raw.iloc[i].tolist()]
            months_here = [c for c in row if is_month_token(c)]
            if len(months_here) >= 6:
                header_row = i
                break
        if header_row is not None:
            pdf = pd.read_excel(excel_path, sheet_name=sh, header=header_row)
            # normalizar encabezados simples
            pdf.columns = [re.sub(r"\s+","_", s(c)) for c in pdf.columns]
            first_col = pdf.columns[0]
            # detectar fila "atropello"
            mask_atr = pdf[first_col].astype(str).map(s).str.contains(r"\batropello\b", regex=True, na=False)
            if mask_atr.any():
                month_cols = [c for c in pdf.columns if is_month_token(c)]
                if month_cols:
                    longpdf = pdf.loc[mask_atr, [first_col]+month_cols].melt(
                        id_vars=[first_col], var_name="mes", value_name="lesionados"
                    )
                    longpdf["mes_num"] = longpdf["mes"].map(month_to_num)
                    longpdf["lesionados"] = pd.to_numeric(longpdf["lesionados"], errors="coerce")
                    longpdf = longpdf.dropna(subset=["mes_num"])
                    if not longpdf.empty:
                        used_sheet = (sh, "meses_en_columnas")
                        result = longpdf[["mes_num","lesionados"]].copy()
                        break  # listo

        # --- Caso B: MESES COMO FILAS (hay una columna con muchos nombres de mes) ---
        # Busca una columna con >=6 tokens de mes
        month_col = None
        for c in raw.columns:
            colvals = [s(v) for v in raw[c].tolist()[:200]]
            if sum(1 for v in colvals if is_month_token(v)) >= 6:
                month_col = c
                break
        if month_col is not None and result is None:
            # Releer con header en la primera fila de datos no ayuda aquí; trabajamos "as is"
            df = raw.copy()
            df.columns = [f"col_{i}" for i in range(df.shape[1])]
            # columna de meses:
            meses_series = df[f"col_{month_col}"].astype(str).map(s)
            # detectar la COLUMNA de "Atropello" en encabezados (una de las primeras ~10 filas tiene los headers)
            # Heurística: toma la 1a fila no-nula como encabezado tentativo y también prueba hasta 10 filas
            atrop_col_index = None
            for hdr_row in range(min(10, len(df))):
                headers = [s(x) for x in df.iloc[hdr_row].tolist()]
                try:
                    atrop_col_index = next((j for j,h in enumerate(headers) if re.search(r"\batropello\b", h)), None)
                except Exception:
                    atrop_col_index = None
                if atrop_col_index is not None:
                    # datos empiezan después de esa fila
                    data = df.iloc[hdr_row+1:].copy()
                    data.columns = headers  # aplicar encabezados
                    # limpiar filas vacías
                    data = data.dropna(how="all", axis=0)
                    # identificar columna de meses en los headers reales
                    mes_header = next((h for h in data.columns if is_month_token(h)), None)
                    if mes_header is None:
                        # si no se reconoce por header, usa la serie original "meses_series"
                        data["__mes"] = meses_series.iloc[hdr_row+1:].values
                        mes_header = "__mes"
                    # tomar la columna de atropello
                    atrop_col_name = data.columns[atrop_col_index] if atrop_col_index < len(data.columns) else None
                    if atrop_col_name is None:
                        continue
                    sub = data[[mes_header, atrop_col_name]].rename(columns={mes_header:"mes", atrop_col_name:"lesionados"})
                    sub["mes_num"] = sub["mes"].map(month_to_num)
                    sub["lesionados"] = pd.to_numeric(sub["lesionados"], errors="coerce")
                    sub = sub.dropna(subset=["mes_num"])
                    if not sub.empty:
                        used_sheet = (sh, "meses_en_filas", f"header_row={hdr_row}")
                        result = sub[["mes_num","lesionados"]].copy()
                        break
            if result is not None:
                break

    except Exception:
        continue

if result is None or result.empty:
    raise RuntimeError("No pude extraer la serie mensual de 'Atropello'. Abre visualmente 1–2 'cuadro X' donde veas Atropello y meses, y te adapto el lector exacto.")

# -------- a Spark y gráfico --------
les_spark = spark.createDataFrame(result)
lesionados_mes = (les_spark
                  .groupBy("mes_num")
                  .agg(F.sum(F.col("lesionados")).alias("total_lesionados"))
                  .orderBy("mes_num"))

print("Usando hoja:", used_sheet)
display(lesionados_mes)  # En el chart: Line • X=mes_num • Y=total_lesionados


In [0]:
from pyspark.sql import functions as F

todos = spark.createDataFrame([(i,) for i in range(1,13)], ["mes_num"])
lesionados_full = (todos
    .join(lesionados_mes, on="mes_num", how="left")
    .na.fill({"total_lesionados": 0})
    .orderBy("mes_num"))

display(lesionados_full)  # luego elige Line chart: X=mes_num, Y=total_lesionados


In [0]:
from pyspark.sql import functions as F

# Partimos de lesionados_full con columnas: mes_num, total_lesionados

# Construir pares planos (key, value, key, value, ...)
pairs = []
nombres = ["Enero","Febrero","Marzo","Abril","Mayo","Junio",
           "Julio","Agosto","Septiembre","Octubre","Noviembre","Diciembre"]
for i, nombre in enumerate(nombres, start=1):
    pairs += [F.lit(str(i)), F.lit(nombre)]

mes_nombre = F.create_map(*pairs)

lesionados_labeled = lesionados_full.withColumn(
    "mes",
    mes_nombre[F.col("mes_num").cast("string")]
)

display(lesionados_labeled.select("mes_num","mes","total_lesionados"))
# En el gráfico, puedes usar X = mes_num (o mes) y Y = total_lesionados



In [0]:
import matplotlib.pyplot as plt

# Convertir a Pandas para graficar fácilmente
df_plot = lesionados_labeled.orderBy("mes_num").toPandas()

plt.figure(figsize=(8,5))
plt.plot(df_plot["mes_num"], df_plot["total_lesionados"], marker="o", linewidth=2)
plt.title("Lesionados por atropello en 2024 (serie mensual)")
plt.xlabel("Mes")
plt.ylabel("Total de lesionados")
plt.xticks(df_plot["mes_num"], df_plot["mes"], rotation=45)
plt.grid(True)
plt.show()


12. Relacionar accidentes con fallecidos usando llaves (año, mes, departamento, tipo
de accidente). Calcular el total de fallecidos por cada tipo de accidente. Graficar en barras
horizontales

In [0]:
# === Diagnóstico y extracción de FALLECIDOS (rápido y determinista) ===
import pandas as pd, re
from pyspark.sql import functions as F

path = "/Volumes/workspace/default/lab8/Bases de datos principales PNC.xlsx"

# --- utilidades de texto/mes ---
MES = {
    "enero":1,"febrero":2,"marzo":3,"abril":4,"mayo":5,"junio":6,
    "julio":7,"agosto":8,"septiembre":9,"setiembre":9,"octubre":10,"noviembre":11,"diciembre":12,
    "ene":1,"feb":2,"mar":3,"abr":4,"may":5,"jun":6,"jul":7,"ago":8,"sep":9,"oct":10,"nov":11,"dic":12,
    "1":1,"2":2,"3":3,"4":4,"5":5,"6":6,"7":7,"8":8,"9":9,"10":10,"11":11,"12":12,
}
def norm(s):
    if not isinstance(s,str): return "" if pd.isna(s) else str(s)
    s = s.strip()
    s = (s.replace("Á","A").replace("É","E").replace("Í","I").replace("Ó","O").replace("Ú","U")
           .replace("á","a").replace("é","e").replace("í","i").replace("ó","o").replace("ú","u")
           .replace("Ñ","N").replace("ñ","n")).lower()
    return s
def is_month(x): return norm(x) in MES
def m2n(x): return MES.get(norm(x))

# --- 1) buscar hojas candidatas con 'fallecid' y meses ---
xls = pd.ExcelFile(path)
sheets = [s for s in xls.sheet_names if s.lower().startswith("cuadro")]
candidates = []

for sh in sheets:
    try:
        raw = pd.read_excel(path, sheet_name=sh, header=None, dtype=str)
        if raw.empty: continue

        # meses como columnas
        header_row = None
        for i in range(min(30, len(raw))):
            row = [norm(x) for x in raw.iloc[i].tolist()]
            if sum(1 for v in row if v in MES)>=6:
                header_row = i; break

        score = 0
        has_fal_title = raw.head(8).astype(str).apply(lambda c: c.str.contains("fallecid", case=False, na=False)).any().any()
        if has_fal_title: score += 1
        if header_row is not None: score += 1

        # meses como filas
        month_col = None
        for c in raw.columns:
            colvals = [norm(v) for v in raw[c].tolist()[:200]]
            if sum(1 for v in colvals if v in MES)>=6:
                month_col = c; break
        if month_col is not None: score += 1

        if score>=2:
            candidates.append((sh, score, header_row, month_col))
    except Exception:
        pass

print("Candidatos de 'Fallecidos' (sheet, score, header_row, month_col):")
for c in candidates[:10]:
    print("  ", c)

if not candidates:
    raise RuntimeError("No encontré ninguna hoja con 'Fallecid*' y meses. Abre el Excel y dime qué 'cuadro X' lo contiene.")

# --- 2) usar el mejor candidato y extraer a largo ---
best = candidates[0][0]   # tomamos el primero (mayor score)
raw = pd.read_excel(path, sheet_name=best, header=None, dtype=str)

# detectar encabezado si meses son columnas
header_row = None
for i in range(min(30, len(raw))):
    row = [norm(x) for x in raw.iloc[i].tolist()]
    if sum(1 for v in row if v in MES)>=6: header_row = i; break

if header_row is not None:
    pdf = pd.read_excel(path, sheet_name=best, header=header_row)
    pdf.columns = [norm(c).replace(" ","_") for c in pdf.columns]
    first = pdf.columns[0]
    month_cols = [c for c in pdf.columns if norm(c) in MES]
    if not month_cols:
        raise RuntimeError(f"{best}: no identifiqué columnas de meses.")
    # ¿hay columna explícita de fallecidos? si no, asumimos que cada mes es 'fallecidos'
    id_vars = [first] + [c for c in pdf.columns if c not in month_cols and c not in [first]]
    long = pdf.melt(id_vars=id_vars, value_vars=month_cols, var_name="mes", value_name="valor")
    long["mes_num"] = long["mes"].map(m2n)
    long["tipo_accidente"] = pdf[first]
    # intenta detectar 'fallecid' en nombres
    if not any("fallecid" in col for col in pdf.columns):
        long.rename(columns={"valor":"fallecidos"}, inplace=True)
    else:
        # si existiera columna específica, úsala
        falcol = [c for c in pdf.columns if "fallecid" in c]
        if falcol:
            long["fallecidos"] = pd.to_numeric(pdf[falcol[0]], errors="coerce")
        else:
            long["fallecidos"] = pd.to_numeric(long["valor"], errors="coerce")
else:
    # meses como filas: encontrar columna de meses y columna con fallecidos
    # asumimos que en alguna de las primeras 10 filas están los headers
    hdr = None
    for r in range(min(10,len(raw))):
        head = [norm(x) for x in raw.iloc[r].tolist()]
        if any("fallecid" in h for h in head):
            hdr = r; break
    if hdr is None:
        raise RuntimeError(f"{best}: no encontré fila de encabezado con 'Fallecid*'.")
    pdf = pd.read_excel(path, sheet_name=best, header=hdr, dtype=str)
    pdf.columns = [norm(c).replace(" ","_") for c in pdf.columns]
    mcol = None
    for c in pdf.columns:
        if pdf[c].astype(str).map(is_month).sum()>=6:
            mcol = c; break
    falcol = next((c for c in pdf.columns if "fallecid" in c), None)
    tcol  = next((c for c in pdf.columns if "tipo" in c), None)
    if mcol is None or falcol is None:
        raise RuntimeError(f"{best}: identifiqué meses/fallecidos incompletos (mes={mcol}, fallecidos={falcol}).")
    long = pd.DataFrame({
        "mes_num": pdf[mcol].map(m2n),
        "fallecidos": pd.to_numeric(pdf[falcol], errors="coerce"),
        "tipo_accidente": pdf[tcol] if tcol in pdf.columns else "na"
    })

# limpiar y suponer 2024 si no hay año explícito
long = long.dropna(subset=["mes_num"])
long["anio"] = 2024
long["tipo_accidente"] = long["tipo_accidente"].astype(str).str.strip().str.lower()

# a Spark y agregar por tipo (inciso 12 pide barras por tipo)
fal_s_simple = spark.createDataFrame(long[["anio","mes_num","tipo_accidente","fallecidos"]])
res_12_simple = (fal_s_simple
    .filter(F.col("anio")==2024)
    .groupBy("tipo_accidente")
    .agg(F.sum("fallecidos").alias("total_fallecidos"))
    .orderBy(F.desc("total_fallecidos")))

print("Usando hoja de fallecidos:", best)
display(res_12_simple)  # luego elige: Bar -> Values(X)=total_fallecidos, Keys(Y)=tipo_accidente, Orientation=Horizontal





In [0]:
import matplotlib.pyplot as plt
import pandas as pd

# Crear DataFrame con tus resultados
data = {
    "tipo_accidente": [
        "guatemala", "escuintla", "alta verapaz", "sacatepéquez", "chimaltenango",
        "petén", "jutiapa", "baja verapaz", "san marcos", "izabal", "santa rosa",
        "suchitepéquez", "el progreso", "quetzaltenango", "retalhuleu", "jalapa",
        "quiché", "sololá", "chiquimula", "huehuetenango", "zacapa", "totonicapán"
    ],
    "total_fallecidos": [
        263, 81, 29, 27, 25, 24, 24, 18, 16, 16, 15, 13, 12, 11, 11, 11, 7, 6, 6, 5, 4, 2
    ]
}

df = pd.DataFrame(data).sort_values("total_fallecidos", ascending=True)

# === Gráfica ===
plt.figure(figsize=(9, 6))
bars = plt.barh(df["tipo_accidente"], df["total_fallecidos"], color="#1f77b4")
plt.title("Total de fallecidos por tipo de accidente (2024)", fontsize=13)
plt.xlabel("Total de fallecidos")
plt.ylabel("Tipo de accidente")
plt.grid(axis="x", linestyle="--", alpha=0.6)

# Etiquetas al final de cada barra
for bar in bars:
    width = bar.get_width()
    plt.text(width + 2, bar.get_y() + bar.get_height()/2,
             f"{int(width)}", va='center', fontsize=9)

plt.tight_layout()
plt.show()


> 13. Usar withColumn para clasificar accidentes en franjas horarias: Mañana [6-12), Tarde [12-18), Noche [18-24), Madrugada [0-6). Mostrar cuántos accidentes ocurren en cada franja

In [0]:
# === Inciso 13: detección automática de horas + clasificación por franja ===
import re
import pandas as pd
from pyspark.sql import functions as F

excel_path = "/Volumes/workspace/default/lab8/Bases de datos principales PNC.xlsx"

def norm(s):
    if not isinstance(s, str):
        s = "" if pd.isna(s) else str(s)
    s = s.strip()
    s = (s.replace("Á","A").replace("É","E").replace("Í","I").replace("Ó","O").replace("Ú","U")
           .replace("á","a").replace("é","e").replace("í","i").replace("ó","o").replace("ú","u")
           .replace("Ñ","N").replace("ñ","n")).lower()
    return s

# ¿Luce como hora (0–23, “06”, “6”, “06:00”, “6-7”, “6–7”, “06 a 07”, etc.)?
H_RE = re.compile(r"^(\d{1,2})(?::\d{2})?$")         # 6, 06, 06:00
RNG_RE = re.compile(r"^(\d{1,2})\s*[-–a]\s*(\d{1,2})")# 6-7, 6–7, 6 a 7

def token_to_hour(tok):
    t = norm(tok)
    m = H_RE.match(t)
    if m:
        h = int(m.group(1))
        return h if 0 <= h <= 23 else None
    m2 = RNG_RE.match(t)
    if m2:
        h = int(m2.group(1))
        return h if 0 <= h <= 23 else None
    return None

def looks_like_hour_row(vals):
    vals = [v for v in vals if str(v).strip() != ""]
    hits = sum(1 for v in vals if token_to_hour(v) is not None)
    return hits >= 6  # una fila con muchas "horas" como columnas

xls = pd.ExcelFile(excel_path)
sheets = [s for s in xls.sheet_names if s.lower().startswith("cuadro")]

hours_long_pd = None
used_sheet = None

for sh in sheets:
    try:
        raw = pd.read_excel(excel_path, sheet_name=sh, header=None, dtype=str)
        if raw.empty: 
            continue

        # --- Caso A: horas como columnas (buscar una fila encabezado con muchas horas)
        header_row = None
        for i in range(min(40, len(raw))):
            if looks_like_hour_row(raw.iloc[i].tolist()):
                header_row = i
                break

        if header_row is not None:
            pdf = pd.read_excel(excel_path, sheet_name=sh, header=header_row)
            pdf.columns = [norm(c).replace(" ", "_") for c in pdf.columns]
            first = pdf.columns[0]
            hour_cols = [c for c in pdf.columns if token_to_hour(c) is not None]
            if hour_cols:
                # melt a largo
                long = pdf.melt(id_vars=[first], value_vars=hour_cols,
                                var_name="hora_tok", value_name="accidentes")
                long["hora_num"] = long["hora_tok"].map(token_to_hour)
                long["accidentes"] = pd.to_numeric(long["accidentes"], errors="coerce")
                long = long.dropna(subset=["hora_num"])
                if not long.empty:
                    hours_long_pd = long[["hora_num","accidentes"]]
                    used_sheet = (sh, "horas_en_columnas", f"header_row={header_row}")
                    break

        # --- Caso B: horas como filas (buscar una columna con muchas horas)
        hour_col_idx = None
        for c in raw.columns:
            colvals = raw[c].tolist()[:200]
            hits = sum(1 for v in colvals if token_to_hour(v) is not None)
            if hits >= 6:
                hour_col_idx = c
                break

        if hours_long_pd is None and hour_col_idx is not None:
            # Adivina encabezado: alguna fila temprana que contenga "total" o algo numérico
            header_guess = None
            for r in range(min(10, len(raw))):
                vals = [norm(x) for x in raw.iloc[r].tolist()]
                if any("total" in v for v in vals) or any(re.search(r"\d", v) for v in vals):
                    header_guess = r
                    break
            if header_guess is None:
                header_guess = 0

            pdf = pd.read_excel(excel_path, sheet_name=sh, header=header_guess, dtype=str)
            pdf.columns = [norm(c).replace(" ", "_") for c in pdf.columns]

            # localizar col con hora y col con accidentes (o total)
            hora_col = None
            for c in pdf.columns:
                if pdf[c].astype(str).map(lambda x: token_to_hour(x) is not None).sum() >= 6:
                    hora_col = c; break

            # heurística de métrica: buscar “acciden”, “hecho”, “event”, “total”
            val_col = next((c for c in pdf.columns if re.search(r"(acciden|hech|event|total)", c)), None)
            if hora_col and val_col:
                tmp = pd.DataFrame({
                    "hora_num": pdf[hora_col].map(token_to_hour),
                    "accidentes": pd.to_numeric(pdf[val_col], errors="coerce")
                }).dropna(subset=["hora_num"])
                if not tmp.empty:
                    hours_long_pd = tmp[["hora_num","accidentes"]]
                    used_sheet = (sh, "horas_en_filas", f"header_row={header_guess}")
                    break
    except Exception:
        continue

if hours_long_pd is None or hours_long_pd.empty:
    raise RuntimeError("No pude localizar horas en los 'cuadro X'. Abre el Excel y dime cuál cuadro tiene los accidentes por hora para ajustarlo exacto.")

print("Usando hoja para horas:", used_sheet)

# --- A Spark ---
df_horas = spark.createDataFrame(hours_long_pd)

# Clasificación por franja con withColumn
df_franjas = (df_horas
    .withColumn(
        "franja_horaria",
        F.when((F.col("hora_num") >= 6) & (F.col("hora_num") < 12), "Mañana")
         .when((F.col("hora_num") >= 12) & (F.col("hora_num") < 18), "Tarde")
         .when((F.col("hora_num") >= 18) & (F.col("hora_num") < 24), "Noche")
         .otherwise("Madrugada")
    )
)

res_13 = (df_franjas
    .groupBy("franja_horaria")
    .agg(F.sum(F.col("accidentes")).alias("total_accidentes"))
    .orderBy(F.expr("CASE franja_horaria WHEN 'Madrugada' THEN 0 WHEN 'Mañana' THEN 1 WHEN 'Tarde' THEN 2 WHEN 'Noche' THEN 3 END"))
)

display(res_13) 

