# **Labs 1 and 2 PySpark:**

In these labs we will be using the "[[NeurIPS 2020] Data Science for COVID-19 (DS4C)](https://www.kaggle.com/datasets/kimjihoo/coronavirusdataset?select=PatientInfo.csv)" dataset, retrieved from [Kaggle](https://www.kaggle.com/) on 1/6/2022, for educational non commercial purpose, License
[CC BY-NC-SA 4.0
](https://creativecommons.org/licenses/by-nc-sa/4.0/)


The csv file that we will be using in this lab is **PatientInfo**.

## PatientInfo.csv

**patient_id**
the ID of the patient

**sex**
the sex of the patient

**age**
the age of the patient

**country**
the country of the patient

**province**
the province of the patient

**city**
the city of the patient

**infection_case**
the case of infection

**infected_by**
the ID of who infected the patient


**contact_number**
the number of contacts with people

**symptom_onset_date**
the date of symptom onset

**confirmed_date**
the date of being confirmed

**released_date**
the date of being released

**deceased_date**
the date of being deceased

**state**
isolated / released / deceased

### Import the pyspark and check it's version

In [None]:
!pip install pyspark
import pyspark
print(pyspark.__version__)

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
3.3.0


In [None]:
import pandas as pd
from sklearn.metrics import classification_report
from pyspark.sql.functions import *
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer,OneHotEncoder,Imputer,VectorAssembler
from pyspark.ml import Pipeline

### Import and create SparkSession

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

### Load the PatientInfo.csv file and show the first 5 rows

In [None]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [None]:
df  = spark.read.csv("/content/sample_data/PatientInfo.csv",header=True,inferSchema=True)
df.show(10)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------+--------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|     confirmed_date|      released_date|deceased_date|   state|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|2020-01-23 00:00:00|2020-02-05 00:00:00|         null|released|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|2020-01-30 00:00:00|2020-03-02 00:00:00|         null|released|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|

### Display the schema of the dataset

In [None]:
df.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: timestamp (nullable = true)
 |-- released_date: timestamp (nullable = true)
 |-- deceased_date: timestamp (nullable = true)
 |-- state: string (nullable = true)



### Display the statistical summary

In [None]:
df.summary().show()

+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+------------------+--------+
|summary|          patient_id|   sex| age|   country|province|          city|      infection_case|         infected_by|      contact_number|symptom_onset_date|   state|
+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+------------------+--------+
|  count|                5165|  4043|3785|      5165|    5165|          5071|                4246|                1346|                 791|               690|    5165|
|   mean|2.8636345618679576E9|  null|null|      null|    null|          null|                null|2.2845944015643125E9|1.6772572523506988E7|              null|    null|
| stddev| 2.074210725277473E9|  null|null|      null|    null|          null|                null|1.5265072953383324E9| 3.093097580985502E8|              n

### Using the state column.
### How many people survived (released), and how many didn't survive (isolated/deceased)?

In [None]:
df.select('patient_id', 'state').groupBy('state').agg(count(col('patient_id'))).show()

+--------+-----------------+
|   state|count(patient_id)|
+--------+-----------------+
|isolated|             2158|
|released|             2929|
|deceased|               78|
+--------+-----------------+



### Display the number of null values in each column

In [None]:
df.select([count(when(isnull(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+
|patient_id| sex| age|country|province|city|infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|state|
+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+
|         0|1122|1380|      0|       0|  94|           919|       3819|          4374|              4475|             3|         3578|         5099|    0|
+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+



## Data preprocessing

### Fill the nulls in the deceased_date with the released_date. 
- You can use <b>coalesce</b> function

In [None]:
# df['deceased_date'].na.fill(value=0,subset=["population"]).show()
df_2=df.withColumn('deceased_date',when(df['deceased_date'].isNull(),df['released_date']).otherwise(df['deceased_date']))
df_2.show(5)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------------+--------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|     confirmed_date|      released_date|      deceased_date|   state|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------------+--------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|2020-01-23 00:00:00|2020-02-05 00:00:00|2020-02-05 00:00:00|released|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|2020-01-30 00:00:00|2020-03-02 00:00:00|2020-03-02 00:00:00|released|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|con

### Add a column named no_days which is difference between the deceased_date and the confirmed_date then show the top 5 rows. Print the schema.
- <b> Hint: You need to typecast these columns as date first <b>

In [None]:
for c in ['deceased_date','confirmed_date','released_date']:
  df_2= df_2.withColumn(c,to_date(col(c)))
df_3= df_2.withColumn("no_days", datediff('deceased_date','confirmed_date'))

In [None]:
df_3.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: date (nullable = true)
 |-- released_date: date (nullable = true)
 |-- deceased_date: date (nullable = true)
 |-- state: string (nullable = true)
 |-- no_days: integer (nullable = true)



### Add a is_male column if male then it should yield true, else then False

In [None]:
df_3=df_3.na.fill("female",["sex"])
df_4= df_3.withColumn("is_male",(col('sex')=='male').cast('int'))
df_4.show(5)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|is_male|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|      1|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|      1|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact 

### Add a is_dead column if patient state is not released then it should yield true, else then False

- Use <b>UDF</b> to perform this task. 
- However, UDF is not recommended there is no built in function can do the required operation.
- UDF is slower than built in functions.

In [None]:
from pyspark.sql.types import BooleanType
@udf (returnType=BooleanType())
def is_dead(x):
  return x==None
df_5= df_4.withColumn('is_Dead',is_dead(col('released_date')).cast('int'))
df_5.show()



+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|is_male|is_Dead|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|      1|      0|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|      1|      0|
|1000000003|  m

### Change the ages to bins from 10s, 0s, 10s, 20s,.etc to 0,10, 20

In [None]:

df_6= df_5.withColumn('age',substring_index(col('age'),'s',1))
df_6.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|is_male|is_Dead|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|1000000001|  male| 50|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|      1|      0|
|1000000002|  male| 30|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|      1|      0|
|1000000003|  m

### Change age, and no_days  to be typecasted as Double

In [None]:
for c in ['age','no_days']:
  df_6=df_6.withColumn(c,df_6[c].cast('double'))

In [None]:
df_6.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = false)
 |-- age: double (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: date (nullable = true)
 |-- released_date: date (nullable = true)
 |-- deceased_date: date (nullable = true)
 |-- state: string (nullable = true)
 |-- no_days: double (nullable = true)
 |-- is_male: integer (nullable = false)
 |-- is_Dead: integer (nullable = true)



### Drop the columns
["patient_id","sex","infected_by","contact_number","released_date","state",
"symptom_onset_date","confirmed_date","deceased_date","country","no_days",
"city","infection_case"]

In [None]:
df_7=df_6.drop(*("patient_id","sex","infected_by","contact_number","released_date","state", "symptom_onset_date","confirmed_date","deceased_date","country","no_days", "city","infection_case"))

In [None]:
df_7.printSchema()

root
 |-- age: double (nullable = true)
 |-- province: string (nullable = true)
 |-- is_male: integer (nullable = false)
 |-- is_Dead: integer (nullable = true)



In [None]:
df_7.show(5)

+----+--------+-------+-------+
| age|province|is_male|is_Dead|
+----+--------+-------+-------+
|50.0|   Seoul|      1|      0|
|30.0|   Seoul|      1|      0|
|50.0|   Seoul|      1|      0|
|20.0|   Seoul|      1|      0|
|20.0|   Seoul|      0|      0|
+----+--------+-------+-------+
only showing top 5 rows



### Recount the number of nulls now

In [None]:
df_7.select([count(when(col(c).isNull(), c)).alias(c) for c in df_7.columns]).show()

+----+--------+-------+-------+
| age|province|is_male|is_Dead|
+----+--------+-------+-------+
|1380|       0|      0|      0|
+----+--------+-------+-------+



## Now do the same but using SQL select statement

### From the original Patient DataFrame, Create a temporary view (table).

In [None]:
df.createOrReplaceTempView('patientData')

### Use SELECT statement to select all columns from the dataframe and show the output.

In [None]:
spark.sql("SELECT * FROM patientData").show(10)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------+--------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|     confirmed_date|      released_date|deceased_date|   state|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|2020-01-23 00:00:00|2020-02-05 00:00:00|         null|released|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|2020-01-30 00:00:00|2020-03-02 00:00:00|         null|released|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|

### *Using SQL commands*, limit the output to only 5 rows 

In [None]:
spark.sql("SELECT * FROM patientData LIMIT 5").show(10)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------+--------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|     confirmed_date|      released_date|deceased_date|   state|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|2020-01-23 00:00:00|2020-02-05 00:00:00|         null|released|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|2020-01-30 00:00:00|2020-03-02 00:00:00|         null|released|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|

### Select the count of males and females in the dataset

In [None]:
spark.sql("SELECT sex ,count(patient_id) FROM patientData GROUP BY sex").show(5)

+------+-----------------+
|   sex|count(patient_id)|
+------+-----------------+
|  null|             1122|
|female|             2218|
|  male|             1825|
+------+-----------------+



### How many people did survive, and how many didn't?

In [None]:
spark.sql("SELECT state ,count(patient_id) FROM patientData GROUP BY state").show(5)

+--------+-----------------+
|   state|count(patient_id)|
+--------+-----------------+
|isolated|             2158|
|released|             2929|
|deceased|               78|
+--------+-----------------+



### Now, let's perform some preprocessing using SQL:
1. Convert *age* column to double after removing the 's' at the end -- *hint: check SUBSTRING method*
2. Select only the following columns: `['sex', 'age', 'province', 'state']`
3. Store the result of the query in a new dataframe

In [None]:
df_new=spark.sql('SELECT sex, age, province, state, DOUBLE(SUBSTRING(age,0,2))AS age FROM patientData')

In [None]:
df_new.show(5)

+------+---+--------+--------+----+
|   sex|age|province|   state| age|
+------+---+--------+--------+----+
|  male|50s|   Seoul|released|50.0|
|  male|30s|   Seoul|released|30.0|
|  male|50s|   Seoul|released|50.0|
|  male|20s|   Seoul|released|20.0|
|female|20s|   Seoul|released|20.0|
+------+---+--------+--------+----+
only showing top 5 rows



In [None]:
df_new.printSchema()

root
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- province: string (nullable = true)
 |-- state: string (nullable = true)
 |-- age: double (nullable = true)



## Machine Learning 
### Create a pipeline model to predict is_dead and evaluate the performance.
- Use <b>StringIndexer</b> to transform <b>string</b> data type to indices.
- Use <b>OneHotEncoder</b> to deal with categorical values.
- Use <b>Imputer</b> to fill missing data with mean.

In [None]:
train_df,val_df=df_7.randomSplit(weights=[0.8,0.2],seed=42)

In [None]:
col_to_index= 'province'
indexed= col_to_index +'_index'
ohe_index=indexed + '_ohe'
col_to_impute='age'
imputed= col_to_impute + '_impute'
cols=['is_male',ohe_index,imputed]
target='is_Dead'


In [None]:
string_indexer= StringIndexer(inputCol=col_to_index,outputCol=indexed)
one_hot=OneHotEncoder(inputCol=indexed,outputCol=ohe_index)
impute= Imputer(inputCol=col_to_impute,outputCol=imputed)
vector_assembler=VectorAssembler(inputCols=cols,outputCol='features')

In [None]:
model=RandomForestClassifier(labelCol=target)
pipeline=Pipeline(stages=[string_indexer,one_hot,impute,vector_assembler,model])

In [None]:
m= pipeline.fit(train_df)
train= m.transform(train_df)
val=m.transform(val_df)

In [None]:
evaluter= MulticlassClassificationEvaluator(labelCol=target)

In [None]:
evaluter.evaluate(train)

0.8274637311589544

In [None]:
evaluter.evaluate(val)

0.8106712241684841

In [None]:
train_pd=train.toPandas()
val_pd=val.toPandas()

In [None]:
train_pd.head()

Unnamed: 0,age,province,is_male,is_Dead,province_index,province_index_ohe,age_impute,features,rawPrediction,probability,prediction
0,,Busan,0,1,5.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, ...",40.085979,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, ...","[3.4734969621318217, 16.526503037868174]","[0.17367484810659112, 0.8263251518934088]",1.0
1,,Busan,0,1,5.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, ...",40.085979,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, ...","[3.4734969621318217, 16.526503037868174]","[0.17367484810659112, 0.8263251518934088]",1.0
2,,Gangwon-do,1,0,10.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",40.085979,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[4.988938395626541, 15.011061604373456]","[0.2494469197813271, 0.750553080218673]",1.0
3,,Gyeonggi-do,0,1,2.0,"(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",40.085979,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[1.9674302636322023, 18.032569736367794]","[0.09837151318161014, 0.9016284868183898]",1.0
4,,Gyeonggi-do,0,1,2.0,"(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",40.085979,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[1.9674302636322023, 18.032569736367794]","[0.09837151318161014, 0.9016284868183898]",1.0


In [None]:
y_val=val_pd['is_Dead']
y_pred= val_pd['prediction']

In [None]:
print(classification_report(y_val, y_pred))

              precision    recall  f1-score   support

           0       0.71      0.66      0.68       305
           1       0.85      0.88      0.87       694

    accuracy                           0.81       999
   macro avg       0.78      0.77      0.77       999
weighted avg       0.81      0.81      0.81       999

