# **Labs 1 and 2 PySpark:**

In these labs we will be using the "[[NeurIPS 2020] Data Science for COVID-19 (DS4C)](https://www.kaggle.com/datasets/kimjihoo/coronavirusdataset?select=PatientInfo.csv)" dataset, retrieved from [Kaggle](https://www.kaggle.com/) on 1/6/2022, for educational non commercial purpose, License
[CC BY-NC-SA 4.0
](https://creativecommons.org/licenses/by-nc-sa/4.0/)


The csv file that we will be using in this lab is **PatientInfo**.

### Import the pyspark and check it's version

In [194]:
! pip install pyspark



In [195]:
spark

In [196]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import *

## PatientInfo.csv

**patient_id**
the ID of the patient

**sex**
the sex of the patient

**age**
the age of the patient

**country**
the country of the patient

**province**
the province of the patient

**city**
the city of the patient

**infection_case**
the case of infection

**infected_by**
the ID of who infected the patient


**contact_number**
the number of contacts with people

**symptom_onset_date**
the date of symptom onset

**confirmed_date**
the date of being confirmed

**released_date**
the date of being released

**deceased_date**
the date of being deceased

**state**
isolated / released / deceased

### Import and create SparkSession

In [197]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PatientInfo").getOrCreate()

### Load the PatientInfo.csv file and show the first 5 rows

In [198]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [199]:
patient_info_df = spark.read.csv("PatientInfo.csv", header=True, inferSchema=True)
patient_info_df.show(5)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|         NULL|released|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|         NULL|released|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|              NULL|    2020-01-30|   202

### Display the schema of the dataset

In [200]:
patient_info_df.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: date (nullable = true)
 |-- released_date: date (nullable = true)
 |-- deceased_date: date (nullable = true)
 |-- state: string (nullable = true)



### Display the statistical summary

In [201]:
patient_info_df.describe().show()

+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+------------------+--------+
|summary|          patient_id|   sex| age|   country|province|          city|      infection_case|         infected_by|      contact_number|symptom_onset_date|   state|
+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+------------------+--------+
|  count|                5165|  4043|3785|      5165|    5165|          5071|                4246|                1346|                 791|               690|    5165|
|   mean|2.8636345618679576E9|  NULL|NULL|      NULL|    NULL|          NULL|                NULL|2.2845944015643125E9|1.6772572523506988E7|              NULL|    NULL|
| stddev| 2.074210725277473E9|  NULL|NULL|      NULL|    NULL|          NULL|                NULL|1.5265072953383324E9| 3.093097580985502E8|              N

### Using the state column.
### How many people survived (released), and how many didn't survive (isolated/deceased)?

In [202]:
survivors = patient_info_df.filter(col("state") == "released").count()
non_survivors = patient_info_df.filter((col("state") == "isolated") | (col("state") == "deceased")).count()
print("Survivors:", survivors)
print("Non-Survivors:", non_survivors)

Survivors: 2929
Non-Survivors: 2236


### Display the number of null values in each column

In [203]:
null_counts = [count(when(isnull(col(c)), c)).alias(c) for c in patient_info_df.columns]
patient_info_df.select(null_counts).show()

+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+
|patient_id| sex| age|country|province|city|infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|state|
+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+
|         0|1122|1380|      0|       0|  94|           919|       3819|          4374|              4475|             3|         3578|         5099|    0|
+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+



## Data preprocessing

### Fill the nulls in the deceased_date with the released_date.
- You can use <b>coalesce</b> function

In [204]:
patient_info_df = patient_info_df.withColumn("deceased_date", coalesce("deceased_date", "released_date"))
patient_info_df.show(5)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|   2020-03-02|released|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|              NULL|    2020-01-30|   202

### Add a column named no_days which is difference between the deceased_date and the confirmed_date then show the top 5 rows. Print the schema.
- <b> Hint: You need to typecast these columns as date first <b>

In [205]:
patient_info_df = patient_info_df.withColumn("deceased_date", to_date(col("deceased_date")))
patient_info_df = patient_info_df.withColumn("confirmed_date", to_date(col("confirmed_date")))
patient_info_df = patient_info_df.withColumn("no_days", datediff(col("deceased_date"), col("confirmed_date")))
patient_info_df.show(5)
patient_info_df.printSchema()

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|

### Add a is_male column if male then it should yield true, else then False

In [206]:
patient_info_df = patient_info_df.na.drop(subset=["sex"])
def is_male_udf(sex):
    return sex.lower() == 'male'
is_male_udf_spark = udf(is_male_udf, BooleanType())
patient_info_df = patient_info_df.withColumn("is_male", is_male_udf_spark(patient_info_df["sex"]))
patient_info_df.show(5)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|is_male|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|   true|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|   true|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact 

### Add a is_dead column if patient state is not released then it should yield true, else then False

- Use <b>UDF</b> to perform this task.
- However, UDF is not recommended there is no built in function can do the required operation.
- UDF is slower than built in functions.

In [207]:
def is_dead_udf(state):
    return state.lower() != 'released'
is_dead_udf_spark = udf(is_dead_udf, BooleanType())
patient_info_df = patient_info_df.withColumn("is_dead", is_dead_udf_spark(patient_info_df["state"]))
patient_info_df.show(5)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|is_male|is_dead|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|   true|  false|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|   true|  false|
|1000000003|  male|5

### Change the ages to bins from 10s, 0s, 10s, 20s,.etc to 0,10, 20

In [208]:
patient_info_df = patient_info_df.withColumn("age", regexp_replace("age", "s", ""))
patient_info_df = patient_info_df.withColumn("age", patient_info_df["age"].cast(DoubleType()))
patient_info_df.show()

+----------+------+----+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|patient_id|   sex| age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|is_male|is_dead|
+----------+------+----+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|1000000001|  male|50.0|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|   true|  false|
|1000000002|  male|30.0|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|   true|  false|
|100000000

### Change age, and no_days  to be typecasted as Double

In [209]:
patient_info_df = patient_info_df.withColumn("age", col("age").cast(DoubleType()))
patient_info_df = patient_info_df.withColumn("no_days", col("no_days").cast(DoubleType()))

### Drop the columns
["patient_id","sex","infected_by","contact_number","released_date","state",
"symptom_onset_date","confirmed_date","deceased_date","country","no_days",
"city","infection_case"]

In [210]:
columns_to_drop = ["patient_id", "sex", "infected_by", "contact_number", "released_date",
                   "state", "symptom_onset_date", "confirmed_date", "deceased_date",
                   "country", "no_days", "city", "infection_case"]

patient_info_df2 = patient_info_df.drop(*columns_to_drop)
patient_info_df2.show()

+----+--------+-------+-------+
| age|province|is_male|is_dead|
+----+--------+-------+-------+
|50.0|   Seoul|   true|  false|
|30.0|   Seoul|   true|  false|
|50.0|   Seoul|   true|  false|
|20.0|   Seoul|   true|  false|
|20.0|   Seoul|  false|  false|
|50.0|   Seoul|  false|  false|
|20.0|   Seoul|   true|  false|
|20.0|   Seoul|   true|  false|
|30.0|   Seoul|   true|  false|
|60.0|   Seoul|  false|  false|
|50.0|   Seoul|  false|  false|
|20.0|   Seoul|   true|  false|
|80.0|   Seoul|   true|   true|
|60.0|   Seoul|  false|  false|
|70.0|   Seoul|   true|  false|
|70.0|   Seoul|   true|  false|
|70.0|   Seoul|   true|  false|
|20.0|   Seoul|   true|  false|
|70.0|   Seoul|  false|  false|
|70.0|   Seoul|  false|  false|
+----+--------+-------+-------+
only showing top 20 rows



### Recount the number of nulls now

In [211]:
null_counts = [
    count(col("age")).alias("nulls_in_age"),
    count(col("province")).alias("nulls_in_province"),
    count(col("is_dead")).alias("nulls_in_is_dead")
]
patient_info_df2.select(null_counts).show()


+------------+-----------------+----------------+
|nulls_in_age|nulls_in_province|nulls_in_is_dead|
+------------+-----------------+----------------+
|        3782|             4043|            4043|
+------------+-----------------+----------------+



## Now do the same but using SQL select statement

### From the original Patient DataFrame, Create a temporary view (table).

In [212]:
patient_info_df.createOrReplaceTempView("patient_info_table")

### Use SELECT statement to select all columns from the dataframe and show the output.

In [213]:
selected_df = spark.sql("SELECT * FROM patient_info_table")
selected_df.show()

+----------+------+----+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|patient_id|   sex| age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|is_male|is_dead|
+----------+------+----+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|1000000001|  male|50.0|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|   13.0|   true|  false|
|1000000002|  male|30.0|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|   2020-03-02|released|   32.0|   true|  false|
|100000000

### *Using SQL commands*, limit the output to only 5 rows

In [214]:
limited_df = spark.sql("SELECT * FROM patient_info_table LIMIT 5")
limited_df.show()

+----------+------+----+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|patient_id|   sex| age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|is_male|is_dead|
+----------+------+----+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|1000000001|  male|50.0|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|   13.0|   true|  false|
|1000000002|  male|30.0|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|   2020-03-02|released|   32.0|   true|  false|
|1000000003|  m

### Select the count of males and females in the dataset

In [215]:
gender_count_df = spark.sql("SELECT sex, COUNT(*) as count FROM patient_info_table GROUP BY sex")
gender_count_df.show()

+------+-----+
|   sex|count|
+------+-----+
|female| 2218|
|  male| 1825|
+------+-----+



### How many people did survive, and how many didn't?

In [216]:
survivors = patient_info_df.filter(col("state") == "released").count()
non_survivors = patient_info_df.filter((col("state") == "isolated") | (col("state") == "deceased")).count()
print("Survivors:", survivors)
print("Non-Survivors:", non_survivors)

Survivors: 2514
Non-Survivors: 1529


### Now, let's perform some preprocessing using SQL:
1. Convert *age* column to double after removing the 's' at the end -- *hint: check SUBSTRING method*
2. Select only the following columns: `['sex', 'age', 'province', 'state']`
3. Store the result of the query in a new dataframe

In [217]:
patient_info_df3 = spark.sql("""
    SELECT sex, CAST(SUBSTRING(age, 1, LENGTH(age)-1) AS DOUBLE) AS age, province, state
    FROM patient_info_table
""")
patient_info_df3.show(5)


+------+----+--------+--------+
|   sex| age|province|   state|
+------+----+--------+--------+
|  male|50.0|   Seoul|released|
|  male|30.0|   Seoul|released|
|  male|50.0|   Seoul|released|
|  male|20.0|   Seoul|released|
|female|20.0|   Seoul|released|
+------+----+--------+--------+
only showing top 5 rows

