In [1]:
# si spark no está disponible, crea session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("COVID_Colombia_Analysis").getOrCreate()
sc = spark.sparkContext

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
6,application_1763434461835_0008,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
path_s3 = "s3://notebooksnath/data/covid19/Casos_positivos_de_COVID-19_en_Colombia-100K.csv"

df = spark.read.option("header", True).option("inferSchema", True).csv(path_s3)
df.show(5)
df.printSchema()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+---------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+
|fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|
+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+---------

In [3]:
from pyspark.sql.functions import col, year, to_date

df2 = df.withColumn("fecha", to_date(col("fecha reporte web"), "yyyy-MM-dd")) \
        .withColumn("anio", year(col("fecha"))) \
        .drop("fecha reporte web")

df2.show(5)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+---------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+-----+----+
|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|fecha|anio|
+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+-----------

In [4]:
from pyspark.sql.functions import col

# renombrar espacios / mayúsculas a minúsculas:
for c in df.columns:
    df = df.withColumnRenamed(c, c.strip().lower().replace(" ", "_"))

df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- fecha_reporte_web: string (nullable = true)
 |-- id_de_caso: integer (nullable = true)
 |-- fecha_de_notificación: string (nullable = true)
 |-- código_divipola_departamento: integer (nullable = true)
 |-- nombre_departamento: string (nullable = true)
 |-- código_divipola_municipio: integer (nullable = true)
 |-- nombre_municipio: string (nullable = true)
 |-- edad: integer (nullable = true)
 |-- unidad_de_medida_de_edad: integer (nullable = true)
 |-- sexo: string (nullable = true)
 |-- tipo_de_contagio: string (nullable = true)
 |-- ubicación_del_caso: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- código_iso_del_país: integer (nullable = true)
 |-- nombre_del_país: string (nullable = true)
 |-- recuperado: string (nullable = true)
 |-- fecha_de_inicio_de_síntomas: string (nullable = true)
 |-- fecha_de_muerte: string (nullable = true)
 |-- fecha_de_diagnóstico: string (nullable = true)
 |-- fecha_de_recuperación: string (nullable = true)
 |-- tipo_de_r

In [5]:
from pyspark.sql.functions import to_date, col, when

# transformar campo fecha si existe 'fecha_diagnostico' o 'fecha'
# intenta varios nombres comunes
date_cols = [c for c in df.columns if 'fecha' in c]
date_cols

df = df.withColumn("fecha_evento", to_date(col(date_cols[0]), "yyyy-MM-dd"))
# crear columna binaria recommended_flag como ejemplo general
df = df.withColumn("is_severe", when(col("estado") == "Fallecido", 1).otherwise(0))  


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# casos en 2020
df_2020 = df.filter(col("fecha_evento").between("2020-01-01","2020-12-31"))

