# Notebook para el Caso de estudio - Covid19

In [1]:
# Montar Google Drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [7]:
#instalar java y spark
!apt-get install openjdk-17-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-4.0.1/spark-4.0.1-bin-hadoop3.tgz
!tar xf spark-4.0.1-bin-hadoop3.tgz
!pip install -q findspark

In [12]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-4.0.1-bin-hadoop3"

In [9]:
import findspark
findspark.init()

In [13]:
# Crear SparkSession
from pyspark.sql import SparkSession

# spark local con archivos locales:
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Obtener SparkContext
sc = spark.sparkContext

In [14]:
# Ruta del archivo en Google Drive
file_path = "/content/drive/MyDrive/datasets/covid19/Casos_positivos_de_COVID-19_en_Colombia-1K.csv"

# Leer CSV en Spark DataFrame
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Procesamiento básico a nivel de DataFrame

In [15]:
from pyspark.sql.functions import col, when

# Crear columna categórica por rangos de edad (ejemplo)
df = df.withColumn("rango_edad",
                   when(col("edad") < 20, "<20")
                   .when((col("edad") >= 20) & (col("edad") < 40), "20-39")
                   .when((col("edad") >= 40) & (col("edad") < 60), "40-59")
                   .otherwise("60+"))

In [16]:
# Ejemplo: eliminar columnas que no sean necesarias
# df.drop("Unidad de medida de edad")

In [17]:
# Filtrar pacientes mayores de 60 años
df_mayores60 = df.filter(col("Edad") > 60)
df_mayores60.show(5)

+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+--------------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+----------+
|fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|     Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|rango_edad|
+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------

In [18]:
# Filtrar casos activos
df_activos = df.filter(col("Estado") == "Activo")
df_activos.show(5)

+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+---------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+----------+
|fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|rango_edad|
+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+---------

In [19]:
# Número de casos por departamento
casos_por_departamento = df.groupBy("Nombre departamento").count().orderBy(col("count").desc())
casos_por_departamento.show(10)

+-------------------+-----+
|Nombre departamento|count|
+-------------------+-----+
|             BOGOTA|  401|
|              VALLE|  148|
|          ANTIOQUIA|  106|
|       CUNDINAMARCA|   49|
|          CARTAGENA|   39|
|          RISARALDA|   34|
|       BARRANQUILLA|   31|
|              HUILA|   30|
|            QUINDIO|   23|
|    NORTE SANTANDER|   19|
+-------------------+-----+
only showing top 10 rows


In [20]:
# Número de casos por sexo
casos_por_sexo = df.groupBy("Sexo").count()
casos_por_sexo.show()

+----+-----+
|Sexo|count|
+----+-----+
|   F|  480|
|   M|  520|
+----+-----+



In [21]:
# Número de casos por rango de edad
casos_por_rango = df.groupBy("rango_edad").count()
casos_por_rango.show()

+----------+-----+
|rango_edad|count|
+----------+-----+
|       <20|   55|
|     20-39|  441|
|       60+|  168|
|     40-59|  336|
+----------+-----+



## Preguntas de negocio usando DataFrame

In [22]:
# DataFrame API

from pyspark.sql.functions import col, desc

In [23]:
# 1. Los 10 departamentos con más casos
df_departamentos = df.groupBy("Nombre departamento").count().orderBy(desc("count"))
df_departamentos.show(10)

+-------------------+-----+
|Nombre departamento|count|
+-------------------+-----+
|             BOGOTA|  401|
|              VALLE|  148|
|          ANTIOQUIA|  106|
|       CUNDINAMARCA|   49|
|          CARTAGENA|   39|
|          RISARALDA|   34|
|       BARRANQUILLA|   31|
|              HUILA|   30|
|            QUINDIO|   23|
|    NORTE SANTANDER|   19|
+-------------------+-----+
only showing top 10 rows


In [24]:
# 2. Las 10 ciudades con más casos
df_ciudades = df.groupBy("Nombre municipio").count().orderBy(desc("count"))
df_ciudades.show(10)

+----------------+-----+
|Nombre municipio|count|
+----------------+-----+
|          BOGOTA|  401|
|            CALI|  101|
|        MEDELLIN|   63|
|       CARTAGENA|   39|
|    BARRANQUILLA|   31|
|           NEIVA|   27|
|         PEREIRA|   25|
|         PALMIRA|   22|
|      VALLEDUPAR|   16|
|         ARMENIA|   15|
+----------------+-----+
only showing top 10 rows


In [25]:
# 3. Los 10 días con más casos
df_dias = df.groupBy("fecha reporte web").count().orderBy(desc("count"))
df_dias.show(10)

