# **Labs 1 and 2 PySpark:**

In these labs we will be using the "[[NeurIPS 2020] Data Science for COVID-19 (DS4C)](https://www.kaggle.com/datasets/kimjihoo/coronavirusdataset?select=PatientInfo.csv)" dataset, retrieved from [Kaggle](https://www.kaggle.com/) on 1/6/2022, for educational non commercial purpose, License
[CC BY-NC-SA 4.0
](https://creativecommons.org/licenses/by-nc-sa/4.0/)


The csv file that we will be using in this lab is **PatientInfo**.

## PatientInfo.csv

**patient_id**
the ID of the patient

**sex**
the sex of the patient

**age**
the age of the patient

**country**
the country of the patient

**province**
the province of the patient

**city**
the city of the patient

**infection_case**
the case of infection

**infected_by**
the ID of who infected the patient


**contact_number**
the number of contacts with people

**symptom_onset_date**
the date of symptom onset

**confirmed_date**
the date of being confirmed

**released_date**
the date of being released

**deceased_date**
the date of being deceased

**state**
isolated / released / deceased

### Import the pyspark and check it's version

In [1]:
import findspark 
import numpy 
import pandas
findspark.init()
import pyspark
from  pyspark.sql import SparkSession 
import pyspark.sql.functions as F # for aggregation funs || min ,max ...

### Import and create SparkSession

In [2]:
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [3]:
spark

### Load the PatientInfo.csv file and show the first 5 rows

In [4]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

### Display the schema of the dataset

In [5]:
data = spark.read.format("csv").\
option("inferschema","true").\
option("header" ,"true").\
load("PatientInfo.csv")

In [6]:
data.show(3)

+----------+----+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id| sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+----+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|         null|released|
|1000000002|male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|         null|released|
|1000000003|male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|              null|    2020-01-30|   2020-02-19|    

In [7]:
data.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: date (nullable = true)
 |-- released_date: date (nullable = true)
 |-- deceased_date: date (nullable = true)
 |-- state: string (nullable = true)



### Display the statistical summary

In [8]:
data.describe().show()

+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+------------------+--------+
|summary|          patient_id|   sex| age|   country|province|          city|      infection_case|         infected_by|      contact_number|symptom_onset_date|   state|
+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+------------------+--------+
|  count|                5165|  4043|3785|      5165|    5165|          5071|                4246|                1346|                 791|               690|    5165|
|   mean|2.8636345618679576E9|  null|null|      null|    null|          null|                null|2.2845944015643125E9|1.6772572523506988E7|              null|    null|
| stddev| 2.074210725277473E9|  null|null|      null|    null|          null|                null|1.5265072953383324E9| 3.093097580985502E8|              n

### Using the state column.
### How many people survived (released), and how many didn't survive (isolated/deceased)?

In [9]:
data.groupBy("state").count().show()

+--------+-----+
|   state|count|
+--------+-----+
|isolated| 2158|
|released| 2929|
|deceased|   78|
+--------+-----+



### Display the number of null values in each column

In [10]:
from pyspark.sql.functions import col,count ,sum , isnan
#non_null_counts = [count(col(c)).alias(c) for c in data.columns]
null_counts = [sum(col(c).isNull().cast("integer")).alias(c) for c in data.columns]
data.agg(*null_counts).show()


+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+
|patient_id| sex| age|country|province|city|infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|state|
+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+
|         0|1122|1380|      0|       0|  94|           919|       3819|          4374|              4475|             3|         3578|         5099|    0|
+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+



## Data preprocessing

### Fill the nulls in the deceased_date with the released_date. 
- You can use <b>coalesce</b> function

In [11]:
from pyspark.sql.functions import coalesce 
new_data = data.withColumn("deceased_date", coalesce(col("deceased_date"), col("released_date")))
null_counts = [sum(col(c).isNull().cast("integer")).alias(c) for c in new_data.columns]
new_data.agg(*null_counts).show()


+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+
|patient_id| sex| age|country|province|city|infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|state|
+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+
|         0|1122|1380|      0|       0|  94|           919|       3819|          4374|              4475|             3|         3578|         3514|    0|
+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+



### Add a column named no_days which is difference between the deceased_date and the confirmed_date then show the top 5 rows. Print the schema.
- <b> Hint: You need to typecast these columns as date first <b>

In [12]:
from pyspark.sql.functions import datediff
new_data_2 = new_data.withColumn("no_days", datediff(col("deceased_date"), col("confirmed_date")))
new_data_2.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: date (nullable = true)
 |-- released_date: date (nullable = true)
 |-- deceased_date: date (nullable = true)
 |-- state: string (nullable = true)
 |-- no_days: integer (nullable = true)



In [13]:
new_data_2.select("deceased_date","confirmed_date","no_days").show(5)

+-------------+--------------+-------+
|deceased_date|confirmed_date|no_days|
+-------------+--------------+-------+
|   2020-02-05|    2020-01-23|     13|
|   2020-03-02|    2020-01-30|     32|
|   2020-02-19|    2020-01-30|     20|
|   2020-02-15|    2020-01-30|     16|
|   2020-02-24|    2020-01-31|     24|
+-------------+--------------+-------+
only showing top 5 rows



### Add a is_male column if male then it should yield true, else then False

In [14]:
from pyspark.sql.functions import when 
new_data_3 = new_data_2.withColumn("is_male", when(col("sex") == "male", True).otherwise(False))


In [15]:
new_data_3.select("sex","is_male").show(5)

+------+-------+
|   sex|is_male|
+------+-------+
|  male|   true|
|  male|   true|
|  male|   true|
|  male|   true|
|female|  false|
+------+-------+
only showing top 5 rows



### Add a is_dead column if patient state is not released then it should yield true, else then False

- Use <b>UDF</b> to perform this task. 
- However, UDF is not recommended there is no built in function can do the required operation.
- UDF is slower than built in functions.

In [16]:
new_data_4 = new_data_3.withColumn("is_dead", when(col("state") == "released", True).otherwise(False))

In [17]:
new_data_4.select("state","is_dead").show()

+--------+-------+
|   state|is_dead|
+--------+-------+
|released|   true|
|released|   true|
|released|   true|
|released|   true|
|released|   true|
|released|   true|
|released|   true|
|released|   true|
|released|   true|
|released|   true|
|released|   true|
|released|   true|
|deceased|  false|
|released|   true|
|released|   true|
|released|   true|
|released|   true|
|released|   true|
|released|   true|
|released|   true|
+--------+-------+
only showing top 20 rows



### Change the ages to bins from 10s, 0s, 10s, 20s,.etc to 0,10, 20

In [18]:
new_data_4.select("age").show(5)

+---+
|age|
+---+
|50s|
|30s|
|50s|
|20s|
|20s|
+---+
only showing top 5 rows



In [19]:
from pyspark.sql.functions import regexp_replace

new_data_5 = new_data_4.withColumn("age", regexp_replace(col("age"), "s", ""))

In [20]:
new_data_5.select("age").show(5)

+---+
|age|
+---+
| 50|
| 30|
| 50|
| 20|
| 20|
+---+
only showing top 5 rows



### Change age, and no_days  to be typecasted as Double

In [21]:
new_data_5.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: date (nullable = true)
 |-- released_date: date (nullable = true)
 |-- deceased_date: date (nullable = true)
 |-- state: string (nullable = true)
 |-- no_days: integer (nullable = true)
 |-- is_male: boolean (nullable = false)
 |-- is_dead: boolean (nullable = false)



In [22]:
new_data_6 = new_data_5.withColumn("age", col("age").cast("double"))
new_data_6 = new_data_6.withColumn("no_days", col("no_days").cast("double"))

In [23]:
new_data_6.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: double (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: date (nullable = true)
 |-- released_date: date (nullable = true)
 |-- deceased_date: date (nullable = true)
 |-- state: string (nullable = true)
 |-- no_days: double (nullable = true)
 |-- is_male: boolean (nullable = false)
 |-- is_dead: boolean (nullable = false)



### Drop the columns
["patient_id","sex","infected_by","contact_number","released_date","state",
"symptom_onset_date","confirmed_date","deceased_date","country","no_days",
"city","infection_case"]

In [24]:
last_data = new_data_6.drop("patient_id","sex","infected_by","contact_number","released_date","state",
"symptom_onset_date","confirmed_date","deceased_date","country","no_days",
"city","infection_case"
)

In [25]:
last_data.printSchema()

root
 |-- age: double (nullable = true)
 |-- province: string (nullable = true)
 |-- is_male: boolean (nullable = false)
 |-- is_dead: boolean (nullable = false)



### Recount the number of nulls now

In [26]:
null_counts = [sum(col(c).isNull().cast("integer")).alias(c) for c in last_data.columns]
last_data.agg(*null_counts).show()

+----+--------+-------+-------+
| age|province|is_male|is_dead|
+----+--------+-------+-------+
|1380|       0|      0|      0|
+----+--------+-------+-------+



## Now do the same but using SQL select statement

### From the original Patient DataFrame, Create a temporary view (table).

In [27]:
data_sql=spark.read.csv('PatientInfo.csv',header=True,inferSchema=True)
data_sql.createOrReplaceTempView("patient_info")

### Use SELECT statement to select all columns from the dataframe and show the output.

In [28]:
spark.sql("select * from patient_info ").show()


+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|         null|released|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|         null|released|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-gu|contact with patient| 2002000001|            17|              null|    2020-01-30|

### *Using SQL commands*, limit the output to only 5 rows 

In [29]:
spark.sql('''select * FROM patient_info 
LIMIT 5''').show()

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|         null|released|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|         null|released|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|              null|    2020-01-30|   202

### Select the count of males and females in the dataset

In [30]:
spark.sql('''SELECT sex, COUNT(*) AS count 
           FROM patient_info 
           GROUP BY sex''').show()

+------+-----+
|   sex|count|
+------+-----+
|  null| 1122|
|female| 2218|
|  male| 1825|
+------+-----+



### How many people did survive, and how many didn't?

In [31]:
spark.sql('''SELECT state, count(*) AS count 
           FROM patient_info 
           GROUP BY state''').show()

+--------+-----+
|   state|count|
+--------+-----+
|isolated| 2158|
|released| 2929|
|deceased|   78|
+--------+-----+



### Now, let's perform some preprocessing using SQL:
1. Convert *age* column to double after removing the 's' at the end -- *hint: check SUBSTRING method*
2. Select only the following columns: `['sex', 'age', 'province', 'state']`
3. Store the result of the query in a new dataframe

In [32]:
spark.sql('''SELECT *, CAST(SUBSTRING(age, 1, LENGTH(age) - 1) AS double) AS age_double 
                     FROM patient_info 
                     WHERE age LIKE '%s' ''').withColumn("age", col("age_double")).drop("age_double").show(5)

+----------+------+----+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex| age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+----+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50.0|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|         null|released|
|1000000002|  male|30.0|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|         null|released|
|1000000003|  male|50.0|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|              null|    2020-01-30|

In [33]:
spark.sql('''SELECT  sex, CAST(SUBSTRING(age, 1, LENGTH(age) - 1) AS double) as age, province, state 
                      FROM patient_info''').show(5)

+------+----+--------+--------+
|   sex| age|province|   state|
+------+----+--------+--------+
|  male|50.0|   Seoul|released|
|  male|30.0|   Seoul|released|
|  male|50.0|   Seoul|released|
|  male|20.0|   Seoul|released|
|female|20.0|   Seoul|released|
+------+----+--------+--------+
only showing top 5 rows



## Machine Learning 
### Create a pipeline model to predict is_dead and evaluate the performance.
- Use <b>StringIndexer</b> to transform <b>string</b> data type to indices.
- Use <b>OneHotEncoder</b> to deal with categorical values.
- Use <b>Imputer</b> to fill missing data with mean.

In [34]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, Imputer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator,MulticlassClassificationEvaluator


In [35]:
last_data = last_data.withColumn("is_male",col("is_male").cast("integer"))

In [36]:
last_data = last_data.withColumn("is_dead",col("is_dead").cast("integer"))

In [37]:
data_types = last_data.dtypes
data_types

[('age', 'double'),
 ('province', 'string'),
 ('is_male', 'int'),
 ('is_dead', 'int')]

In [38]:
last_data.show()

+----+--------+-------+-------+
| age|province|is_male|is_dead|
+----+--------+-------+-------+
|50.0|   Seoul|      1|      1|
|30.0|   Seoul|      1|      1|
|50.0|   Seoul|      1|      1|
|20.0|   Seoul|      1|      1|
|20.0|   Seoul|      0|      1|
|50.0|   Seoul|      0|      1|
|20.0|   Seoul|      1|      1|
|20.0|   Seoul|      1|      1|
|30.0|   Seoul|      1|      1|
|60.0|   Seoul|      0|      1|
|50.0|   Seoul|      0|      1|
|20.0|   Seoul|      1|      1|
|80.0|   Seoul|      1|      0|
|60.0|   Seoul|      0|      1|
|70.0|   Seoul|      1|      1|
|70.0|   Seoul|      1|      1|
|70.0|   Seoul|      1|      1|
|20.0|   Seoul|      1|      1|
|70.0|   Seoul|      0|      1|
|70.0|   Seoul|      0|      1|
+----+--------+-------+-------+
only showing top 20 rows



In [39]:
cat_col = [v for (v,t) in data_types if (t != "double") & (v != "is_dead")  ]
cat_col

['province', 'is_male']

In [40]:
cat_col_indx = [s+'_index' for s in cat_col]
cat_col_indx

['province_index', 'is_male_index']

In [41]:
cat_col_OHE = [s+'_OHE' for s in cat_col]
cat_col_OHE

['province_OHE', 'is_male_OHE']

In [42]:
num_col = [v for (v,t) in data_types if (t == "double") & (v != "is_dead") ]
num_col

['age']

In [43]:
AllDataCol = num_col + cat_col_OHE
AllDataCol

['age', 'province_OHE', 'is_male_OHE']

In [44]:
stringIndexer = StringIndexer(inputCols=["province"], outputCols=["provinceIndex"])
oneHotEncoder = OneHotEncoder(inputCols=["provinceIndex"], outputCols=["province_OHE"])
imputer = Imputer(inputCols=["age"], outputCols=["age_imputed"], strategy="mean")
assembler = VectorAssembler(inputCols=["province_OHE", "is_male", "age_imputed"], outputCol="features")

lgr = LogisticRegression(featuresCol='features',labelCol='is_dead',predictionCol='prediction')

In [45]:
trainDF,testDf = last_data.randomSplit([0.8,0.2],seed=42)

In [46]:
pl = Pipeline(stages=[stringIndexer, oneHotEncoder, imputer, assembler, lgr])


In [47]:
plModel = pl.fit(trainDF)

### EVALUATION 

In [48]:
train_eval = MulticlassClassificationEvaluator(labelCol="is_dead", predictionCol="prediction")
predictionAndTarget_train = plModel.transform(trainDF).select("is_dead", "prediction")
test_eval = MulticlassClassificationEvaluator(labelCol="is_dead", predictionCol="prediction")
predictionAndTarget_test= plModel.transform(testDf).select("is_dead", "prediction")


In [49]:
acc_train = train_eval.evaluate(predictionAndTarget_train, {train_eval.metricName: "accuracy"})
acc_test = test_eval.evaluate(predictionAndTarget_test, {test_eval.metricName: "accuracy"})


In [50]:
print("Training Acc:", acc_train)
print("Test Acc:", acc_test)

Training Acc: 0.8377340374459914
Test Acc: 0.8478478478478478


In [51]:
pred_test = plModel.transform(testDf)
test_evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="is_dead")
test_auc = test_evaluator.evaluate(pred_test)

In [52]:
pred_train = plModel.transform(trainDF)
train_evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="is_dead")
train_auc = train_evaluator.evaluate(pred_train)






In [53]:
print("Training Auc:", train_auc)
print("Test Auc:", test_auc)

Training Auc: 0.8901646562766864
Test Auc: 0.8867182977456541
