# Data Processing using Pyspark

In [None]:
#configuración en google colab de spark y pyspark
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
#instalar java y spark
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
!tar xf spark-3.3.1-bin-hadoop3.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [None]:
spark

In [None]:
sc

In [None]:
# Load csv Dataset 
#df=spark.read.csv('s3://<bucket/dir>/sample_data.csv',inferSchema=True,header=True)
df=spark.read.csv('/content/gdrive/MyDrive/st0263-2266/bigdata/datasets/covid19/Casos_positivos_de_COVID-19_en_Colombia-100K.csv',inferSchema=True,header=True)

In [None]:
#Columnas de la tabla
df.columns

['fecha reporte web',
 'ID de caso',
 'Fecha de notificación',
 'Código DIVIPOLA departamento',
 'Nombre departamento',
 'Código DIVIPOLA municipio',
 'Nombre municipio',
 'Edad',
 'Unidad de medida de edad',
 'Sexo',
 'Tipo de contagio',
 'Ubicación del caso',
 'Estado',
 'Código ISO del país',
 'Nombre del país',
 'Recuperado',
 'Fecha de inicio de síntomas',
 'Fecha de muerte',
 'Fecha de diagnóstico',
 'Fecha de recuperación',
 'Tipo de recuperación',
 'Pertenencia étnica',
 'Nombre del grupo étnico']

In [None]:
#printSchema
df.printSchema()

root
 |-- fecha reporte web: string (nullable = true)
 |-- ID de caso: integer (nullable = true)
 |-- Fecha de notificación: string (nullable = true)
 |-- Código DIVIPOLA departamento: integer (nullable = true)
 |-- Nombre departamento: string (nullable = true)
 |-- Código DIVIPOLA municipio: integer (nullable = true)
 |-- Nombre municipio: string (nullable = true)
 |-- Edad: integer (nullable = true)
 |-- Unidad de medida de edad: integer (nullable = true)
 |-- Sexo: string (nullable = true)
 |-- Tipo de contagio: string (nullable = true)
 |-- Ubicación del caso: string (nullable = true)
 |-- Estado: string (nullable = true)
 |-- Código ISO del país: integer (nullable = true)
 |-- Nombre del país: string (nullable = true)
 |-- Recuperado: string (nullable = true)
 |-- Fecha de inicio de síntomas: string (nullable = true)
 |-- Fecha de muerte: string (nullable = true)
 |-- Fecha de diagnóstico: string (nullable = true)
 |-- Fecha de recuperación: string (nullable = true)
 |-- Tipo de r

In [58]:
#Seleccion de columnas
df.select('Nombre municipio', 'Código ISO del país', 'Pertenencia étnica').show(7)

+----------------+-------------------+------------------+
|Nombre municipio|Código ISO del país|Pertenencia étnica|
+----------------+-------------------+------------------+
|          BOGOTA|                380|                 6|
|            BUGA|                724|                 5|
|        MEDELLIN|                724|                 6|
|        MEDELLIN|               null|                 6|
|        MEDELLIN|               null|                 6|
|          ITAGUI|               null|                 6|
|       CARTAGENA|                840|                 6|
+----------------+-------------------+------------------+
only showing top 7 rows



In [59]:
#Cambio de nombre de columna
df.select('Nombre departamento').withColumnRenamed('Nombre Departamento', 'Dep').show(6)

+---------+
|      Dep|
+---------+
|   BOGOTA|
|    VALLE|
|ANTIOQUIA|
|ANTIOQUIA|
|ANTIOQUIA|
|ANTIOQUIA|
+---------+
only showing top 6 rows



In [60]:
#Añadir una columna
df.select('ID de caso', 'Fecha de notificación', 'Edad').withColumn("Edad luego 3 años",(df["Edad"]+3)).show(7,False)

+----------+---------------------+----+-----------------+
|ID de caso|Fecha de notificación|Edad|Edad luego 3 años|
+----------+---------------------+----+-----------------+
|1         |2/3/2020 0:00:00     |19  |22               |
|2         |6/3/2020 0:00:00     |34  |37               |
|3         |7/3/2020 0:00:00     |50  |53               |
|4         |9/3/2020 0:00:00     |55  |58               |
|5         |9/3/2020 0:00:00     |25  |28               |
|6         |10/3/2020 0:00:00    |27  |30               |
|7         |8/3/2020 0:00:00     |85  |88               |
+----------+---------------------+----+-----------------+
only showing top 7 rows



