# **Labs 1 and 2 PySpark:**

In these labs we will be using the "[[NeurIPS 2020] Data Science for COVID-19 (DS4C)](https://www.kaggle.com/datasets/kimjihoo/coronavirusdataset?select=PatientInfo.csv)" dataset, retrieved from [Kaggle](https://www.kaggle.com/) on 1/6/2022, for educational non commercial purpose, License
[CC BY-NC-SA 4.0
](https://creativecommons.org/licenses/by-nc-sa/4.0/)


The csv file that we will be using in this lab is **PatientInfo**.

## PatientInfo.csv

**patient_id**
the ID of the patient

**sex**
the sex of the patient

**age**
the age of the patient

**country**
the country of the patient

**province**
the province of the patient

**city**
the city of the patient

**infection_case**
the case of infection

**infected_by**
the ID of who infected the patient


**contact_number**
the number of contacts with people

**symptom_onset_date**
the date of symptom onset

**confirmed_date**
the date of being confirmed

**released_date**
the date of being released

**deceased_date**
the date of being deceased

**state**
isolated / released / deceased

### Import the pyspark and check it's version

In [1]:
from pyspark.sql import SparkSession

### Import and create SparkSession

In [2]:
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/31 14:59:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Load the PatientInfo.csv file and show the first 5 rows

In [3]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [4]:
df = spark.read.csv("/kaggle/input/patiens/PatientInfo.csv", header=True, inferSchema=True)

### Display the schema of the dataset

In [5]:
df.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: date (nullable = true)
 |-- released_date: date (nullable = true)
 |-- deceased_date: date (nullable = true)
 |-- state: string (nullable = true)



### Display the statistical summary

In [6]:
df.describe().show()

25/05/31 14:59:38 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 2:>                                                                              (0 + 1) / 1]

+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+------------------+--------+
|summary|          patient_id|   sex| age|   country|province|          city|      infection_case|         infected_by|      contact_number|symptom_onset_date|   state|
+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+------------------+--------+
|  count|                5165|  4043|3785|      5165|    5165|          5071|                4246|                1346|                 791|               690|    5165|
|   mean|2.8636345618679576E9|  NULL|NULL|      NULL|    NULL|          NULL|                NULL|2.2845944015643125E9|1.6772572523506988E7|              NULL|    NULL|
| stddev| 2.074210725277473E9|  NULL|NULL|      NULL|    NULL|          NULL|                NULL|1.5265072953383324E9| 3.093097580985502E8|              N

                                                                                                    

In [7]:
df.summary().show()

[Stage 5:>                                                                              (0 + 1) / 1]

+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+------------------+--------+
|summary|          patient_id|   sex| age|   country|province|          city|      infection_case|         infected_by|      contact_number|symptom_onset_date|   state|
+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+------------------+--------+
|  count|                5165|  4043|3785|      5165|    5165|          5071|                4246|                1346|                 791|               690|    5165|
|   mean|2.8636345618679576E9|  NULL|NULL|      NULL|    NULL|          NULL|                NULL|2.2845944015643125E9|1.6772572523506988E7|              NULL|    NULL|
| stddev| 2.074210725277473E9|  NULL|NULL|      NULL|    NULL|          NULL|                NULL|1.5265072953383324E9| 3.093097580985502E8|              N

                                                                                                    

### Using the state column.
### How many people survived (released), and how many didn't survive (isolated/deceased)?

In [8]:
from pyspark.sql.functions import col
df.filter(col("state") == "released").count()

2929

In [9]:
df.filter(col("state").isin("isolated", "deceased")).count()

2236

### Display the number of null values in each column

In [10]:
import pyspark.sql.functions as F

null_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns])
null_counts.show()

+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+
|patient_id| sex| age|country|province|city|infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|state|
+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+
|         0|1122|1380|      0|       0|  94|           919|       3819|          4374|              4475|             3|         3578|         5099|    0|
+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+



## Data preprocessing

### Fill the nulls in the deceased_date with the released_date. 
- You can use <b>coalesce</b> function

In [11]:
df_processed_1 = df.withColumn("deceased_date", F.coalesce("deceased_date", "released_date"))

In [12]:
df_processed_1.select("deceased_date", "released_date").show(10)

