In [1]:
# Instalar SDK Java 8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
# Descargar Spark 3.4.3

!wget -q https://archive.apache.org/dist/spark/spark-3.4.3/spark-3.4.3-bin-hadoop3.tgz

In [3]:
# Descomprimir el archivo descargado de Spark

!tar xf spark-3.4.3-bin-hadoop3.tgz

In [4]:
# establecer las variables de entorno
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.3-bin-hadoop3"

In [5]:
# Instalar la librería findspark

!pip install -q findspark

In [6]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

Ejercicios

Los datos adjuntos a esta lección forman parte de la base de datos [NeurIPS 2020] Data Science for COVID-19 (DS4C) disponible en Kaggle. Estos datos hacen referencia a los casos de contagio de covid-19 en Corea del Sur.

El archivo csv Case contiene los casos reportados y el archivo csv PatientInfo contiene la información de los pacientes.

1.- A partir del archivo csv Case, determine las tres ciudades con más casos confirmados de la enfermedad. La salida debe contener tres columnas: provincia, ciudad y casos confirmados. El resultado debe contener exactamente los tres nombre de ciudades con más casos confirmados ya que no se admiten otros valores.

2.- Cree un dataframe a partir del archivo csv PatientInfo. Asegúrese de que su dataframe no contenga pacientes duplicados.

a) ¿Cuántos pacientes tienen informado por quién se contagiaron(columna infected_by)? Obtenga solo los pacientes que tengan informado por quién se contagiaron.

b) A partir de la salida del inciso anterior obtenga solo los pacientes femeninos. La salida no debe contener las columnas released_date y deceased_date.

c) Establezca el número de particiones del dataframe resultante del inciso anterior en dos. Escriba el dataframe resultante en un archivo parquet. La salida debe estar particionada por la provincia y el modo de escritura debe ser overwrite.

In [7]:
sc = spark.sparkContext

In [8]:
df = spark.read.option('header','true').csv('./data/Case.csv')

In [9]:
df.show(10)

+--------+--------+---------------+-----+--------------------+---------+---------+----------+
| case_id|province|           city|group|      infection_case|confirmed| latitude| longitude|
+--------+--------+---------------+-----+--------------------+---------+---------+----------+
| 1000001|   Seoul|     Yongsan-gu| TRUE|       Itaewon Clubs|      139|37.538621|126.992652|
| 1000002|   Seoul|      Gwanak-gu| TRUE|             Richway|      119| 37.48208|126.901384|
| 1000003|   Seoul|        Guro-gu| TRUE| Guro-gu Call Center|       95|37.508163|126.884387|
| 1000004|   Seoul|   Yangcheon-gu| TRUE|Yangcheon Table T...|       43|37.546061|126.874209|
| 1000005|   Seoul|      Dobong-gu| TRUE|     Day Care Center|       43|37.679422|127.044374|
| 1000006|   Seoul|        Guro-gu| TRUE|Manmin Central Ch...|       41|37.481059|126.894343|
| 1000007|   Seoul|from other city| TRUE|SMR Newly Planted...|       36|        -|         -|
| 1000008|   Seoul|  Dongdaemun-gu| TRUE|       Dongan Churc

In [10]:
from pyspark.sql.functions import col
df.select('group').distinct().show()

+-----+
|group|
+-----+
|FALSE|
| TRUE|
+-----+



In [11]:
df.printSchema()

root
 |--  case_id: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- group: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- confirmed: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)



In [12]:
from pyspark.sql.types import StructType , StructField , StringType, IntegerType, BooleanType, FloatType

schema = StructType(
    [
        StructField('case_id', StringType() , True),
        StructField('province', StringType() , True),
        StructField('city', StringType() , True),
        StructField('group', BooleanType() , True),
        StructField('infection_case', StringType() , True),
        StructField('confirmed', IntegerType() , True),
        StructField('latitude', FloatType() , True),
        StructField('longitude', FloatType() , True)

    ]
)

In [13]:
df1 = spark.read.option('header','true').schema(schema).csv('./data/Case.csv')

In [14]:
df1.show(10)

+-------+--------+---------------+-----+--------------------+---------+---------+----------+
|case_id|province|           city|group|      infection_case|confirmed| latitude| longitude|
+-------+--------+---------------+-----+--------------------+---------+---------+----------+
|1000001|   Seoul|     Yongsan-gu| true|       Itaewon Clubs|      139| 37.53862| 126.99265|
|1000002|   Seoul|      Gwanak-gu| true|             Richway|      119| 37.48208| 126.90138|
|1000003|   Seoul|        Guro-gu| true| Guro-gu Call Center|       95|37.508163|126.884384|
|1000004|   Seoul|   Yangcheon-gu| true|Yangcheon Table T...|       43|37.546062| 126.87421|
|1000005|   Seoul|      Dobong-gu| true|     Day Care Center|       43| 37.67942| 127.04437|
|1000006|   Seoul|        Guro-gu| true|Manmin Central Ch...|       41| 37.48106| 126.89434|
|1000007|   Seoul|from other city| true|SMR Newly Planted...|       36|     null|      null|
|1000008|   Seoul|  Dongdaemun-gu| true|       Dongan Church|       17

In [17]:
from pyspark.sql.functions import desc,col
df1.orderBy(desc('confirmed')).select('province','city','confirmed').where((col('city')!='-') & (col('city')!='from other city')).show(3)

+--------+------------+---------+
|province|        city|confirmed|
+--------+------------+---------+
|   Daegu|      Nam-gu|     4511|
|   Daegu|Dalseong-gun|      196|
|   Seoul|  Yongsan-gu|      139|
+--------+------------+---------+
only showing top 3 rows



