
# Análisis Dask — **Notebook final del equipo (100% comentado)**
> Usamos `katalog_gempa.csv` (columnas: `tgl`, `ot`, `lat`, `lon`, `depth`, `mag`, `remark`).  
> Construimos `time = to_datetime(tgl + " " + ot)` y hacemos EDA + comparativa Pandas vs Dask.  
> Escrito en **primera persona plural** como estudiantes.


In [None]:

# ==============================================
# IMPORTS Y OPCIONES GLOBALES
# ==============================================
# En esta celda importamos lo mínimo necesario y dejamos opciones
# para que el notebook sea robusto en Windows/VS Code.
# Forzamos a pandas a usar almacenamiento de strings en Python
# (evita requerir 'pyarrow') y dejamos todo listo para graficar.
# ----------------------------------------------

# Importamos librerías base
import os  # Para verificar rutas y crear carpetas de salida
from time import perf_counter  # Para medir tiempos en nuestros experimentos
import pandas as pd  # API de análisis tabular secuencial
pd.options.mode.string_storage = "python"  # <- evitamos backend 'pyarrow' en strings

# Importamos Dask DataFrame y distribución
import dask.dataframe as dd  # API tipo pandas pero en paralelo/perezoso
from dask.distributed import Client, LocalCluster  # Para cluster local

# Importamos matplotlib para gráficos
import matplotlib.pyplot as plt  # Gráficos estáticos


In [None]:

# ==============================================
# INICIAR / REINICIAR CLUSTER DASK (ROBUSTO)
# ==============================================
# Nuestro objetivo aquí es levantar un cluster local estable en Windows,
# sin widgets (evitamos jinja2/bokeh) y con el dashboard en un puerto libre.
# ----------------------------------------------

# 1) Cerramos instancias previas si quedaron vivas (evita choques de puertos)
for obj in ("client", "cluster"):
    try:
        globals().get(obj) and globals()[obj].close()
    except Exception:
        pass

# 2) Creamos un cluster local. Ajustar n_workers/threads_per_worker según CPU.
N_WORKERS = 4            # Podemos cambiar a 2/8 según el equipo real
THREADS_PER_WORKER = 2   # Ídem
cluster = LocalCluster(
    n_workers=N_WORKERS,
    threads_per_worker=THREADS_PER_WORKER,
    processes=True,            # Usamos procesos separados (suele rendir mejor)
    dashboard_address=":0",    # ":0" = Dask elige un puerto libre automáticamente
)
client = Client(cluster)

# 3) NO renderizamos el widget de Client; imprimimos sólo el enlace
print("Dask dashboard:", getattr(client, "dashboard_link", "(no disponible)"))


In [None]:

# ==============================================
# LECTURA DEL CSV + CONSTRUCCIÓN DE COLUMNA 'time'
# ==============================================
# Aquí leemos el archivo 'katalog_gempa.csv' (debe estar junto al notebook
# o usamos ruta absoluta). Evitamos 'parse_dates' porque la fecha real
# viene separada en 'tgl' (YYYY/MM/DD) y 'ot' (HH:MM:SS.sss).
# Luego construimos 'time_str' y de ahí 'time' con to_datetime.
# ----------------------------------------------

# 1) Definimos la ruta del CSV (si está junto al .ipynb, así basta)
CSV_PATH = r"katalog_gempa.csv"  # Cambiar a ruta absoluta si hace falta

# 2) Verificamos que el archivo exista antes de leerlo
print("Archivo existe? ", os.path.exists(CSV_PATH), "→", CSV_PATH)
assert os.path.exists(CSV_PATH), f"No encontramos el CSV en: {CSV_PATH}"

