# **Labs 1 and 2 PySpark:**

In these labs we will be using the "[[NeurIPS 2020] Data Science for COVID-19 (DS4C)](https://www.kaggle.com/datasets/kimjihoo/coronavirusdataset?select=PatientInfo.csv)" dataset, retrieved from [Kaggle](https://www.kaggle.com/) on 1/6/2022, for educational non commercial purpose, License
[CC BY-NC-SA 4.0
](https://creativecommons.org/licenses/by-nc-sa/4.0/)


The csv file that we will be using in this lab is **PatientInfo**.

## PatientInfo.csv

**patient_id**
the ID of the patient

**sex**
the sex of the patient

**age**
the age of the patient

**country**
the country of the patient

**province**
the province of the patient

**city**
the city of the patient

**infection_case**
the case of infection

**infected_by**
the ID of who infected the patient


**contact_number**
the number of contacts with people

**symptom_onset_date**
the date of symptom onset

**confirmed_date**
the date of being confirmed

**released_date**
the date of being released

**deceased_date**
the date of being deceased

**state**
isolated / released / deceased

### Import the pyspark and check it's version

In [2]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=36f14b7ad65c616b46e3a3e6b5798c3e6e6aa504ffe3be6646de85be7c56c701
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


### Import and create SparkSession

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

### Load the PatientInfo.csv file and show the first 5 rows

In [4]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [6]:
df = spark.read.csv("/content/PatientInfo.csv"
                          ,inferSchema=True
                          , header=True)

In [16]:
df.show(5)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|         NULL|released|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|         NULL|released|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|              NULL|    2020-01-30|   202

### Display the schema of the dataset

Examine your data to see what the columns look like. Are they of the correct types? Do any of them need to be converted to different types? Do they have null values?

In [11]:
df.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: date (nullable = true)
 |-- released_date: date (nullable = true)
 |-- deceased_date: date (nullable = true)
 |-- state: string (nullable = true)



### Display the statistical summary

In [15]:
df_sum = df.summary()
df_sum.show()

+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+------------------+--------+
|summary|          patient_id|   sex| age|   country|province|          city|      infection_case|         infected_by|      contact_number|symptom_onset_date|   state|
+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+------------------+--------+
|  count|                5165|  4043|3785|      5165|    5165|          5071|                4246|                1346|                 791|               690|    5165|
|   mean|2.8636345618679576E9|  NULL|NULL|      NULL|    NULL|          NULL|                NULL|2.2845944015643125E9|1.6772572523506988E7|              NULL|    NULL|
| stddev| 2.074210725277473E9|  NULL|NULL|      NULL|    NULL|          NULL|                NULL|1.5265072953383324E9| 3.093097580985502E8|              N

### Using the state column.
### How many people survived (released), and how many didn't survive (isolated/deceased)?

In [12]:
from pyspark.sql import functions as F

In [17]:
survival_counts = df.groupBy('state').count()
survival_counts.show()

+--------+-----+
|   state|count|
+--------+-----+
|isolated| 2158|
|released| 2929|
|deceased|   78|
+--------+-----+



### Display the number of null values in each column

In [11]:
from pyspark.sql.functions import *

In [23]:
null_dict = {col:df.filter(df[col].isNull()).count() for col in df.columns}
null_dict

{'patient_id': 0,
 'sex': 1122,
 'age': 1380,
 'country': 0,
 'province': 0,
 'city': 94,
 'infection_case': 919,
 'infected_by': 3819,
 'contact_number': 4374,
 'symptom_onset_date': 4475,
 'confirmed_date': 3,
 'released_date': 3578,
 'deceased_date': 5099,
 'state': 0}

## Data preprocessing

### Fill the nulls in the deceased_date with the released_date.
- You can use <b>coalesce</b> function

In [17]:
dec_df = df.withColumn('deceased_date', coalesce(df['deceased_date'], df['released_date']))
dec_df.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|   2020-03-02|released|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-gu|contact with patient| 2002000001|            17|              NULL|    2020-01-30|

In [18]:
null_dict = {col:dec_df.filter(dec_df[col].isNull()).count() for col in dec_df.columns}
null_dict

{'patient_id': 0,
 'sex': 1122,
 'age': 1380,
 'country': 0,
 'province': 0,
 'city': 94,
 'infection_case': 919,
 'infected_by': 3819,
 'contact_number': 4374,
 'symptom_onset_date': 4475,
 'confirmed_date': 3,
 'released_date': 3578,
 'deceased_date': 3514,
 'state': 0}

### Add a column named no_days which is difference between the deceased_date and the confirmed_date then show the top 5 rows. Print the schema.
- <b> Hint: You need to typecast these columns as date first <b>

In [19]:
typec_df = dec_df.withColumn('confirmed_date', to_date(col('confirmed_date')))
typec_df = dec_df.withColumn('deceased_date', to_date(col('deceased_date')))

In [20]:
df_no_days = typec_df.withColumn('no_days', datediff(col('deceased_date'), col('confirmed_date')))
df_no_days.show(5)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|

In [29]:
df_no_days.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: date (nullable = true)
 |-- released_date: date (nullable = true)
 |-- deceased_date: date (nullable = true)
 |-- state: string (nullable = true)
 |-- no_days: integer (nullable = true)



### Remove null values of sex column.
### Add a is_male column if male then it should yield true, else (Female) then False

In [21]:
df_filt = df_no_days.filter(df['sex'].isNotNull())
male_df = df_filt.withColumn('is_male', when(col('sex') == 'male', True).otherwise(False))

male_df.show(5)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|is_male|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|   true|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|   true|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact 

### Add a is_dead column if patient state is not released then it should yield true, else then False

- Use <b>UDF</b> to perform this task.
- However, UDF is not recommended there is no built in function can do the required operation.
- UDF is slower than built in functions.

In [8]:
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType

In [22]:
def is_dead(state):
    return state != 'released'
is_dead_udf = udf(is_dead, BooleanType())
dead_df = male_df.withColumn('is_dead', is_dead_udf(df['state']))
dead_df.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|is_male|is_dead|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|   true|  false|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|   true|  false|
|1000000003|  m

### Change the ages to bins from 10s, 0s, 10s, 20s,.etc to 0,10, 20

In [24]:
age_df = dead_df.withColumn('age', (substring(col('age'), 1, 2).cast('int')))
age_df.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|is_male|is_dead|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|1000000001|  male| 50|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|   true|  false|
|1000000002|  male| 30|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|   true|  false|
|1000000003|  m

### Change age, and no_days  to be typecasted as Double

In [25]:
df_dbl = age_df.withColumn('age', col('age').cast('double'))
df_dbl = age_df.withColumn('no_days', col('no_days').cast('double'))

### Drop the columns
["patient_id","sex","infected_by","contact_number","released_date","state",
"symptom_onset_date","confirmed_date","deceased_date","country","no_days",
"city","infection_case"]

In [26]:
columns_to_drop =["patient_id","sex","infected_by","contact_number","released_date","state", "symptom_onset_date","confirmed_date","deceased_date","country","no_days", "city","infection_case"]
final_df = df_dbl.drop(*columns_to_drop)

final_df.show()


+---+--------+-------+-------+
|age|province|is_male|is_dead|
+---+--------+-------+-------+
| 50|   Seoul|   true|  false|
| 30|   Seoul|   true|  false|
| 50|   Seoul|   true|  false|
| 20|   Seoul|   true|  false|
| 20|   Seoul|  false|  false|
| 50|   Seoul|  false|  false|
| 20|   Seoul|   true|  false|
| 20|   Seoul|   true|  false|
| 30|   Seoul|   true|  false|
| 60|   Seoul|  false|  false|
| 50|   Seoul|  false|  false|
| 20|   Seoul|   true|  false|
| 80|   Seoul|   true|   true|
| 60|   Seoul|  false|  false|
| 70|   Seoul|   true|  false|
| 70|   Seoul|   true|  false|
| 70|   Seoul|   true|  false|
| 20|   Seoul|   true|  false|
| 70|   Seoul|  false|  false|
| 70|   Seoul|  false|  false|
+---+--------+-------+-------+
only showing top 20 rows



### Recount the number of nulls now

In [27]:
null_dict = {col:final_df.filter(final_df[col].isNull()).count() for col in final_df.columns}
null_dict

{'age': 327, 'province': 0, 'is_male': 0, 'is_dead': 0}

## Now do the same but using SQL select statement

spark.sql("SELECT * FROM myTableName")

### From the original Patient DataFrame, Create a temporary view (table).

In [29]:
df.createOrReplaceTempView("patients_table")

### Use SELECT statement to select all columns from the dataframe and show the output.

In [30]:
all_cols_df = spark.sql("SELECT * FROM patients_table")

In [31]:
all_cols_df.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|         NULL|released|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|         NULL|released|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-gu|contact with patient| 2002000001|            17|              NULL|    2020-01-30|

### *Using SQL commands*, limit the output to only 5 rows

In [32]:
rows_df = spark.sql("SELECT * FROM patients_table LIMIT 5")

In [33]:
rows_df.show()

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|         NULL|released|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|         NULL|released|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|              NULL|    2020-01-30|   202

### Select the count of males and females in the dataset

In [35]:
gender_df = spark.sql("SELECT sex, count(sex) from patients_table GROUP BY sex")
gender_df.show()

+------+----------+
|   sex|count(sex)|
+------+----------+
|  NULL|         0|
|female|      2218|
|  male|      1825|
+------+----------+



### How many people did survive, and how many didn't?

In [36]:
survived_df = spark.sql("SELECT state, count(*) from patients_table GROUP BY state")
survived_df.show()

+--------+--------+
|   state|count(1)|
+--------+--------+
|isolated|    2158|
|released|    2929|
|deceased|      78|
+--------+--------+



### Now, let's perform some preprocessing using SQL:
1. Convert *age* column to double after removing the 's' at the end -- *hint: check SUBSTRING method*
2. Select only the following columns: `['sex', 'age', 'province', 'state']`
3. Store the result of the query in a new dataframe

In [39]:
rows_df.show(5)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|         NULL|released|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|         NULL|released|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|              NULL|    2020-01-30|   202

In [48]:
age_df = spark.sql("select sex, SUBSTRING(age, 1, 2), province, state from patients_table")
age_df.show(5)

+------+--------------------+--------+--------+
|   sex|substring(age, 1, 2)|province|   state|
+------+--------------------+--------+--------+
|  male|                  50|   Seoul|released|
|  male|                  30|   Seoul|released|
|  male|                  50|   Seoul|released|
|  male|                  20|   Seoul|released|
|female|                  20|   Seoul|released|
+------+--------------------+--------+--------+
only showing top 5 rows



## Machine Learning
### Create a pipeline model to predict is_dead and evaluate the performance.
- Use <b>StringIndexer</b> to transform <b>string</b> data type to indices.
- Use <b>OneHotEncoder</b> to deal with categorical values.
- Use <b>Imputer</b> to fill missing data with mean.

In [49]:
final_df.show(5)

+---+--------+-------+-------+
|age|province|is_male|is_dead|
+---+--------+-------+-------+
| 50|   Seoul|   true|  false|
| 30|   Seoul|   true|  false|
| 50|   Seoul|   true|  false|
| 20|   Seoul|   true|  false|
| 20|   Seoul|  false|  false|
+---+--------+-------+-------+
only showing top 5 rows



In [50]:
trainDF, testDF = final_df.randomSplit([.8,.2],seed=42)
print(f"There are {trainDF.count()} rows in the training set, and {testDF.count()} in the test set")

There are 3293 rows in the training set, and 750 in the test set


In [51]:
trainDF.cache()

DataFrame[age: int, province: string, is_male: boolean, is_dead: boolean]

In [99]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer, OneHotEncoder, Imputer
from pyspark.ml import Pipeline

In [78]:
from pyspark.ml.classification import LogisticRegression

In [56]:
trainDF.dtypes

[('age', 'int'),
 ('province', 'string'),
 ('is_male', 'boolean'),
 ('is_dead', 'boolean')]

In [100]:
trainDF = trainDF.withColumn('is_dead', trainDF['is_dead'].cast('int'))
trainDF = trainDF.withColumn('is_male', trainDF['is_male'].cast('int'))

In [101]:
trainDF.dtypes

[('age', 'int'),
 ('province', 'string'),
 ('is_male', 'int'),
 ('is_dead', 'int')]

In [103]:
categoricalCols = [field for (field, dataType) in trainDF.dtypes
                  if dataType == 'string']

In [104]:
categoricalCols

['province']

In [105]:
indexOutputCols = [x + "_Index" for x in categoricalCols]
indexOutputCols

['province_Index']

In [106]:
oheOutputCols = [x + "_OHE" for x in categoricalCols]
oheOutputCols

['province_OHE']

In [107]:
stringIndexer = StringIndexer(inputCols=categoricalCols,
                              outputCols=indexOutputCols,
                             handleInvalid='skip')

oheEncoder = OneHotEncoder(inputCols=indexOutputCols,
                          outputCols=oheOutputCols)

In [108]:
imputer = Imputer(inputCols=["age"], outputCols=["age_imputed"], strategy='mean')

In [114]:
vecAssembler = VectorAssembler(inputCols=['age_imputed','is_male']+oheOutputCols,outputCol='features')
vecAssembler

VectorAssembler_ad8fb675536c

In [110]:
['age','is_male']+oheOutputCols

['age', 'is_male', 'province_OHE']

In [115]:
lr = LogisticRegression(featuresCol='features',
                      labelCol='is_dead',
                      predictionCol='prediction')

In [116]:
pipeline = Pipeline(stages=[stringIndexer,oheEncoder, imputer,
                           vecAssembler,lr])

In [117]:
pipelineModel = pipeline.fit(trainDF)

Model evaluation

In [124]:
testDF = testDF.withColumn('is_dead', testDF['is_dead'].cast('int'))
testDF = testDF.withColumn('is_male', testDF['is_male'].cast('int'))

In [125]:
predDF = pipelineModel.transform(testDF)

In [119]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [130]:
evaluator = BinaryClassificationEvaluator(labelCol="is_dead")
accuracy = evaluator.evaluate(predDF)

print(f"accuracy: {accuracy}")

accuracy: 0.9144198646972432
