###**Configuración**

In [3]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
!tar xf spark-3.3.1-bin-hadoop3.tgz
!pip install -q findspark

In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

In [5]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [6]:
spark

In [7]:
sc

###**1. Cargar datos desde Google Drive**

In [8]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [9]:
df=spark.read.csv('gdrive/MyDrive/datasets/covid19/Casos_positivos_de_COVID-19_en_Colombia-100K.csv',inferSchema=True,header=True)

###**2. Análisis exploratorio del dataframe donde cargamos los datos**

####**2.1. Columnas**

In [10]:
df

DataFrame[fecha reporte web: string, ID de caso: int, Fecha de notificación: string, Código DIVIPOLA departamento: int, Nombre departamento: string, Código DIVIPOLA municipio: int, Nombre municipio: string, Edad: int, Unidad de medida de edad: int, Sexo: string, Tipo de contagio: string, Ubicación del caso: string, Estado: string, Código ISO del país: int, Nombre del país: string, Recuperado: string, Fecha de inicio de síntomas: string, Fecha de muerte: string, Fecha de diagnóstico: string, Fecha de recuperación: string, Tipo de recuperación: string, Pertenencia étnica: int, Nombre del grupo étnico: string]

In [11]:
df.columns

['fecha reporte web',
 'ID de caso',
 'Fecha de notificación',
 'Código DIVIPOLA departamento',
 'Nombre departamento',
 'Código DIVIPOLA municipio',
 'Nombre municipio',
 'Edad',
 'Unidad de medida de edad',
 'Sexo',
 'Tipo de contagio',
 'Ubicación del caso',
 'Estado',
 'Código ISO del país',
 'Nombre del país',
 'Recuperado',
 'Fecha de inicio de síntomas',
 'Fecha de muerte',
 'Fecha de diagnóstico',
 'Fecha de recuperación',
 'Tipo de recuperación',
 'Pertenencia étnica',
 'Nombre del grupo étnico']

In [12]:
len(df.columns)

23

####**2.2. Tipos de datos**

In [13]:
df.dtypes

[('fecha reporte web', 'string'),
 ('ID de caso', 'int'),
 ('Fecha de notificación', 'string'),
 ('Código DIVIPOLA departamento', 'int'),
 ('Nombre departamento', 'string'),
 ('Código DIVIPOLA municipio', 'int'),
 ('Nombre municipio', 'string'),
 ('Edad', 'int'),
 ('Unidad de medida de edad', 'int'),
 ('Sexo', 'string'),
 ('Tipo de contagio', 'string'),
 ('Ubicación del caso', 'string'),
 ('Estado', 'string'),
 ('Código ISO del país', 'int'),
 ('Nombre del país', 'string'),
 ('Recuperado', 'string'),
 ('Fecha de inicio de síntomas', 'string'),
 ('Fecha de muerte', 'string'),
 ('Fecha de diagnóstico', 'string'),
 ('Fecha de recuperación', 'string'),
 ('Tipo de recuperación', 'string'),
 ('Pertenencia étnica', 'int'),
 ('Nombre del grupo étnico', 'string')]

####**2.3. Seleccionar algunas columnas**

In [22]:
df.select('ID de caso','Fecha de diagnóstico','Sexo').show(5)

+----------+--------------------+----+
|ID de caso|Fecha de diagnóstico|Sexo|
+----------+--------------------+----+
|         1|    6/3/2020 0:00:00|   F|
|         2|    9/3/2020 0:00:00|   M|
|         3|    9/3/2020 0:00:00|   F|
|         4|   11/3/2020 0:00:00|   M|
|         5|   11/3/2020 0:00:00|   M|
+----------+--------------------+----+
only showing top 5 rows



In [15]:
df.select('Recuperado','Fecha de recuperación').show(5)

+----------+---------------------+
|Recuperado|Fecha de recuperación|
+----------+---------------------+
|Recuperado|    13/3/2020 0:00:00|
|Recuperado|    19/3/2020 0:00:00|
|Recuperado|    15/3/2020 0:00:00|
|Recuperado|    26/3/2020 0:00:00|
|Recuperado|    23/3/2020 0:00:00|
+----------+---------------------+
only showing top 5 rows



####**2.4. Renombrar columnas**

In [16]:
df.withColumnRenamed('ID de caso', 'Id')

DataFrame[fecha reporte web: string, Id: int, Fecha de notificación: string, Código DIVIPOLA departamento: int, Nombre departamento: string, Código DIVIPOLA municipio: int, Nombre municipio: string, Edad: int, Unidad de medida de edad: int, Sexo: string, Tipo de contagio: string, Ubicación del caso: string, Estado: string, Código ISO del país: int, Nombre del país: string, Recuperado: string, Fecha de inicio de síntomas: string, Fecha de muerte: string, Fecha de diagnóstico: string, Fecha de recuperación: string, Tipo de recuperación: string, Pertenencia étnica: int, Nombre del grupo étnico: string]