In [61]:
#Eliminar una columna
df.select('Pertenencia étnica', 'ID de caso', 'Edad', 'Sexo').drop('Pertenencia étnica', 'Sexo').show(9)


+----------+----+
|ID de caso|Edad|
+----------+----+
|         1|  19|
|         2|  34|
|         3|  50|
|         4|  55|
|         5|  25|
|         6|  27|
|         7|  85|
|         8|  22|
|         9|  28|
+----------+----+
only showing top 9 rows



In [62]:
#Filtrar las filas
df.filter(df['Nombre municipio']=='MEDELLIN').show(8)

+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+---------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+
|fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|
+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+---------

In [63]:
# UDF
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
region = udf(lambda reg: "PAISA" if reg == "MEDELLIN" else "RESTO DEL PAIS", StringType())
#apply udf on dataframe
df.withColumn("REGION", region(df['Nombre municipio'])).show(7,False)

+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+-------------------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+--------------+
|fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|Nombre del país          |Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|REGION        |
+-----------------+----------+---------------------+----------------------------+-------------------+-------

In [64]:
#Más victimas por departamento
df.groupBy('Nombre departamento').count().orderBy('count',ascending=False).show(10,False)

+-------------------+-----+
|Nombre departamento|count|
+-------------------+-----+
|BOGOTA             |30016|
|BARRANQUILLA       |13065|
|ATLANTICO          |10994|
|VALLE              |10404|
|CARTAGENA          |8333 |
|ANTIOQUIA          |4554 |
|NARIÑO             |3520 |
|CUNDINAMARCA       |2827 |
|AMAZONAS           |2317 |
|CHOCO              |1636 |
+-------------------+-----+
only showing top 10 rows



In [65]:
#Más victimas por municipio
df.groupBy('Nombre municipio').count().orderBy('count',ascending=False).show(10,False)

+----------------+-----+
|Nombre municipio|count|
+----------------+-----+
|BOGOTA          |30016|
|BARRANQUILLA    |13065|
|CARTAGENA       |8333 |
|CALI            |7747 |
|SOLEDAD         |6233 |
|LETICIA         |2194 |
|MEDELLIN        |2137 |
|TUMACO          |1501 |
|BUENAVENTURA    |1453 |
|QUIBDO          |1367 |
+----------------+-----+
only showing top 10 rows



In [66]:
#Más victimas por día
df.groupBy('Fecha de notificación').count().orderBy('count',ascending=False).show(10,False)

+---------------------+-----+
|Fecha de notificación|count|
+---------------------+-----+
|18/6/2020 0:00:00    |3477 |
|19/6/2020 0:00:00    |3328 |
|17/6/2020 0:00:00    |3318 |
|16/6/2020 0:00:00    |3232 |
|23/6/2020 0:00:00    |3230 |
|11/6/2020 0:00:00    |2747 |
|20/6/2020 0:00:00    |2684 |
|12/6/2020 0:00:00    |2679 |
|10/6/2020 0:00:00    |2650 |
|24/6/2020 0:00:00    |2599 |
+---------------------+-----+
only showing top 10 rows



In [67]:
#Más casos por edad
df.groupBy('Edad').count().orderBy('Edad',ascending=True).show()

+----+-----+
|Edad|count|
+----+-----+
|   1|  485|
|   2|  440|
|   3|  449|
|   4|  373|
|   5|  425|
|   6|  431|
|   7|  442|
|   8|  461|
|   9|  467|
|  10|  530|
|  11|  566|
|  12|  562|
|  13|  531|
|  14|  580|
|  15|  560|
|  16|  600|
|  17|  685|
|  18| 1160|
|  19| 1567|
|  20| 1674|
+----+-----+
only showing top 20 rows



In [34]:
#Numero de casos agrupados por sexo
df.groupBy('Sexo').count().orderBy('count',ascending=True).show()

+----+-----+
|Sexo|count|
+----+-----+
|   F|45902|
|   M|54098|
+----+-----+



In [68]:
# In Python
from pyspark.sql import SparkSession
# Create a SparkSession
spark = (SparkSession
  .builder
  .appName("SparkSQLExampleApp")
  .getOrCreate())
csv_file = "/content/gdrive/MyDrive/st0263-2266/bigdata/datasets/covid19/Casos_positivos_de_COVID-19_en_Colombia-100K.csv"
df = (spark.read.format("csv")
  .option("inferSchema", "true")
  .option("header", "true")
  .load(csv_file))
df.createOrReplaceTempView("tabla")

