## 0.&nbsp;Instalación Spark

In [1]:
# Instalar SDK Java 8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Descargar Spark 3.2.3
!wget -q https://archive.apache.org/dist/spark/spark-3.2.3/spark-3.2.3-bin-hadoop3.2.tgz

# Descomprimir el archivo descargado de Spark
!tar xf spark-3.2.3-bin-hadoop3.2.tgz

# Establecer las variables de entorno
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.3-bin-hadoop3.2"

# Instalar la librería findspark
!pip install -q findspark

# Instalar pyspark
!pip install -q pyspark==3.2.3

## 1.&nbsp;Spark Session

In [2]:
# Find Spark
import findspark
findspark.init()

# Spark Session
from pyspark.sql import SparkSession

# SparkContext
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

## 2.&nbsp;Ejercicios

In [3]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType

In [4]:
"""
Ejercicio 1: A partir del archivo csv Case, determine las tres ciudades con más
casos confirmados de la enfermedad. La salida debe contener tres columnas:
provincia, ciudad y casos confirmados. El resultado debe contener exactamente los
tres nombres de las ciudades con más casos confirmados ya que no se admiten otros valores.
"""
case_df = spark.read.option("header", "true").option("inferSchema", "true").csv("Case.csv")
case_df.show()

+--------+--------+---------------+-----+--------------------+---------+---------+----------+
| case_id|province|           city|group|      infection_case|confirmed| latitude| longitude|
+--------+--------+---------------+-----+--------------------+---------+---------+----------+
| 1000001|   Seoul|     Yongsan-gu| true|       Itaewon Clubs|      139|37.538621|126.992652|
| 1000002|   Seoul|      Gwanak-gu| true|             Richway|      119| 37.48208|126.901384|
| 1000003|   Seoul|        Guro-gu| true| Guro-gu Call Center|       95|37.508163|126.884387|
| 1000004|   Seoul|   Yangcheon-gu| true|Yangcheon Table T...|       43|37.546061|126.874209|
| 1000005|   Seoul|      Dobong-gu| true|     Day Care Center|       43|37.679422|127.044374|
| 1000006|   Seoul|        Guro-gu| true|Manmin Central Ch...|       41|37.481059|126.894343|
| 1000007|   Seoul|from other city| true|SMR Newly Planted...|       36|        -|         -|
| 1000008|   Seoul|  Dongdaemun-gu| true|       Dongan Churc

In [5]:
case_df.printSchema()

root
 |--  case_id: integer (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- group: boolean (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- confirmed: integer (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)



In [6]:
# Se ajusta el nombre de la columna case_id
case_df = case_df.withColumnRenamed(" case_id", "case_id")
case_df.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- group: boolean (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- confirmed: integer (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)



In [7]:
from pyspark.sql.functions import col, desc
case_df.orderBy(desc("confirmed")).show()

+-------+-----------------+---------------+-----+--------------------+---------+---------+----------+
|case_id|         province|           city|group|      infection_case|confirmed| latitude| longitude|
+-------+-----------------+---------------+-----+--------------------+---------+---------+----------+
|1200001|            Daegu|         Nam-gu| true|  Shincheonji Church|     4511| 35.84008|  128.5667|
|1200009|            Daegu|              -|false|contact with patient|      917|        -|         -|
|1200010|            Daegu|              -|false|                 etc|      747|        -|         -|
|6000001| Gyeongsangbuk-do|from other city| true|  Shincheonji Church|      566|        -|         -|
|2000020|      Gyeonggi-do|              -|false|     overseas inflow|      305|        -|         -|
|1000036|            Seoul|              -|false|     overseas inflow|      298|        -|         -|
|1200002|            Daegu|   Dalseong-gun| true|Second Mi-Ju Hosp...|      196|35

In [8]:
case_df_top_cities = case_df.\
  where((col("city") != "-") & (col("city") != "from other city")).\
  orderBy(desc("confirmed")).select("province", "city", "confirmed").limit(3)
case_df_top_cities.show()

+--------+------------+---------+
|province|        city|confirmed|
+--------+------------+---------+
|   Daegu|      Nam-gu|     4511|
|   Daegu|Dalseong-gun|      196|
|   Seoul|  Yongsan-gu|      139|
+--------+------------+---------+



In [9]:
"""
Ejercicio 2: Cree un dataframe a partir del archivo csv PatientInfo.
Asegúrese de que su dataframe no contenga pacientes duplicados.
"""
patient_info_df = spark.read.option("header", "true").option("inferSchema", "true").csv("PatientInfo.csv")
patient_info_df.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|         null|released|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|         null|released|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-gu|contact with patient| 2002000001|            17|              null|    2020-01-30|

In [10]:
patient_info_df.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: string (nullable = true)
 |-- released_date: string (nullable = true)
 |-- deceased_date: string (nullable = true)
 |-- state: string (nullable = true)



In [11]:
# Cantidad de registros en la columna patient_id
patient_info_df.select(col("patient_id")).count()

5165

In [12]:
# Cantidad de registros ÚNICOS en la columna patient_id
patient_info_df.select(col("patient_id")).distinct().count()

5164

In [13]:
# Se infiere que hay un paciente duplicado, por tanto se procede a eliminar el registro con al paciente duplicado
patient_info_df = patient_info_df.dropDuplicates(["patient_id"])
patient_info_df.select(col("patient_id")).count()

5164

In [14]:
"""
a. ¿Cuántos pacientes tienen informado por quién se contagiaron(columna infected_by)?
Obtenga solo los pacientes que tengan informado por quién se contagiaron.
"""
patients_that_reported = patient_info_df.dropna(subset=["infected_by"])
print(patients_that_reported.count())

1346


In [15]:
"""
b. A partir de la salida del inciso anterior obtenga solo los pacientes femeninos.
La salida no debe contener las columnas released_date y deceased_date.
"""
patiens_that_reported_fem = patients_that_reported.where(col("sex") == "female").drop("released_date").drop("deceased_date")
patiens_that_reported_fem.show()

+----------+------+---+-------+--------+-------------+--------------------+-----------+--------------+------------------+--------------+--------+
|patient_id|   sex|age|country|province|         city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|   state|
+----------+------+---+-------+--------+-------------+--------------------+-----------+--------------+------------------+--------------+--------+
|1000000005|female|20s|  Korea|   Seoul|  Seongbuk-gu|contact with patient| 1000000002|             2|              null|    2020-01-31|released|
|1000000006|female|50s|  Korea|   Seoul|    Jongno-gu|contact with patient| 1000000003|            43|              null|    2020-01-31|released|
|1000000010|female|60s|  Korea|   Seoul|  Seongbuk-gu|contact with patient| 1000000003|             6|              null|    2020-02-05|released|
|1000000014|female|60s|  Korea|   Seoul|    Jongno-gu|contact with patient| 1000000013|            27|        2020-02-06|   

In [16]:
# Se confirma que solo esten valores "female" en la columna "sex"
patiens_that_reported_fem.select("sex").distinct().show()

+------+
|   sex|
+------+
|female|
+------+



In [17]:
"""
c. Establezca el número de particiones del dataframe resultante del inciso anterior en dos.
Escriba el dataframe resultante en un archivo parquet.
La salida debe estar particionada por la provincia y el modo de escritura debe ser overwrite.
"""
output_df = patiens_that_reported_fem.repartition(2)
output_df.write.partitionBy("province").format("parquet").mode("overwrite").parquet("./data/output")

In [None]:
from google.colab import drive
drive.mount('/content/drive')