# casos en hombres adultos (>40)
df_hombres_adultos = df.filter((col("sexo") == "M") & (col("edad") > 40))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
df.createOrReplaceTempView("covid")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
# Los 10 ciudades con mas casos:
top_ciudades = df.groupBy("nombre_municipio").count().orderBy(col("count").desc()).limit(10)
top_ciudades.show(10,truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+-----+
|nombre_municipio|count|
+----------------+-----+
|BOGOTA          |30016|
|BARRANQUILLA    |13065|
|CARTAGENA       |8333 |
|CALI            |7747 |
|SOLEDAD         |6233 |
|LETICIA         |2194 |
|MEDELLIN        |2137 |
|TUMACO          |1501 |
|BUENAVENTURA    |1453 |
|QUIBDO          |1367 |
+----------------+-----+

In [9]:
# Los 10 departamentos con mas casos:
top_depart = df.groupBy("nombre_departamento").count().orderBy(col("count").desc()).limit(10)
top_depart.show(10,truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+-----+
|nombre_departamento|count|
+-------------------+-----+
|BOGOTA             |30016|
|BARRANQUILLA       |13065|
|ATLANTICO          |10994|
|VALLE              |10404|
|CARTAGENA          |8333 |
|ANTIOQUIA          |4554 |
|NARIÑO             |3520 |
|CUNDINAMARCA       |2827 |
|AMAZONAS           |2317 |
|CHOCO              |1636 |
+-------------------+-----+

In [10]:
# Los 10 días con mas casos:
top_dias = df.groupBy("fecha_evento").count().orderBy(col("count").desc()).limit(10)
top_dias.show(10,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+------+
|fecha_evento|count |
+------------+------+
|null        |100000|
+------------+------+

In [11]:
# Distribución de casos por edades
from pyspark.sql.functions import when

df_age = df.withColumn("age_group",
    when(col("edad") < 10, "<10")
    .when(col("edad").between(10,19), "10-19")
    .when(col("edad").between(20,29), "20-29")
    .when(col("edad").between(30,39), "30-39")
    .when(col("edad").between(40,49), "40-49")
    .when(col("edad").between(50,59), "50-59")
    .when(col("edad") >= 60, "60+")
    .otherwise("unknown")
)

dist_ages = df_age.groupBy("age_group").count().orderBy("age_group")
dist_ages.show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+-----+
|age_group|count|
+---------+-----+
|10-19    |7341 |
|20-29    |21937|
|30-39    |23029|
|40-49    |15828|
|50-59    |12857|
|60+      |15035|
|<10      |3973 |
+---------+-----+

In [12]:
# Mi pregunta: Casos por sexo y departamento
cases_sex_dept = df.groupBy("nombre_departamento","sexo").count().orderBy("nombre_departamento","sexo")
cases_sex_dept.show(20,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+----+-----+
|nombre_departamento|sexo|count|
+-------------------+----+-----+
|AMAZONAS           |F   |894  |
|AMAZONAS           |M   |1423 |
|ANTIOQUIA          |F   |1712 |
|ANTIOQUIA          |M   |2842 |
|ARAUCA             |F   |9    |
|ARAUCA             |M   |70   |
|ATLANTICO          |F   |5005 |
|ATLANTICO          |M   |5989 |
|BARRANQUILLA       |F   |5897 |
|BARRANQUILLA       |M   |7168 |
|BOGOTA             |F   |15052|
|BOGOTA             |M   |14964|
|BOLIVAR            |F   |462  |
|BOLIVAR            |M   |536  |
|BOYACA             |F   |183  |
|BOYACA             |M   |195  |
|CALDAS             |F   |154  |
|CALDAS             |M   |108  |
|CAQUETA            |F   |14   |
|CAQUETA            |M   |25   |
+-------------------+----+-----+
only showing top 20 rows

In [13]:
out_base = "s3://notebooksnath/data/covid19/Casos_Covid_100K.csv/output-covid"

# 1. top_depart
top_depart.write.mode("overwrite").option("header", True).csv(out_base + "top_departamentos_csv")
top_depart.write.mode("overwrite").parquet(out_base + "top_departamentos_parquet")

# 2. top_ciudades
top_ciudades.write.mode("overwrite").option("header", True).csv(out_base + "top_ciudades_csv")
top_ciudades.write.mode("overwrite").parquet(out_base + "top_ciudades_parquet")

# 3. top_dias
top_dias.write.mode("overwrite").option("header", True).csv(out_base + "top_dias_csv")
top_dias.write.mode("overwrite").parquet(out_base + "top_dias_parquet")

# 4. dist_ages
dist_ages.write.mode("overwrite").option("header", True).csv(out_base + "dist_edades_csv")
dist_ages.write.mode("overwrite").parquet(out_base + "dist_edades_parquet")

# 5. tu consulta
cases_sex_dept.write.mode("overwrite").option("header", True).csv(out_base + "cases_sex_dept_csv")
cases_sex_dept.write.mode("overwrite").parquet(out_base + "cases_sex_dept_parquet")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…