# Data Processing using Pyspark

In [None]:
#configuración en google colab de spark y pyspark
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
#instalar java y spark
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
!tar xf spark-3.3.1-bin-hadoop3.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [None]:
spark

In [None]:
sc

In [None]:
# Load csv Dataset 
#df=spark.read.csv('s3://<bucket/dir>/sample_data.csv',inferSchema=True,header=True)
df=spark.read.csv('/content/gdrive/MyDrive/st0263-2266/bigdata/datasets/covid19/Casos_positivos_de_COVID-19_en_Colombia-100K.csv',inferSchema=True,header=True)

In [None]:
#columns of dataframe
df.columns

['fecha reporte web',
 'ID de caso',
 'Fecha de notificación',
 'Código DIVIPOLA departamento',
 'Nombre departamento',
 'Código DIVIPOLA municipio',
 'Nombre municipio',
 'Edad',
 'Unidad de medida de edad',
 'Sexo',
 'Tipo de contagio',
 'Ubicación del caso',
 'Estado',
 'Código ISO del país',
 'Nombre del país',
 'Recuperado',
 'Fecha de inicio de síntomas',
 'Fecha de muerte',
 'Fecha de diagnóstico',
 'Fecha de recuperación',
 'Tipo de recuperación',
 'Pertenencia étnica',
 'Nombre del grupo étnico']

In [None]:
#printSchema
df.printSchema()

root
 |-- fecha reporte web: string (nullable = true)
 |-- ID de caso: integer (nullable = true)
 |-- Fecha de notificación: string (nullable = true)
 |-- Código DIVIPOLA departamento: integer (nullable = true)
 |-- Nombre departamento: string (nullable = true)
 |-- Código DIVIPOLA municipio: integer (nullable = true)
 |-- Nombre municipio: string (nullable = true)
 |-- Edad: integer (nullable = true)
 |-- Unidad de medida de edad: integer (nullable = true)
 |-- Sexo: string (nullable = true)
 |-- Tipo de contagio: string (nullable = true)
 |-- Ubicación del caso: string (nullable = true)
 |-- Estado: string (nullable = true)
 |-- Código ISO del país: integer (nullable = true)
 |-- Nombre del país: string (nullable = true)
 |-- Recuperado: string (nullable = true)
 |-- Fecha de inicio de síntomas: string (nullable = true)
 |-- Fecha de muerte: string (nullable = true)
 |-- Fecha de diagnóstico: string (nullable = true)
 |-- Fecha de recuperación: string (nullable = true)
 |-- Tipo de r

In [None]:
df.select('fecha reporte web','Fecha de notificación','Ubicación del caso','Fecha de muerte').show(5)

+-----------------+---------------------+------------------+---------------+
|fecha reporte web|Fecha de notificación|Ubicación del caso|Fecha de muerte|
+-----------------+---------------------+------------------+---------------+
| 6/3/2020 0:00:00|     2/3/2020 0:00:00|              Casa|           null|
| 9/3/2020 0:00:00|     6/3/2020 0:00:00|              Casa|           null|
| 9/3/2020 0:00:00|     7/3/2020 0:00:00|              Casa|           null|
|11/3/2020 0:00:00|     9/3/2020 0:00:00|              Casa|           null|
|11/3/2020 0:00:00|     9/3/2020 0:00:00|              Casa|           null|
+-----------------+---------------------+------------------+---------------+
only showing top 5 rows



In [None]:
df.select('Fecha de notificación','Recuperado','Edad').withColumn("Edad despues de 10 años",(df["Edad"]+10)).show(10,False)

+---------------------+----------+----+-----------------------+
|Fecha de notificación|Recuperado|Edad|Edad despues de 10 años|
+---------------------+----------+----+-----------------------+
|2/3/2020 0:00:00     |Recuperado|19  |29                     |
|6/3/2020 0:00:00     |Recuperado|34  |44                     |
|7/3/2020 0:00:00     |Recuperado|50  |60                     |
|9/3/2020 0:00:00     |Recuperado|55  |65                     |
|9/3/2020 0:00:00     |Recuperado|25  |35                     |
|10/3/2020 0:00:00    |Recuperado|27  |37                     |
|8/3/2020 0:00:00     |Recuperado|85  |95                     |
|9/3/2020 0:00:00     |Recuperado|22  |32                     |
|8/3/2020 0:00:00     |Recuperado|28  |38                     |
|12/3/2020 0:00:00    |Recuperado|36  |46                     |
+---------------------+----------+----+-----------------------+
only showing top 10 rows



In [None]:
df.select('ID de caso','Fecha de notificación','Recuperado','Edad').drop('Fecha de notificación').show(5)

