In [None]:
#configuración en google colab de spark y pyspark
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
#instalar java y spark
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
!tar xf spark-3.5.3-bin-hadoop3.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.3-bin-hadoop3"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [None]:
spark

In [None]:
sc

In [None]:
# Load csv Dataset
df=spark.read.csv('gdrive/MyDrive/st0263-242/bigdata/datasets/covid19/Casos_positivos_de_COVID-19_en_Colombia-100K.csv',inferSchema=True,header=True)

In [None]:
df.columns

['fecha reporte web',
 'ID de caso',
 'Fecha de notificación',
 'Código DIVIPOLA departamento',
 'Nombre departamento',
 'Código DIVIPOLA municipio',
 'Nombre municipio',
 'Edad',
 'Unidad de medida de edad',
 'Sexo',
 'Tipo de contagio',
 'Ubicación del caso',
 'Estado',
 'Código ISO del país',
 'Nombre del país',
 'Recuperado',
 'Fecha de inicio de síntomas',
 'Fecha de muerte',
 'Fecha de diagnóstico',
 'Fecha de recuperación',
 'Tipo de recuperación',
 'Pertenencia étnica',
 'Nombre del grupo étnico']

In [None]:
df.printSchema()

root
 |-- fecha reporte web: string (nullable = true)
 |-- ID de caso: integer (nullable = true)
 |-- Fecha de notificación: string (nullable = true)
 |-- Código DIVIPOLA departamento: integer (nullable = true)
 |-- Nombre departamento: string (nullable = true)
 |-- Código DIVIPOLA municipio: integer (nullable = true)
 |-- Nombre municipio: string (nullable = true)
 |-- Edad: integer (nullable = true)
 |-- Unidad de medida de edad: integer (nullable = true)
 |-- Sexo: string (nullable = true)
 |-- Tipo de contagio: string (nullable = true)
 |-- Ubicación del caso: string (nullable = true)
 |-- Estado: string (nullable = true)
 |-- Código ISO del país: integer (nullable = true)
 |-- Nombre del país: string (nullable = true)
 |-- Recuperado: string (nullable = true)
 |-- Fecha de inicio de síntomas: string (nullable = true)
 |-- Fecha de muerte: string (nullable = true)
 |-- Fecha de diagnóstico: string (nullable = true)
 |-- Fecha de recuperación: string (nullable = true)
 |-- Tipo de r

In [None]:
df.select('fecha reporte web', 'ID de caso', 'Sexo', 'Tipo de contagio').show(10, False)

+-----------------+----------+----+----------------+
|fecha reporte web|ID de caso|Sexo|Tipo de contagio|
+-----------------+----------+----+----------------+
|6/3/2020 0:00:00 |1         |F   |Importado       |
|9/3/2020 0:00:00 |2         |M   |Importado       |
|9/3/2020 0:00:00 |3         |F   |Importado       |
|11/3/2020 0:00:00|4         |M   |Relacionado     |
|11/3/2020 0:00:00|5         |M   |Relacionado     |
|11/3/2020 0:00:00|6         |F   |Relacionado     |
|11/3/2020 0:00:00|7         |F   |Importado       |
|11/3/2020 0:00:00|8         |F   |Importado       |
|11/3/2020 0:00:00|9         |F   |Importado       |
|12/3/2020 0:00:00|10        |F   |Importado       |
+-----------------+----------+----+----------------+
only showing top 10 rows



In [None]:
df=df.withColumnRenamed('fecha reporte web', 'fecha_reporte')

In [None]:
df=df.withColumnsRenamed({'ID de caso': 'ID', 'Nombre del país': 'pais', 'Nombre departamento': 'departamento', 'Nombre municipio': 'municipio', 'Ubicación del caso': 'Ubicación'})

In [None]:
df=df.withColumn('Edad_mas_10', df['Edad'] + 10)
df.show(5, False)

+-----------------+---+---------------------+----------------------------+------------+-------------------------+---------+----+------------------------+----+----------------+---------+------+-------------------+------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+-----------+-----------+
|fecha_reporte    |ID |Fecha de notificación|Código DIVIPOLA departamento|departamento|Código DIVIPOLA municipio|municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación|Estado|Código ISO del país|pais  |Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|age_group  |Edad_mas_10|
+-----------------+---+---------------------+----------------------------+------------+-------------------------+---------+----+------------------------+----+----------------+---------+------+-------------------+------+----------+---------------------------+---------------+

In [None]:
columns_to_remove=['Nombre del grupo étnico', 'Pertenencia étnica']
df=df.drop(*columns_to_remove)

In [None]:
df.printSchema()

