# **Labs 1 and 2 PySpark:**

In these labs we will be using the "[[NeurIPS 2020] Data Science for COVID-19 (DS4C)](https://www.kaggle.com/datasets/kimjihoo/coronavirusdataset?select=PatientInfo.csv)" dataset, retrieved from [Kaggle](https://www.kaggle.com/) on 1/6/2022, for educational non commercial purpose, License
[CC BY-NC-SA 4.0
](https://creativecommons.org/licenses/by-nc-sa/4.0/)


The csv file that we will be using in this lab is **PatientInfo**.

## PatientInfo.csv

**patient_id**
the ID of the patient

**sex**
the sex of the patient

**age**
the age of the patient

**country**
the country of the patient

**province**
the province of the patient

**city**
the city of the patient

**infection_case**
the case of infection

**infected_by**
the ID of who infected the patient


**contact_number**
the number of contacts with people

**symptom_onset_date**
the date of symptom onset

**confirmed_date**
the date of being confirmed

**released_date**
the date of being released

**deceased_date**
the date of being deceased

**state**
isolated / released / deceased

### Import the pyspark and check it's version

In [1]:
import pyspark

### Import and create SparkSession

In [2]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.getOrCreate()

٢٢/٠٦/٢٥ ١٢:٠٨:١٠ WARN Utils: Your hostname, mohand-Lenovo-ideapad-310-15IKB resolves to a loopback address: 127.0.1.1; using 192.168.43.218 instead (on interface wlp2s0)
٢٢/٠٦/٢٥ ١٢:٠٨:١٠ WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
٢٢/٠٦/٢٥ ١٢:٠٨:١٠ WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
٢٢/٠٦/٢٥ ١٢:٠٨:١١ WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### Load the PatientInfo.csv file and show the first 5 rows

In [3]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [4]:
data=spark.read.csv('PatientInfo.csv',header=True,samplingRatio=0.0001)

### Display the schema of the dataset

In [5]:
data.printSchema()

root
 |-- patient_id: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: string (nullable = true)
 |-- released_date: string (nullable = true)
 |-- deceased_date: string (nullable = true)
 |-- state: string (nullable = true)



### Display the statistical summary

In [345]:
data.describe().show()

+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+------------------+--------------+-------------+-------------+--------+
|summary|          patient_id|   sex| age|   country|province|          city|      infection_case|         infected_by|      contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+------------------+--------------+-------------+-------------+--------+
|  count|                5165|  4043|3785|      5165|    5165|          5071|                4246|                1346|                 791|               690|          5162|         1587|           66|    5165|
|   mean|2.8636345618679576E9|  null|null|      null|    null|          null|                null|2.2845944015643125E9|1.6772572523506988E7|            

### Using the state column.
### How many people survived (released), and how many didn't survive (isolated/deceased)?

In [346]:
from pyspark.sql.functions import *

In [347]:
data.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|         null|released|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|         null|released|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-gu|contact with patient| 2002000001|            17|              null|    2020-01-30|

In [348]:
count_survived=data.select(col('state')).where(col('state')=='released').count()
count_survived

2929

### Display the number of null values in each column

In [349]:
data.select([count(when (isnull(c), c)).alias(c) for c in data.columns]).show()

+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+
|patient_id| sex| age|country|province|city|infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|state|
+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+
|         0|1122|1380|      0|       0|  94|           919|       3819|          4374|              4475|             3|         3578|         5099|    0|
+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+



## Data preprocessing

### Fill the nulls in the deceased_date with the released_date. 
- You can use <b>coalesce</b> function

In [350]:

new_data=data.withColumn('deceased_date',coalesce(data["released_date"], data["deceased_date"]))

In [351]:
new_data.select(col('deceased_date')).show()

+-------------+
|deceased_date|
+-------------+
|   2020-02-05|
|   2020-03-02|
|   2020-02-19|
|   2020-02-15|
|   2020-02-24|
|   2020-02-19|
|   2020-02-10|
|   2020-02-24|
|   2020-02-21|
|   2020-02-29|
|   2020-02-29|
|   2020-02-27|
|         null|
|   2020-03-12|
|         null|
|   2020-03-11|
|   2020-03-01|
|         null|
|   2020-03-08|
|         null|
+-------------+
only showing top 20 rows



### Add a column named no_days which is difference between the deceased_date and the confirmed_date then show the top 5 rows. Print the schema.
- <b> Hint: You need to typecast these columns as date first <b>

In [352]:
new_data=new_data.withColumn("no_days",datediff(to_timestamp(col('deceased_date')),to_timestamp(col('confirmed_date'))))

### Add a is_male column if male then it should yield true, else then False

In [353]:
new_data=new_data.withColumn('is_male',when(col('sex')=='male',True).otherwise(False))

### Add a is_dead column if patient state is not released then it should yield true, else then False

- Use <b>UDF</b> to perform this task. 
- However, UDF is not recommended there is no built in function can do the required operation.
- UDF is slower than built in functions.

In [354]:
def add_is_dead(col):
    if col!='released':
        return 1
    else:
        return 0

In [355]:
UDF=udf(lambda x:add_is_dead(x))

In [356]:
new_data=new_data.withColumn('is_dead',UDF(col('state')))

### Change the ages to bins from 10s, 0s, 10s, 20s,.etc to 0,10, 20

In [357]:
new_data=new_data.withColumn('age',split(col('age'),'s').getItem(0))

### Change age, and no_days  to be typecasted as Double

In [358]:
new_data=new_data.withColumn('age',new_data.age.cast('Double'))
new_data=new_data.withColumn('no_days',new_data.no_days.cast('Double'))

In [359]:
new_data=new_data.withColumn('is_male',new_data.is_male.cast('String'))

### Drop the columns
["patient_id","sex","infected_by","contact_number","released_date","state",
"symptom_onset_date","confirmed_date","deceased_date","country","no_days",
"city","infection_case"]

In [360]:
new_data=new_data.drop("patient_id","sex","infected_by","contact_number","released_date","state", "symptom_onset_date","confirmed_date","deceased_date","country","no_days", "city","infection_case")

In [361]:
new_data.show()

+----+--------+-------+-------+
| age|province|is_male|is_dead|
+----+--------+-------+-------+
|50.0|   Seoul|   true|      0|
|30.0|   Seoul|   true|      0|
|50.0|   Seoul|   true|      0|
|20.0|   Seoul|   true|      0|
|20.0|   Seoul|  false|      0|
|50.0|   Seoul|  false|      0|
|20.0|   Seoul|   true|      0|
|20.0|   Seoul|   true|      0|
|30.0|   Seoul|   true|      0|
|60.0|   Seoul|  false|      0|
|50.0|   Seoul|  false|      0|
|20.0|   Seoul|   true|      0|
|80.0|   Seoul|   true|      1|
|60.0|   Seoul|  false|      0|
|70.0|   Seoul|   true|      0|
|70.0|   Seoul|   true|      0|
|70.0|   Seoul|   true|      0|
|20.0|   Seoul|   true|      0|
|70.0|   Seoul|  false|      0|
|70.0|   Seoul|  false|      0|
+----+--------+-------+-------+
only showing top 20 rows



### Recount the number of nulls now

In [362]:
new_data.select([count(when (isnull(c), c)).alias(c) for c in new_data.columns]).show()

+----+--------+-------+-------+
| age|province|is_male|is_dead|
+----+--------+-------+-------+
|1380|       0|      0|      0|
+----+--------+-------+-------+



## Now do the same but using SQL select statement

### From the original Patient DataFrame, Create a temporary view (table).

In [6]:
data.createOrReplaceTempView("data_sql")

### Use SELECT statement to select all columns from the dataframe and show the output.

In [7]:
spark.sql('select * from data_sql').show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|         null|released|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|         null|released|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-gu|contact with patient| 2002000001|            17|              null|    2020-01-30|

### *Using SQL commands*, limit the output to only 5 rows 

In [8]:
spark.sql('select * from data_sql limit 5').show()

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|         null|released|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|         null|released|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|              null|    2020-01-30|   202

### Select the count of males and females in the dataset

In [18]:
spark.sql("select count(*) as men from data_sql where sex ='male'").show()

+----+
| men|
+----+
|1825|
+----+



In [19]:
spark.sql("select count(*) as female from data_sql where sex ='female'").show()

+------+
|female|
+------+
|  2218|
+------+



### How many people did survive, and how many didn't?

In [23]:
spark.sql("select count(*) as survived from data_sql where state!='deceased'").show()

+--------+
|survived|
+--------+
|    5087|
+--------+



In [25]:
spark.sql("select count(*) as dead from data_sql where state='deceased'").show()

+----+
|dead|
+----+
|  78|
+----+



### Now, let's perform some preprocessing using SQL:
1. Convert *age* column to double after removing the 's' at the end -- *hint: check SUBSTRING method*
2. Select only the following columns: `['sex', 'age', 'province', 'state']`
3. Store the result of the query in a new dataframe

In [33]:
spark.sql("SELECT cast( SUBSTRING(age, 1,2)as double) as age from data_sql ").show(5)


+----+
| age|
+----+
|50.0|
|30.0|
|50.0|
|20.0|
|20.0|
+----+
only showing top 5 rows



In [36]:
data_2=spark.sql('select sex,age,province,state from data_sql')

In [37]:
data_2.show()

+------+---+--------+--------+
|   sex|age|province|   state|
+------+---+--------+--------+
|  male|50s|   Seoul|released|
|  male|30s|   Seoul|released|
|  male|50s|   Seoul|released|
|  male|20s|   Seoul|released|
|female|20s|   Seoul|released|
|female|50s|   Seoul|released|
|  male|20s|   Seoul|released|
|  male|20s|   Seoul|released|
|  male|30s|   Seoul|released|
|female|60s|   Seoul|released|
|female|50s|   Seoul|released|
|  male|20s|   Seoul|released|
|  male|80s|   Seoul|deceased|
|female|60s|   Seoul|released|
|  male|70s|   Seoul|released|
|  male|70s|   Seoul|released|
|  male|70s|   Seoul|released|
|  male|20s|   Seoul|released|
|female|70s|   Seoul|released|
|female|70s|   Seoul|released|
+------+---+--------+--------+
only showing top 20 rows



## Machine Learning 
### Create a pipeline model to predict is_dead and evaluate the performance.
- Use <b>StringIndexer</b> to transform <b>string</b> data type to indices.
- Use <b>OneHotEncoder</b> to deal with categorical values.
- Use <b>Imputer</b> to fill missing data with mean.

In [363]:
from pyspark.ml.feature import *

In [364]:
new_data=new_data.withColumn('is_dead',new_data.is_dead.cast('Double'))

In [365]:
new_data=new_data.withColumn('is_male',new_data.is_male.cast('String'))

In [366]:
trainDF, testDF = new_data.randomSplit([.8,.2],seed=42)
print(f"There are {trainDF.count()} rows in the training set, and {testDF.count()} in the test set")

There are 4166 rows in the training set, and 999 in the test set


In [367]:
categorical_features=['province','is_male']

In [368]:
categorical_features_indexed=['province_indxed','is_male_indexed']

In [369]:
categorical_features_ohe=['province_ohe','is_male_ohe']

In [370]:
numerical_features=['age',]
numerical_imputed=['age_imputed']

In [371]:
imputer=Imputer(inputCols=numerical_features,outputCols=numerical_imputed,strategy='mean')

In [372]:
str_indxer=StringIndexer(inputCols=categorical_features,outputCols=categorical_features_indexed,handleInvalid='keep')

In [373]:
ohe=OneHotEncoder(inputCols=categorical_features_indexed,outputCols=categorical_features_ohe)

In [374]:
all_features=categorical_features_ohe+numerical_imputed

In [375]:
vect_assm=VectorAssembler(inputCols=all_features,outputCol='features')

In [376]:
from pyspark.ml.classification import LogisticRegression

In [377]:
Lr=LogisticRegression(featuresCol='features',labelCol='is_dead',predictionCol='predictions')

In [378]:
from pyspark.ml import Pipeline

In [379]:
pip=Pipeline(stages=[imputer,str_indxer,ohe,vect_assm,Lr])

In [381]:
model=pip.fit(trainDF)

In [383]:
preds=model.transform(testDF)

In [390]:
preds.select('predictions','is_dead').show()

+-----------+-------+
|predictions|is_dead|
+-----------+-------+
|        0.0|    0.0|
|        1.0|    1.0|
|        1.0|    1.0|
|        1.0|    1.0|
|        1.0|    1.0|
|        1.0|    1.0|
|        1.0|    1.0|
|        1.0|    1.0|
|        1.0|    1.0|
|        1.0|    1.0|
|        1.0|    1.0|
|        1.0|    1.0|
|        1.0|    1.0|
|        1.0|    1.0|
|        1.0|    1.0|
|        1.0|    1.0|
|        1.0|    1.0|
|        1.0|    1.0|
|        1.0|    1.0|
|        1.0|    1.0|
+-----------+-------+
only showing top 20 rows



In [400]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [402]:
RE=BinaryClassificationEvaluator(rawPredictionCol='predictions',labelCol='is_dead',metricName='areaUnderROC')

In [405]:
RE.evaluate(preds)

0.812747868433268

# Clustering

In [2]:
from pyspark.ml.clustering import KMeans

In [4]:
from pyspark import SparkContext
sc = SparkContext("local", "Simple App")

٢٢/٠٧/١٨ ١٠:٣٠:٥٤ WARN Utils: Your hostname, mohand-Lenovo-ideapad-310-15IKB resolves to a loopback address: 127.0.1.1; using 192.168.1.17 instead (on interface wlp2s0)
٢٢/٠٧/١٨ ١٠:٣٠:٥٤ WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
٢٢/٠٧/١٨ ١٠:٣٠:٥٥ WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
٢٢/٠٧/١٨ ١٠:٣٠:٥٦ WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [5]:
kmeans=KMeans(featuresCol='feature',seed=2,k=2)

In [None]:
pip2=