# 3) Leemos de forma perezosa en Dask con tipos explícitos para evitar sorpresas
t0 = perf_counter()
df = dd.read_csv(
    CSV_PATH,
    dtype={
        "tgl": "object",   # fecha en texto
        "ot": "object",    # hora en texto
        "lat": "float64",
        "lon": "float64",
        "depth": "float64",
        "mag": "float64",  # magnitud numérica
        "remark": "object" # descripción/lugar en texto
    },
    assume_missing=True,   # Permite NaN cuando hay vacíos en columnas numéricas
    blocksize="64MB",      # Particionado razonable para equipos personales
    # No usamos on_bad_lines para máxima compatibilidad de pandas
)
t1 = perf_counter()
print(f"Particiones: {df.npartitions} | tiempo lectura (metadatos): {t1 - t0:.2f}s")

# 4) Mostramos columnas y una muestra pequeña (head() sí computa una porción)
print("Columnas disponibles:", list(df.columns))
print("\\nMuestra (5 filas):")
display(df.head(5))

# 5) Construimos 'time' desde 'tgl' + 'ot' y definimos los nombres estándar
#    para usarlos el resto del notebook.
df['time_str'] = df['tgl'].astype('string').str.strip() + " " + df['ot'].astype('string').str.strip()
df['time'] = dd.to_datetime(df['time_str'], errors='coerce')  # naive (sin tz)
TIME_COL, MAG_COL, PLACE_COL = 'time', 'mag', 'remark'


In [None]:

# ==============================================
# LIMPIEZA Y TIPOS
# ==============================================
# En esta celda garantizamos que:
# - la magnitud sea numérica (coerción convierte texto inválido a NaN),
# - el lugar/descripción esté normalizado como texto,
# - y descartamos filas sin 'time' o sin 'mag' para análisis consistentes.
# ----------------------------------------------

# 1) Aseguramos tipos (si algo vino raro del CSV, lo corregimos)
df[MAG_COL] = dd.to_numeric(df[MAG_COL], errors='coerce')
df[PLACE_COL] = df[PLACE_COL].astype('string').str.strip()

# 2) Eliminamos filas que no tengan tiempo o magnitud (datos esenciales)
df = df.dropna(subset=[TIME_COL, MAG_COL])

# 3) Mostramos tipos y un head tras limpieza para validar que todo tenga sentido
print("dtypes (lazy):")
print(df.dtypes)
print("\nHead tras limpieza:")
display(df.head(5))


In [None]:

# ==============================================
# DESCRIPTIVOS, TOTAL DE EVENTOS Y TOP UBICACIONES
# ==============================================
# Nuestro objetivo aquí es producir:
# - Estadísticos básicos de 'mag' con describe()
# - El total de eventos válidos (filas)
# - El Top 10 de ubicaciones según 'remark'
# ----------------------------------------------

t0 = perf_counter()

# 1) Descriptivos de magnitud (Pandas Series tras compute)
desc = df[MAG_COL].describe().compute()

# 2) Total de eventos limpios
total_events = df.shape[0].compute()

# 3) Top lugares: IMPORTANTE: primero compute(), luego head(10)
top_places = (
    df[PLACE_COL]
    .value_counts(split_every=8)  # ayuda a escalar cuando hay muchas categorías
    .compute()                    # Dask -> Pandas
    .head(10)                     # Tomamos los 10 más frecuentes
)

t1 = perf_counter()

print("Descriptivos de 'mag':\n", desc)
print(f"\nTotal de eventos: {total_events:,}")
print("\nTop 10 ubicaciones:\n", top_places)
print(f"\nTiempo (descriptivos+conteos): {t1 - t0:.2f}s")


In [None]:

# ==============================================
# HISTOGRAMA DE MAGNITUDES
# ==============================================
# Para graficar usamos matplotlib, que trabaja con Pandas. Por eso traemos
# la Serie de magnitudes a memoria con compute() y dibujamos el histograma.
# ----------------------------------------------

# 1) Serie de magnitudes en Pandas
mags = df[MAG_COL].dropna().compute()

# 2) Histograma
plt.figure()
plt.hist(mags, bins=30, edgecolor="black")  # No fijamos colores para seguir la guía del profe
plt.title("Distribución de magnitudes sísmicas")
plt.xlabel("Magnitud")
plt.ylabel("Frecuencia")
plt.grid(True)
plt.tight_layout()
plt.show()


In [None]:

# ==============================================
# SERIE TEMPORAL: EVENTOS POR DÍA
# ==============================================
# Aquí construimos la serie diaria (conteos por día).
# Usamos floor('D') para truncar a día, agrupamos y computamos.
# ----------------------------------------------

daily = (
    df[[TIME_COL]]
    .assign(day=df[TIME_COL].dt.floor("D"))
    .groupby("day")
    .size()
    .compute()
    .sort_index()
)

# Gráfico de la serie diaria
plt.figure()
plt.plot(daily.index, daily.values)
plt.title("Eventos sísmicos por día")
plt.xlabel("Fecha")
plt.ylabel("Cantidad de eventos")
plt.tight_layout()
plt.show()


In [None]:

# ==============================================
# GRÁFICO: TOP 10 UBICACIONES
# ==============================================
# Graficamos el top de 'remark' que ya calculamos antes (top_places).
# ----------------------------------------------

if len(top_places) > 0:
    top10 = top_places.sort_values(ascending=True)  # barras horizontales de menor a mayor
    plt.figure()
    plt.barh(top10.index.astype(str), top10.values)
    plt.title("Top 10 ubicaciones con más sismos")
    plt.xlabel("Conteo de sismos")
    plt.ylabel("Ubicación")
    plt.tight_layout()
    plt.show()
else:
    print("No hay ubicaciones disponibles para graficar (top_places vacío).")


In [None]:

# ==============================================
# COMPARATIVA DE RENDIMIENTO: PANDAS VS DASK (EVENTOS POR DÍA)
# ==============================================
# Replicamos el cálculo de la serie diaria con Pandas y lo comparamos con Dask.
# Medimos tiempos con perf_counter y verificamos que los totales coinciden.
# ----------------------------------------------

# 1) Pandas
tp0 = perf_counter()
pdf = pd.read_csv("katalog_gempa.csv")  # usamos la misma ruta relativa
# Construimos 'time' de la misma forma que en Dask
pdf["time_str"] = pdf["tgl"].astype(str).str.strip() + " " + pdf["ot"].astype(str).str.strip()
pdf[TIME_COL] = pd.to_datetime(pdf["time_str"], errors="coerce")
# Limpiamos igual que en Dask
pdf = pdf.dropna(subset=[TIME_COL, MAG_COL])
# Serie diaria en Pandas
pandas_daily = pdf[TIME_COL].dt.floor("D").value_counts().sort_index()
tp1 = perf_counter()

# 2) Dask (ya tenemos 'daily' arriba). Lo referenciamos directamente.
td0 = perf_counter()
dask_daily = daily  # ya computado
td1 = perf_counter()

# 3) Reportamos tiempos y consistencia
print(f"Pandas tiempo: {tp1 - tp0:.2f} s | Filas: {len(pdf):,}")
print(f"Dask   tiempo: {td1 - td0:.2f} s | Particiones: {df.npartitions}")
print(f"Total eventos (Pandas vs Dask): {int(pandas_daily.sum())} vs {int(dask_daily.sum())}")


In [None]:

# ==============================================
# GUARDAR RESULTADOS Y FIGURAS
# ==============================================
# Persistimos algunas salidas para el informe:
# - CSV con descriptivos de magnitud
# - CSV con eventos por día (Dask)
# - PNG con histograma de magnitudes
# ----------------------------------------------

# 1) Aseguramos carpetas
os.makedirs("results", exist_ok=True)
os.makedirs("figures", exist_ok=True)

# 2) Guardamos descriptivos y serie diaria
desc.to_csv("results/descriptivos_magnitud.csv")
daily.to_frame(name="count").to_csv("results/eventos_por_dia_dask.csv")

# 3) Guardamos el histograma (re-render simple)
plt.figure()
plt.hist(mags, bins=30, edgecolor="black")
plt.title("Distribución de magnitudes sísmicas")
plt.xlabel("Magnitud")
plt.ylabel("Frecuencia")
plt.tight_layout()
plt.savefig("figures/hist_magnitudes.png", dpi=150)
plt.close()

print("Artefactos guardados en ./results y ./figures")