In [17]:
df.withColumnRenamed('Nombre departamento', 'Departamento')

DataFrame[fecha reporte web: string, ID de caso: int, Fecha de notificación: string, Código DIVIPOLA departamento: int, Departamento: string, Código DIVIPOLA municipio: int, Nombre municipio: string, Edad: int, Unidad de medida de edad: int, Sexo: string, Tipo de contagio: string, Ubicación del caso: string, Estado: string, Código ISO del país: int, Nombre del país: string, Recuperado: string, Fecha de inicio de síntomas: string, Fecha de muerte: string, Fecha de diagnóstico: string, Fecha de recuperación: string, Tipo de recuperación: string, Pertenencia étnica: int, Nombre del grupo étnico: string]

In [18]:
df.withColumnRenamed('Nombre municipio', 'Municipio')

DataFrame[fecha reporte web: string, ID de caso: int, Fecha de notificación: string, Código DIVIPOLA departamento: int, Nombre departamento: string, Código DIVIPOLA municipio: int, Municipio: string, Edad: int, Unidad de medida de edad: int, Sexo: string, Tipo de contagio: string, Ubicación del caso: string, Estado: string, Código ISO del país: int, Nombre del país: string, Recuperado: string, Fecha de inicio de síntomas: string, Fecha de muerte: string, Fecha de diagnóstico: string, Fecha de recuperación: string, Tipo de recuperación: string, Pertenencia étnica: int, Nombre del grupo étnico: string]

In [19]:
df.withColumnRenamed('Nombre del país', 'País')

DataFrame[fecha reporte web: string, ID de caso: int, Fecha de notificación: string, Código DIVIPOLA departamento: int, Nombre departamento: string, Código DIVIPOLA municipio: int, Nombre municipio: string, Edad: int, Unidad de medida de edad: int, Sexo: string, Tipo de contagio: string, Ubicación del caso: string, Estado: string, Código ISO del país: int, País: string, Recuperado: string, Fecha de inicio de síntomas: string, Fecha de muerte: string, Fecha de diagnóstico: string, Fecha de recuperación: string, Tipo de recuperación: string, Pertenencia étnica: int, Nombre del grupo étnico: string]

In [20]:
df.withColumnRenamed('ID de caso', 'Id').withColumnRenamed('Nombre departamento', 'Departamento').withColumnRenamed('Nombre municipio', 'Municipio').withColumnRenamed('Nombre del país', 'País')

DataFrame[fecha reporte web: string, Id: int, Fecha de notificación: string, Código DIVIPOLA departamento: int, Departamento: string, Código DIVIPOLA municipio: int, Municipio: string, Edad: int, Unidad de medida de edad: int, Sexo: string, Tipo de contagio: string, Ubicación del caso: string, Estado: string, Código ISO del país: int, País: string, Recuperado: string, Fecha de inicio de síntomas: string, Fecha de muerte: string, Fecha de diagnóstico: string, Fecha de recuperación: string, Tipo de recuperación: string, Pertenencia étnica: int, Nombre del grupo étnico: string]

####**2.5. Agregar columnas**

In [29]:
df.withColumn("Mayores de edad",(df["Edad"]>18)).show(5)

+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+---------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+---------------+
|fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|Mayores de edad|
+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------

In [31]:
df.withColumn("Menores de edad",(df["Edad"]<18)).show(5)

+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+---------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+---------------+
|fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|Menores de edad|
+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------

####**2.6. Borrar columnas**

In [32]:
df.drop('Unidad de medida de edad')

DataFrame[fecha reporte web: string, ID de caso: int, Fecha de notificación: string, Código DIVIPOLA departamento: int, Nombre departamento: string, Código DIVIPOLA municipio: int, Nombre municipio: string, Edad: int, Sexo: string, Tipo de contagio: string, Ubicación del caso: string, Estado: string, Código ISO del país: int, Nombre del país: string, Recuperado: string, Fecha de inicio de síntomas: string, Fecha de muerte: string, Fecha de diagnóstico: string, Fecha de recuperación: string, Tipo de recuperación: string, Pertenencia étnica: int, Nombre del grupo étnico: string]

In [33]:
df.drop('Pertenencia étnica')

DataFrame[fecha reporte web: string, ID de caso: int, Fecha de notificación: string, Código DIVIPOLA departamento: int, Nombre departamento: string, Código DIVIPOLA municipio: int, Nombre municipio: string, Edad: int, Unidad de medida de edad: int, Sexo: string, Tipo de contagio: string, Ubicación del caso: string, Estado: string, Código ISO del país: int, Nombre del país: string, Recuperado: string, Fecha de inicio de síntomas: string, Fecha de muerte: string, Fecha de diagnóstico: string, Fecha de recuperación: string, Tipo de recuperación: string, Nombre del grupo étnico: string]