root
 |-- fecha_reporte: string (nullable = true)
 |-- ID: integer (nullable = true)
 |-- Fecha de notificación: string (nullable = true)
 |-- Código DIVIPOLA departamento: integer (nullable = true)
 |-- departamento: string (nullable = true)
 |-- Código DIVIPOLA municipio: integer (nullable = true)
 |-- municipio: string (nullable = true)
 |-- Edad: integer (nullable = true)
 |-- Unidad de medida de edad: integer (nullable = true)
 |-- Sexo: string (nullable = true)
 |-- Tipo de contagio: string (nullable = true)
 |-- Ubicación: string (nullable = true)
 |-- Estado: string (nullable = true)
 |-- Código ISO del país: integer (nullable = true)
 |-- pais: string (nullable = true)
 |-- Recuperado: string (nullable = true)
 |-- Fecha de inicio de síntomas: string (nullable = true)
 |-- Fecha de muerte: string (nullable = true)
 |-- Fecha de diagnóstico: string (nullable = true)
 |-- Fecha de recuperación: string (nullable = true)
 |-- Tipo de recuperación: string (nullable = true)
 |-- age

In [None]:
filtered_df=df.filter(df.Sexo == 'F')
filtered_df.show(5, False)

+-----------------+---+---------------------+----------------------------+------------+-------------------------+---------+----+------------------------+----+----------------+---------+------+-------------------+-------------------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+-----------+-----------+
|fecha_reporte    |ID |Fecha de notificación|Código DIVIPOLA departamento|departamento|Código DIVIPOLA municipio|municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación|Estado|Código ISO del país|pais                     |Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|age_group  |Edad_mas_10|
+-----------------+---+---------------------+----------------------------+------------+-------------------------+---------+----+------------------------+----+----------------+---------+------+-------------------+------------------------

In [None]:
filtered2_df=df.filter("Sexo=='F' and Edad>20 and Recuperado like '%Recuperado%' and municipio like 'MEDELLIN'")
filtered2_df.show()

+-----------------+---+---------------------+----------------------------+------------+-------------------------+---------+----+------------------------+----+----------------+---------+------+-------------------+--------------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+-----------+-----------+
|    fecha_reporte| ID|Fecha de notificación|Código DIVIPOLA departamento|departamento|Código DIVIPOLA municipio|municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación|Estado|Código ISO del país|                pais|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|  age_group|Edad_mas_10|
+-----------------+---+---------------------+----------------------------+------------+-------------------------+---------+----+------------------------+----+----------------+---------+------+-------------------+--------------------+----------+--

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType,DoubleType,IntegerType

In [None]:
age_udf=udf(lambda age: "kid" if age < 18 else ('young adult' if age<=30 else 'senior'), StringType())
df=df.withColumn("age_group", age_udf(df.Edad))
df.filter(df['age_group']=='young adult').orderBy('Edad', ascending=False).show(10, False)

+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+--------------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+-----------+
|fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|Nombre del país     |Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|age_group  |
+-----------------+----------+---------------------+----------------------------+-------------------+-----------------------

In [None]:
df.groupBy('departamento').count().orderBy('count', ascending=False).show(10, False)

+------------+-----+
|departamento|count|
+------------+-----+
|BOGOTA      |30016|
|BARRANQUILLA|13065|
|ATLANTICO   |10994|
|VALLE       |10404|
|CARTAGENA   |8333 |
|ANTIOQUIA   |4554 |
|NARIÑO      |3520 |
|CUNDINAMARCA|2827 |
|AMAZONAS    |2317 |
|CHOCO       |1636 |
+------------+-----+
only showing top 10 rows



In [None]:
df.groupBy('municipio').count().orderBy('count', ascending=False).show(10, False)

+------------+-----+
|municipio   |count|
+------------+-----+
|BOGOTA      |30016|
|BARRANQUILLA|13065|
|CARTAGENA   |8333 |
|CALI        |7747 |
|SOLEDAD     |6233 |
|LETICIA     |2194 |
|MEDELLIN    |2137 |
|TUMACO      |1501 |
|BUENAVENTURA|1453 |
|QUIBDO      |1367 |
+------------+-----+
only showing top 10 rows



In [28]:
df.dropna(subset=['Fecha de inicio de síntomas']).groupBy('Fecha de inicio de síntomas').count().orderBy('count', ascending=False).show(10, False)

+---------------------------+-----+
|Fecha de inicio de síntomas|count|
+---------------------------+-----+
|10/6/2020 0:00:00          |2731 |
|16/6/2020 0:00:00          |2558 |
|18/6/2020 0:00:00          |2479 |
|12/6/2020 0:00:00          |2452 |
|1/6/2020 0:00:00           |2429 |
|8/6/2020 0:00:00           |2390 |
|17/6/2020 0:00:00          |2344 |
|5/6/2020 0:00:00           |2266 |
|9/6/2020 0:00:00           |2224 |
|19/6/2020 0:00:00          |2162 |
+---------------------------+-----+
only showing top 10 rows



In [30]:
df.select('edad', 'age_group', 'ID').groupBy('edad', 'age_group').count().orderBy('count', ascending=False).show()

