# **Labs 1 and 2 PySpark:**

In these labs we will be using the "[[NeurIPS 2020] Data Science for COVID-19 (DS4C)](https://www.kaggle.com/datasets/kimjihoo/coronavirusdataset?select=PatientInfo.csv)" dataset, retrieved from [Kaggle](https://www.kaggle.com/) on 1/6/2022, for educational non commercial purpose, License
[CC BY-NC-SA 4.0
](https://creativecommons.org/licenses/by-nc-sa/4.0/)


The csv file that we will be using in this lab is **PatientInfo**.

## PatientInfo.csv

**patient_id**
the ID of the patient

**sex**
the sex of the patient

**age**
the age of the patient

**country**
the country of the patient

**province**
the province of the patient

**city**
the city of the patient

**infection_case**
the case of infection

**infected_by**
the ID of who infected the patient


**contact_number**
the number of contacts with people

**symptom_onset_date**
the date of symptom onset

**confirmed_date**
the date of being confirmed

**released_date**
the date of being released

**deceased_date**
the date of being deceased

**state**
isolated / released / deceased

### Import the pyspark and check it's version

In [1]:
import findspark 

In [2]:
findspark.init()

### Import and create SparkSession

In [3]:
from pyspark.sql import SparkSession

In [7]:
from pyspark.sql.functions import col, regexp_replace

In [4]:
spark = SparkSession.builder.getOrCreate()

24/08/28 11:04:11 WARN Utils: Your hostname, DESKTOP-3PA0MV5 resolves to a loopback address: 127.0.1.1; using 192.168.238.222 instead (on interface eth2)
24/08/28 11:04:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/28 11:04:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
spark.version

'3.5.2'

### Load the PatientInfo.csv file and show the first 5 rows

In [None]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [5]:
df = spark.read.csv('PatientInfo.csv',header =True,inferSchema=True)

                                                                                

In [8]:
df.show(5)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|         NULL|released|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|         NULL|released|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|              NULL|    2020-01-30|   202

### Display the schema of the dataset

In [9]:
df.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: date (nullable = true)
 |-- released_date: date (nullable = true)
 |-- deceased_date: date (nullable = true)
 |-- state: string (nullable = true)



In [8]:
df = df.withColumn("age", regexp_replace(col("age"), "[^0-9]", "").cast("int"))

In [11]:
df = df.withColumn("symptom_onset_date", col("symptom_onset_date").cast("date"))

In [12]:
df.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: date (nullable = true)
 |-- confirmed_date: date (nullable = true)
 |-- released_date: date (nullable = true)
 |-- deceased_date: date (nullable = true)
 |-- state: string (nullable = true)



### Display the statistical summary

In [13]:
df.describe().show()

24/08/28 11:12:44 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 4:>                                                          (0 + 1) / 1]

+-------+--------------------+------+------------------+----------+--------+--------------+--------------------+--------------------+--------------------+--------+
|summary|          patient_id|   sex|               age|   country|province|          city|      infection_case|         infected_by|      contact_number|   state|
+-------+--------------------+------+------------------+----------+--------+--------------+--------------------+--------------------+--------------------+--------+
|  count|                5165|  4043|              3785|      5165|    5165|          5071|                4246|                1346|                 791|    5165|
|   mean|2.8636345618679576E9|  NULL|  40.3672391017173|      NULL|    NULL|          NULL|                NULL|2.2845944015643125E9|1.6772572523506988E7|    NULL|
| stddev| 2.074210725277473E9|  NULL|20.197162016016495|      NULL|    NULL|          NULL|                NULL|1.5265072953383324E9| 3.093097580985502E8|    NULL|
|    min|       

                                                                                

### Using the state column.
### How many people survived (released), and how many didn't survive (isolated/deceased)?

In [14]:
df.groupBy('state').count().show()

+--------+-----+
|   state|count|
+--------+-----+
|isolated| 2158|
|released| 2929|
|deceased|   78|
+--------+-----+



### Display the number of null values in each column

In [15]:
columns=df.columns

In [16]:
from pyspark.sql.functions import *

In [17]:
for column in columns:
    x=df.select([sum(col(column).isNull().cast('int'))])
    x.show()

+--------------------------------------+
|sum(CAST((patient_id IS NULL) AS INT))|
+--------------------------------------+
|                                     0|
+--------------------------------------+

+-------------------------------+
|sum(CAST((sex IS NULL) AS INT))|
+-------------------------------+
|                           1122|
+-------------------------------+

+-------------------------------+
|sum(CAST((age IS NULL) AS INT))|
+-------------------------------+
|                           1380|
+-------------------------------+

+-----------------------------------+
|sum(CAST((country IS NULL) AS INT))|
+-----------------------------------+
|                                  0|
+-----------------------------------+

+------------------------------------+
|sum(CAST((province IS NULL) AS INT))|
+------------------------------------+
|                                   0|
+------------------------------------+

+--------------------------------+
|sum(CAST((city IS NULL) AS IN

## Data preprocessing

### Fill the nulls in the deceased_date with the released_date. 
- You can use <b>coalesce</b> function

In [18]:
df_replaced_null = df.withColumn('deceased_date', coalesce(col('deceased_date'), col('released_date')))

### Add a column named no_days which is difference between the deceased_date and the confirmed_date then show the top 5 rows. Print the schema.
- <b> Hint: You need to typecast these columns as date first <b>

In [19]:
df_new = df_replaced_null.withColumn('deceased_date', to_date(col('deceased_date'))) \
                         .withColumn('confirmed_date', to_date(col('confirmed_date')))

In [22]:
df_with_on_days=df_new.withColumn('no_days', datediff(col('deceased_date'), col('confirmed_date')))

In [23]:
df_with_on_days.show(5)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+
|1000000001|  male| 50|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|
|1000000002|  male| 30|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|
|1000000003|  male| 50|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|

### Add a is_male column if male then it should yield true, else then False

In [46]:
df_with_is_male=df_with_on_days.withColumn('is_male',when(col('sex') == 'male','true') .when(col('sex') == 'female', 'false')
.otherwise('other'))

In [42]:
#df_with_is_male=df_with_on_days.withColumn('is_male', when(col('sex') == 'Male', True).otherwise(False))

In [49]:
df_with_is_male.show(5)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|is_male|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+
|1000000001|  male| 50|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|   true|
|1000000002|  male| 30|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|   true|
|1000000003|  male| 50|  Korea|   Seoul|  Jongno-gu|contact 

### Add a is_dead column if patient state is not released then it should yield true, else then False

- Use <b>UDF</b> to perform this task. 
- However, UDF is not recommended there is no built in function can do the required operation.
- UDF is slower than built in functions.

In [26]:
def is_dead_function(state):
    return state != 'released'

In [27]:
is_dead_udf = udf(is_dead_function)

In [28]:
df_is_dead = df_with_is_male.withColumn('is_dead', is_dead_udf(col('state')))

### Change the ages to bins from 10s, 0s, 10s, 20s,.etc to 0,10, 20

In [36]:
df_binned_age = df_with_is_male.withColumn(
    'age_bin',
    when(col('age')< 10, 0)
    .when(col('age')< 20, 10)
    .when(col('age')< 30, 20)
    .when(col('age')< 40, 30)
    .when(col('age')< 50, 40)
    .when(col('age')< 60, 50)
    .otherwise(60)
)

In [37]:
df_binned_age.show(5)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|is_male|age_bin|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|1000000001|  male| 50|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|  false|     50|
|1000000002|  male| 30|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|  false|     30|
|1000000003|  male| 

### Change age, and no_days  to be typecasted as Double

In [38]:
df_casted = df_binned_age.withColumn('age', col('age').cast('double')) \
              .withColumn('no_days', col('no_days').cast('double'))

In [39]:
df_casted.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: double (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: date (nullable = true)
 |-- confirmed_date: date (nullable = true)
 |-- released_date: date (nullable = true)
 |-- deceased_date: date (nullable = true)
 |-- state: string (nullable = true)
 |-- no_days: double (nullable = true)
 |-- is_male: boolean (nullable = false)
 |-- age_bin: integer (nullable = false)



### Drop the columns
["patient_id","sex","infected_by","contact_number","released_date","state",
"symptom_onset_date","confirmed_date","deceased_date","country","no_days",
"city","infection_case"]

In [40]:
df_dropped = df_casted.drop('patient_id', 'sex', 'infected_by', 'contact_number', 'released_date', 'state', 
    'symptom_onset_date', 'confirmed_date', 'deceased_date', 'country', 'no_days', 
    'city', 'infection_case')

### Recount the number of nulls now

In [41]:
columns2=df_dropped.columns
for column in columns2:
    x=df_dropped.select([sum(col(column).isNull().cast('int'))])
    x.show()

+-------------------------------+
|sum(CAST((age IS NULL) AS INT))|
+-------------------------------+
|                           1380|
+-------------------------------+

+------------------------------------+
|sum(CAST((province IS NULL) AS INT))|
+------------------------------------+
|                                   0|
+------------------------------------+

+-----------------------------------+
|sum(CAST((is_male IS NULL) AS INT))|
+-----------------------------------+
|                                  0|
+-----------------------------------+

+-----------------------------------+
|sum(CAST((age_bin IS NULL) AS INT))|
+-----------------------------------+
|                                  0|
+-----------------------------------+



## Now do the same but using SQL select statement

### From the original Patient DataFrame, Create a temporary view (table).

In [77]:
df.createOrReplaceTempView('patient_view')

### Use SELECT statement to select all columns from the dataframe and show the output.

In [79]:
all_data=spark.sql(""" select * from patient_view
""")

In [81]:
all_data.show(5)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|         NULL|released|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|         NULL|released|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|              NULL|    2020-01-30|   202

### *Using SQL commands*, limit the output to only 5 rows 

In [82]:
limited=spark.sql(""" select * from patient_view
limit 5
""")

In [83]:
limited.show()

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|         NULL|released|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|         NULL|released|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|              NULL|    2020-01-30|   202

### Select the count of males and females in the dataset

In [86]:
count_gender=spark.sql(""" select count(patient_id),sex from patient_view
where sex in ('male','female')
group by sex
""")

In [87]:
count_gender.show()

+-----------------+------+
|count(patient_id)|   sex|
+-----------------+------+
|             2218|female|
|             1825|  male|
+-----------------+------+



### How many people did survive, and how many didn't?

In [88]:
servived=spark.sql("""
select state ,count(patient_id) from df_view
group by state
""")

In [89]:
servived.show()

+--------+-----------------+
|   state|count(patient_id)|
+--------+-----------------+
|isolated|             2158|
|released|             2929|
|deceased|               78|
+--------+-----------------+



### Now, let's perform some preprocessing using SQL:
1. Convert *age* column to double after removing the 's' at the end -- *hint: check SUBSTRING method*
2. Select only the following columns: `['sex', 'age', 'province', 'state']`
3. Store the result of the query in a new dataframe

In [97]:
convert_age=spark.sql("""SELECT sex,CAST(SUBSTRING(age, 1, LENGTH(age) - 1) AS DOUBLE) AS age ,province, state
FROM df_view
""")

In [98]:
convert_age.show()

+------+----+--------+--------+
|   sex| age|province|   state|
+------+----+--------+--------+
|  male|50.0|   Seoul|released|
|  male|30.0|   Seoul|released|
|  male|50.0|   Seoul|released|
|  male|20.0|   Seoul|released|
|female|20.0|   Seoul|released|
|female|50.0|   Seoul|released|
|  male|20.0|   Seoul|released|
|  male|20.0|   Seoul|released|
|  male|30.0|   Seoul|released|
|female|60.0|   Seoul|released|
|female|50.0|   Seoul|released|
|  male|20.0|   Seoul|released|
|  male|80.0|   Seoul|deceased|
|female|60.0|   Seoul|released|
|  male|70.0|   Seoul|released|
|  male|70.0|   Seoul|released|
|  male|70.0|   Seoul|released|
|  male|20.0|   Seoul|released|
|female|70.0|   Seoul|released|
|female|70.0|   Seoul|released|
+------+----+--------+--------+
only showing top 20 rows

