In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, datediff, when, avg, row_number
from pyspark.sql.window import Window
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql import functions as F

In [None]:
spark = SparkSession.builder \
    .master("yarn") \
    .config("spark.executor.instances", "4") \
    .config("spark.executor.cores", "4") \
    .config("spark.executor.memory", "16g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .appName("hospital_stay") \
    .getOrCreate()

spark

## Cargar base SEGCOVID

In [None]:
def cargar_datos(archivo, columnas):
    try:
        return spark.read.parquet(archivo).select(*columnas)
    except Exception as e:
        print(f"Error al procesar {archivo}: {e}")
        return None

In [None]:
dir_segcovid = {
    "df_2020": "hdfs:///rawdata/segcovid/segcovid_parquet_2020",
    "df_2021": "hdfs:///rawdata/segcovid/segcovid_parquet_2021",
    "df_2022": "hdfs:///rawdata/segcovid/segcovid_parquet_2022",
    "df_2023": "hdfs:///rawdata/segcovid/segcovid_parquet_2023"
}

# Columnas de interés
columnas_segcovid = ["PersonaBasicaID", 
                     "AmbitoAtencion", 
                     "FechaIngresoAtencion", 
                     "FechaEgresoAtencion",
                     "DepartamentoAtencion"]

## Análisis para Bogotá

In [None]:
# Cargar todos los DataFrames
dfs_segcovid = []
for nombre, archivo in dir_segcovid.items():
    df = cargar_datos(archivo, columnas_segcovid)
    if df is not None:
        dfs_segcovid.append(df)

# Concatenar
if dfs_segcovid:
    df_segcovid = dfs_segcovid[0]
    for df in dfs_segcovid[1:]:
        df_segcovid = df_segcovid.unionByName(df, allowMissingColumns=True)
        
    # SEGCOVID filtrada por Hospitalización y Bogotá
    df_hosp = df_segcovid.filter(col("AmbitoAtencion") == "Hospitalización").drop_duplicates().persist()
    df_hosp = df_hosp.filter(col("DepartamentoAtencion") == "11 - Bogotá D.C.").drop_duplicates().persist()

else:
    df_segcovid = None
    df_hosp = None

In [None]:
# Cómo es la estructura de la base
total_registros = df_hosp.count()
ids_unicos = df_hosp.select('PersonaBasicaID').distinct().count()
print(f"Total de registros en df_hosp: {total_registros}")
print(f"Total de columnas en df_hosp: {len(df_hosp.columns)}")
print(f"Número de ids únicos en df_hosp: {ids_unicos}")

In [None]:
# Convertir columnas de fecha a tipo fecha en Spark
df_hosp = df_hosp.withColumn("FechaIngresoAtencion", to_date(col("FechaIngresoAtencion"), "yyyy-MM-dd"))
df_hosp = df_hosp.withColumn("FechaEgresoAtencion", to_date(col("FechaEgresoAtencion"), "yyyy-MM-dd"))

In [None]:
# Filtrar registros donde FechaEgresoAtencion >= FechaIngresoAtencion
df_hosp2 = df_hosp.filter(F.col("FechaEgresoAtencion") >= F.col("FechaIngresoAtencion"))

df_hosp2 = df_hosp2.filter(col("FechaIngresoAtencion") >= "2020-01-01")
df_hosp2 = df_hosp2.filter(col("FechaEgresoAtencion") >= "2020-01-01")
df_hosp2 = df_hosp2.filter(col("FechaIngresoAtencion") < "2024-01-01")
df_hosp2 = df_hosp2.filter(col("FechaEgresoAtencion") < "2024-01-01")

In [None]:
# df_hosp2.sort("PersonaBasicaID", "FechaIngresoAtencion", ascending=[True, True]).show()
df_hosp2.sort("PersonaBasicaID", ascending=[True]).show(100)

In [None]:
##################
# Ordenar el DataFrame
window_spec = Window.partitionBy().orderBy("PersonaBasicaID", "FechaIngresoAtencion", "FechaEgresoAtencion")
df_sort = df_hosp2.withColumn("row_num", F.row_number().over(window_spec))

In [None]:
df_sort.show(100)

In [None]:
# Identificar cambios en las fechas de ingreso
window_spec_lag = Window.partitionBy().orderBy("row_num")
df_sort = df_sort.withColumn("Prev_FechaIngresoAtencion", F.lag("FechaIngresoAtencion").over(window_spec_lag))
df_sort = df_sort.withColumn("Prev_PersonaBasicaID", F.lag("PersonaBasicaID").over(window_spec_lag))

In [None]:
df_sort.select("PersonaBasicaID", "FechaIngresoAtencion", "FechaEgresoAtencion", "row_num", "Prev_FechaIngresoAtencion", "Prev_PersonaBasicaID").show(100)

In [None]:
df_sort = df_sort.withColumn(
    "CambioHospitalizacion",
    F.sum(F.when((F.col("FechaIngresoAtencion") != F.col("Prev_FechaIngresoAtencion")) |
                 (F.col("PersonaBasicaID") != F.col("Prev_PersonaBasicaID")), 1).otherwise(0))
    .over(window_spec_lag.rowsBetween(Window.unboundedPreceding, Window.currentRow))
)

In [None]:
df_sort.select("PersonaBasicaID", "FechaIngresoAtencion", "row_num", "Prev_FechaIngresoAtencion", "Prev_PersonaBasicaID", "CambioHospitalizacion").show(1000)

In [None]:
# Agrupar por hospitalizaciones +  Fecha inicio y final
hospitalizaciones = df_sort.groupBy("CambioHospitalizacion").agg(
    F.first("PersonaBasicaID").alias("PersonaBasicaID"),
    F.first("FechaIngresoAtencion").alias("FechaIngresoAtencion"),
    F.last("FechaEgresoAtencion").alias("FechaEgresoAtencion")
)

# Calcular la diferencia de días "Hosp_stay"
hospitalizaciones = hospitalizaciones.withColumn("Hosp_stay", datediff(col("FechaEgresoAtencion"), col("FechaIngresoAtencion")))

In [None]:
hospitalizaciones.show()

In [None]:
# Cómo es la estructura de la base
total_registros = hospitalizaciones.count()
ids_unicos = hospitalizaciones.select('PersonaBasicaID').distinct().count()
print(f"Total de registros en hospitalizaciones: {total_registros}")
print(f"Total de columnas en hospitalizaciones: {len(hospitalizaciones.columns)}")
print(f"Número de ids únicos en hospitalizaciones: {ids_unicos}")

In [None]:
# Seleccionar las columnas necesarias para el análisis de olas
df_para_olas = hospitalizaciones.select("PersonaBasicaID", "FechaIngresoAtencion", "FechaEgresoAtencion", "Hosp_stay")

# Crear columna de ola
expr_ola = when(col("FechaIngresoAtencion").between("2020-02-26", "2020-09-25"), "Wave 1") \
          .when(col("FechaIngresoAtencion").between("2020-11-01", "2021-03-01"), "Wave 2") \
          .when(col("FechaIngresoAtencion").between("2021-03-01", "2021-09-14"), "Wave 3") \
          .when(col("FechaIngresoAtencion").between("2021-11-20", "2022-03-24"), "Wave 4") \
          .otherwise("Fuera_Ola")

# Asignar la ola correspondiente a cada registro
df_para_olas = df_para_olas.withColumn("Ola_COVID", expr_ola)
df_para_olas = df_para_olas.filter((col("Ola_COVID") != 'Fuera_Ola'))

# Aplicar filtros para considerar Hosp_stay entre 0 y 100 días
df_para_olas = df_para_olas.filter((col("Hosp_stay") >= 0) & (col("Hosp_stay") <= 100))

# Calcular el promedio de Hosp_stay por cada ola
df_avg = df_para_olas.groupBy("Ola_COVID").agg(avg("Hosp_stay").alias("AVG_Hosp_stay")).sort("Ola_COVID")

In [None]:
df_avg.show()

In [None]:
proms = df_avg.sort("Ola_COVID").select('AVG_Hosp_stay').rdd.flatMap(lambda x: x).collect()

In [None]:
promedios = proms
promedios

In [None]:
waves = ['Wave 1', 'Wave 2', 'Wave 3', 'Wave 4']
colors = ['#6b6ca3', '#87bcbd', '#6f9954', '#b1615c']

# Grafica 
plt.figure(figsize=(4, 5))
plt.bar(waves, promedios, color=colors, width=1.0)
# plt.xlabel('COVID-19 waves')
plt.ylabel('Avg value of delay time (Days)', fontsize=14)
plt.title('Hospital stay Bogotá')
plt.yticks([0.0, 2.5, 5.0, 7.5, 10.0, 12.5, 15.0, 17.5])

for i, promedio in enumerate(promedios):
    plt.text(i, promedio + 0.2, f'{promedio:.2f}', ha='center', fontsize=9)

plt.show()

In [None]:
df_para_olas.groupBy("Ola_COVID").count().sort("Ola_COVID").show()

---
---
---
---
---
---

## Análisis Nacional - no se eliminan duplicados

In [None]:
# -------------------------------
df_hosp_global = (df_segcovid.filter(F.col("AmbitoAtencion") == "Hospitalización")
                  .dropDuplicates()
                  .persist())

# Convertir columnas de fecha a tipo fecha en Spark
df_hosp_global = df_hosp_global.withColumn("FechaIngresoAtencion", to_date(col("FechaIngresoAtencion"), "yyyy-MM-dd"))
df_hosp_global = df_hosp_global.withColumn("FechaEgresoAtencion", to_date(col("FechaEgresoAtencion"), "yyyy-MM-dd"))

# Filtrar registros donde FechaEgresoAtencion >= FechaIngresoAtencion
df_hosp_global = df_hosp_global.filter(F.col("FechaEgresoAtencion") >= F.col("FechaIngresoAtencion"))

df_hosp_global = df_hosp_global.filter((F.col("FechaIngresoAtencion").between("2020-01-01", "2023-12-31")) &
                                       (F.col("FechaEgresoAtencion").between("2020-01-01", "2023-12-31")))

window_spec = Window.orderBy("PersonaBasicaID", "FechaIngresoAtencion", "FechaEgresoAtencion")

# Calcular columnas previas para identificar cambios de hospitalización y generar el grupo
df_sorted = (df_hosp_global
    .withColumn("prev_ingreso", F.lag("FechaIngresoAtencion").over(window_spec))
    .withColumn("prev_persona", F.lag("PersonaBasicaID").over(window_spec))
    .withColumn(
        "CambioHospitalizacion",
        F.sum(
            F.when(
                (F.col("FechaIngresoAtencion") != F.col("prev_ingreso")) |
                (F.col("PersonaBasicaID") != F.col("prev_persona")), 1
            ).otherwise(0)
        ).over(window_spec.rowsBetween(Window.unboundedPreceding, Window.currentRow))
    )
)

# Agrupar por cambio en hospitalización para obtener la fecha de inicio y fin de cada evento, y calcular la duración
hospitalizaciones = (df_sorted
    .groupBy("CambioHospitalizacion")
    .agg(
        F.first("PersonaBasicaID").alias("PersonaBasicaID"),
        F.first("FechaIngresoAtencion").alias("FechaIngresoAtencion"),
        F.last("FechaEgresoAtencion").alias("FechaEgresoAtencion")
    )
    .withColumn("Hosp_stay", F.datediff(F.col("FechaEgresoAtencion"), F.col("FechaIngresoAtencion")))
)

# Seleccionar las columnas necesarias para el análisis de olas
df_para_olas = hospitalizaciones.select("PersonaBasicaID", "FechaIngresoAtencion", "FechaEgresoAtencion", "Hosp_stay")

# Crear columna de ola
expr_ola = when(col("FechaIngresoAtencion").between("2020-02-26", "2020-09-25"), "Ola_1") \
          .when(col("FechaIngresoAtencion").between("2020-11-01", "2021-03-01"), "Ola_2") \
          .when(col("FechaIngresoAtencion").between("2021-03-01", "2021-09-14"), "Ola_3") \
          .when(col("FechaIngresoAtencion").between("2021-11-20", "2022-03-24"), "Ola_4") \
          .otherwise("Fuera_Ola")

# Asignar la ola correspondiente a cada registro
df_para_olas = df_para_olas.withColumn("Ola_COVID", expr_ola)
df_para_olas = df_para_olas.filter((col("Ola_COVID") != 'Fuera_Ola'))

# Aplicar filtros para considerar Onset_icu entre 0 y 100 días
df_para_olas = df_para_olas.filter((col("Hosp_stay") >= 0) & (col("Hosp_stay") <= 100))

# Calcular el promedio de Stay_hosp por cada ola
df_avg = df_para_olas.groupBy("Ola_COVID").agg(avg("Hosp_stay").alias("AVG_Hosp_stay")).sort("Ola_COVID")

In [None]:
df_avg.show()

In [None]:
promedios = df_avg.sort("Ola_COVID").select('AVG_Hosp_stay').rdd.flatMap(lambda x: x).collect()

In [None]:
promedios

In [None]:
waves = ['Ola 1', 'Ola 2', 'Ola 3', 'Ola 4']
colors = ['#6a5acd', '#66c2a5', '#4daf4a', '#d95f02']

# Grafica 
plt.figure(figsize=(6, 4))
plt.bar(waves, promedios, color=colors, width=0.7, zorder=3)
# plt.xlabel('COVID-19 waves')
plt.ylabel('Promedio (días)', fontsize=12)
plt.title('Duración estancia hospitalaria', fontsize=14)
plt.yticks([0, 5, 10, 15, 20, 25], fontsize=10)

for i, promedio in enumerate(promedios):
    plt.text(i, promedio + 0.2, f'{promedio:.1f}', ha='center', fontsize=9)

plt.grid(True, linestyle='--', alpha=0.6, zorder=0)    
plt.show()

In [None]:
df_para_olas.groupBy("Ola_COVID").count().sort("Ola_COVID").show()

---
---
---
# Análisis para todos los departamentos

In [None]:
from pyspark.sql import Window
from pyspark.sql.functions import col, to_date, datediff, when, row_number, avg
import pyspark.sql.functions as F
import pandas as pd

# -------------------------------
# 1. Cargar y unir los DataFrames de SEGCOVID
dfs_segcovid = []
for nombre, archivo in dir_segcovid.items():
    df = cargar_datos(archivo, columnas_segcovid)
    if df is not None:
        dfs_segcovid.append(df)

if dfs_segcovid:
    df_segcovid = dfs_segcovid[0]
    for df in dfs_segcovid[1:]:
        df_segcovid = df_segcovid.unionByName(df, allowMissingColumns=True)
else:
    df_segcovid = None
    
df_segcovid = df_segcovid.filter(col("DepartamentoAtencion") != "-1 - NO DEFINIDO")

df_segcovid_v2 = df_segcovid.withColumn(
    'DepartamentoAtencion',
     when(df_segcovid['DepartamentoAtencion'] == '11 - Bogotá D.C.', 'DC')
    .when(df_segcovid['DepartamentoAtencion'] == '05 - Antioquia', 'ANT')
    .when(df_segcovid['DepartamentoAtencion'] == '76 - Valle del Cauca', 'VAC')
    .when(df_segcovid['DepartamentoAtencion'] == '08 - Atlántico', 'ATL')
    .when(df_segcovid['DepartamentoAtencion'] == '25 - Cundinamarca', 'CUN')
    .when(df_segcovid['DepartamentoAtencion'] == '68 - Santander', 'SAN')
    .when(df_segcovid['DepartamentoAtencion'] == '13 - Bolívar', 'BOL')
    .when(df_segcovid['DepartamentoAtencion'] == '47 - Magdalena', 'MAG')
    .when(df_segcovid['DepartamentoAtencion'] == '15 - Boyacá', 'BOY')
    .when(df_segcovid['DepartamentoAtencion'] == '17 - Caldas', 'CAL')
    .when(df_segcovid['DepartamentoAtencion'] == '52 - Nariño', 'NAR')
    .when(df_segcovid['DepartamentoAtencion'] == '20 - Cesar', 'CES')
    .when(df_segcovid['DepartamentoAtencion'] == '23 - Córdoba', 'COR')
    .when(df_segcovid['DepartamentoAtencion'] == '73 - Tolima', 'TOL')
    .when(df_segcovid['DepartamentoAtencion'] == '50 - Meta', 'MET')
    .when(df_segcovid['DepartamentoAtencion'] == '41 - Huila', 'HUI')
    .when(df_segcovid['DepartamentoAtencion'] == '54 - Norte de Santander', 'NSA')
    .when(df_segcovid['DepartamentoAtencion'] == '66 - Risaralda', 'RIS')
    .when(df_segcovid['DepartamentoAtencion'] == '19 - Cauca', 'CAU')
    .when(df_segcovid['DepartamentoAtencion'] == '44 - La Guajira', 'LAG')
    .when(df_segcovid['DepartamentoAtencion'] == '70 - Sucre', 'SUC')
    .when(df_segcovid['DepartamentoAtencion'] == '63 - Quindio', 'QUI')
    .when(df_segcovid['DepartamentoAtencion'] == '85 - Casanare', 'CAS')
    .when(df_segcovid['DepartamentoAtencion'] == '18 - Caquetá', 'CAQ')
    .when(df_segcovid['DepartamentoAtencion'] == '86 - Putumayo', 'PUT')
    .when(df_segcovid['DepartamentoAtencion'] == '27 - Chocó', 'CHO')
    .when(df_segcovid['DepartamentoAtencion'] == '81 - Arauca', 'ARA')
    .when(df_segcovid['DepartamentoAtencion'] == '91 - Amazonas', 'AMA')
    .when(df_segcovid['DepartamentoAtencion'] == '88 - Archipiélago de San Andrés Providencia y Santa Catalina', 'SAP')
    .when(df_segcovid['DepartamentoAtencion'] == '95 - Guaviare', 'GUV')
    .when(df_segcovid['DepartamentoAtencion'] == '99 - Vichada', 'VIC')
    .when(df_segcovid['DepartamentoAtencion'] == '94 - Guainía', 'GUA')
    .when(df_segcovid['DepartamentoAtencion'] == '97 - Vaupés', 'VAU')
    .otherwise(df_segcovid['DepartamentoAtencion'])
)

# -------------------------------
# 2. Filtrar SEGCOVID para "Cuidado Intensivo" (sin filtrar por departamento aún)
df_hosp_global = df_segcovid_v2.filter(col("AmbitoAtencion") == "Hospitalización") \
                           .drop_duplicates() \
                           .persist()

# Obtener la lista única de departamentos (columna DepartamentoAtencion)
departamentos = [row[0] for row in df_segcovid_v2.select("DepartamentoAtencion").distinct().collect()]

# Lista para almacenar resultados finales
results = []
conteos = []

# Crear columna de ola
expr_ola = when(col("FechaIngresoAtencion").between("2020-02-26", "2020-09-25"), "Wave 1") \
          .when(col("FechaIngresoAtencion").between("2020-11-01", "2021-03-01"), "Wave 2") \
          .when(col("FechaIngresoAtencion").between("2021-03-01", "2021-09-14"), "Wave 3") \
          .when(col("FechaIngresoAtencion").between("2021-11-20", "2022-03-24"), "Wave 4") \
          .otherwise("Fuera_Ola")

# -------------------------------
# 3. Iterar sobre cada departamento
for dept in departamentos:
    # Filtrar df_icu para el departamento actual
    df_hosp_dep = df_hosp_global.filter(col("DepartamentoAtencion") == dept) \
                              .drop_duplicates() \
                              .persist()
    
    # Convertir columnas de fecha a tipo fecha en Spark
    df_hosp_dep = df_hosp_dep.withColumn("FechaIngresoAtencion", to_date(col("FechaIngresoAtencion"), "yyyy-MM-dd"))
    df_hosp_dep = df_hosp_dep.withColumn("FechaEgresoAtencion", to_date(col("FechaEgresoAtencion"), "yyyy-MM-dd"))

    # Filtrar registros donde FechaEgresoAtencion >= FechaIngresoAtencion
    df_hosp_dep = df_hosp_dep.filter(F.col("FechaEgresoAtencion") >= F.col("FechaIngresoAtencion"))

    df_hosp_dep = df_hosp_dep.filter((F.col("FechaIngresoAtencion").between("2020-01-01", "2023-12-31")) &
                                     (F.col("FechaEgresoAtencion").between("2020-01-01", "2023-12-31")))


    window_spec = Window.orderBy("PersonaBasicaID", "FechaIngresoAtencion", "FechaEgresoAtencion")

    # Calcular columnas previas para identificar cambios de hospitalización y generar el grupo
    df_sorted = (df_hosp_dep
        .withColumn("prev_ingreso", F.lag("FechaIngresoAtencion").over(window_spec))
        .withColumn("prev_persona", F.lag("PersonaBasicaID").over(window_spec))
        .withColumn(
            "CambioHospitalizacion",
            F.sum(
                F.when(
                    (F.col("FechaIngresoAtencion") != F.col("prev_ingreso")) |
                    (F.col("PersonaBasicaID") != F.col("prev_persona")), 1
                ).otherwise(0)
            ).over(window_spec.rowsBetween(Window.unboundedPreceding, Window.currentRow))
        )
    )

    # Agrupar por cambio en hospitalización para obtener la fecha de inicio y fin de cada evento, y calcular la duración
    hospitalizaciones = (df_sorted
        .groupBy("CambioHospitalizacion")
        .agg(
            F.first("PersonaBasicaID").alias("PersonaBasicaID"),
            F.first("FechaIngresoAtencion").alias("FechaIngresoAtencion"),
            F.last("FechaEgresoAtencion").alias("FechaEgresoAtencion")
        )
        .withColumn("Hosp_stay", F.datediff(F.col("FechaEgresoAtencion"), F.col("FechaIngresoAtencion")))
    )

    # Seleccionar las columnas necesarias para el análisis de olas
    df_para_olas = hospitalizaciones.select("PersonaBasicaID", "FechaIngresoAtencion", "FechaEgresoAtencion", "Hosp_stay")

    # Asignar la ola correspondiente a cada registro
    df_para_olas = df_para_olas.withColumn("Ola_COVID", expr_ola)
    df_para_olas = df_para_olas.filter((col("Ola_COVID") != 'Fuera_Ola'))

    # Aplicar filtros para considerar Onset_icu entre 0 y 100 días
    df_para_olas = df_para_olas.filter((col("Hosp_stay") >= 0) & (col("Hosp_stay") <= 100))

    # Calcular el promedio de Stay_hosp por cada ola
    df_avg = df_para_olas.groupBy("Ola_COVID").agg(avg("Hosp_stay").alias("AVG_Hosp_stay")).sort("Ola_COVID")
     
    # Guardar en el diccionario final usando el departamento como clave
    for row in df_avg.collect():
        results.append({
            'Departamento': dept,
            'Wave': row["Ola_COVID"],
            'MeanDelay': row["AVG_Hosp_stay"]
        })

    for row in df_para_olas.groupBy("Ola_COVID").count().sort("Ola_COVID").collect():
        conteos.append({
            'Departamento': dept,
            'Wave': row["Ola_COVID"],
            'Registros': row["count"]
        })

In [None]:
df_avg.show()

In [None]:
# Crear un DataFrame con los resultados
df_results = pd.DataFrame(results)
pivot_table = df_results.pivot(index="Departamento", columns="Wave", values="MeanDelay")
pivot_table

In [None]:
df_conteos = pd.DataFrame(conteos)
tabla_conteos = df_conteos.pivot(index="Departamento", columns="Wave", values="Registros")
tabla_conteos

In [None]:
import seaborn as sns

# Grafica heatmap
plt.figure(figsize=(3, len(pivot_table)*0.25)) 
sns.heatmap(pivot_table, 
            annot=True, 
            cmap="coolwarm", #RdYlGr coolwarm summer
            fmt=".2f",
            annot_kws={"size": 8})
plt.title("Avg Delay \nHospital Stay", fontsize=12)
plt.xlabel("")
plt.xticks(rotation=60)
plt.ylabel("Department")
plt.tight_layout()
plt.show()

In [None]:
plt.figure(figsize=(6, 5))
sns.boxplot(x='Wave', y='MeanDelay', data=df_results, palette="Set3", hue='Wave', legend=False, showfliers=False)
sns.swarmplot(x='Wave', y='MeanDelay', data=df_results, color=".25")
plt.title("Hospital Stay")
plt.xlabel("")
plt.ylabel("Avg value of delay time (Days)")
plt.tight_layout()
plt.show()

---
---
---
## Análisis para todos los departamentos POR REGIONES

In [None]:
from pyspark.sql import Window
from pyspark.sql.functions import col, to_date, datediff, when, row_number, avg
import pyspark.sql.functions as F
import pandas as pd

# -------------------------------
# 1. Cargar y unir los DataFrames de SEGCOVID
dfs_segcovid = []
for nombre, archivo in dir_segcovid.items():
    df = cargar_datos(archivo, columnas_segcovid)
    if df is not None:
        dfs_segcovid.append(df)

if dfs_segcovid:
    df_segcovid = dfs_segcovid[0]
    for df in dfs_segcovid[1:]:
        df_segcovid = df_segcovid.unionByName(df, allowMissingColumns=True)
else:
    df_segcovid = None
    
df_segcovid = df_segcovid.filter(col("DepartamentoAtencion") != "-1 - NO DEFINIDO")

df_segcovid_v2 = df_segcovid.withColumn(
    'DepartamentoAtencion',
     when(df_segcovid['DepartamentoAtencion'] == '11 - Bogotá D.C.', 'Andina')
    .when(df_segcovid['DepartamentoAtencion'] == '05 - Antioquia', 'Andina')
    .when(df_segcovid['DepartamentoAtencion'] == '76 - Valle del Cauca', 'Pacífica')
    .when(df_segcovid['DepartamentoAtencion'] == '08 - Atlántico', 'Caribe e Insular')
    .when(df_segcovid['DepartamentoAtencion'] == '25 - Cundinamarca', 'Andina')
    .when(df_segcovid['DepartamentoAtencion'] == '68 - Santander', 'Andina')
    .when(df_segcovid['DepartamentoAtencion'] == '13 - Bolívar', 'Caribe e Insular')
    .when(df_segcovid['DepartamentoAtencion'] == '47 - Magdalena', 'Caribe e Insular')
    .when(df_segcovid['DepartamentoAtencion'] == '15 - Boyacá', 'Andina')
    .when(df_segcovid['DepartamentoAtencion'] == '17 - Caldas', 'Andina')
    .when(df_segcovid['DepartamentoAtencion'] == '52 - Nariño', 'Pacífica')
    .when(df_segcovid['DepartamentoAtencion'] == '20 - Cesar', 'Caribe e Insular')
    .when(df_segcovid['DepartamentoAtencion'] == '23 - Córdoba', 'Caribe e Insular')
    .when(df_segcovid['DepartamentoAtencion'] == '73 - Tolima', 'Andina')
    .when(df_segcovid['DepartamentoAtencion'] == '50 - Meta', 'Orinoquía')
    .when(df_segcovid['DepartamentoAtencion'] == '41 - Huila', 'Andina')
    .when(df_segcovid['DepartamentoAtencion'] == '54 - Norte de Santander', 'Andina')
    .when(df_segcovid['DepartamentoAtencion'] == '66 - Risaralda', 'Andina')
    .when(df_segcovid['DepartamentoAtencion'] == '19 - Cauca', 'Pacífica')
    .when(df_segcovid['DepartamentoAtencion'] == '44 - La Guajira', 'Caribe e Insular')
    .when(df_segcovid['DepartamentoAtencion'] == '70 - Sucre', 'Caribe e Insular')
    .when(df_segcovid['DepartamentoAtencion'] == '63 - Quindio', 'Andina')
    .when(df_segcovid['DepartamentoAtencion'] == '85 - Casanare', 'Orinoquía')
    .when(df_segcovid['DepartamentoAtencion'] == '18 - Caquetá', 'Amazónica')
    .when(df_segcovid['DepartamentoAtencion'] == '86 - Putumayo', 'Amazónica')
    .when(df_segcovid['DepartamentoAtencion'] == '27 - Chocó', 'Pacífica')
    .when(df_segcovid['DepartamentoAtencion'] == '81 - Arauca', 'Orinoquía')
    .when(df_segcovid['DepartamentoAtencion'] == '91 - Amazonas', 'Amazónica')
    .when(df_segcovid['DepartamentoAtencion'] == '88 - Archipiélago de San Andrés Providencia y Santa Catalina', 'Caribe e Insular')
    .when(df_segcovid['DepartamentoAtencion'] == '95 - Guaviare', 'Amazónica')
    .when(df_segcovid['DepartamentoAtencion'] == '99 - Vichada', 'Orinoquía')
    .when(df_segcovid['DepartamentoAtencion'] == '94 - Guainía', 'Amazónica')
    .when(df_segcovid['DepartamentoAtencion'] == '97 - Vaupés', 'Amazónica')
    .otherwise(df_segcovid['DepartamentoAtencion'])
)

# -------------------------------
# 2. Filtrar SEGCOVID para "Cuidado Intensivo" (sin filtrar por departamento aún)
df_hosp_global = df_segcovid_v2.filter(col("AmbitoAtencion") == "Hospitalización") \
                           .drop_duplicates() \
                           .persist()

# Obtener la lista única de departamentos (columna DepartamentoAtencion)
departamentos = [row[0] for row in df_segcovid_v2.select("DepartamentoAtencion").distinct().collect()]

# Lista para almacenar resultados finales
results = []
conteos = []

# Crear columna de ola
expr_ola = when(col("FechaIngresoAtencion").between("2020-02-26", "2020-09-25"), "Wave 1") \
          .when(col("FechaIngresoAtencion").between("2020-11-01", "2021-03-01"), "Wave 2") \
          .when(col("FechaIngresoAtencion").between("2021-03-01", "2021-09-14"), "Wave 3") \
          .when(col("FechaIngresoAtencion").between("2021-11-20", "2022-03-24"), "Wave 4") \
          .otherwise("Fuera_Ola")

# -------------------------------
# 3. Iterar sobre cada departamento
for dept in departamentos:
    # Filtrar df_icu para el departamento actual
    df_hosp_dep = df_hosp_global.filter(col("DepartamentoAtencion") == dept) \
                              .drop_duplicates() \
                              .persist()
    
    # Convertir columnas de fecha a tipo fecha en Spark
    df_hosp_dep = df_hosp_dep.withColumn("FechaIngresoAtencion", to_date(col("FechaIngresoAtencion"), "yyyy-MM-dd"))
    df_hosp_dep = df_hosp_dep.withColumn("FechaEgresoAtencion", to_date(col("FechaEgresoAtencion"), "yyyy-MM-dd"))

    # Filtrar registros donde FechaEgresoAtencion >= FechaIngresoAtencion
    df_hosp_dep = df_hosp_dep.filter(F.col("FechaEgresoAtencion") >= F.col("FechaIngresoAtencion"))

    df_hosp_dep = df_hosp_dep.filter((F.col("FechaIngresoAtencion").between("2020-01-01", "2023-12-31")) &
                                     (F.col("FechaEgresoAtencion").between("2020-01-01", "2023-12-31")))


    window_spec = Window.orderBy("PersonaBasicaID", "FechaIngresoAtencion", "FechaEgresoAtencion")

    # Calcular columnas previas para identificar cambios de hospitalización y generar el grupo
    df_sorted = (df_hosp_dep
        .withColumn("prev_ingreso", F.lag("FechaIngresoAtencion").over(window_spec))
        .withColumn("prev_persona", F.lag("PersonaBasicaID").over(window_spec))
        .withColumn(
            "CambioHospitalizacion",
            F.sum(
                F.when(
                    (F.col("FechaIngresoAtencion") != F.col("prev_ingreso")) |
                    (F.col("PersonaBasicaID") != F.col("prev_persona")), 1
                ).otherwise(0)
            ).over(window_spec.rowsBetween(Window.unboundedPreceding, Window.currentRow))
        )
    )

    # Agrupar por cambio en hospitalización para obtener la fecha de inicio y fin de cada evento, y calcular la duración
    hospitalizaciones = (df_sorted
        .groupBy("CambioHospitalizacion")
        .agg(
            F.first("PersonaBasicaID").alias("PersonaBasicaID"),
            F.first("FechaIngresoAtencion").alias("FechaIngresoAtencion"),
            F.last("FechaEgresoAtencion").alias("FechaEgresoAtencion")
        )
        .withColumn("Hosp_stay", F.datediff(F.col("FechaEgresoAtencion"), F.col("FechaIngresoAtencion")))
    )

    # Seleccionar las columnas necesarias para el análisis de olas
    df_para_olas = hospitalizaciones.select("PersonaBasicaID", "FechaIngresoAtencion", "FechaEgresoAtencion", "Hosp_stay")

    # Asignar la ola correspondiente a cada registro
    df_para_olas = df_para_olas.withColumn("Ola_COVID", expr_ola)
    df_para_olas = df_para_olas.filter((col("Ola_COVID") != 'Fuera_Ola'))

    # Aplicar filtros para considerar Onset_icu entre 0 y 100 días
    df_para_olas = df_para_olas.filter((col("Hosp_stay") >= 0) & (col("Hosp_stay") <= 100))

    # Calcular el promedio de Stay_hosp por cada ola
    df_avg = df_para_olas.groupBy("Ola_COVID").agg(avg("Hosp_stay").alias("AVG_Hosp_stay")).sort("Ola_COVID")
     
    # Guardar en el diccionario final usando el departamento como clave
    for row in df_avg.collect():
        results.append({
            'Departamento': dept,
            'Wave': row["Ola_COVID"],
            'MeanDelay': row["AVG_Hosp_stay"]
        })

    for row in df_para_olas.groupBy("Ola_COVID").count().sort("Ola_COVID").collect():
        conteos.append({
            'Departamento': dept,
            'Wave': row["Ola_COVID"],
            'Registros': row["count"]
        })

In [None]:
# Crear un DataFrame con los resultados
df_results = pd.DataFrame(results)
pivot_table = df_results.pivot(index="Departamento", columns="Wave", values="MeanDelay")
pivot_table

In [None]:
df_conteos = pd.DataFrame(conteos)
tabla_conteos = df_conteos.pivot(index="Departamento", columns="Wave", values="Registros")
tabla_conteos

In [None]:
import seaborn as sns

# Grafica heatmap
plt.figure(figsize=(4, 3.5)) 
sns.heatmap(pivot_table, 
            annot=True, 
            cmap="coolwarm", #RdYlGr coolwarm summer
            fmt=".2f",
            annot_kws={"size": 9})
plt.title("Avg Delay\n Hospital Stay", fontsize=12)
plt.xticks(rotation=45)
plt.xlabel("")
plt.ylabel("")
plt.tight_layout()
plt.show()

In [None]:
plt.figure(figsize=(6, 4))

# Crear líneas para cada departamento
for reg in pivot_table.index:
    plt.plot(pivot_table.columns, pivot_table.loc[reg], marker='o', linestyle='-', label=reg)

plt.title("Hospital Stay", fontsize=14)
plt.xlabel("")
plt.ylabel('Avg value of delay time (Days)', fontsize=12)
plt.legend(title="Region", loc='upper right', fontsize=9, bbox_to_anchor=(1.34, 1.02)) 
plt.grid(True, linestyle='--', alpha=0.7)

plt.show()

In [None]:
import numpy as np
plt.figure(figsize=(8, 4))

num_regions = len(pivot_table)
num_waves = len(pivot_table.columns)

x = np.arange(num_regions)
width = 0.15  

# Barras
colors = ['#6b6ca3', '#87bcbd', '#6f9954', '#b1615c']
for i, wave in enumerate(pivot_table.columns):
    plt.bar(x + i * width, pivot_table[wave], width=width, label=wave, color=colors[i])

plt.xticks(x + width * (num_waves / 2 - 0.5), pivot_table.index, rotation=0, fontsize=9)
plt.ylabel('Avg value of delay time (Days)')
plt.title("Hospital Stay")
plt.legend(title="COVID-19 Wave", fontsize=9, bbox_to_anchor=(1.25, 1.02))
plt.yticks([0, 2.5, 5.0, 7.5, 10.0, 12.5, 15.0])

plt.tight_layout()
plt.show()