# Data Processing using Pyspark

In [1]:
#configuración en google colab de spark y pyspark
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [2]:
#instalar java y spark
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
!tar xf spark-3.5.3-bin-hadoop3.tgz
!pip install -q findspark

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.3-bin-hadoop3"

In [4]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [5]:
spark

In [6]:
sc

In [7]:
# Load csv Dataset
#df=spark.read.csv('s3://<bucket/dir>/sample_data.csv',inferSchema=True,header=True)
df=spark.read.csv('gdrive/MyDrive/st0263-labs/datasets/Casos_positivos_de_COVID-19_en_Colombia-1K.csv',inferSchema=True,header=True)

In [8]:
#columns of dataframe
df.columns

['fecha reporte web',
 'ID de caso',
 'Fecha de notificación',
 'Código DIVIPOLA departamento',
 'Nombre departamento',
 'Código DIVIPOLA municipio',
 'Nombre municipio',
 'Edad',
 'Unidad de medida de edad',
 'Sexo',
 'Tipo de contagio',
 'Ubicación del caso',
 'Estado',
 'Código ISO del país',
 'Nombre del país',
 'Recuperado',
 'Fecha de inicio de síntomas',
 'Fecha de muerte',
 'Fecha de diagnóstico',
 'Fecha de recuperación',
 'Tipo de recuperación',
 'Pertenencia étnica',
 'Nombre del grupo étnico']

In [9]:
#check number of columns
len(df.columns)

23

In [10]:
#number of records in dataframe
df.count()

1000

In [11]:
#shape of dataset
print((df.count(),len(df.columns)))

(1000, 23)


In [12]:
#printSchema
df.printSchema()

root
 |-- fecha reporte web: string (nullable = true)
 |-- ID de caso: integer (nullable = true)
 |-- Fecha de notificación: string (nullable = true)
 |-- Código DIVIPOLA departamento: integer (nullable = true)
 |-- Nombre departamento: string (nullable = true)
 |-- Código DIVIPOLA municipio: integer (nullable = true)
 |-- Nombre municipio: string (nullable = true)
 |-- Edad: integer (nullable = true)
 |-- Unidad de medida de edad: integer (nullable = true)
 |-- Sexo: string (nullable = true)
 |-- Tipo de contagio: string (nullable = true)
 |-- Ubicación del caso: string (nullable = true)
 |-- Estado: string (nullable = true)
 |-- Código ISO del país: integer (nullable = true)
 |-- Nombre del país: string (nullable = true)
 |-- Recuperado: string (nullable = true)
 |-- Fecha de inicio de síntomas: string (nullable = true)
 |-- Fecha de muerte: string (nullable = true)
 |-- Fecha de diagnóstico: string (nullable = true)
 |-- Fecha de recuperación: string (nullable = true)
 |-- Tipo de r

In [13]:
#fisrt few rows of dataframe
df.show(5)

+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+---------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+
|fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|
+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+---------

In [14]:
#select only 2 columns
df.select('Edad','Nombre municipio').show(5)

+----+----------------+
|Edad|Nombre municipio|
+----+----------------+
|  19|          BOGOTA|
|  34|            BUGA|
|  50|        MEDELLIN|
|  55|        MEDELLIN|
|  25|        MEDELLIN|
+----+----------------+
only showing top 5 rows



In [15]:
#info about dataframe
df.describe().show()

+-------+-----------------+-----------------+---------------------+----------------------------+-------------------+-------------------------+----------------+-----------------+------------------------+----+----------------+------------------+---------+-------------------+---------------+----------+---------------------------+----------------+--------------------+---------------------+--------------------+------------------+-----------------------+
|summary|fecha reporte web|       ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|             Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|   Estado|Código ISO del país|Nombre del país|Recuperado|Fecha de inicio de síntomas| Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|
+-------+-----------------+-----------------+---------------------+---------------------------

In [16]:
# Seleccionamos algunas columnas del dataset, imprimimos 10 filas.
nuevo_df = df.select('ID de caso', 'Edad','Nombre municipio', 'Tipo de contagio', 'Estado')
nuevo_df.show(10)