In [18]:
dfej2 = spark.read.option('header','true').csv('./data/PatientInfo.csv')

In [19]:
dfej2.show(10)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|         null|released|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|         null|released|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|              null|    2020-01-30|   202

In [20]:
dfej2.printSchema()

root
 |-- patient_id: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: string (nullable = true)
 |-- released_date: string (nullable = true)
 |-- deceased_date: string (nullable = true)
 |-- state: string (nullable = true)



In [21]:
dfej2.count()

5165

In [22]:
from pyspark.sql.types import StructType , StructField , StringType, IntegerType, BooleanType, FloatType,DateType

schema2 = StructType(
    [
        StructField('patient_id', StringType() , True),
        StructField('sex', StringType() , True),
        StructField('age', IntegerType() , True),
        StructField('country', StringType() , True),
        StructField('province', StringType() , True),
        StructField('city', StringType() , True),
        StructField('infection_case', StringType() , True),
        StructField('infected_by', StringType() , True),
        StructField('contact_number', IntegerType() , True),
        StructField('symptom_onset_date', DateType() , True),
        StructField('confirmed_date', DateType() , True),
        StructField('released_date', DateType() , True),
        StructField('deceased_date', DateType() , True),
        StructField('state', StringType() , True)


    ]
)

In [23]:
dfestructurado = spark.read.schema(schema2).option('header','true').csv('./data/PatientInfo.csv')

In [24]:
dfestructurado.count()

5165

In [25]:
from pyspark.sql.functions import col

dfsin_duplicados = dfestructurado.dropDuplicates(['patient_id'])

In [26]:
dfsin_duplicados.count()

5164

In [27]:
dfsin_duplicados.show()

+----------+------+----+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex| age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+----+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|null|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|         null|released|
|1000000002|  male|null|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|         null|released|
|1000000003|  male|null|  Korea|   Seoul|   Jongno-gu|contact with patient| 2002000001|            17|              null|    2020-

In [28]:
df_sinNulos = dfsin_duplicados.na.drop(subset=['infected_by'])

In [29]:
df_sinNulos.count()

1346

In [32]:
df_femenino = df_sinNulos.filter((col('sex')=='female')).na.drop(subset=['sex'])

In [33]:
df_femenino.show()

+----------+------+----+-------+--------+-------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex| age|country|province|         city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+----+-------+--------+-------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000005|female|null|  Korea|   Seoul|  Seongbuk-gu|contact with patient| 1000000002|             2|              null|    2020-01-31|   2020-02-24|         null|released|
|1000000006|female|null|  Korea|   Seoul|    Jongno-gu|contact with patient| 1000000003|            43|              null|    2020-01-31|   2020-02-19|         null|released|
|1000000010|female|null|  Korea|   Seoul|  Seongbuk-gu|contact with patient| 1000000003|             6|              null|   

In [34]:
df_sinfechas = df_femenino.drop('released_date','deceased_date')

In [35]:
df_sinfechas.show()

+----------+------+----+-------+--------+-------------+--------------------+-----------+--------------+------------------+--------------+--------+
|patient_id|   sex| age|country|province|         city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|   state|
+----------+------+----+-------+--------+-------------+--------------------+-----------+--------------+------------------+--------------+--------+
|1000000005|female|null|  Korea|   Seoul|  Seongbuk-gu|contact with patient| 1000000002|             2|              null|    2020-01-31|released|
|1000000006|female|null|  Korea|   Seoul|    Jongno-gu|contact with patient| 1000000003|            43|              null|    2020-01-31|released|
|1000000010|female|null|  Korea|   Seoul|  Seongbuk-gu|contact with patient| 1000000003|             6|              null|    2020-02-05|released|
|1000000014|female|null|  Korea|   Seoul|    Jongno-gu|contact with patient| 1000000013|            27|        2020-02

In [36]:
df_sinfechas.coalesce(2).write.partitionBy('province').mode('overwrite').parquet('./data/parquet/df_sinfechas.parquet')

+-----------------+
|         province|
+-----------------+
|           Sejong|
|            Ulsan|
|Chungcheongbuk-do|
|          Gwangju|
| Gyeongsangbuk-do|
|            Daegu|
| Gyeongsangnam-do|
|          Incheon|
|          Jeju-do|
|      Gyeonggi-do|
|            Busan|
|          Daejeon|
|            Seoul|
|Chungcheongnam-do|
|     Jeollabuk-do|
|     Jeollanam-do|
+-----------------+



# Sugerencias

In [None]:
casos =spark.read.option('header','true').option('inferSchema','true').csv('./data/Case.csv')

pacientes_info = spark.read.option('header','true').option('inferSchema','true').csv('./data/PatientInfo.csv')

In [None]:
from pyspark.sql.functions import col,desc
casos.filter((col('city')!='-') &(col('city')!='from other city')).ordrerBy(desc('confirmed')).select('province','city','confirmed').show(3)


In [None]:
pacientes_info.select(col('patient_id')).show()

In [None]:
pacientes_info.select(col('patient_id')).distinct().show()

In [None]:
pacientes_info = pacientes_info.dropDuplicates(['patient_id'])

In [None]:
from pyspark.sql.functions import count
pacientes_info.select(count('patient_id')).alias('conteo').show()

In [None]:
pacientes_info_contagios = pacientes_info.na.drop(subset=['infected_by'])

In [None]:
pacientes_info_contagios.filter(col('sex')=='female'|col('sex')=='male')