In [69]:
#Más victimas por departamento
spark.sql("""SELECT `Nombre departamento`, COUNT(*) FROM tabla GROUP BY `Nombre departamento` ORDER BY 2 DESC""").show()

+-------------------+--------+
|Nombre departamento|count(1)|
+-------------------+--------+
|             BOGOTA|   30016|
|       BARRANQUILLA|   13065|
|          ATLANTICO|   10994|
|              VALLE|   10404|
|          CARTAGENA|    8333|
|          ANTIOQUIA|    4554|
|             NARIÑO|    3520|
|       CUNDINAMARCA|    2827|
|           AMAZONAS|    2317|
|              CHOCO|    1636|
|              SUCRE|    1317|
|               META|    1269|
|              CESAR|    1056|
|     STA MARTA D.E.|    1022|
|             TOLIMA|    1013|
|            BOLIVAR|     998|
|          MAGDALENA|     869|
|            CORDOBA|     807|
|          SANTANDER|     756|
|          RISARALDA|     545|
+-------------------+--------+
only showing top 20 rows



In [70]:
#Más victimas por  Municipio
spark.sql("""SELECT `Nombre municipio`, COUNT(*) FROM tabla GROUP BY `Nombre municipio` ORDER BY 2 DESC""").show()

+----------------+--------+
|Nombre municipio|count(1)|
+----------------+--------+
|          BOGOTA|   30016|
|    BARRANQUILLA|   13065|
|       CARTAGENA|    8333|
|            CALI|    7747|
|         SOLEDAD|    6233|
|         LETICIA|    2194|
|        MEDELLIN|    2137|
|          TUMACO|    1501|
|    BUENAVENTURA|    1453|
|          QUIBDO|    1367|
|         MALAMBO|    1289|
|   VILLAVICENCIO|    1176|
|          SOACHA|    1071|
|       SINCELEJO|    1036|
|     SANTA MARTA|    1022|
|           PASTO|     707|
|          GALAPA|     702|
|      VALLEDUPAR|     653|
|         ESPINAL|     566|
|         ITUANGO|     565|
+----------------+--------+
only showing top 20 rows



In [71]:
#Más victimas por día
spark.sql("""SELECT `Fecha de notificación`, COUNT(*) FROM tabla GROUP BY `Fecha de notificación` ORDER BY 2 DESC""").show()

+---------------------+--------+
|Fecha de notificación|count(1)|
+---------------------+--------+
|    18/6/2020 0:00:00|    3477|
|    19/6/2020 0:00:00|    3328|
|    17/6/2020 0:00:00|    3318|
|    16/6/2020 0:00:00|    3232|
|    23/6/2020 0:00:00|    3230|
|    11/6/2020 0:00:00|    2747|
|    20/6/2020 0:00:00|    2684|
|    12/6/2020 0:00:00|    2679|
|    10/6/2020 0:00:00|    2650|
|    24/6/2020 0:00:00|    2599|
|     8/6/2020 0:00:00|    2454|
|     9/6/2020 0:00:00|    2302|
|    13/6/2020 0:00:00|    2198|
|    25/6/2020 0:00:00|    2030|
|     5/6/2020 0:00:00|    1963|
|    21/6/2020 0:00:00|    1953|
|    22/6/2020 0:00:00|    1908|
|     3/6/2020 0:00:00|    1850|
|     6/6/2020 0:00:00|    1756|
|     4/6/2020 0:00:00|    1739|
+---------------------+--------+
only showing top 20 rows



In [56]:
spark.sql("""SELECT Edad, COUNT(*) FROM tabla GROUP BY Edad  ORDER BY 1 ASC""").show()

+----+--------+
|Edad|count(1)|
+----+--------+
|   1|     485|
|   2|     440|
|   3|     449|
|   4|     373|
|   5|     425|
|   6|     431|
|   7|     442|
|   8|     461|
|   9|     467|
|  10|     530|
|  11|     566|
|  12|     562|
|  13|     531|
|  14|     580|
|  15|     560|
|  16|     600|
|  17|     685|
|  18|    1160|
|  19|    1567|
|  20|    1674|
+----+--------+
only showing top 20 rows



In [57]:
#Numero de casos agrupados por sexo
spark.sql("""SELECT Sexo, COUNT(*) FROM tabla GROUP BY Sexo  ORDER BY 2 DESC""").show()

+----+--------+
|Sexo|count(1)|
+----+--------+
|   M|   54098|
|   F|   45902|
+----+--------+