+-------------+-------------+
|deceased_date|released_date|
+-------------+-------------+
|   2020-02-05|   2020-02-05|
|   2020-03-02|   2020-03-02|
|   2020-02-19|   2020-02-19|
|   2020-02-15|   2020-02-15|
|   2020-02-24|   2020-02-24|
|   2020-02-19|   2020-02-19|
|   2020-02-10|   2020-02-10|
|   2020-02-24|   2020-02-24|
|   2020-02-21|   2020-02-21|
|   2020-02-29|   2020-02-29|
+-------------+-------------+
only showing top 10 rows



### Add a column named no_days which is difference between the deceased_date and the confirmed_date then show the top 5 rows. Print the schema.
- <b> Hint: You need to typecast these columns as date first <b>

In [13]:
df_processed_2 = df_processed_1.withColumn("deceased_date", F.to_date("deceased_date", "yyyy-MM-dd"))
df_processed_3 = df_processed_2.withColumn("confirmed_date", F.to_date("confirmed_date", "yyyy-MM-dd"))

df_processed_4 = df_processed_3.withColumn("no_days", F.datediff("deceased_date", "confirmed_date"))

In [14]:
df_processed_4.select("confirmed_date", "deceased_date", "no_days").show(5)

+--------------+-------------+-------+
|confirmed_date|deceased_date|no_days|
+--------------+-------------+-------+
|    2020-01-23|   2020-02-05|     13|
|    2020-01-30|   2020-03-02|     32|
|    2020-01-30|   2020-02-19|     20|
|    2020-01-30|   2020-02-15|     16|
|    2020-01-31|   2020-02-24|     24|
+--------------+-------------+-------+
only showing top 5 rows



### Add a is_male column if male then it should yield true, else then False

In [15]:
df_processed_5 = df_processed_4.withColumn("is_male", df_processed_4.sex == "male")

### Add a is_dead column if patient state is not released then it should yield true, else then False

- Use <b>UDF</b> to perform this task. 
- However, UDF is not recommended there is no built in function can do the required operation.
- UDF is slower than built in functions.

In [16]:
from pyspark.sql.types import BooleanType

def check_is_dead(state):
    return state != "released"

In [17]:
is_dead_udf = F.udf(check_is_dead, BooleanType())

df_processed_not_used = df_processed_5.withColumn("is_dead", is_dead_udf(df_processed_5.state))

### Another Way faster

In [18]:
df_processed_6 = df_processed_5.withColumn("is_dead", F.when(df_processed_5.sex == "released", False).otherwise(True))

### Change the ages to bins from 10s, 0s, 10s, 20s,.etc to 0,10, 20

In [19]:
df_processed_7 = df_processed_6.withColumn("age", F.regexp_replace("age", "s", "").cast("int"))

### Change age, and no_days  to be typecasted as Double

In [20]:
df_processed_8 = df_processed_7.withColumn("age", F.col("age").cast("double")) \
       .withColumn("no_days", F.col("no_days").cast("double"))

### Drop the columns
["patient_id","sex","infected_by","contact_number","released_date","state",
"symptom_onset_date","confirmed_date","deceased_date","country","no_days",
"city","infection_case"]

In [21]:
columns_to_drop = ["patient_id", "sex", "infected_by", "contact_number", "released_date",
                   "state", "symptom_onset_date", "confirmed_date", "deceased_date",
                   "country", "no_days", "city", "infection_case"]

df_processed_9 = df_processed_8.drop(*columns_to_drop)

### Recount the number of nulls now

In [22]:
null_counts_2 = df_processed_9.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_processed_9.columns])
null_counts_2.show()

+----+--------+-------+-------+
| age|province|is_male|is_dead|
+----+--------+-------+-------+
|1380|       0|   1122|      0|
+----+--------+-------+-------+



## Now do the same but using SQL select statement

### From the original Patient DataFrame, Create a temporary view (table).

In [23]:
df.createOrReplaceTempView("patient_table")

### Use SELECT statement to select all columns from the dataframe and show the output.

In [24]:
spark.sql("SELECT * FROM patient_table").show(5)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|         NULL|released|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|         NULL|released|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|              NULL|    2020-01-30|   202

### *Using SQL commands*, limit the output to only 5 rows 

In [25]:
spark.sql("SELECT * FROM patient_table LIMIT 5").show()

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|         NULL|released|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|         NULL|released|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|              NULL|    2020-01-30|   202