+----------+----------+----+
|ID de caso|Recuperado|Edad|
+----------+----------+----+
|         1|Recuperado|  19|
|         2|Recuperado|  34|
|         3|Recuperado|  50|
|         4|Recuperado|  55|
|         5|Recuperado|  25|
+----------+----------+----+
only showing top 5 rows



In [None]:
df.filter(df['Edad']>30).show(5)

+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+--------------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+
|fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|     Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|
+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----

In [None]:
df.select('Nombre departamento','Recuperado').withColumnRenamed('Recuperado','sano').show(5)

+-------------------+----------+
|Nombre departamento|      sano|
+-------------------+----------+
|             BOGOTA|Recuperado|
|              VALLE|Recuperado|
|          ANTIOQUIA|Recuperado|
|          ANTIOQUIA|Recuperado|
|          ANTIOQUIA|Recuperado|
+-------------------+----------+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import udf

In [None]:
from pyspark.sql.types import StringType,DoubleType,IntegerType

In [None]:
def realizar_cuarentena(dep):
  if dep == "Recuperado":
    return "No cuarentena"
  else:
    return "Realizar cuarentena"

In [None]:
obj_udf=udf(realizar_cuarentena,StringType())
df.select('ID de caso','Edad','Recuperado').withColumn('Realizar cuarentena',obj_udf(df['Recuperado'])).show(10)

+----------+----+----------+-------------------+
|ID de caso|Edad|Recuperado|Realizar cuarentena|
+----------+----+----------+-------------------+
|         1|  19|Recuperado|      No cuarentena|
|         2|  34|Recuperado|      No cuarentena|
|         3|  50|Recuperado|      No cuarentena|
|         4|  55|Recuperado|      No cuarentena|
|         5|  25|Recuperado|      No cuarentena|
|         6|  27|Recuperado|      No cuarentena|
|         7|  85|Recuperado|      No cuarentena|
|         8|  22|Recuperado|      No cuarentena|
|         9|  28|Recuperado|      No cuarentena|
|        10|  36|Recuperado|      No cuarentena|
+----------+----+----------+-------------------+
only showing top 10 rows



In [None]:
#Respuestas a las preguntas de la sección 3

In [75]:
#Dirección 
write_uri1='s3a://datamazapataj/Preguntas3/3_1'

In [76]:
#Departamentos con mayor casos de covid (dataframe)

df.groupBy('Nombre departamento').count().orderBy('count',ascending=False).show(10,False)

+-------------------+-----+
|Nombre departamento|count|
+-------------------+-----+
|BOGOTA             |30016|
|BARRANQUILLA       |13065|
|ATLANTICO          |10994|
|VALLE              |10404|
|CARTAGENA          |8333 |
|ANTIOQUIA          |4554 |
|NARIÑO             |3520 |
|CUNDINAMARCA       |2827 |
|AMAZONAS           |2317 |
|CHOCO              |1636 |
+-------------------+-----+
only showing top 10 rows



In [None]:
#Municipio con mayor casos de covid (dataframe)

df.groupBy('Nombre municipio').count().orderBy('count',ascending=False).show(10,False)

+----------------+-----+
|Nombre municipio|count|
+----------------+-----+
|BOGOTA          |30016|
|BARRANQUILLA    |13065|
|CARTAGENA       |8333 |
|CALI            |7747 |
|SOLEDAD         |6233 |
|LETICIA         |2194 |
|MEDELLIN        |2137 |
|TUMACO          |1501 |
|BUENAVENTURA    |1453 |
|QUIBDO          |1367 |
+----------------+-----+
only showing top 10 rows



In [None]:
#Fechas en las que hay mayor casos de covid (dataframe)

df.groupBy('Fecha de notificación').count().orderBy('count',ascending=False).show(10,False)

+---------------------+-----+
|Fecha de notificación|count|
+---------------------+-----+
|18/6/2020 0:00:00    |3477 |
|19/6/2020 0:00:00    |3328 |
|17/6/2020 0:00:00    |3318 |
|16/6/2020 0:00:00    |3232 |
|23/6/2020 0:00:00    |3230 |
|11/6/2020 0:00:00    |2747 |
|20/6/2020 0:00:00    |2684 |
|12/6/2020 0:00:00    |2679 |
|10/6/2020 0:00:00    |2650 |
|24/6/2020 0:00:00    |2599 |
+---------------------+-----+
only showing top 10 rows



In [None]:
#Distribución de casos por edades (dataframe)

df.groupBy('Edad').count().orderBy('Edad',ascending=True).show()