+-----------------+-----+
|fecha reporte web|count|
+-----------------+-----+
|31/3/2020 0:00:00|  107|
|24/3/2020 0:00:00|  105|
|30/3/2020 0:00:00|   96|
| 1/4/2020 0:00:00|   95|
|29/3/2020 0:00:00|   94|
|23/3/2020 0:00:00|   73|
|28/3/2020 0:00:00|   69|
|25/3/2020 0:00:00|   61|
|27/3/2020 0:00:00|   49|
|20/3/2020 0:00:00|   48|
+-----------------+-----+
only showing top 10 rows


In [26]:
# 4. Distribución de casos por edades
df_edades = df.groupBy("Edad").count().orderBy(desc("count"))
df_edades.show(10)

+----+-----+
|Edad|count|
+----+-----+
|  30|   33|
|  28|   31|
|  32|   30|
|  26|   29|
|  29|   28|
|  33|   28|
|  50|   27|
|  42|   24|
|  35|   23|
|  45|   23|
+----+-----+
only showing top 10 rows


In [27]:
# 5. Pregunta propia: Número de casos por sexo
df_sexo = df.groupBy("Sexo").count().orderBy(desc("count"))
df_sexo.show()

+----+-----+
|Sexo|count|
+----+-----+
|   M|  520|
|   F|  480|
+----+-----+



## Preguntas de negocio usando SparkSQL

In [28]:
# SparkSQL

# Registrar DataFrame como tabla temporal
df.createOrReplaceTempView("covid")

In [29]:
# 1. Los 10 departamentos con más casos
spark.sql("""
    SELECT `Nombre departamento`, COUNT(*) AS total_casos
    FROM covid
    GROUP BY `Nombre departamento`
    ORDER BY total_casos DESC
    LIMIT 10
""").show()

+-------------------+-----------+
|Nombre departamento|total_casos|
+-------------------+-----------+
|             BOGOTA|        401|
|              VALLE|        148|
|          ANTIOQUIA|        106|
|       CUNDINAMARCA|         49|
|          CARTAGENA|         39|
|          RISARALDA|         34|
|       BARRANQUILLA|         31|
|              HUILA|         30|
|            QUINDIO|         23|
|    NORTE SANTANDER|         19|
+-------------------+-----------+



In [30]:
# 2. Las 10 ciudades con más casos
spark.sql("""
    SELECT `Nombre municipio`, COUNT(*) AS total_casos
    FROM covid
    GROUP BY `Nombre municipio`
    ORDER BY total_casos DESC
    LIMIT 10
""").show()

+----------------+-----------+
|Nombre municipio|total_casos|
+----------------+-----------+
|          BOGOTA|        401|
|            CALI|        101|
|        MEDELLIN|         63|
|       CARTAGENA|         39|
|    BARRANQUILLA|         31|
|           NEIVA|         27|
|         PEREIRA|         25|
|         PALMIRA|         22|
|      VALLEDUPAR|         16|
|         ARMENIA|         15|
+----------------+-----------+



In [31]:
# 3. Los 10 días con más casos
spark.sql("""
    SELECT `fecha reporte web`, COUNT(*) AS total_casos
    FROM covid
    GROUP BY `fecha reporte web`
    ORDER BY total_casos DESC
    LIMIT 10
""").show()

+-----------------+-----------+
|fecha reporte web|total_casos|
+-----------------+-----------+
|31/3/2020 0:00:00|        107|
|24/3/2020 0:00:00|        105|
|30/3/2020 0:00:00|         96|
| 1/4/2020 0:00:00|         95|
|29/3/2020 0:00:00|         94|
|23/3/2020 0:00:00|         73|
|28/3/2020 0:00:00|         69|
|25/3/2020 0:00:00|         61|
|27/3/2020 0:00:00|         49|
|20/3/2020 0:00:00|         48|
+-----------------+-----------+



In [32]:
# 4. Distribución de casos por edades
spark.sql("""
    SELECT Edad, COUNT(*) AS total_casos
    FROM covid
    GROUP BY Edad
    ORDER BY total_casos DESC
""").show(10)

+----+-----------+
|Edad|total_casos|
+----+-----------+
|  30|         33|
|  28|         31|
|  32|         30|
|  26|         29|
|  29|         28|
|  33|         28|
|  50|         27|
|  42|         24|
|  35|         23|
|  45|         23|
+----+-----------+
only showing top 10 rows


In [33]:
# 5. Pregunta propia: Número de casos por tipo de contagio
spark.sql("""
    SELECT `Tipo de contagio`, COUNT(*) AS total_casos
    FROM covid
    GROUP BY `Tipo de contagio`
    ORDER BY total_casos DESC
""").show()