### Select the count of males and females in the dataset

In [26]:
query = """
SELECT sex, COUNT(*) AS count
FROM patient_table
GROUP BY sex
"""
spark.sql(query).show()

+------+-----+
|   sex|count|
+------+-----+
|  NULL| 1122|
|female| 2218|
|  male| 1825|
+------+-----+



### How many people did survive, and how many didn't?

In [27]:
query = """
SELECT
  CASE WHEN state = 'released' THEN 'survived' ELSE 'not_survived' END AS survival_status,
  COUNT(*) AS count
FROM patient_table
GROUP BY survival_status
"""
spark.sql(query).show()

+---------------+-----+
|survival_status|count|
+---------------+-----+
|   not_survived| 2236|
|       survived| 2929|
+---------------+-----+



### Now, let's perform some preprocessing using SQL:
1. Convert *age* column to double after removing the 's' at the end -- *hint: check SUBSTRING method*
2. Select only the following columns: `['sex', 'age', 'province', 'state']`
3. Store the result of the query in a new dataframe

In [28]:
query = """
SELECT sex,CAST(SUBSTRING(age, 1, LENGTH(age) - 1) AS DOUBLE) AS age,province,state    
FROM patient_table
"""
### col , start_position , number_of_characters

In [29]:
df_preprocessed = spark.sql(query)
df_preprocessed.show(5)

+------+----+--------+--------+
|   sex| age|province|   state|
+------+----+--------+--------+
|  male|50.0|   Seoul|released|
|  male|30.0|   Seoul|released|
|  male|50.0|   Seoul|released|
|  male|20.0|   Seoul|released|
|female|20.0|   Seoul|released|
+------+----+--------+--------+
only showing top 5 rows



In [30]:
df_preprocessed.printSchema()

root
 |-- sex: string (nullable = true)
 |-- age: double (nullable = true)
 |-- province: string (nullable = true)
 |-- state: string (nullable = true)



## Machine Learning 
### Create a pipeline model to predict is_dead and evaluate the performance.
- Use <b>StringIndexer</b> to transform <b>string</b> data type to indices.
- Use <b>OneHotEncoder</b> to deal with categorical values.
- Use <b>Imputer</b> to fill missing data with mean.

In [31]:
df_preprocessed_1 = df_preprocessed.withColumn("is_dead",F.when((col("state").isin("isolated", "deceased")), 1).otherwise(0))

In [32]:
df_preprocessed_final = df_preprocessed_1.drop("state")
df_preprocessed_final.show(5)

+------+----+--------+-------+
|   sex| age|province|is_dead|
+------+----+--------+-------+
|  male|50.0|   Seoul|      0|
|  male|30.0|   Seoul|      0|
|  male|50.0|   Seoul|      0|
|  male|20.0|   Seoul|      0|
|female|20.0|   Seoul|      0|
+------+----+--------+-------+
only showing top 5 rows



In [33]:
df_preprocessed_final.count()

5165

In [34]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, Imputer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [35]:
traindf,testdf = df_preprocessed_final.randomSplit([0.8, 0.2], seed=42)

In [36]:
null_counts_after_sql = df_preprocessed_final.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_preprocessed_final.columns])
null_counts_after_sql.show()

+----+----+--------+-------+
| sex| age|province|is_dead|
+----+----+--------+-------+
|1122|1380|       0|      0|
+----+----+--------+-------+



In [37]:
traindf.dtypes

[('sex', 'string'),
 ('age', 'double'),
 ('province', 'string'),
 ('is_dead', 'int')]

In [38]:
cat_cols = [field for field, dtype in traindf.dtypes if dtype == 'string']

In [39]:
numeric_cols = [field for field, dtype in traindf.dtypes if ((dtype =='double')&(field !='is_dead'))]

In [40]:
indexed_cols = [col + '_index' for col in cat_cols]
OHE_cols = [col + '_OHE' for col in cat_cols]
Imputed =[col + "_imputed" for col in numeric_cols]

In [41]:
strind = StringIndexer(inputCols=cat_cols, outputCols=indexed_cols,handleInvalid='skip')
ohe = OneHotEncoder(inputCols=indexed_cols, outputCols=OHE_cols)
imputer = Imputer(inputCols=numeric_cols, outputCols=Imputed)
assembler = VectorAssembler(inputCols=Imputed + OHE_cols, outputCol='features')