+----------+----+----------------+----------------+------+
|ID de caso|Edad|Nombre municipio|Tipo de contagio|Estado|
+----------+----+----------------+----------------+------+
|         1|  19|          BOGOTA|       Importado|  Leve|
|         2|  34|            BUGA|       Importado|  Leve|
|         3|  50|        MEDELLIN|       Importado|  Leve|
|         4|  55|        MEDELLIN|     Relacionado|  Leve|
|         5|  25|        MEDELLIN|     Relacionado|  Leve|
|         6|  27|          ITAGUI|     Relacionado|  Leve|
|         7|  85|       CARTAGENA|       Importado|  Leve|
|         8|  22|          BOGOTA|       Importado|  Leve|
|         9|  28|          BOGOTA|       Importado|  Leve|
|        10|  36|          BOGOTA|       Importado|  Leve|
+----------+----+----------------+----------------+------+
only showing top 10 rows



In [17]:
# Renombramos dos columnas 'Nombre municipio', 'Tipo de contagio'
df_rename=nuevo_df.withColumnRenamed('Nombre municipio','Municipio').withColumnRenamed('Tipo de contagio','Contagio')
df_rename.show(10)

+----------+----+---------+-----------+------+
|ID de caso|Edad|Municipio|   Contagio|Estado|
+----------+----+---------+-----------+------+
|         1|  19|   BOGOTA|  Importado|  Leve|
|         2|  34|     BUGA|  Importado|  Leve|
|         3|  50| MEDELLIN|  Importado|  Leve|
|         4|  55| MEDELLIN|Relacionado|  Leve|
|         5|  25| MEDELLIN|Relacionado|  Leve|
|         6|  27|   ITAGUI|Relacionado|  Leve|
|         7|  85|CARTAGENA|  Importado|  Leve|
|         8|  22|   BOGOTA|  Importado|  Leve|
|         9|  28|   BOGOTA|  Importado|  Leve|
|        10|  36|   BOGOTA|  Importado|  Leve|
+----------+----+---------+-----------+------+
only showing top 10 rows



In [18]:
from pyspark.sql.types import StringType,DoubleType,IntegerType

In [19]:
#with column
df_rename.withColumn("Edad mas 10",(df["Edad"]+10)).show(10,False)

+----------+----+---------+-----------+------+-----------+
|ID de caso|Edad|Municipio|Contagio   |Estado|Edad mas 10|
+----------+----+---------+-----------+------+-----------+
|1         |19  |BOGOTA   |Importado  |Leve  |29         |
|2         |34  |BUGA     |Importado  |Leve  |44         |
|3         |50  |MEDELLIN |Importado  |Leve  |60         |
|4         |55  |MEDELLIN |Relacionado|Leve  |65         |
|5         |25  |MEDELLIN |Relacionado|Leve  |35         |
|6         |27  |ITAGUI   |Relacionado|Leve  |37         |
|7         |85  |CARTAGENA|Importado  |Leve  |95         |
|8         |22  |BOGOTA   |Importado  |Leve  |32         |
|9         |28  |BOGOTA   |Importado  |Leve  |38         |
|10        |36  |BOGOTA   |Importado  |Leve  |46         |
+----------+----+---------+-----------+------+-----------+
only showing top 10 rows



In [20]:
df_rename.withColumn('Edad tipo double',df['Edad'].cast(DoubleType())).show(10,False)

