# **Labs 1 and 2 PySpark:**

In these labs we will be using the "[[NeurIPS 2020] Data Science for COVID-19 (DS4C)](https://www.kaggle.com/datasets/kimjihoo/coronavirusdataset?select=PatientInfo.csv)" dataset, retrieved from [Kaggle](https://www.kaggle.com/) on 1/6/2022, for educational non commercial purpose, License
[CC BY-NC-SA 4.0
](https://creativecommons.org/licenses/by-nc-sa/4.0/)


The csv file that we will be using in this lab is **PatientInfo**.

## PatientInfo.csv

**patient_id**
the ID of the patient

**sex**
the sex of the patient

**age**
the age of the patient

**country**
the country of the patient

**province**
the province of the patient

**city**
the city of the patient

**infection_case**
the case of infection

**infected_by**
the ID of who infected the patient


**contact_number**
the number of contacts with people

**symptom_onset_date**
the date of symptom onset

**confirmed_date**
the date of being confirmed

**released_date**
the date of being released

**deceased_date**
the date of being deceased

**state**
isolated / released / deceased

**Install pyspark**

In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 43 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 60.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=feeca2ca1cbe29a48f605eff701a634569ccd27c472336c35c3948deb5a9f998
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


### Import the pyspark and check it's version

In [2]:
import pyspark
!pyspark --version

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.3.0
      /_/
                        
Using Scala version 2.12.15, OpenJDK 64-Bit Server VM, 11.0.15
Branch HEAD
Compiled by user ubuntu on 2022-06-09T19:58:58Z
Revision f74867bddfbcdd4d08076db36851e88b15e66556
Url https://github.com/apache/spark
Type --help for more information.


### Import and create SparkSession

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName('PySpark DataFrame From RDD').getOrCreate()

### Load the PatientInfo.csv file and show the first 5 rows

In [5]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [13]:
df=spark.read.csv('/content/PatientInfo.csv',inferSchema=True,header=True)
df.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------+--------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|     confirmed_date|      released_date|deceased_date|   state|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|2020-01-23 00:00:00|2020-02-05 00:00:00|         null|released|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|2020-01-30 00:00:00|2020-03-02 00:00:00|         null|released|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-gu|contact with patient| 20020

### Display the schema of the dataset

In [7]:
df.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: timestamp (nullable = true)
 |-- released_date: timestamp (nullable = true)
 |-- deceased_date: timestamp (nullable = true)
 |-- state: string (nullable = true)



### Display the statistical summary

In [9]:
df.describe().show()

+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+------------------+--------+
|summary|          patient_id|   sex| age|   country|province|          city|      infection_case|         infected_by|      contact_number|symptom_onset_date|   state|
+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+------------------+--------+
|  count|                5165|  4043|3785|      5165|    5165|          5071|                4246|                1346|                 791|               690|    5165|
|   mean|2.8636345618679576E9|  null|null|      null|    null|          null|                null|2.2845944015643125E9|1.6772572523506988E7|              null|    null|
| stddev| 2.074210725277473E9|  null|null|      null|    null|          null|                null|1.5265072953383324E9| 3.093097580985502E8|              n

### Using the state column.
### How many people survived (released), and how many didn't survive (isolated/deceased)?

In [23]:
df.filter(df['state']=='released').count()

2929

In [27]:
df.filter(df['state']=='deceased').count()

78

In [28]:
df.filter(df['state']=='isolated').count()

2158

### Display the number of null values in each column

In [26]:
from pyspark.sql.functions import isnull
null={col:df.filter(df[col].isNull()).count() for col in df.columns}
null

{'age': 1380,
 'city': 94,
 'confirmed_date': 3,
 'contact_number': 4374,
 'country': 0,
 'deceased_date': 5099,
 'infected_by': 3819,
 'infection_case': 919,
 'patient_id': 0,
 'province': 0,
 'released_date': 3578,
 'sex': 1122,
 'state': 0,
 'symptom_onset_date': 4475}

## Data preprocessing

### Fill the nulls in the deceased_date with the released_date. 
- You can use <b>coalesce</b> function

In [30]:
from pyspark.sql.functions import coalesce
filled_df=df.withColumn("deceased_date",coalesce(df.deceased_date,df.released_date))
filled_df.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------------+--------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|     confirmed_date|      released_date|      deceased_date|   state|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------------+--------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|2020-01-23 00:00:00|2020-02-05 00:00:00|2020-02-05 00:00:00|released|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|2020-01-30 00:00:00|2020-03-02 00:00:00|2020-03-02 00:00:00|released|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-

### Add a column named no_days which is difference between the deceased_date and the confirmed_date then show the top 5 rows. Print the schema.
- <b> Hint: You need to typecast these columns as date first <b>

In [38]:
from pyspark.sql.functions import datediff
df_new=filled_df.withColumn("deceased_date",filled_df["deceased_date"].cast('date'))
df_new=filled_df.withColumn("confirmed_date",filled_df["confirmed_date"].cast('date'))
df_new=filled_df.withColumn("no_days",datediff(filled_df.confirmed_date,filled_df.deceased_date))
df_new.show(3)

+----------+----+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------------+--------+-------+
|patient_id| sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|     confirmed_date|      released_date|      deceased_date|   state|no_days|
+----------+----+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------------+--------+-------+
|1000000001|male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|2020-01-23 00:00:00|2020-02-05 00:00:00|2020-02-05 00:00:00|released|    -13|
|1000000002|male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|2020-01-30 00:00:00|2020-03-02 00:00:00|2020-03-02 00:00:00|released|    -32|
|1000000003|male|50s|  Ko

### Add a is_male column if male then it should yield true, else then False

In [119]:
from pyspark.sql import functions as f
df_new=df_new.withColumn("is_male",f.when(df_new["sex"]=="male","true").otherwise("false"))
df_new.show(10)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------------+--------+-------+-------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|     confirmed_date|      released_date|      deceased_date|   state|no_days|is_male|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------------+--------+-------+-------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|2020-01-23 00:00:00|2020-02-05 00:00:00|2020-02-05 00:00:00|released|    -13|   true|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|2020-01-30 00:00:00|2020-03-02 00:00:00|2020-03-02 00:00:00|r

### Add a is_dead column if patient state is not released then it should yield true, else then False

- Use <b>UDF</b> to perform this task. 
- However, UDF is not recommended there is no built in function can do the required operation.
- UDF is slower than built in functions.

In [188]:
from pyspark.sql.functions import udf,col
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType,DoubleType,BooleanType

def check_death(state):
    return 1 if state!="released" else 0


In [189]:
convertUDF = udf(check_death,StringType())
df_udf=df_new.withColumn("is_dead",convertUDF(col("state")))
df_udf.show(15)


+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------------+--------+-------+-------+-------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|     confirmed_date|      released_date|      deceased_date|   state|no_days|is_male|is_dead|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------------+--------+-------+-------+-------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|2020-01-23 00:00:00|2020-02-05 00:00:00|2020-02-05 00:00:00|released|    -13|   true|      0|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|2020-01-30 00:00:00|2020

In [196]:
df_udf=df_udf.withColumn("is_dead", f.when(col("state")!="released",1).otherwise(0))

### Change the ages to bins from 10s, 0s, 10s, 20s,.etc to 0,10, 20

In [197]:
from pyspark.sql.functions import regexp_replace
df_udf=df_udf.withColumn('age',regexp_replace("age",'s',""))
df_udf.show(5)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------------+--------+-------+-------+-------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|     confirmed_date|      released_date|      deceased_date|   state|no_days|is_male|is_dead|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------------+--------+-------+-------+-------+
|1000000001|  male| 50|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|2020-01-23 00:00:00|2020-02-05 00:00:00|2020-02-05 00:00:00|released|    -13|   true|      0|
|1000000002|  male| 30|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|2020-01-30 00:00:00|2020-03-0

### Change age, and no_days  to be typecasted as Double

In [199]:
df_udf=df_udf.withColumn("no_days",df_udf.no_days.cast(DoubleType()))
df_udf=df_udf.withColumn("age",df_udf.age.cast(DoubleType()))
df_udf.show(5)

+----------+------+----+-------+--------+-----------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------------+--------+-------+-------+-------+
|patient_id|   sex| age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|     confirmed_date|      released_date|      deceased_date|   state|no_days|is_male|is_dead|
+----------+------+----+-------+--------+-----------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------------+--------+-------+-------+-------+
|1000000001|  male|50.0|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|2020-01-23 00:00:00|2020-02-05 00:00:00|2020-02-05 00:00:00|released|  -13.0|   true|      0|
|1000000002|  male|30.0|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|2020-01-30 00:00:00|2020

### Drop the columns
["patient_id","sex","infected_by","contact_number","released_date","state",
"symptom_onset_date","confirmed_date","deceased_date","country","no_days",
"city","infection_case"]

In [200]:
df_udf=df_udf.drop("patient_id","sex","infected_by","contact_number","released_date","state", "symptom_onset_date","confirmed_date","deceased_date","country","no_days", "city","infection_case")
df_udf.show(10)

+----+--------+-------+-------+
| age|province|is_male|is_dead|
+----+--------+-------+-------+
|50.0|   Seoul|   true|      0|
|30.0|   Seoul|   true|      0|
|50.0|   Seoul|   true|      0|
|20.0|   Seoul|   true|      0|
|20.0|   Seoul|  false|      0|
|50.0|   Seoul|  false|      0|
|20.0|   Seoul|   true|      0|
|20.0|   Seoul|   true|      0|
|30.0|   Seoul|   true|      0|
|60.0|   Seoul|  false|      0|
+----+--------+-------+-------+
only showing top 10 rows



### Recount the number of nulls now

In [66]:
null_again={col:df_udf.filter(df_udf[col].isNull()).count() for col in df_udf.columns}
null_again

{'age': 1380, 'is_dead': 0, 'is_male': 0, 'province': 0}

## Now do the same but using SQL select statement

### From the original Patient DataFrame, Create a temporary view (table).

In [68]:
df.createOrReplaceTempView("temp_table")

### Use SELECT statement to select all columns from the dataframe and show the output.

In [69]:
spark.sql("SELECT * FROM temp_table").show(10)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------+--------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|     confirmed_date|      released_date|deceased_date|   state|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|2020-01-23 00:00:00|2020-02-05 00:00:00|         null|released|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|2020-01-30 00:00:00|2020-03-02 00:00:00|         null|released|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|

### *Using SQL commands*, limit the output to only 5 rows 

In [71]:
spark.sql("SELECT * FROM temp_table LIMIT 5").show()

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------+--------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|     confirmed_date|      released_date|deceased_date|   state|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|2020-01-23 00:00:00|2020-02-05 00:00:00|         null|released|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|2020-01-30 00:00:00|2020-03-02 00:00:00|         null|released|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|

### Select the count of males and females in the dataset

In [82]:
spark.sql("SELECT COUNT(sex) AS COUNT_Male FROM temp_table WHERE sex=='male' ").show()
spark.sql("SELECT COUNT(sex) AS COUNT_FeMale FROM temp_table WHERE sex=='female' ").show()

+----------+
|COUNT_Male|
+----------+
|      1825|
+----------+

+------------+
|COUNT_FeMale|
+------------+
|        2218|
+------------+



### How many people did survive, and how many didn't?

In [93]:
spark.sql("SELECT COUNT(STATE) as survived FROM temp_table WHERE state=='released' ").show()
spark.sql("SELECT COUNT(STATE) as  notsurvive FROM temp_table WHERE state !='released' ").show()

+--------+
|survived|
+--------+
|    2929|
+--------+

+----------+
|notsurvive|
+----------+
|      2236|
+----------+



### Now, let's perform some preprocessing using SQL:
1. Convert *age* column to double after removing the 's' at the end -- *hint: check SUBSTRING method*
2. Select only the following columns: `['sex', 'age', 'province', 'state']`
3. Store the result of the query in a new dataframe

In [103]:
df1=spark.sql("SELECT SUBSTRING(age,0,2) as age,SEX,PROVINCE,STATE FROM temp_table")
df1.show()

+---+------+--------+--------+
|age|   SEX|PROVINCE|   STATE|
+---+------+--------+--------+
| 50|  male|   Seoul|released|
| 30|  male|   Seoul|released|
| 50|  male|   Seoul|released|
| 20|  male|   Seoul|released|
| 20|female|   Seoul|released|
| 50|female|   Seoul|released|
| 20|  male|   Seoul|released|
| 20|  male|   Seoul|released|
| 30|  male|   Seoul|released|
| 60|female|   Seoul|released|
| 50|female|   Seoul|released|
| 20|  male|   Seoul|released|
| 80|  male|   Seoul|deceased|
| 60|female|   Seoul|released|
| 70|  male|   Seoul|released|
| 70|  male|   Seoul|released|
| 70|  male|   Seoul|released|
| 20|  male|   Seoul|released|
| 70|female|   Seoul|released|
| 70|female|   Seoul|released|
+---+------+--------+--------+
only showing top 20 rows



In [201]:
df_udf.printSchema()

root
 |-- age: double (nullable = true)
 |-- province: string (nullable = true)
 |-- is_male: string (nullable = false)
 |-- is_dead: integer (nullable = false)



## Machine Learning 
### Create a pipeline model to predict is_dead and evaluate the performance.
- Use <b>StringIndexer</b> to transform <b>string</b> data type to indices.
- Use <b>OneHotEncoder</b> to deal with categorical values.
- Use <b>Imputer</b> to fill missing data with mean.

In [191]:
from pyspark.ml.feature import StringIndexer , OneHotEncoder , Imputer

#handle strings then convert the categories to 1 hot encoding
Indexer1 = StringIndexer(inputCol="province", outputCol="province_indexed")
encoder1 = OneHotEncoder(inputCol="province_indexed", outputCol="province_enc")
Imputer = Imputer(inputCol="age", outputCol="age_I") #age is the only one with nulls
Indexer2 = StringIndexer(inputCol="is_male", outputCol="is_male_indexed")
encoder2 = OneHotEncoder(inputCol="is_male_indexed", outputCol="is_male_enc")

In [202]:
trainDF, testDF = df_udf.randomSplit([.8,.2],seed=42)
print(f"There are {trainDF.count()} rows in the training set, and {testDF.count()} in the test set")

There are 4166 rows in the training set, and 999 in the test set


In [212]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier,LinearSVC
from pyspark.ml import Pipeline

In [204]:
vecAssembler = VectorAssembler(inputCols=["province_enc","is_male_enc","age_I"],outputCol='features') #compress all feACTURES IN 1 VECTOR

In [216]:
estimator = RandomForestClassifier(featuresCol='features', numTrees=15, maxDepth=15, labelCol='is_dead', seed=42)
#estimator1=LinearSVC(featuresCol="features",labelCol='is_dead')

In [220]:
#CREATE PIPELINE
pipeline =Pipeline(stages = [Indexer1, encoder1,Indexer2,encoder2, Imputer ,vecAssembler , estimator])
model=pipeline.fit(trainDF)
pred=model.transform(testDF)
pred.show(5)

+----+-----------+-------+-------+----------------+---------------+---------------+-------------+------------------+--------------------+--------------------+--------------------+----------+
| age|   province|is_male|is_dead|province_indexed|   province_enc|is_male_indexed|  is_male_enc|             age_I|            features|       rawPrediction|         probability|prediction|
+----+-----------+-------+-------+----------------+---------------+---------------+-------------+------------------+--------------------+--------------------+--------------------+----------+
|null| Gangwon-do|  false|      0|            10.0|(16,[10],[1.0])|            0.0|(1,[0],[1.0])|40.085978835978835|(18,[10,16,17],[1...|[7.94600132136560...|[0.52973342142437...|       0.0|
|null|Gyeonggi-do|  false|      1|             2.0| (16,[2],[1.0])|            0.0|(1,[0],[1.0])|40.085978835978835|(18,[2,16,17],[1....|[0.58892903575105...|[0.03926193571673...|       1.0|
|null|Gyeonggi-do|  false|      1|           

In [221]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
eval = BinaryClassificationEvaluator(labelCol='is_dead')
eval.evaluate(pred)

0.9079802438170314