In [34]:
df.drop('Nombre del grupo étnico')

DataFrame[fecha reporte web: string, ID de caso: int, Fecha de notificación: string, Código DIVIPOLA departamento: int, Nombre departamento: string, Código DIVIPOLA municipio: int, Nombre municipio: string, Edad: int, Unidad de medida de edad: int, Sexo: string, Tipo de contagio: string, Ubicación del caso: string, Estado: string, Código ISO del país: int, Nombre del país: string, Recuperado: string, Fecha de inicio de síntomas: string, Fecha de muerte: string, Fecha de diagnóstico: string, Fecha de recuperación: string, Tipo de recuperación: string, Pertenencia étnica: int]

In [35]:
df.drop('Pertenencia étnica','Nombre del grupo étnico')

DataFrame[fecha reporte web: string, ID de caso: int, Fecha de notificación: string, Código DIVIPOLA departamento: int, Nombre departamento: string, Código DIVIPOLA municipio: int, Nombre municipio: string, Edad: int, Unidad de medida de edad: int, Sexo: string, Tipo de contagio: string, Ubicación del caso: string, Estado: string, Código ISO del país: int, Nombre del país: string, Recuperado: string, Fecha de inicio de síntomas: string, Fecha de muerte: string, Fecha de diagnóstico: string, Fecha de recuperación: string, Tipo de recuperación: string]

####**2.7. Filtrar datos**

In [38]:
df.filter(df['Nombre del país']=='ITALIA').show(5)

+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+---------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+
|fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|
+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+---------

In [39]:
df.filter(df['Sexo']=='F').filter(df['Edad']>18).show(5)

+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+--------------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+
|fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|     Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|
+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----

In [40]:
df.filter(df['Sexo']=='M').filter(df['Recuperado']=='Recuperado').show(5)

+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+---------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+
|fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|
+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+---------

####**2.8. Ejecutar alguna función UDF o lambda sobre alguna columna creando una nueva**

In [41]:
from pyspark.sql.functions import udf

In [45]:
nacionalidad_udf = udf(lambda pais: "Colombiano" if pais == 'COLOMBIA' else "Extranjero")

df.withColumn('Nacionalidad',nacionalidad_udf(df['Nombre del país'])).show(5)

+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+---------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+------------+
|fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|Nacionalidad|
+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+-----

###**3. Contestar las siguientes preguntas sobre los datos de covid**

In [46]:
df.createOrReplaceTempView("dataset_covid_sql")

####**3.1. Los 10 departamentos con más casos de covid en Colombia ordenados de mayor a menor**

In [47]:
df.groupBy('Nombre departamento').count().orderBy('count',ascending=False).show(10)

+-------------------+-----+
|Nombre departamento|count|
+-------------------+-----+
|             BOGOTA|30016|
|       BARRANQUILLA|13065|
|          ATLANTICO|10994|
|              VALLE|10404|
|          CARTAGENA| 8333|
|          ANTIOQUIA| 4554|
|             NARIÑO| 3520|
|       CUNDINAMARCA| 2827|
|           AMAZONAS| 2317|
|              CHOCO| 1636|
+-------------------+-----+
only showing top 10 rows



In [48]:
spark.sql("""SELECT `Nombre departamento`,
COUNT(*)
FROM dataset_covid_sql
GROUP BY `Nombre departamento`
ORDER BY COUNT(*)
DESC""").show(10)

+-------------------+--------+
|Nombre departamento|count(1)|
+-------------------+--------+
|             BOGOTA|   30016|
|       BARRANQUILLA|   13065|
|          ATLANTICO|   10994|
|              VALLE|   10404|
|          CARTAGENA|    8333|
|          ANTIOQUIA|    4554|
|             NARIÑO|    3520|
|       CUNDINAMARCA|    2827|
|           AMAZONAS|    2317|
|              CHOCO|    1636|
+-------------------+--------+
only showing top 10 rows



####**3.2. Las 10 ciudades con más casos de covid en Colombia ordenados de mayor a menor**

In [49]:
df.groupBy('Nombre municipio').count().orderBy('count',ascending=False).show(10)

+----------------+-----+
|Nombre municipio|count|
+----------------+-----+
|          BOGOTA|30016|
|    BARRANQUILLA|13065|
|       CARTAGENA| 8333|
|            CALI| 7747|
|         SOLEDAD| 6233|
|         LETICIA| 2194|
|        MEDELLIN| 2137|
|          TUMACO| 1501|
|    BUENAVENTURA| 1453|
|          QUIBDO| 1367|
+----------------+-----+
only showing top 10 rows



In [50]:
spark.sql("""SELECT `Nombre municipio`,
COUNT(*)
FROM dataset_covid_sql
GROUP BY `Nombre municipio`
ORDER BY COUNT(*)
DESC""").show(10)