In [42]:
classifier = LogisticRegression(featuresCol='features', labelCol='is_dead')
pl = Pipeline(stages=[imputer , strind, ohe, assembler, classifier])

In [43]:
pl_model = pl.fit(traindf)

In [44]:
pred_train = pl_model.transform(traindf)

In [45]:
pred = pl_model.transform(testdf)

In [46]:
evaluator = BinaryClassificationEvaluator(labelCol="is_dead", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

In [47]:
auc_train = evaluator.evaluate(pred_train)
auc_test = evaluator.evaluate(pred)

In [48]:
print(f"auc_train : {auc_train}, auc_test : {auc_test} ")

auc_train : 0.9241543712514069, auc_test : 0.9284758128469465 


In [49]:
preds_LR = pred.select(F.col("prediction").cast("int"), F.col("is_dead").cast("int"))

# Confusion matrix components
TP = preds_LR.filter((col("prediction") == 1) & (col("is_dead") == 1)).count()
TN = preds_LR.filter((col("prediction") == 0) & (col("is_dead") == 0)).count()
FP = preds_LR.filter((col("prediction") == 1) & (col("is_dead") == 0)).count()
FN = preds_LR.filter((col("prediction") == 0) & (col("is_dead") == 1)).count()

# Metrics
accuracy = (TP + TN) / (TP + TN + FP + FN)
precision = TP / (TP + FP) if (TP + FP) != 0 else 0
recall = TP / (TP + FN) if (TP + FN) != 0 else 0
f1_score = 2 * precision * recall / (precision + recall) if (precision + recall) != 0 else 0

print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1_score:.4f}")


Accuracy: 0.8877
Precision: 0.8900
Recall: 0.8215
F1 Score: 0.8544


### Support vector Machine

In [50]:
from pyspark.ml.classification import LinearSVC

In [51]:
SVC = LinearSVC(featuresCol='features', labelCol='is_dead')
pl_SVC = Pipeline(stages=[imputer , strind, ohe, assembler, SVC])

In [52]:
pl_model_SVC = pl_SVC.fit(traindf)
pred_train_SVC = pl_model_SVC.transform(traindf)
pred_SVC = pl_model_SVC.transform(testdf)

In [53]:
evaluator_SVC = BinaryClassificationEvaluator(labelCol="is_dead", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

In [54]:
auc_train_SVC = evaluator_SVC.evaluate(pred_train_SVC)
auc_test_SVC = evaluator_SVC.evaluate(pred_SVC)

In [55]:
print(f"auc_train : {auc_train_SVC}, auc_test : {auc_test_SVC} ")

auc_train : 0.9075328418987797, auc_test : 0.9117335448057102 


In [56]:
preds_SVC = pred_SVC.select(F.col("prediction").cast("int"), F.col("is_dead").cast("int"))

# Confusion matrix components
TP_SVC = preds_SVC.filter((col("prediction") == 1) & (col("is_dead") == 1)).count()
TN_SVC = preds_SVC.filter((col("prediction") == 0) & (col("is_dead") == 0)).count()
FP_SVC = preds_SVC.filter((col("prediction") == 1) & (col("is_dead") == 0)).count()
FN_SVC = preds_SVC.filter((col("prediction") == 0) & (col("is_dead") == 1)).count()

# Metrics
accuracy_SVC = (TP_SVC + TN_SVC) / (TP_SVC + TN_SVC + FP_SVC + FN_SVC)
precision_SVC = TP_SVC / (TP_SVC + FP_SVC) if (TP_SVC + FP_SVC) != 0 else 0
recall_SVC = TP_SVC / (TP_SVC + FN_SVC) if (TP_SVC + FN_SVC) != 0 else 0
f1_score_SVC = 2 * precision_SVC * recall_SVC / (precision_SVC + recall_SVC) if (precision_SVC + recall_SVC) != 0 else 0

print(f"Accuracy: {accuracy_SVC:.4f}")
print(f"Precision: {precision_SVC:.4f}")
print(f"Recall: {recall_SVC:.4f}")
print(f"F1 Score: {f1_score_SVC:.4f}")


Accuracy: 0.8827
Precision: 0.8734
Recall: 0.8277
F1 Score: 0.8499