+----------------+-----------+
|Tipo de contagio|total_casos|
+----------------+-----------+
|       Importado|        477|
|     Relacionado|        459|
|     Comunitaria|         63|
|      En estudio|          1|
+----------------+-----------+



## Guardar respuestas:

In [34]:
import os

# Carpeta principal de salida
out_dir = "/content/drive/MyDrive/out"
os.makedirs(out_dir, exist_ok=True)

# Subcarpetas
csv_dir = os.path.join(out_dir, "df_csv")
parquet_dir = os.path.join(out_dir, "df_parquet")
os.makedirs(csv_dir, exist_ok=True)
os.makedirs(parquet_dir, exist_ok=True)

In [35]:
# Departamentos
df_departamentos.coalesce(1).write.format("csv").option("header", "true").mode("overwrite").save(os.path.join(csv_dir, "departamentos"))
df_departamentos.write.format("parquet").mode("overwrite").save(os.path.join(parquet_dir, "departamentos"))

# Ciudades
df_ciudades.coalesce(1).write.format("csv").option("header", "true").mode("overwrite").save(os.path.join(csv_dir, "ciudades"))
df_ciudades.write.format("parquet").mode("overwrite").save(os.path.join(parquet_dir, "ciudades"))

# Días
df_dias.coalesce(1).write.format("csv").option("header", "true").mode("overwrite").save(os.path.join(csv_dir, "dias"))
df_dias.write.format("parquet").mode("overwrite").save(os.path.join(parquet_dir, "dias"))

# Edades
df_edades.coalesce(1).write.format("csv").option("header", "true").mode("overwrite").save(os.path.join(csv_dir, "edades"))
df_edades.write.format("parquet").mode("overwrite").save(os.path.join(parquet_dir, "edades"))

# Casos por sexo
df_sexo.coalesce(1).write.format("csv").option("header", "true").mode("overwrite").save(os.path.join(csv_dir, "sexo"))
df_sexo.write.format("parquet").mode("overwrite").save(os.path.join(parquet_dir, "sexo"))

In [37]:
# 1. Departamentos SQL
departamentos_sql = spark.sql("""
    SELECT `Nombre departamento`, COUNT(*) AS total_casos
    FROM covid
    GROUP BY `Nombre departamento`
    ORDER BY total_casos DESC
    LIMIT 10
""")
departamentos_sql.coalesce(1).write.format("csv").option("header","true").mode("overwrite").save(os.path.join(csv_dir, "departamentos_sql"))
departamentos_sql.write.format("parquet").mode("overwrite").save(os.path.join(parquet_dir, "departamentos_sql"))

# 2. Ciudades SQL
ciudades_sql = spark.sql("""
    SELECT `Nombre municipio`, COUNT(*) AS total_casos
    FROM covid
    GROUP BY `Nombre municipio`
    ORDER BY total_casos DESC
    LIMIT 10
""")
ciudades_sql.coalesce(1).write.format("csv").option("header","true").mode("overwrite").save(os.path.join(csv_dir, "ciudades_sql"))
ciudades_sql.write.format("parquet").mode("overwrite").save(os.path.join(parquet_dir, "ciudades_sql"))

# 3. Días con más casos SQL
dias_sql = spark.sql("""
    SELECT `fecha reporte web`, COUNT(*) AS total_casos
    FROM covid
    GROUP BY `fecha reporte web`
    ORDER BY total_casos DESC
    LIMIT 10
""")
dias_sql.coalesce(1).write.format("csv").option("header","true").mode("overwrite").save(os.path.join(csv_dir, "dias_sql"))
dias_sql.write.format("parquet").mode("overwrite").save(os.path.join(parquet_dir, "dias_sql"))

# 4. Distribución de casos por edades SQL
edades_sql = spark.sql("""
    SELECT Edad, COUNT(*) AS total_casos
    FROM covid
    GROUP BY Edad
    ORDER BY total_casos DESC
""")
edades_sql.coalesce(1).write.format("csv").option("header","true").mode("overwrite").save(os.path.join(csv_dir, "edades_sql"))
edades_sql.write.format("parquet").mode("overwrite").save(os.path.join(parquet_dir, "edades_sql"))

# 5. Pregunta propia: Casos por tipo de contagio SQL
tipo_contagio_sql = spark.sql("""
    SELECT `Tipo de contagio`, COUNT(*) AS total_casos
    FROM covid
    GROUP BY `Tipo de contagio`
    ORDER BY total_casos DESC
""")
tipo_contagio_sql.coalesce(1).write.format("csv").option("header","true").mode("overwrite").save(os.path.join(csv_dir, "tipo_contagio_sql"))
tipo_contagio_sql.write.format("parquet").mode("overwrite").save(os.path.join(parquet_dir, "tipo_contagio_sql"))