+----+-----+
|Edad|count|
+----+-----+
|   1|  485|
|   2|  440|
|   3|  449|
|   4|  373|
|   5|  425|
|   6|  431|
|   7|  442|
|   8|  461|
|   9|  467|
|  10|  530|
|  11|  566|
|  12|  562|
|  13|  531|
|  14|  580|
|  15|  560|
|  16|  600|
|  17|  685|
|  18| 1160|
|  19| 1567|
|  20| 1674|
+----+-----+
only showing top 20 rows



In [None]:
#Cuanta gente se ha recuperado, cuanta ha fallecido (dataframe)

df.groupBy('Recuperado').count().orderBy('count').show()

+----------+-----+
|Recuperado|count|
+----------+-----+
| fallecido|    5|
|       N/A|  505|
| Fallecido| 4658|
|Recuperado|94832|
+----------+-----+



In [None]:
#SparkSQL para las preguntas 3

In [None]:
# In Python
from pyspark.sql import SparkSession
# Create a SparkSession
spark = (SparkSession
  .builder
  .appName("SparkSQLExampleApp")
  .getOrCreate())
# Path to data set
csv_file = "/content/gdrive/MyDrive/st0263-2266/bigdata/datasets/covid19/Casos_positivos_de_COVID-19_en_Colombia-100K.csv"
# Read and create a temporary view
# Infer schema (note that for larger files you 
# may want to specify the schema)
df = (spark.read.format("csv")
  .option("inferSchema", "true")
  .option("header", "true")
  .load(csv_file))
df.createOrReplaceTempView("DbCovid")

In [None]:
#Distribución de casos por edades (SparkSQL)
spark.sql("""SELECT Edad, COUNT(*) FROM DbCovid GROUP BY Edad""").show(10)

+----+--------+
|Edad|count(1)|
+----+--------+
|  31|    2569|
|  85|     224|
|  65|     736|
|  53|    1307|
|  78|     346|
|  34|    2310|
| 101|       1|
|  81|     275|
|  28|    2540|
|  76|     361|
+----+--------+
only showing top 10 rows



In [None]:
#Municipio con mayores casos de covid (SparkSQL)
spark.sql("""SELECT `Nombre municipio`, COUNT(*) FROM DbCovid GROUP BY `Nombre municipio` ORDER BY 2  DESC""").show(10)

+----------------+--------+
|Nombre municipio|count(1)|
+----------------+--------+
|          BOGOTA|   30016|
|    BARRANQUILLA|   13065|
|       CARTAGENA|    8333|
|            CALI|    7747|
|         SOLEDAD|    6233|
|         LETICIA|    2194|
|        MEDELLIN|    2137|
|          TUMACO|    1501|
|    BUENAVENTURA|    1453|
|          QUIBDO|    1367|
+----------------+--------+
only showing top 10 rows



In [None]:
#Departamentos con mayores casos de covid (SparkSQL)
spark.sql("""SELECT `Nombre departamento`, COUNT(*) FROM covid GROUP BY `Nombre departamento` ORDER BY 2  DESC""").show(10)

+-------------------+--------+
|Nombre departamento|count(1)|
+-------------------+--------+
|             BOGOTA|   30016|
|       BARRANQUILLA|   13065|
|          ATLANTICO|   10994|
|              VALLE|   10404|
|          CARTAGENA|    8333|
|          ANTIOQUIA|    4554|
|             NARIÑO|    3520|
|       CUNDINAMARCA|    2827|
|           AMAZONAS|    2317|
|              CHOCO|    1636|
+-------------------+--------+
only showing top 10 rows



In [None]:
#Fechas con mayores casos de covid (SparkSQL)
spark.sql("""SELECT `Fecha de notificación`, COUNT(*) FROM covid GROUP BY `Fecha de notificación` ORDER BY 2  DESC""").show(10)

+---------------------+--------+
|Fecha de notificación|count(1)|
+---------------------+--------+
|    18/6/2020 0:00:00|    3477|
|    19/6/2020 0:00:00|    3328|
|    17/6/2020 0:00:00|    3318|
|    16/6/2020 0:00:00|    3232|
|    23/6/2020 0:00:00|    3230|
|    11/6/2020 0:00:00|    2747|
|    20/6/2020 0:00:00|    2684|
|    12/6/2020 0:00:00|    2679|
|    10/6/2020 0:00:00|    2650|
|    24/6/2020 0:00:00|    2599|
+---------------------+--------+
only showing top 10 rows



In [None]:
#Cuanta gente se ha recuperado, cuanta ha fallecido (SparkSQL)
spark.sql("""SELECT Recuperado, COUNT(*) FROM covid GROUP BY Recuperado ORDER BY 2  DESC""").show(10)

+----------+--------+
|Recuperado|count(1)|
+----------+--------+
|Recuperado|   94832|
| Fallecido|    4658|
|       N/A|     505|
| fallecido|       5|
+----------+--------+

