# **Labs 1 and 2 PySpark:**

In these labs we will be using the "[[NeurIPS 2020] Data Science for COVID-19 (DS4C)](https://www.kaggle.com/datasets/kimjihoo/coronavirusdataset?select=PatientInfo.csv)" dataset, retrieved from [Kaggle](https://www.kaggle.com/) on 1/6/2022, for educational non commercial purpose, License
[CC BY-NC-SA 4.0
](https://creativecommons.org/licenses/by-nc-sa/4.0/)


The csv file that we will be using in this lab is **PatientInfo**.

## PatientInfo.csv

**patient_id**
the ID of the patient

**sex**
the sex of the patient

**age**
the age of the patient

**country**
the country of the patient

**province**
the province of the patient

**city**
the city of the patient

**infection_case**
the case of infection

**infected_by**
the ID of who infected the patient


**contact_number**
the number of contacts with people

**symptom_onset_date**
the date of symptom onset

**confirmed_date**
the date of being confirmed

**released_date**
the date of being released

**deceased_date**
the date of being deceased

**state**
isolated / released / deceased

### Import the pyspark and check it's version

In [1]:
spark

### Import and create SparkSession

In [2]:
sc

### Load the PatientInfo.csv file and show the first 5 rows

In [3]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [7]:
df = spark.read.csv("PatientInfo.csv",inferSchema= True ,header=True)
df.show(4)

+----------+----+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id| sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+----+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|         NULL|released|
|1000000002|male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|         NULL|released|
|1000000003|male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|              NULL|    2020-01-30|   2020-02-19|    

### Display the schema of the dataset

In [8]:
df.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: date (nullable = true)
 |-- confirmed_date: date (nullable = true)
 |-- released_date: date (nullable = true)
 |-- deceased_date: date (nullable = true)
 |-- state: string (nullable = true)



### Display the statistical summary

In [10]:
df.summary().show()

23/10/05 12:14:10 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 7:>                                                          (0 + 1) / 1]

+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+--------+
|summary|          patient_id|   sex| age|   country|province|          city|      infection_case|         infected_by|      contact_number|   state|
+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+--------+
|  count|                5165|  4043|3785|      5165|    5165|          5071|                4246|                1346|                 791|    5165|
|   mean|2.8636345618679576E9|  NULL|NULL|      NULL|    NULL|          NULL|                NULL|2.2845944015643125E9|1.6772572523506988E7|    NULL|
| stddev| 2.074210725277473E9|  NULL|NULL|      NULL|    NULL|          NULL|                NULL|1.5265072953383324E9| 3.093097580985502E8|    NULL|
|    min|          1000000001|female|  0s|Bangladesh|   Busan|     Andong-si|Anyang Gunpo Past...|  

                                                                                

### Using the state column.
### How many people survived (released), and how many didn't survive (isolated/deceased)?

In [13]:
from pyspark.sql.functions import *
import pyspark.sql.functions as F

In [29]:
df1=df.groupBy('state').agg(F.count(col('state')).alias('Count'))

In [33]:
df2=df.filter((col('state')=="isolated") | (col('state')=="deceased") ).count()
df3=df.filter((col('state')=="released") ).count()

In [39]:
dec={
    "dead":df2,
    "survive":df3
}
dataframe = spark.createDataFrame(list(dec.items()),['state','Count'])
dataframe.show()

+-------+-----+
|  state|Count|
+-------+-----+
|   dead| 2236|
|survive| 2929|
+-------+-----+



### Display the number of null values in each column

In [45]:
nulls={}
for col in df.columns:
    nullCount = df.filter(F.col(col).isNull()).count()
    nulls[col] = nullCount
nulls   

{'patient_id': 0,
 'sex': 1122,
 'age': 1380,
 'country': 0,
 'province': 0,
 'city': 94,
 'infection_case': 919,
 'infected_by': 3819,
 'contact_number': 4374,
 'symptom_onset_date': 4476,
 'confirmed_date': 3,
 'released_date': 3578,
 'deceased_date': 5099,
 'state': 0}

## Data preprocessing

### Fill the nulls in the deceased_date with the released_date. 
- You can use <b>coalesce</b> function

In [64]:
df_date=df.withColumn('deceased_date',when(df['deceased_date'].isNull(),df['released_date']))


### Add a column named no_days which is difference between the deceased_date and the confirmed_date then show the top 5 rows. Print the schema.
- <b> Hint: You need to typecast these columns as date first <b>

In [69]:
df_date2=df_date.withColumn('no_days',F.date_diff( to_date(df_date['deceased_date']),to_date(df_date['confirmed_date'])))

In [70]:
df_date2.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-gu|contact with patient| 2002000001|         

### Add a is_male column if male then it should yield true, else then False

In [78]:
df_male=df_date2.withColumn('is_male',when(df['sex']=="male","true").when(df['sex']=="female","false"))

In [79]:
df_male.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|is_male|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|   true|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|   true|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-gu|co

### Add a is_dead column if patient state is not released then it should yield true, else then False

- Use <b>UDF</b> to perform this task. 
- However, UDF is not recommended there is no built in function can do the required operation.
- UDF is slower than built in functions.

In [80]:
from pyspark.sql.types import *   

In [100]:
def udf_func(s):
    if s!= 'released':
        return 1
    else:
        return 0
converted_udf= udf(udf_func,BooleanType())

In [101]:
df_dead=df_male.withColumn('is_dead',converted_udf(df_male['state']))
df_dead.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|is_male|is_dead|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|   true|   NULL|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|   true|   NULL|
|1000000003|  m

### Change the ages to bins from 10s, 0s, 10s, 20s,.etc to 0,10, 20

In [105]:
df_age=df_dead.withColumn('age',split(df_dead['age'],'s')[0])
df_age.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|is_male|is_dead|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|1000000001|  male| 50|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|   true|   NULL|
|1000000002|  male| 30|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|   true|   NULL|
|1000000003|  m

In [141]:
df_age=df_dead.withColumn('age',df_dead['age'].substr(0,2))
df_age.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|is_male|is_dead|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|1000000001|  male| 50|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|   true|   NULL|
|1000000002|  male| 30|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|   true|   NULL|
|1000000003|  m

### Change age, and no_days  to be typecasted as Double

In [143]:
df_1=df_age.withColumn('age',F.col('age').cast(DoubleType())).withColumn('no_days',F.col('no_days').cast(DoubleType()))

### Drop the columns
["patient_id","sex","infected_by","contact_number","released_date","state",
"symptom_onset_date","confirmed_date","deceased_date","country","no_days",
"city","infection_case"]

In [144]:
df_2=df_1.drop("patient_id","sex","infected_by","contact_number","released_date","state", "symptom_onset_date","confirmed_date","deceased_date","country","no_days", "city","infection_case")

In [145]:
df_2.show()

+----+--------+-------+-------+
| age|province|is_male|is_dead|
+----+--------+-------+-------+
|50.0|   Seoul|   true|   NULL|
|30.0|   Seoul|   true|   NULL|
|50.0|   Seoul|   true|   NULL|
|20.0|   Seoul|   true|   NULL|
|20.0|   Seoul|  false|   NULL|
|50.0|   Seoul|  false|   NULL|
|20.0|   Seoul|   true|   NULL|
|20.0|   Seoul|   true|   NULL|
|30.0|   Seoul|   true|   NULL|
|60.0|   Seoul|  false|   NULL|
|50.0|   Seoul|  false|   NULL|
|20.0|   Seoul|   true|   NULL|
|80.0|   Seoul|   true|   NULL|
|60.0|   Seoul|  false|   NULL|
|70.0|   Seoul|   true|   NULL|
|70.0|   Seoul|   true|   NULL|
|70.0|   Seoul|   true|   NULL|
|20.0|   Seoul|   true|   NULL|
|70.0|   Seoul|  false|   NULL|
|70.0|   Seoul|  false|   NULL|
+----+--------+-------+-------+
only showing top 20 rows



### Recount the number of nulls now

In [146]:
nulls={}
for col in df_2.columns:
    nullCount = df_2.filter(F.col(col).isNull()).count()
    nulls[col] = nullCount
nulls 

{'age': 1446, 'province': 0, 'is_male': 1122, 'is_dead': 5165}

## Now do the same but using SQL select statement

### From the original Patient DataFrame, Create a temporary view (table).

In [147]:
df.createOrReplaceTempView("SelimView")

### Use SELECT statement to select all columns from the dataframe and show the output.

In [148]:
spark.sql(""" select * from SelimView""").show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|         NULL|released|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|         NULL|released|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-gu|contact with patient| 2002000001|            17|              NULL|    2020-01-30|

### *Using SQL commands*, limit the output to only 5 rows 

In [149]:
spark.sql(""" select * from SelimView limit 5""").show()

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|         NULL|released|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|         NULL|released|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|              NULL|    2020-01-30|   202

### Select the count of males and females in the dataset

In [159]:
spark.sql(""" select sex,count(*) from SelimView GROUP BY sex""").show()

+------+--------+
|   sex|count(1)|
+------+--------+
|  NULL|    1122|
|female|    2218|
|  male|    1825|
+------+--------+



### How many people did survive, and how many didn't?

In [172]:
spark.sql(""" select count(*) as dead,(select count(*)  
                                       from SelimView
                                       where state in ('released'))as survive

              from SelimView 
              where state in ('isolated','deceased')""").show()

+----+-------+
|dead|survive|
+----+-------+
|2236|   2929|
+----+-------+



### Now, let's perform some preprocessing using SQL:
1. Convert *age* column to double after removing the 's' at the end -- *hint: check SUBSTRING method*
2. Select only the following columns: `['sex', 'age', 'province', 'state']`
3. Store the result of the query in a new dataframe

In [180]:
spark.sql(""" select cast(substring(age,0,2) as double) as age
                from SelimView """).show()

+----+
| age|
+----+
|50.0|
|30.0|
|50.0|
|20.0|
|20.0|
|50.0|
|20.0|
|20.0|
|30.0|
|60.0|
|50.0|
|20.0|
|80.0|
|60.0|
|70.0|
|70.0|
|70.0|
|20.0|
|70.0|
|70.0|
+----+
only showing top 20 rows



In [181]:
spark.sql(""" select sex,age,province,state
                from SelimView """).show()

+------+---+--------+--------+
|   sex|age|province|   state|
+------+---+--------+--------+
|  male|50s|   Seoul|released|
|  male|30s|   Seoul|released|
|  male|50s|   Seoul|released|
|  male|20s|   Seoul|released|
|female|20s|   Seoul|released|
|female|50s|   Seoul|released|
|  male|20s|   Seoul|released|
|  male|20s|   Seoul|released|
|  male|30s|   Seoul|released|
|female|60s|   Seoul|released|
|female|50s|   Seoul|released|
|  male|20s|   Seoul|released|
|  male|80s|   Seoul|deceased|
|female|60s|   Seoul|released|
|  male|70s|   Seoul|released|
|  male|70s|   Seoul|released|
|  male|70s|   Seoul|released|
|  male|20s|   Seoul|released|
|female|70s|   Seoul|released|
|female|70s|   Seoul|released|
+------+---+--------+--------+
only showing top 20 rows



In [182]:
df_final=spark.sql(""" select sex,age,province,state
                from SelimView """)

In [183]:
df_final.show()

+------+---+--------+--------+
|   sex|age|province|   state|
+------+---+--------+--------+
|  male|50s|   Seoul|released|
|  male|30s|   Seoul|released|
|  male|50s|   Seoul|released|
|  male|20s|   Seoul|released|
|female|20s|   Seoul|released|
|female|50s|   Seoul|released|
|  male|20s|   Seoul|released|
|  male|20s|   Seoul|released|
|  male|30s|   Seoul|released|
|female|60s|   Seoul|released|
|female|50s|   Seoul|released|
|  male|20s|   Seoul|released|
|  male|80s|   Seoul|deceased|
|female|60s|   Seoul|released|
|  male|70s|   Seoul|released|
|  male|70s|   Seoul|released|
|  male|70s|   Seoul|released|
|  male|20s|   Seoul|released|
|female|70s|   Seoul|released|
|female|70s|   Seoul|released|
+------+---+--------+--------+
only showing top 20 rows