+----------+----+---------+-----------+------+----------------+
|ID de caso|Edad|Municipio|Contagio   |Estado|Edad tipo double|
+----------+----+---------+-----------+------+----------------+
|1         |19  |BOGOTA   |Importado  |Leve  |19.0            |
|2         |34  |BUGA     |Importado  |Leve  |34.0            |
|3         |50  |MEDELLIN |Importado  |Leve  |50.0            |
|4         |55  |MEDELLIN |Relacionado|Leve  |55.0            |
|5         |25  |MEDELLIN |Relacionado|Leve  |25.0            |
|6         |27  |ITAGUI   |Relacionado|Leve  |27.0            |
|7         |85  |CARTAGENA|Importado  |Leve  |85.0            |
|8         |22  |BOGOTA   |Importado  |Leve  |22.0            |
|9         |28  |BOGOTA   |Importado  |Leve  |28.0            |
|10        |36  |BOGOTA   |Importado  |Leve  |36.0            |
+----------+----+---------+-----------+------+----------------+
only showing top 10 rows



In [21]:
# Eliminar la columna 'Suma_double'.
df_rename.drop('Edad tipo double').show(10)

+----------+----+---------+-----------+------+
|ID de caso|Edad|Municipio|   Contagio|Estado|
+----------+----+---------+-----------+------+
|         1|  19|   BOGOTA|  Importado|  Leve|
|         2|  34|     BUGA|  Importado|  Leve|
|         3|  50| MEDELLIN|  Importado|  Leve|
|         4|  55| MEDELLIN|Relacionado|  Leve|
|         5|  25| MEDELLIN|Relacionado|  Leve|
|         6|  27|   ITAGUI|Relacionado|  Leve|
|         7|  85|CARTAGENA|  Importado|  Leve|
|         8|  22|   BOGOTA|  Importado|  Leve|
|         9|  28|   BOGOTA|  Importado|  Leve|
|        10|  36|   BOGOTA|  Importado|  Leve|
+----------+----+---------+-----------+------+
only showing top 10 rows



In [22]:
#filter the records
df_rename.filter(df_rename['Municipio']=='MEDELLIN').show(10)

+----------+----+---------+-----------+------+
|ID de caso|Edad|Municipio|   Contagio|Estado|
+----------+----+---------+-----------+------+
|         3|  50| MEDELLIN|  Importado|  Leve|
|         4|  55| MEDELLIN|Relacionado|  Leve|
|         5|  25| MEDELLIN|Relacionado|  Leve|
|        20|  26| MEDELLIN|Relacionado|  Leve|
|        21|  28| MEDELLIN|Relacionado|  Leve|
|        32|  55| MEDELLIN|  Importado|  Leve|
|       106|  44| MEDELLIN|  Importado|  Leve|
|       107|  56| MEDELLIN|  Importado|  Leve|
|       108|  57| MEDELLIN|  Importado|  Leve|
|       131|  22| MEDELLIN|  Importado|  Leve|
+----------+----+---------+-----------+------+
only showing top 10 rows



In [23]:
#filter the records
df_rename.filter(df_rename['Municipio']=='MEDELLIN').select('Edad','Municipio','Contagio').show(10)

+----+---------+-----------+
|Edad|Municipio|   Contagio|
+----+---------+-----------+
|  50| MEDELLIN|  Importado|
|  55| MEDELLIN|Relacionado|
|  25| MEDELLIN|Relacionado|
|  26| MEDELLIN|Relacionado|
|  28| MEDELLIN|Relacionado|
|  55| MEDELLIN|  Importado|
|  44| MEDELLIN|  Importado|
|  56| MEDELLIN|  Importado|
|  57| MEDELLIN|  Importado|
|  22| MEDELLIN|  Importado|
+----+---------+-----------+
only showing top 10 rows



In [24]:
#filter the multiple conditions
df_rename.filter(df_rename['Municipio']=='BOGOTA').filter(df_rename['Edad'] <18).show(10)

+----------+----+---------+-----------+------+
|ID de caso|Edad|Municipio|   Contagio|Estado|
+----------+----+---------+-----------+------+
|       776|   2|   BOGOTA|Relacionado|  Leve|
|       787|   2|   BOGOTA|Relacionado|  Leve|
|       870|   2|   BOGOTA|Relacionado|  Leve|
|       873|  15|   BOGOTA|Relacionado|  Leve|
|       879|  13|   BOGOTA|Relacionado|  Leve|
|       900|   2|   BOGOTA|Relacionado|  Leve|
|       984|  15|   BOGOTA|Relacionado|  Leve|
|       998|  17|   BOGOTA|Comunitaria|  Leve|
+----------+----+---------+-----------+------+



