# Data Processing using PySpark

### Configurar Spark y PySpark en Google Colab

In [1]:
# Instalar Java y Spark
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
!tar xf spark-3.3.1-bin-hadoop3.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

In [3]:
# Crear los dos objetos para trabajar en Spark
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("data_processing")\
    .master("local[*]")\
    .config("spark.driver.memory","16G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .config("spark.jars.packages", "com.amazonaws:aws-java-sdk:1.11.1034,org.apache.hadoop:hadoop-aws:3.3.4")\
    .config('fs.s3a.access.key', 'ASIATP7C6U53LWP66OY3') \
    .config('fs.s3a.secret.key', 'QjpwAUClmNhQ1TUmsIHnDTB6ZuuXnJ3i+IiiHhFK') \
    .config('fs.s3a.session.token','FwoGZXIvYXdzELr//////////wEaDF1VFAKc2ZI980l9pSLFAacdJwga/ao2flLKw6sOB6fPW1YdVRR4awa0EIU2ulaxlIeptdmtQdB6Zh2GOtbAefdMtz4swEgEax4KT04xichsfpu3NSQKiGKQwl2fH+v3nNh4ZVWEzf/s0DpP2mUqByKg8hgw2VD1GcD5/CeL5O059xQN6oXHH3FOLAlwEgnMMDEpG7TN55johumC8d13gH1ptfK2pxURQEJQiLHbIxqNfPUMkqX6BgF5j4NF6FZAsZljJxbmzWC5OvJlUr8eoT79QB00KN7Z9ZsGMi3jBllsmoA/bzLId6J8D7dlxizmE0rKidf5Jg9udivOPSG4jzfPayxlMt/jde4=') \
    .config('fs.s3a.path.style.access', 'true') \
    .config('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem') \
    .config('fs.s3a.endpoint', 's3.amazonaws.com') \
    .getOrCreate()
sc = spark.sparkContext

In [4]:
spark

In [5]:
sc

### Cargar los datos desde Google Drive

In [6]:
# Cargar los datos desde S3 a un dataframe
df=spark.read.csv('s3a://dxninob-p3/datasets/covid19/Casos_positivos_de_COVID-19_en_Colombia-100K.csv',inferSchema=True,header=True)

### Análisis exploratorio de los datos con dataframes

In [7]:
from pyspark.sql.types import StringType,DoubleType,IntegerType

#### 2.1. Columnas

In [8]:
# Ver el nombre de todas las columnas
df.columns

['fecha reporte web',
 'ID de caso',
 'Fecha de notificación',
 'Código DIVIPOLA departamento',
 'Nombre departamento',
 'Código DIVIPOLA municipio',
 'Nombre municipio',
 'Edad',
 'Unidad de medida de edad',
 'Sexo',
 'Tipo de contagio',
 'Ubicación del caso',
 'Estado',
 'Código ISO del país',
 'Nombre del país',
 'Recuperado',
 'Fecha de inicio de síntomas',
 'Fecha de muerte',
 'Fecha de diagnóstico',
 'Fecha de recuperación',
 'Tipo de recuperación',
 'Pertenencia étnica',
 'Nombre del grupo étnico']

#### 2.2. Tipos de datos

In [9]:
# Ver el tipo de dato de todas las columnas
df.dtypes

[('fecha reporte web', 'string'),
 ('ID de caso', 'int'),
 ('Fecha de notificación', 'string'),
 ('Código DIVIPOLA departamento', 'int'),
 ('Nombre departamento', 'string'),
 ('Código DIVIPOLA municipio', 'int'),
 ('Nombre municipio', 'string'),
 ('Edad', 'int'),
 ('Unidad de medida de edad', 'int'),
 ('Sexo', 'string'),
 ('Tipo de contagio', 'string'),
 ('Ubicación del caso', 'string'),
 ('Estado', 'string'),
 ('Código ISO del país', 'int'),
 ('Nombre del país', 'string'),
 ('Recuperado', 'string'),
 ('Fecha de inicio de síntomas', 'string'),
 ('Fecha de muerte', 'string'),
 ('Fecha de diagnóstico', 'string'),
 ('Fecha de recuperación', 'string'),
 ('Tipo de recuperación', 'string'),
 ('Pertenencia étnica', 'int'),
 ('Nombre del grupo étnico', 'string')]

#### 2.3. Seleccionar algunas Columnas

In [10]:
# Seleccionar la columna de la fecha de notificación
df.select('Fecha de notificación').show(5)

+---------------------+
|Fecha de notificación|
+---------------------+
|     2/3/2020 0:00:00|
|     6/3/2020 0:00:00|
|     7/3/2020 0:00:00|
|     9/3/2020 0:00:00|
|     9/3/2020 0:00:00|
+---------------------+
only showing top 5 rows



In [11]:
# Seleccionar las columnas del id de caso, fecha de diagnóstico y fecha de recuperación
df.select('ID de caso','Fecha de diagnóstico', 'Fecha de recuperación').show(5)

+----------+--------------------+---------------------+
|ID de caso|Fecha de diagnóstico|Fecha de recuperación|
+----------+--------------------+---------------------+
|         1|    6/3/2020 0:00:00|    13/3/2020 0:00:00|
|         2|    9/3/2020 0:00:00|    19/3/2020 0:00:00|
|         3|    9/3/2020 0:00:00|    15/3/2020 0:00:00|
|         4|   11/3/2020 0:00:00|    26/3/2020 0:00:00|
|         5|   11/3/2020 0:00:00|    23/3/2020 0:00:00|
+----------+--------------------+---------------------+
only showing top 5 rows



In [12]:
# Seleccionar las columnas del sexo y de recuperado
df.select('Sexo', 'Recuperado').show(5)

+----+----------+
|Sexo|Recuperado|
+----+----------+
|   F|Recuperado|
|   M|Recuperado|
|   F|Recuperado|
|   M|Recuperado|
|   M|Recuperado|
+----+----------+
only showing top 5 rows



#### 2.4. Renombrar columnas

In [13]:
# Renombrar columna ID de caso a id
df.withColumnRenamed('ID de caso', 'id').show(5)

+-----------------+---+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+---------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+
|fecha reporte web| id|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|
+-----------------+---+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+

In [14]:
# Renombrar columna fecha reporte web a Report Date y ID de caso a Case ID
df.withColumnRenamed('fecha reporte web', 'Report Date').withColumnRenamed('ID de caso', 'Case ID').show(5)

+-----------------+-------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+---------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+
|      Report Date|Case ID|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|
+-----------------+-------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------

#### 2.5. Agregar columnas

In [15]:
# Agregar una columna que incremente la edad de las personas 10 años
df.withColumn("Edad + 10 años",(df["Edad"]+10)).show(5)

+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+---------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+--------------+
|fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|Edad + 10 años|
+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+-

In [16]:
# Agregar una fila que contenga el id del caso en un String
df.withColumn('ID caso string',df["ID de caso"].cast(StringType())).dtypes

[('fecha reporte web', 'string'),
 ('ID de caso', 'int'),
 ('Fecha de notificación', 'string'),
 ('Código DIVIPOLA departamento', 'int'),
 ('Nombre departamento', 'string'),
 ('Código DIVIPOLA municipio', 'int'),
 ('Nombre municipio', 'string'),
 ('Edad', 'int'),
 ('Unidad de medida de edad', 'int'),
 ('Sexo', 'string'),
 ('Tipo de contagio', 'string'),
 ('Ubicación del caso', 'string'),
 ('Estado', 'string'),
 ('Código ISO del país', 'int'),
 ('Nombre del país', 'string'),
 ('Recuperado', 'string'),
 ('Fecha de inicio de síntomas', 'string'),
 ('Fecha de muerte', 'string'),
 ('Fecha de diagnóstico', 'string'),
 ('Fecha de recuperación', 'string'),
 ('Tipo de recuperación', 'string'),
 ('Pertenencia étnica', 'int'),
 ('Nombre del grupo étnico', 'string'),
 ('ID caso string', 'string')]

In [17]:
# Agregar la columna con la suma del ID del caso de la persona con su edad
df.withColumn("Ubicación",df["ID de caso"]+ df["Edad"]).show(5)

+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+---------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+---------+
|fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|Ubicación|
+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+-----------

#### 2.6. Borrar columnas

In [18]:
# Borrar la columna fecha reporte web
df.drop('fecha reporte web').show(5)

+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+---------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+
|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|
+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+----------------

In [19]:
# Borrar las columnas fecha reporte web,  ID de caso y Fecha de notificación
df.drop('fecha reporte web', 'ID de caso', 'Fecha de notificación').show(5)

+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+---------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+
|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|
+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+---------------+----------+---------------------------+--------------

#### 2.7. Filtrar datos

In [20]:
# Filtrar a las personas con tipo de contagio relacionado
df.filter(df['Tipo de contagio']=='Relacionado').show(5)

+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+---------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+
|fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|
+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+---------

In [21]:
# Filtrar las mujeres de Medellín que se recuperaron por tiempo
df.filter(df['Sexo']=='F').filter(df['Nombre municipio']=='MEDELLIN').filter(df['Tipo de recuperación']=='Tiempo').show(5)

+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+---------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+
|fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|
+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+---------

In [22]:
# Filtrar la personas con tipo de contagio importado ubicadas en Ecuador
df.filter((df['Tipo de contagio']=='Importado')&(df['Nombre del país']=='ECUADOR')).show(5)

+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+---------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+
|fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|
+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+---------

In [23]:
# Filtrar los bebes de genero masculino menores a 5 años que fallecieron por COVID
df.filter(df['Edad']<5).filter(df['Sexo']=='M').filter(df['Recuperado']=='Fallecido').show(5)

+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+--------------------+----+------------------------+----+----------------+------------------+---------+-------------------+---------------+----------+---------------------------+-----------------+--------------------+---------------------+--------------------+------------------+-----------------------+
|fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|    Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|   Estado|Código ISO del país|Nombre del país|Recuperado|Fecha de inicio de síntomas|  Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|
+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+-------------

#### 2.8. Ejecutar alguna función UDF o lambda sobre alguna columna creando una nueva

In [24]:
# Importar UDF y Pandas UDF
from pyspark.sql.functions import udf, pandas_udf, PandasUDFType

Ejemplo 1 - Agregar una columna según el rango de edad de la persona

In [25]:
# Función común
def price_range(edad): 
    if edad < 18:
      return 'Niño'
    elif edad < 35:
        return 'Joven'
    elif edad < 65:
        return 'Adulto'
    else:
        return 'Adulto mayor'

# Crear el UDF usando la función común
Edad_udf = udf(price_range, StringType())

# Aplicar el UDF en el dataframe
df.withColumn('Rango de edad',Edad_udf(df['Edad'])).show(5)

+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+---------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+-------------+
|fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|Rango de edad|
+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+---

In [26]:
# Crear el UDF usando la función lambda
Edad_udf2 = udf(lambda edad: "Niño" if edad < 18 else "Joven" if edad < 35 else "Adulto" if edad < 65 else "Adulto mayor", StringType())

# Aplicar el UDF en el dataframe
df.withColumn('Rango de edad',Edad_udf2(df['Edad'])).show(5)

+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+---------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+-------------+
|fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|Rango de edad|
+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+---

Ejemplo 2 - Agregar una columna de la ubicación de la persona, la cual indique su  departamento y municipio

In [27]:
# Función común
def ubicacion(dpto, mpio):
    ubicacion = dpto + ", " + mpio
    return ubicacion

# Crear el UDF usando la función común con Pandas UDF
ubicacion_udf = pandas_udf(ubicacion, StringType())

# Aplicar el UDF en el dataframe
df.withColumn('Ubicacion',ubicacion_udf(df['Nombre departamento'], df['Nombre municipio'])).show(5)

+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+---------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+-------------------+
|fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|          Ubicacion|
+-----------------+----------+---------------------+----------------------------+-------------------+-----------------

In [28]:
# Crear el UDF usando la función lambda con Pandas UDF
ubicacion_udf2 = pandas_udf(lambda dpto, mpio: dpto + ', ' + mpio, StringType())

# Aplicar el UDF en el dataframe
df.withColumn('Ubicacion',ubicacion_udf2(df['Nombre departamento'],df['Nombre municipio'])).show(5)

+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+---------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+-------------------+
|fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|          Ubicacion|
+-----------------+----------+---------------------+----------------------------+-------------------+-----------------

### Preguntas adicionales

In [29]:
df.createOrReplaceTempView("dataset_covid_sql")

#### 3.1. Los 10 departamentos con más casos de covid en Colombia ordenados de mayor a menor

In [30]:
punto31 = df.groupBy('Nombre departamento').count().orderBy('count',ascending=False)
punto31.show(10)

+-------------------+-----+
|Nombre departamento|count|
+-------------------+-----+
|             BOGOTA|30016|
|       BARRANQUILLA|13065|
|          ATLANTICO|10994|
|              VALLE|10404|
|          CARTAGENA| 8333|
|          ANTIOQUIA| 4554|
|             NARIÑO| 3520|
|       CUNDINAMARCA| 2827|
|           AMAZONAS| 2317|
|              CHOCO| 1636|
+-------------------+-----+
only showing top 10 rows



In [31]:
spark.sql("""SELECT `Nombre departamento`,
COUNT(*)
FROM dataset_covid_sql
GROUP BY `Nombre departamento`
ORDER BY COUNT(*)
DESC""").show(10)

+-------------------+--------+
|Nombre departamento|count(1)|
+-------------------+--------+
|             BOGOTA|   30016|
|       BARRANQUILLA|   13065|
|          ATLANTICO|   10994|
|              VALLE|   10404|
|          CARTAGENA|    8333|
|          ANTIOQUIA|    4554|
|             NARIÑO|    3520|
|       CUNDINAMARCA|    2827|
|           AMAZONAS|    2317|
|              CHOCO|    1636|
+-------------------+--------+
only showing top 10 rows



#### 3.2. Las 10 ciudades con más casos de covid en Colombia ordenados de mayor a menor

In [32]:
punto32 = df.groupBy('Nombre municipio').count().orderBy('count',ascending=False)
punto32.show(10)

+----------------+-----+
|Nombre municipio|count|
+----------------+-----+
|          BOGOTA|30016|
|    BARRANQUILLA|13065|
|       CARTAGENA| 8333|
|            CALI| 7747|
|         SOLEDAD| 6233|
|         LETICIA| 2194|
|        MEDELLIN| 2137|
|          TUMACO| 1501|
|    BUENAVENTURA| 1453|
|          QUIBDO| 1367|
+----------------+-----+
only showing top 10 rows



In [33]:
spark.sql("""SELECT `Nombre municipio`,
COUNT(*)
FROM dataset_covid_sql
GROUP BY `Nombre municipio`
ORDER BY COUNT(*)
DESC""").show(10)

+----------------+--------+
|Nombre municipio|count(1)|
+----------------+--------+
|          BOGOTA|   30016|
|    BARRANQUILLA|   13065|
|       CARTAGENA|    8333|
|            CALI|    7747|
|         SOLEDAD|    6233|
|         LETICIA|    2194|
|        MEDELLIN|    2137|
|          TUMACO|    1501|
|    BUENAVENTURA|    1453|
|          QUIBDO|    1367|
+----------------+--------+
only showing top 10 rows



#### 3.3. Los 10 días con más casos de covid en Colombia ordenados de mayor a menor

In [34]:
punto33 = df.groupBy('Fecha de diagnóstico').count().orderBy('count',ascending=False)
punto33.show(10)

+--------------------+-----+
|Fecha de diagnóstico|count|
+--------------------+-----+
|   26/6/2020 0:00:00| 4390|
|   27/6/2020 0:00:00| 4019|
|   28/6/2020 0:00:00| 3580|
|   25/6/2020 0:00:00| 3381|
|   19/6/2020 0:00:00| 3053|
|   18/6/2020 0:00:00| 3040|
|   23/6/2020 0:00:00| 3031|
|   22/6/2020 0:00:00| 2938|
|   21/6/2020 0:00:00| 2781|
|   24/6/2020 0:00:00| 2564|
+--------------------+-----+
only showing top 10 rows



In [35]:
spark.sql("""SELECT `Fecha de diagnóstico`,
COUNT(*)
FROM dataset_covid_sql
GROUP BY `Fecha de diagnóstico`
ORDER BY COUNT(*)
DESC""").show(10)

+--------------------+--------+
|Fecha de diagnóstico|count(1)|
+--------------------+--------+
|   26/6/2020 0:00:00|    4390|
|   27/6/2020 0:00:00|    4019|
|   28/6/2020 0:00:00|    3580|
|   25/6/2020 0:00:00|    3381|
|   19/6/2020 0:00:00|    3053|
|   18/6/2020 0:00:00|    3040|
|   23/6/2020 0:00:00|    3031|
|   22/6/2020 0:00:00|    2938|
|   21/6/2020 0:00:00|    2781|
|   24/6/2020 0:00:00|    2564|
+--------------------+--------+
only showing top 10 rows



#### 3.4. Distribución de casos por edades de covid en Colombia

In [36]:
punto34 = df.groupBy('Edad').count().orderBy('Edad',ascending=True)
punto34.show()

+----+-----+
|Edad|count|
+----+-----+
|   1|  485|
|   2|  440|
|   3|  449|
|   4|  373|
|   5|  425|
|   6|  431|
|   7|  442|
|   8|  461|
|   9|  467|
|  10|  530|
|  11|  566|
|  12|  562|
|  13|  531|
|  14|  580|
|  15|  560|
|  16|  600|
|  17|  685|
|  18| 1160|
|  19| 1567|
|  20| 1674|
+----+-----+
only showing top 20 rows



In [37]:
spark.sql("""SELECT Edad,
COUNT(*)
FROM dataset_covid_sql
GROUP BY Edad
ORDER BY Edad
ASC""").show()

+----+--------+
|Edad|count(1)|
+----+--------+
|   1|     485|
|   2|     440|
|   3|     449|
|   4|     373|
|   5|     425|
|   6|     431|
|   7|     442|
|   8|     461|
|   9|     467|
|  10|     530|
|  11|     566|
|  12|     562|
|  13|     531|
|  14|     580|
|  15|     560|
|  16|     600|
|  17|     685|
|  18|    1160|
|  19|    1567|
|  20|    1674|
+----+--------+
only showing top 20 rows



#### 3.5. Realice la pregunta de negocio que quiera sobre los datos y respondala con la correspondiente programación en Spark

¿Cual es la edad promedio de los hombres contagiados de la ciudad de Medellín?

In [38]:
punto35v1 = df.filter(df['Nombre municipio']=='MEDELLIN').filter(df['Sexo']=='M').agg({'Edad':'mean'})
punto35v1.show()

+-----------------+
|        avg(Edad)|
+-----------------+
|37.51642335766423|
+-----------------+



In [39]:
spark.sql("""SELECT AVG(Edad)
FROM dataset_covid_sql
WHERE `Nombre municipio`='MEDELLIN'
AND `Sexo`='M'
""").show()

+-----------------+
|        avg(Edad)|
+-----------------+
|37.51642335766423|
+-----------------+



¿Cual es la edad promedio de las mujeres contagiadas distribuidas por departamentos de menor a mayor?

In [40]:
punto35v2 = df.groupBy('Nombre departamento').agg({'Edad':'mean'}).orderBy('avg(Edad)')
punto35v2.show()

+-------------------+------------------+
|Nombre departamento|         avg(Edad)|
+-------------------+------------------+
|            VICHADA|              20.0|
|           GUAVIARE| 26.23076923076923|
|             ARAUCA|26.329113924050635|
|               META|35.321513002364064|
|         SAN ANDRES| 35.95652173913044|
|          ANTIOQUIA|36.193895476504174|
|            CAQUETA| 36.23076923076923|
|            GUAINIA| 36.57142857142857|
|             VAUPES| 36.57142857142857|
|           AMAZONAS| 36.72378075097108|
|              CESAR| 37.00757575757576|
|          SANTANDER| 37.12169312169312|
|              CHOCO|37.673594132029336|
|            GUAJIRA|              37.8|
|             TOLIMA| 38.01974333662389|
|             NARIÑO|38.269602272727276|
|       CUNDINAMARCA|38.392642377078175|
|          RISARALDA| 38.56697247706422|
|             BOGOTA|38.625966151385924|
|            BOLIVAR|39.503006012024045|
+-------------------+------------------+
only showing top

In [41]:
spark.sql("""SELECT `Nombre departamento`, AVG(Edad)
FROM dataset_covid_sql
GROUP BY `Nombre departamento`
ORDER BY AVG(Edad)
ASC
""").show()

+-------------------+------------------+
|Nombre departamento|         avg(Edad)|
+-------------------+------------------+
|            VICHADA|              20.0|
|           GUAVIARE| 26.23076923076923|
|             ARAUCA|26.329113924050635|
|               META|35.321513002364064|
|         SAN ANDRES| 35.95652173913044|
|          ANTIOQUIA|36.193895476504174|
|            CAQUETA| 36.23076923076923|
|            GUAINIA| 36.57142857142857|
|             VAUPES| 36.57142857142857|
|           AMAZONAS| 36.72378075097108|
|              CESAR| 37.00757575757576|
|          SANTANDER| 37.12169312169312|
|              CHOCO|37.673594132029336|
|            GUAJIRA|              37.8|
|             TOLIMA| 38.01974333662389|
|             NARIÑO|38.269602272727276|
|       CUNDINAMARCA|38.392642377078175|
|          RISARALDA| 38.56697247706422|
|             BOGOTA|38.625966151385924|
|            BOLIVAR|39.503006012024045|
+-------------------+------------------+
only showing top

### Guardar los datos en un bucket público

In [None]:
# Guardar punto 3.1 en CSV
csv_uri='s3a://dxninob-p3/resultados/punto31_csv'
punto31.coalesce(1).write.format("csv").option("header","true").save(csv_uri)
# Guardar punto 3.1 en parquet
parquet_uri='s3a://dxninob-p3/resultados/punto31_parquet'
punto31.write.format('parquet').save(parquet_uri)

In [None]:
# Guardar punto 3.2 en CSV
csv_uri='s3a://dxninob-p3/resultados/punto32_csv'
punto32.coalesce(1).write.format("csv").option("header","true").save(csv_uri)
# Guardar punto 3.1 en parquet
parquet_uri='s3a://dxninob-p3/resultados/punto32_parquet'
punto32.write.format('parquet').save(parquet_uri)

In [None]:
# Guardar punto 3.3 en CSV
csv_uri='s3a://dxninob-p3/resultados/punto33_csv'
punto33.coalesce(1).write.format("csv").option("header","true").save(csv_uri)
# Guardar punto 3.1 en parquet
parquet_uri='s3a://dxninob-p3/resultados/punto33_parquet'
punto33.write.format('parquet').save(parquet_uri)

In [None]:
# Guardar punto 3.4 en CSV
csv_uri='s3a://dxninob-p3/resultados/punto34_csv'
punto34.coalesce(1).write.format("csv").option("header","true").save(csv_uri)
# Guardar punto 3.1 en parquet
parquet_uri='s3a://dxninob-p3/resultados/punto34_parquet'
punto34.write.format('parquet').save(parquet_uri)

In [None]:
# Guardar punto 3.5 en CSV
# Pregunta 1
csv_uri='s3a://dxninob-p3/resultados/punto35v1_csv'
punto35v1.coalesce(1).write.format("csv").option("header","true").save(csv_uri)
# Guardar punto 3.1 en parquet
parquet_uri='s3a://dxninob-p3/resultados/punto35v1_parquet'
punto35v1.write.format('parquet').save(parquet_uri)

# Pregunta2
csv_uri='s3a://dxninob-p3/resultados/punto35v2_csv'
punto35v2.coalesce(1).write.format("csv").option("header","true").save(csv_uri)
# Guardar punto 3.1 en parquet
parquet_uri='s3a://dxninob-p3/resultados/punto35v2_parquet'
punto35v2.write.format('parquet').save(parquet_uri)