+----+-----------+-----+
|edad|  age_group|count|
+----+-----------+-----+
|  30|young adult| 2735|
|  29|young adult| 2611|
|  31|     senior| 2569|
|  28|young adult| 2540|
|  27|young adult| 2494|
|  26|young adult| 2436|
|  33|     senior| 2371|
|  32|     senior| 2362|
|  25|young adult| 2335|
|  34|     senior| 2310|
|  35|     senior| 2292|
|  24|young adult| 2214|
|  36|     senior| 2175|
|  37|     senior| 2132|
|  38|     senior| 2098|
|  40|     senior| 2050|
|  23|young adult| 2041|
|  39|     senior| 1985|
|  22|young adult| 1879|
|  41|     senior| 1783|
+----+-----------+-----+
only showing top 20 rows



In [31]:
from pyspark.sql.functions import col
df.filter(col('Fecha de muerte').isNotNull()).groupBy('age_group').count().orderBy('count', ascending=False).show()

+-----------+-----+
|  age_group|count|
+-----------+-----+
|     senior| 5435|
|young adult|  151|
|        kid|   47|
+-----------+-----+



In [32]:
df.filter(col('Fecha de muerte').isNotNull()).show()

+-----------------+---+---------------------+----------------------------+--------------+-------------------------+------------+----+------------------------+----+----------------+---------+---------+-------------------+--------------------+----------+---------------------------+-----------------+--------------------+---------------------+--------------------+-----------+-----------+
|    fecha_reporte| ID|Fecha de notificación|Código DIVIPOLA departamento|  departamento|Código DIVIPOLA municipio|   municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación|   Estado|Código ISO del país|                pais|Recuperado|Fecha de inicio de síntomas|  Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|  age_group|Edad_mas_10|
+-----------------+---+---------------------+----------------------------+--------------+-------------------------+------------+----+------------------------+----+----------------+---------+---------+-------------------+------

Using SparkSQL

In [33]:
df.createOrReplaceTempView("covid19")

In [34]:
spark.sql("select departamento, count(*) as count from covid19 group by departamento order by count desc").show(10)

+------------+-----+
|departamento|count|
+------------+-----+
|      BOGOTA|30016|
|BARRANQUILLA|13065|
|   ATLANTICO|10994|
|       VALLE|10404|
|   CARTAGENA| 8333|
|   ANTIOQUIA| 4554|
|      NARIÑO| 3520|
|CUNDINAMARCA| 2827|
|    AMAZONAS| 2317|
|       CHOCO| 1636|
+------------+-----+
only showing top 10 rows



In [35]:
spark.sql("select municipio, count(*) as count from covid19 group by municipio order by count desc").show(10)

+------------+-----+
|   municipio|count|
+------------+-----+
|      BOGOTA|30016|
|BARRANQUILLA|13065|
|   CARTAGENA| 8333|
|        CALI| 7747|
|     SOLEDAD| 6233|
|     LETICIA| 2194|
|    MEDELLIN| 2137|
|      TUMACO| 1501|
|BUENAVENTURA| 1453|
|      QUIBDO| 1367|
+------------+-----+
only showing top 10 rows



In [36]:
spark.sql("""select `Fecha de inicio de síntomas` as fecha, count(*) as count from covid19 where
 `Fecha de inicio de síntomas` is not NULL group by fecha order by count desc limit 10""").show()

+-----------------+-----+
|            fecha|count|
+-----------------+-----+
|10/6/2020 0:00:00| 2731|
|16/6/2020 0:00:00| 2558|
|18/6/2020 0:00:00| 2479|
|12/6/2020 0:00:00| 2452|
| 1/6/2020 0:00:00| 2429|
| 8/6/2020 0:00:00| 2390|
|17/6/2020 0:00:00| 2344|
| 5/6/2020 0:00:00| 2266|
| 9/6/2020 0:00:00| 2224|
|19/6/2020 0:00:00| 2162|
+-----------------+-----+



In [37]:
spark.sql('select edad, age_group,count(*) as count from covid19 group by edad, age_group order by count desc').show()

+----+-----------+-----+
|edad|  age_group|count|
+----+-----------+-----+
|  30|young adult| 2735|
|  29|young adult| 2611|
|  31|     senior| 2569|
|  28|young adult| 2540|
|  27|young adult| 2494|
|  26|young adult| 2436|
|  33|     senior| 2371|
|  32|     senior| 2362|
|  25|young adult| 2335|
|  34|     senior| 2310|
|  35|     senior| 2292|
|  24|young adult| 2214|
|  36|     senior| 2175|
|  37|     senior| 2132|
|  38|     senior| 2098|
|  40|     senior| 2050|
|  23|young adult| 2041|
|  39|     senior| 1985|
|  22|young adult| 1879|
|  41|     senior| 1783|
+----+-----------+-----+
only showing top 20 rows



In [None]:
spark.sql('''select age_group, count(*) as count from covid19 where
`Fecha de muerte` is not null group by age_group order by count desc''').show()

+-----------+-----+
|  age_group|count|
+-----------+-----+
|     senior| 5435|
|young adult|  147|
|        kid|   51|
+-----------+-----+

