# **Labs 1 and 2 PySpark:**

In these labs we will be using the "[[NeurIPS 2020] Data Science for COVID-19 (DS4C)](https://www.kaggle.com/datasets/kimjihoo/coronavirusdataset?select=PatientInfo.csv)" dataset, retrieved from [Kaggle](https://www.kaggle.com/) on 1/6/2022, for educational non commercial purpose, License
[CC BY-NC-SA 4.0
](https://creativecommons.org/licenses/by-nc-sa/4.0/)


The csv file that we will be using in this lab is **PatientInfo**.

## PatientInfo.csv

**patient_id**
the ID of the patient

**sex**
the sex of the patient

**age**
the age of the patient

**country**
the country of the patient

**province**
the province of the patient

**city**
the city of the patient

**infection_case**
the case of infection

**infected_by**
the ID of who infected the patient


**contact_number**
the number of contacts with people

**symptom_onset_date**
the date of symptom onset

**confirmed_date**
the date of being confirmed

**released_date**
the date of being released

**deceased_date**
the date of being deceased

**state**
isolated / released / deceased

### Import the pyspark and check it's version

In [1]:
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import *
print("Spark version:", pyspark.__version__)
from pyspark.ml.feature import StringIndexer, OneHotEncoder, Imputer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

Spark version: 3.2.1


### Import and create SparkSession

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Practical_Day_1_DF_SQL_ML").getOrCreate()

### Load the PatientInfo.csv file and show the first 5 rows

In [3]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [4]:
data = spark.read.csv("PatientInfo.csv", header=True, inferSchema=True)
data.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|         null|released|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|         null|released|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-gu|contact with patient| 2002000001|            17|              null|    2020-01-30|

### Display the schema of the dataset

In [5]:
data.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: string (nullable = true)
 |-- released_date: string (nullable = true)
 |-- deceased_date: string (nullable = true)
 |-- state: string (nullable = true)



### Display the statistical summary

In [6]:
data.describe().show()

+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+------------------+--------------+-------------+-------------+--------+
|summary|          patient_id|   sex| age|   country|province|          city|      infection_case|         infected_by|      contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+------------------+--------------+-------------+-------------+--------+
|  count|                5165|  4043|3785|      5165|    5165|          5071|                4246|                1346|                 791|               690|          5162|         1587|           66|    5165|
|   mean|2.8636345618679576E9|  null|null|      null|    null|          null|                null|2.2845944015643125E9|1.6772572523506988E7|            

### Using the state column.
### How many people survived (released), and how many didn't survive (isolated/deceased)?

In [7]:
data.groupBy("state").count().show()

+--------+-----+
|   state|count|
+--------+-----+
|isolated| 2158|
|released| 2929|
|deceased|   78|
+--------+-----+



### Display the number of null values in each column

In [8]:
data.select([count(when(col(c).isNull(), c)).alias(c) for c in data.columns]).show()

+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+
|patient_id| sex| age|country|province|city|infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|state|
+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+
|         0|1122|1380|      0|       0|  94|           919|       3819|          4374|              4475|             3|         3578|         5099|    0|
+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+



## Data preprocessing

### Fill the nulls in the deceased_date with the released_date. 
- You can use <b>coalesce</b> function

In [9]:
datacopy = data.withColumn("deceased_date", coalesce(data.deceased_date, data.released_date))

### Add a column named no_days which is difference between the deceased_date and the confirmed_date then show the top 5 rows. Print the schema.
- <b> Hint: You need to typecast these columns as date first <b>

In [10]:
sdata = datacopy.withColumn("no_days", datediff(datacopy.deceased_date, datacopy.confirmed_date).cast("int"))

### Add a is_male column if male then it should yield true, else then False

In [11]:
sdata.select(col("sex") == "male").show()

+------------+
|(sex = male)|
+------------+
|        true|
|        true|
|        true|
|        true|
|       false|
|       false|
|        true|
|        true|
|        true|
|       false|
|       false|
|        true|
|        true|
|       false|
|        true|
|        true|
|        true|
|        true|
|       false|
|       false|
+------------+
only showing top 20 rows



In [12]:
wismale = sdata.withColumn("is_male", col("sex") == "male")
wismale.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|is_male|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|   true|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|   true|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-gu|co

### Add a is_dead column if patient state is not released then it should yield true, else then False

- Use <b>UDF</b> to perform this task. 
- However, UDF is not recommended there is no built in function can do the required operation.
- UDF is slower than built in functions.

In [13]:
isdead = wismale.withColumn("is_dead", col("state") != "released")
isdead.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|is_male|is_dead|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|   true|  false|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|   true|  false|
|1000000003|  m

### Change the ages to bins from 10s, 0s, 10s, 20s,.etc to 0,10, 20

In [14]:
@udf
def age_bins(age):
    if age is None or age is None:
        return age
    age = age.split("s")[0]
    return age
afterages = isdead.withColumn("age", age_bins(isdead.age))
afterages.show(5)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|is_male|is_dead|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|1000000001|  male| 50|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|   true|  false|
|1000000002|  male| 30|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|   true|  false|
|1000000003|  male| 

### Change age, and no_days  to be typecasted as Double

In [15]:
datap = afterages.withColumn("age", col("age").cast("double")).withColumn("no_days", col("no_days").cast("double"))
datap.show(5)

+----------+------+----+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|patient_id|   sex| age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|is_male|is_dead|
+----------+------+----+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|1000000001|  male|50.0|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|   13.0|   true|  false|
|1000000002|  male|30.0|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|   2020-03-02|released|   32.0|   true|  false|
|1000000003|  m

### Drop the columns
["patient_id","sex","infected_by","contact_number","released_date","state",
"symptom_onset_date","confirmed_date","deceased_date","country","no_days",
"city","infection_case"]

In [16]:
ddata = datap.drop("patient_id", "sex", "infected_by", "contact_number","released_date","state","symptom_onset_date","confirmed_date","deceased_date","country","no_days", "city","infection_case")
ddata.show(5)

+----+--------+-------+-------+
| age|province|is_male|is_dead|
+----+--------+-------+-------+
|50.0|   Seoul|   true|  false|
|30.0|   Seoul|   true|  false|
|50.0|   Seoul|   true|  false|
|20.0|   Seoul|   true|  false|
|20.0|   Seoul|  false|  false|
+----+--------+-------+-------+
only showing top 5 rows



### Recount the number of nulls now

In [17]:
ddata.select([count(when(col(c).isNull(), c)).alias(c) for c in ddata.columns]).show()

+----+--------+-------+-------+
| age|province|is_male|is_dead|
+----+--------+-------+-------+
|1380|       0|   1122|      0|
+----+--------+-------+-------+



## Now do the same but using SQL select statement

### From the original Patient DataFrame, Create a temporary view (table).

In [18]:
SQLdata = data.createOrReplaceTempView("SQLdata")

### Use SELECT statement to select all columns from the dataframe and show the output.

In [19]:
Selectdata = spark.sql("SELECT * FROM SQLdata")

### *Using SQL commands*, limit the output to only 5 rows 

In [20]:
Selectdata.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|         null|released|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|         null|released|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-gu|contact with patient| 2002000001|            17|              null|    2020-01-30|

### Select the count of males and females in the dataset

In [21]:
spark.sql("SELECT sex, COUNT(patient_id)\
            from SQLdata\
            group by sex").show()

+------+-----------------+
|   sex|count(patient_id)|
+------+-----------------+
|  null|             1122|
|female|             2218|
|  male|             1825|
+------+-----------------+



### How many people did survive, and how many didn't?

In [22]:
spark.sql("SELECT state ,count(patient_id)\
            from SQLdata\
            group by 1").show()

+--------+-----------------+
|   state|count(patient_id)|
+--------+-----------------+
|isolated|             2158|
|released|             2929|
|deceased|               78|
+--------+-----------------+



### Now, let's perform some preprocessing using SQL:
1. Convert *age* column to double after removing the 's' at the end -- *hint: check SUBSTRING method*
2. Select only the following columns: `['sex', 'age', 'province', 'state']`
3. Store the result of the query in a new dataframe

In [23]:
ppdata = spark.sql("SELECT sex, SUBSTRING(age, 1, 2) as age, province, state\
            from SQLdata")
ppdata.show()

+------+---+--------+--------+
|   sex|age|province|   state|
+------+---+--------+--------+
|  male| 50|   Seoul|released|
|  male| 30|   Seoul|released|
|  male| 50|   Seoul|released|
|  male| 20|   Seoul|released|
|female| 20|   Seoul|released|
|female| 50|   Seoul|released|
|  male| 20|   Seoul|released|
|  male| 20|   Seoul|released|
|  male| 30|   Seoul|released|
|female| 60|   Seoul|released|
|female| 50|   Seoul|released|
|  male| 20|   Seoul|released|
|  male| 80|   Seoul|deceased|
|female| 60|   Seoul|released|
|  male| 70|   Seoul|released|
|  male| 70|   Seoul|released|
|  male| 70|   Seoul|released|
|  male| 20|   Seoul|released|
|female| 70|   Seoul|released|
|female| 70|   Seoul|released|
+------+---+--------+--------+
only showing top 20 rows



## Machine Learning 
### Create a pipeline model to predict is_dead and evaluate the performance.
- Use <b>StringIndexer</b> to transform <b>string</b> data type to indices.
- Use <b>OneHotEncoder</b> to deal with categorical values.
- Use <b>Imputer</b> to fill missing data with mean.

In [24]:
@udf
def boolean_to_int(boolean_):
    if boolean_ is None:
        return 0
    if boolean_ is True:
        return 1
    else:
        return 0


In [25]:
MLdata = ddata.withColumn("is_male", boolean_to_int(ddata.is_male).cast("int"))
MLdata1 = MLdata.withColumn("is_dead", boolean_to_int(MLdata.is_dead).cast("int"))
MLdata1.dtypes

[('age', 'double'),
 ('province', 'string'),
 ('is_male', 'int'),
 ('is_dead', 'int')]

In [26]:
MLdata1.select([count(when(col(c).isNull(), c)).alias(c) for c in MLdata1.columns]).show()

+----+--------+-------+-------+
| age|province|is_male|is_dead|
+----+--------+-------+-------+
|1380|       0|      0|      0|
+----+--------+-------+-------+



In [27]:
si = StringIndexer(inputCol="province", outputCol="province_index")
ohe = OneHotEncoder(inputCol="is_male", outputCol="is_male_vec")
imputer = Imputer(inputCol= "age", outputCol="age" )
features = ["age","province_index", "is_male_vec"]
va = VectorAssembler(inputCols=features, outputCol="vec_features")
lr = LinearRegression(featuresCol="vec_features", labelCol="is_dead")
dt = RandomForestClassifier(featuresCol="vec_features", labelCol="is_dead",numTrees=5, maxDepth=5)  

In [28]:
training, testing = MLdata1.randomSplit([0.8, 0.2])

In [30]:
pipe = Pipeline(stages=[si, ohe, imputer,va, lr])
spipe = Pipeline(stages=[si, ohe, imputer,va, dt])
model = pipe.fit(training)
smodel = spipe.fit(testing)
fmpred= model.transform(testing)
smpred= smodel.transform(testing)
fmpred.describe().show()
smpred.describe().show()

+-------+------------------+--------+-------------------+-------------------+-----------------+-------------------+
|summary|               age|province|            is_male|            is_dead|   province_index|         prediction|
+-------+------------------+--------+-------------------+-------------------+-----------------+-------------------+
|  count|              1049|    1049|               1049|               1049|             1049|               1049|
|   mean| 40.36162094317742|    null|0.34604385128693993| 0.4470924690181125|2.470924690181125|0.42975442983248285|
| stddev|16.992454886025808|    null| 0.4759342780327213|0.49743006723039307|3.124234746462307|0.04820052563240537|
|    min|               0.0|   Busan|                  0|                  0|              0.0|0.29816930221785304|
|    max|              90.0|   Ulsan|                  1|                  1|             16.0| 0.5777087400804028|
+-------+------------------+--------+-------------------+---------------

In [31]:
ev= RegressionEvaluator(predictionCol="prediction", labelCol="is_dead", metricName="rmse")
sev = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="is_dead")
ev.evaluate(fmpred)

0.492833009909779

In [32]:
sev.evaluate(smpred)

0.854372840232336