In [25]:
#Distinct Values in a column
df_rename.select('Contagio').distinct().show()

+-----------+
|   Contagio|
+-----------+
|Relacionado|
|  Importado|
|Comunitaria|
| En estudio|
+-----------+



In [26]:
#distinct value count
df_rename.select('Contagio').distinct().count()

4

In [27]:
df_rename.groupBy('Contagio').count().show(5,False)

+-----------+-----+
|Contagio   |count|
+-----------+-----+
|Relacionado|459  |
|Importado  |477  |
|Comunitaria|63   |
|En estudio |1    |
+-----------+-----+



In [28]:
# Value counts
df_rename.groupBy('Contagio').count().orderBy('count',ascending=False).show(4,False)

+-----------+-----+
|Contagio   |count|
+-----------+-----+
|Importado  |477  |
|Relacionado|459  |
|Comunitaria|63   |
|En estudio |1    |
+-----------+-----+



In [29]:
# Value counts
df_rename.groupBy('Contagio').mean().show(4,False)

+-----------+------------------+-----------------+
|Contagio   |avg(ID de caso)   |avg(Edad)        |
+-----------+------------------+-----------------+
|Relacionado|539.8387799564271 |41.17211328976035|
|Importado  |425.77358490566036|42.44863731656184|
|Comunitaria|776.8730158730159 |46.06349206349206|
|En estudio |677.0             |80.0             |
+-----------+------------------+-----------------+



In [30]:
df_rename.groupBy('Contagio').sum().show(4,False)

+-----------+---------------+---------+
|Contagio   |sum(ID de caso)|sum(Edad)|
+-----------+---------------+---------+
|Relacionado|247786         |18898    |
|Importado  |203094         |20248    |
|Comunitaria|48943          |2902     |
|En estudio |677            |80       |
+-----------+---------------+---------+



In [31]:
# Value counts
df_rename.groupBy('Contagio').max().show(4,False)

+-----------+---------------+---------+
|Contagio   |max(ID de caso)|max(Edad)|
+-----------+---------------+---------+
|Relacionado|992            |88       |
|Importado  |994            |87       |
|Comunitaria|1000           |82       |
|En estudio |677            |80       |
+-----------+---------------+---------+



In [32]:
# Value counts
df_rename.groupBy('Contagio').min().show(4,False)

+-----------+---------------+---------+
|Contagio   |min(ID de caso)|min(Edad)|
+-----------+---------------+---------+
|Relacionado|4              |1        |
|Importado  |1              |9        |
|Comunitaria|250            |6        |
|En estudio |677            |80       |
+-----------+---------------+---------+



In [33]:
#Aggregation
df_rename.groupBy('Contagio').agg({'Edad':'sum'}).show(4,False)

+-----------+---------+
|Contagio   |sum(Edad)|
+-----------+---------+
|Relacionado|18898    |
|Importado  |20248    |
|Comunitaria|2902     |
|En estudio |80       |
+-----------+---------+



In [34]:
# UDF
from pyspark.sql.functions import udf