+----------------+--------+
|Nombre municipio|count(1)|
+----------------+--------+
|          BOGOTA|   30016|
|    BARRANQUILLA|   13065|
|       CARTAGENA|    8333|
|            CALI|    7747|
|         SOLEDAD|    6233|
|         LETICIA|    2194|
|        MEDELLIN|    2137|
|          TUMACO|    1501|
|    BUENAVENTURA|    1453|
|          QUIBDO|    1367|
+----------------+--------+
only showing top 10 rows



####**3.3. Los 10 días con más casos de covid en Colombia ordenados de mayor a menor**

In [51]:
df.groupBy('Fecha de diagnóstico').count().orderBy('count',ascending=False).show(10)

+--------------------+-----+
|Fecha de diagnóstico|count|
+--------------------+-----+
|   26/6/2020 0:00:00| 4390|
|   27/6/2020 0:00:00| 4019|
|   28/6/2020 0:00:00| 3580|
|   25/6/2020 0:00:00| 3381|
|   19/6/2020 0:00:00| 3053|
|   18/6/2020 0:00:00| 3040|
|   23/6/2020 0:00:00| 3031|
|   22/6/2020 0:00:00| 2938|
|   21/6/2020 0:00:00| 2781|
|   24/6/2020 0:00:00| 2564|
+--------------------+-----+
only showing top 10 rows



In [52]:
spark.sql("""SELECT `Fecha de diagnóstico`,
COUNT(*)
FROM dataset_covid_sql
GROUP BY `Fecha de diagnóstico`
ORDER BY COUNT(*)
DESC""").show(10)

+--------------------+--------+
|Fecha de diagnóstico|count(1)|
+--------------------+--------+
|   26/6/2020 0:00:00|    4390|
|   27/6/2020 0:00:00|    4019|
|   28/6/2020 0:00:00|    3580|
|   25/6/2020 0:00:00|    3381|
|   19/6/2020 0:00:00|    3053|
|   18/6/2020 0:00:00|    3040|
|   23/6/2020 0:00:00|    3031|
|   22/6/2020 0:00:00|    2938|
|   21/6/2020 0:00:00|    2781|
|   24/6/2020 0:00:00|    2564|
+--------------------+--------+
only showing top 10 rows



####**3.4. Distribución de casos por edades de covid en Colombia**

In [53]:
df.groupBy('Edad').count().orderBy('Edad',ascending=True).show()

+----+-----+
|Edad|count|
+----+-----+
|   1|  485|
|   2|  440|
|   3|  449|
|   4|  373|
|   5|  425|
|   6|  431|
|   7|  442|
|   8|  461|
|   9|  467|
|  10|  530|
|  11|  566|
|  12|  562|
|  13|  531|
|  14|  580|
|  15|  560|
|  16|  600|
|  17|  685|
|  18| 1160|
|  19| 1567|
|  20| 1674|
+----+-----+
only showing top 20 rows



In [54]:
spark.sql("""SELECT Edad,
COUNT(*)
FROM dataset_covid_sql
GROUP BY Edad
ORDER BY Edad
ASC""").show()

+----+--------+
|Edad|count(1)|
+----+--------+
|   1|     485|
|   2|     440|
|   3|     449|
|   4|     373|
|   5|     425|
|   6|     431|
|   7|     442|
|   8|     461|
|   9|     467|
|  10|     530|
|  11|     566|
|  12|     562|
|  13|     531|
|  14|     580|
|  15|     560|
|  16|     600|
|  17|     685|
|  18|    1160|
|  19|    1567|
|  20|    1674|
+----+--------+
only showing top 20 rows



####**3.5. Realice la pregunta de negocio que quiera sobre los datos y respondala con la correspondiente programación en spark**

Los 5 países con más casos de covid ordenados de mayor a menor

In [55]:
df.groupBy('Nombre del país').count().orderBy('count',ascending=False).show(5)

+--------------------+-----+
|     Nombre del país|count|
+--------------------+-----+
|                null|99088|
|              ESPAÑA|  258|
|ESTADOS UNIDOS DE...|  212|
|              BRASIL|   59|
|             ECUADOR|   59|
+--------------------+-----+
only showing top 5 rows



In [56]:
spark.sql("""SELECT `Nombre del país`,
COUNT(*)
FROM dataset_covid_sql
GROUP BY `Nombre del país`
ORDER BY COUNT(*)
DESC""").show(5)

+--------------------+--------+
|     Nombre del país|count(1)|
+--------------------+--------+
|                null|   99088|
|              ESPAÑA|     258|
|ESTADOS UNIDOS DE...|     212|
|              BRASIL|      59|
|             ECUADOR|      59|
+--------------------+--------+
only showing top 5 rows