In [35]:
# Función para clasificar la edad en rangos de 10 en 10
def clasificar_edad(edad):
    if edad is None:
        return "Desconocido"  # Para manejar valores nulos
    rango_inferior = (edad // 10) * 10
    rango_superior = rango_inferior + 9
    return f"{rango_inferior}-{rango_superior}"

In [36]:
#create udf using python function
edades_udf=udf(clasificar_edad,StringType())
#apply udf on dataframe
df_rename.withColumn('Rango edades',edades_udf(df_rename['Edad'])).show(10,False)

+----------+----+---------+-----------+------+------------+
|ID de caso|Edad|Municipio|Contagio   |Estado|Rango edades|
+----------+----+---------+-----------+------+------------+
|1         |19  |BOGOTA   |Importado  |Leve  |10-19       |
|2         |34  |BUGA     |Importado  |Leve  |30-39       |
|3         |50  |MEDELLIN |Importado  |Leve  |50-59       |
|4         |55  |MEDELLIN |Relacionado|Leve  |50-59       |
|5         |25  |MEDELLIN |Relacionado|Leve  |20-29       |
|6         |27  |ITAGUI   |Relacionado|Leve  |20-29       |
|7         |85  |CARTAGENA|Importado  |Leve  |80-89       |
|8         |22  |BOGOTA   |Importado  |Leve  |20-29       |
|9         |28  |BOGOTA   |Importado  |Leve  |20-29       |
|10        |36  |BOGOTA   |Importado  |Leve  |30-39       |
+----------+----+---------+-----------+------+------------+
only showing top 10 rows



In [37]:
# Usando una funcion lamda
edad_udf = udf(lambda age: "Joven" if age <= 25 else "Adulto", StringType())
# Aplicando udf en el dataframe.
df_rename.withColumn("Grupo de edad", edad_udf(df_rename.Edad)).show(10,False)

+----------+----+---------+-----------+------+-------------+
|ID de caso|Edad|Municipio|Contagio   |Estado|Grupo de edad|
+----------+----+---------+-----------+------+-------------+
|1         |19  |BOGOTA   |Importado  |Leve  |Joven        |
|2         |34  |BUGA     |Importado  |Leve  |Adulto       |
|3         |50  |MEDELLIN |Importado  |Leve  |Adulto       |
|4         |55  |MEDELLIN |Relacionado|Leve  |Adulto       |
|5         |25  |MEDELLIN |Relacionado|Leve  |Joven        |
|6         |27  |ITAGUI   |Relacionado|Leve  |Adulto       |
|7         |85  |CARTAGENA|Importado  |Leve  |Adulto       |
|8         |22  |BOGOTA   |Importado  |Leve  |Joven        |
|9         |28  |BOGOTA   |Importado  |Leve  |Adulto       |
|10        |36  |BOGOTA   |Importado  |Leve  |Adulto       |
+----------+----+---------+-----------+------+-------------+
only showing top 10 rows



In [38]:
#pandas udf
from pyspark.sql.functions import pandas_udf, PandasUDFType, col, desc, avg

In [39]:
# Agrupar por departamento y contar los casos
top_departamentos = df.groupBy("Nombre departamento").count().orderBy(desc("count")).limit(10)
top_departamentos.show()

+-------------------+-----+
|Nombre departamento|count|
+-------------------+-----+
|             BOGOTA|  401|
|              VALLE|  148|
|          ANTIOQUIA|  106|
|       CUNDINAMARCA|   49|
|          CARTAGENA|   39|
|          RISARALDA|   34|
|       BARRANQUILLA|   31|
|              HUILA|   30|
|            QUINDIO|   23|
|    NORTE SANTANDER|   19|
+-------------------+-----+



In [40]:
# Agrupar por ciudad (En el este caso, es Municipio.) y contar los casos
top_ciudad = df.groupBy("Nombre municipio").count().orderBy(desc("count")).limit(10)
top_ciudad.show()

+----------------+-----+
|Nombre municipio|count|
+----------------+-----+
|          BOGOTA|  401|
|            CALI|  101|
|        MEDELLIN|   63|
|       CARTAGENA|   39|
|    BARRANQUILLA|   31|
|           NEIVA|   27|
|         PEREIRA|   25|
|         PALMIRA|   22|
|      VALLEDUPAR|   16|
|         ARMENIA|   15|
+----------------+-----+



In [41]:
# Agrupar por Fecha de registro y contar los casos.
top_fecha = df.groupBy("fecha reporte web").count().orderBy(desc("count")).limit(10)
top_fecha.show()

+-----------------+-----+
|fecha reporte web|count|
+-----------------+-----+
|31/3/2020 0:00:00|  107|
|24/3/2020 0:00:00|  105|
|30/3/2020 0:00:00|   96|
| 1/4/2020 0:00:00|   95|
|29/3/2020 0:00:00|   94|
|23/3/2020 0:00:00|   73|
|28/3/2020 0:00:00|   69|
|25/3/2020 0:00:00|   61|
|27/3/2020 0:00:00|   49|
|20/3/2020 0:00:00|   48|
+-----------------+-----+



In [42]:
# Agrupar por Edad y contar los casos.
# Edades diferentes en el dataset.
edades_diferentes = df.select('Edad').distinct().count()
edades = df.groupBy("Edad").count().orderBy(desc("count")).limit(edades_diferentes)
edades.show(edades_diferentes)

+----+-----+
|Edad|count|
+----+-----+
|  30|   33|
|  28|   31|
|  32|   30|
|  26|   29|
|  29|   28|
|  33|   28|
|  50|   27|
|  42|   24|
|  35|   23|
|  45|   23|
|  58|   23|
|  38|   22|
|  25|   22|
|  21|   22|
|  31|   21|
|  24|   21|
|  34|   20|
|  47|   20|
|  54|   20|
|  41|   20|
|  37|   20|
|  51|   20|
|  36|   19|
|  57|   18|
|  39|   18|
|  27|   17|
|  44|   16|
|  61|   16|
|  23|   16|
|  22|   15|
|  48|   15|
|  55|   14|
|  49|   14|
|  40|   13|
|  64|   13|
|  62|   13|
|  46|   13|
|  52|   12|
|  43|   12|
|  56|   12|
|  65|   11|
|  53|   11|
|  19|   11|
|  18|   11|
|  63|   10|
|  72|    9|
|  59|    9|
|  60|    9|
|  68|    9|
|  73|    8|
|  70|    8|
|  16|    6|
|  20|    6|
|  69|    6|
|  80|    6|
|  67|    6|
|  74|    6|
|  15|    5|
|   9|    5|
|  84|    5|
|  66|    5|
|  82|    4|
|   2|    4|
|  81|    3|
|  76|    3|
|  17|    3|
|  83|    3|
|  71|    3|
|  79|    3|
|  85|    2|
|  12|    2|
|   6|    2|
|  88|    2|
|  87|    2|

In [43]:
# Obtener los 5 departamentos con más casos
top_5_departamentos = df.groupBy("Nombre departamento").count().orderBy(desc("count")).limit(5)

# Filtrar el DataFrame original para incluir solo los 5 departamentos principales
df_top_5 = df.join(top_5_departamentos, on="Nombre departamento", how="inner")

promedio_edad_top_5 = df_top_5.groupBy("Nombre departamento").agg(avg("Edad").alias("Promedio_Edad")).orderBy(desc("Promedio_Edad"))
promedio_edad_top_5.show()

+-------------------+------------------+
|Nombre departamento|     Promedio_Edad|
+-------------------+------------------+
|             BOGOTA| 43.01496259351621|
|       CUNDINAMARCA|42.734693877551024|
|              VALLE| 41.99324324324324|
|          ANTIOQUIA|40.386792452830186|
|          CARTAGENA| 40.05128205128205|
+-------------------+------------------+



Con SparkSQL

In [44]:
# Creamos la tabla 'Covid19' con el contenido del dataframe ya establecido antes 'df_with_new_column'.
df.createOrReplaceTempView('Covid19')

In [45]:
# Mostramos el contenido de las 10 primeras entradas de la tabla 'Covid19'.
spark.sql('SELECT * from Covid19').show(10)

+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+--------------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+
|fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|     Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|
+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----

In [46]:
# Realizamos la query para optener los 10 departamentos con más casos de covid en colombia ordenados de mayor a menor.
spark.sql('SELECT `Nombre departamento`, COUNT(*) as Total_Casos FROM Covid19 GROUP BY `Nombre departamento` ORDER BY Total_Casos DESC LIMIT 10').show()

+-------------------+-----------+
|Nombre departamento|Total_Casos|
+-------------------+-----------+
|             BOGOTA|        401|
|              VALLE|        148|
|          ANTIOQUIA|        106|
|       CUNDINAMARCA|         49|
|          CARTAGENA|         39|
|          RISARALDA|         34|
|       BARRANQUILLA|         31|
|              HUILA|         30|
|            QUINDIO|         23|
|    NORTE SANTANDER|         19|
+-------------------+-----------+



In [47]:
# Realizamos la query para optener las 10 ciudades (municipios) con más casos de covid en Colombia Ordenados de mayor a menor.
spark.sql('SELECT `Nombre municipio`, COUNT(*) as Total_Casos FROM Covid19 GROUP BY `Nombre municipio` ORDER BY Total_Casos DESC LIMIT 10').show()

+----------------+-----------+
|Nombre municipio|Total_Casos|
+----------------+-----------+
|          BOGOTA|        401|
|            CALI|        101|
|        MEDELLIN|         63|
|       CARTAGENA|         39|
|    BARRANQUILLA|         31|
|           NEIVA|         27|
|         PEREIRA|         25|
|         PALMIRA|         22|
|      VALLEDUPAR|         16|
|         ARMENIA|         15|
+----------------+-----------+



In [48]:
# Realizamos la query para optener los 10 días con más casos de covid en Colombia Ordenados de mayor a menor.
spark.sql('SELECT `fecha reporte web`, COUNT(*) as Total_Casos FROM Covid19 GROUP BY `fecha reporte web` ORDER BY Total_Casos DESC LIMIT 10').show()

+-----------------+-----------+
|fecha reporte web|Total_Casos|
+-----------------+-----------+
|31/3/2020 0:00:00|        107|
|24/3/2020 0:00:00|        105|
|30/3/2020 0:00:00|         96|
| 1/4/2020 0:00:00|         95|
|29/3/2020 0:00:00|         94|
|23/3/2020 0:00:00|         73|
|28/3/2020 0:00:00|         69|
|25/3/2020 0:00:00|         61|
|27/3/2020 0:00:00|         49|
|20/3/2020 0:00:00|         48|
+-----------------+-----------+



In [49]:
# Realizar la query para optener la distribución de los casos pro edades de covid en Colombia.
spark.sql('SELECT Edad, COUNT(*) as Total_Casos FROM Covid19 GROUP BY Edad ORDER BY Edad').show()

+----+-----------+
|Edad|Total_Casos|
+----+-----------+
|   1|          1|
|   2|          4|
|   3|          1|
|   4|          1|
|   5|          1|
|   6|          2|
|   8|          1|
|   9|          5|
|  12|          2|
|  13|          1|
|  15|          5|
|  16|          6|
|  17|          3|
|  18|         11|
|  19|         11|
|  20|          6|
|  21|         22|
|  22|         15|
|  23|         16|
|  24|         21|
+----+-----------+
only showing top 20 rows



In [50]:
# Realizar query para optener el promedio de eddes de los casos registados en los 5 departamentos con más casos.
query = """
  SELECT `Nombre departamento`, Promedio_Edad
    FROM (SELECT cd.`Nombre departamento`, COUNT(*) AS total_casos, AVG(cd.Edad) AS Promedio_Edad
    FROM Covid19 cd
    GROUP BY cd.`Nombre departamento`
    ORDER BY total_casos DESC
    LIMIT 5
  ) subquery
"""
result = spark.sql(query)
result.show()

+-------------------+------------------+
|Nombre departamento|     Promedio_Edad|
+-------------------+------------------+
|             BOGOTA| 43.01496259351621|
|              VALLE| 41.99324324324324|
|          ANTIOQUIA|40.386792452830186|
|       CUNDINAMARCA|42.734693877551024|
|          CARTAGENA| 40.05128205128205|
+-------------------+------------------+



In [51]:
#target directory
direc='/content/gdrive/MyDrive/st0263-labs/datasets/lab4/df.csv'

In [97]:
#save the dataframe as single csv
df_rename.coalesce(1).write.format("csv").option("header","true").save(direc)