# **Labs 1 and 2 PySpark:**

In these labs we will be using the "[[NeurIPS 2020] Data Science for COVID-19 (DS4C)](https://www.kaggle.com/datasets/kimjihoo/coronavirusdataset?select=PatientInfo.csv)" dataset, retrieved from [Kaggle](https://www.kaggle.com/) on 1/6/2022, for educational non commercial purpose, License
[CC BY-NC-SA 4.0
](https://creativecommons.org/licenses/by-nc-sa/4.0/)


The csv file that we will be using in this lab is **PatientInfo**.

## PatientInfo.csv

**patient_id**
the ID of the patient

**sex**
the sex of the patient

**age**
the age of the patient

**country**
the country of the patient

**province**
the province of the patient

**city**
the city of the patient

**infection_case**
the case of infection

**infected_by**
the ID of who infected the patient


**contact_number**
the number of contacts with people

**symptom_onset_date**
the date of symptom onset

**confirmed_date**
the date of being confirmed

**released_date**
the date of being released

**deceased_date**
the date of being deceased

**state**
isolated / released / deceased

### Import the pyspark and check it's version

In [2]:
! pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 44 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 55.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=cd7a2a4d27ffec4110cea637a34445c7dcba126f8285c0cb0635a714e402ab9b
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [3]:

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
#ordinary important

In [None]:
spark.version

'3.3.0'

### Import and create SparkSession

### Load the PatientInfo.csv file and show the first 5 rows

In [None]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [None]:
df = spark.read.csv('PatientInfo.csv',header=True,inferSchema=True)

### Display the schema of the dataset

In [None]:
df.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: timestamp (nullable = true)
 |-- released_date: timestamp (nullable = true)
 |-- deceased_date: timestamp (nullable = true)
 |-- state: string (nullable = true)



### Display the statistical summary

In [None]:
df.describe().show()

+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+------------------+--------+
|summary|          patient_id|   sex| age|   country|province|          city|      infection_case|         infected_by|      contact_number|symptom_onset_date|   state|
+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+------------------+--------+
|  count|                5165|  4043|3785|      5165|    5165|          5071|                4246|                1346|                 791|               690|    5165|
|   mean|2.8636345618679576E9|  null|null|      null|    null|          null|                null|2.2845944015643125E9|1.6772572523506988E7|              null|    null|
| stddev| 2.074210725277473E9|  null|null|      null|    null|          null|                null|1.5265072953383324E9| 3.093097580985502E8|              n

### Using the state column.
### How many people survived (released), and how many didn't survive (isolated/deceased)?

In [None]:
# df.createOrReplaceTempView('table1')

In [None]:
df.groupBy("state").count().show()
#spark.sql("select count(state) from table1 where state = 'released' ").show()


+--------+-----+
|   state|count|
+--------+-----+
|isolated| 2158|
|released| 2929|
|deceased|   78|
+--------+-----+



### Display the number of null values in each column

In [None]:
# spark.sql("SELECT COUNT(IFNULL(col, 1)) FROM table1").show()
#spark.sql("select count(state) from table1 where state != 'released' ").show()
# Find Count of Null, None, NaN of All DataFrame Columns
from pyspark.sql.functions import col,isnan, when, count
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+
|patient_id| sex| age|country|province|city|infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|state|
+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+
|         0|1122|1380|      0|       0|  94|           919|       3819|          4374|              4475|             3|         3578|         5099|    0|
+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+



## Data preprocessing

### Fill the nulls in the deceased_date with the released_date. 
- You can use <b>coalesce</b> function

In [None]:
#PySpark Coalesce is a function in PySpark that is used to work with the partition data in a PySpark DF
#The Coalesce method is used to decrease the number of partition in a Data Frame
# from spark import coalesce
from pyspark.sql.types import FloatType
from pyspark.sql.functions import *
df = df.withColumn('deceased_date', coalesce(df['released_date'], df['deceased_date']))
df.show()
#coalesce(df['deceased_date'], df['released_date'])

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------------+--------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|     confirmed_date|      released_date|      deceased_date|   state|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------------+--------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|2020-01-23 00:00:00|2020-02-05 00:00:00|2020-02-05 00:00:00|released|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|2020-01-30 00:00:00|2020-03-02 00:00:00|2020-03-02 00:00:00|released|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-

In [None]:
# #deal with the null values in deceased_date
# df.na.fill("released_date","deceased_date").show()

### Add a column named no_days which is difference between the deceased_date and the confirmed_date then show the top 5 rows. Print the schema.
- <b> Hint: You need to typecast these columns as date first <b>

In [None]:
from pyspark.sql.types import DateType
df = df.withColumn("deceased_date",col("deceased_date").cast(DateType()))
df = df.withColumn("confirmed_date",col("confirmed_date").cast(DateType()))

In [None]:
df.select("deceased_date").show()

+-------------+
|deceased_date|
+-------------+
|   2020-02-05|
|   2020-03-02|
|   2020-02-19|
|   2020-02-15|
|   2020-02-24|
|   2020-02-19|
|   2020-02-10|
|   2020-02-24|
|   2020-02-21|
|   2020-02-29|
|   2020-02-29|
|   2020-02-27|
|         null|
|   2020-03-12|
|         null|
|   2020-03-11|
|   2020-03-01|
|         null|
|   2020-03-08|
|         null|
+-------------+
only showing top 20 rows



In [None]:

df =df.withColumn("no_days", datediff(col("deceased_date"),col("confirmed_date")))
df = df.withColumn("is_male",(when(col("sex") == 'male', 'true').otherwise('false')))
# df.withColumn("no_days", df['deceased_date']- df['confirmed_date']).show()
# cola = tmp.n
df.show(truncate=False)

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------------+-------------+--------+-------+-------+
|patient_id|sex   |age|country|province|city        |infection_case      |infected_by|contact_number|symptom_onset_date|confirmed_date|released_date      |deceased_date|state   |no_days|is_male|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------------+-------------+--------+-------+-------+
|1000000001|male  |50s|Korea  |Seoul   |Gangseo-gu  |overseas inflow     |null       |75            |2020-01-22        |2020-01-23    |2020-02-05 00:00:00|2020-02-05   |released|13     |true   |
|1000000002|male  |30s|Korea  |Seoul   |Jungnang-gu |overseas inflow     |null       |31            |null              |2020-01-30    |2020-03-02 00:00:00|2020-03-02   |released|32     |true   |
|1000000003|male  |50s|Ko

### Add a is_male column if male then it should yield true, else then False

In [None]:
#df = df.withColumn("is_male",(when(col("sex") == 'male', 'true').otherwise('false'))).show()

### Add a is_dead column if patient state is not released then it should yield true, else then False

- Use <b>UDF</b> to perform this task. 
- However, UDF is not recommended there is no built in function can do the required operation.
- UDF is slower than built in functions.

In [None]:
def state_change(state):
  s = False
  if s != 'released':
    s = True
  return s
  


In [None]:
from os import stat
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
#StringType” is used to represent string values, To create a string type use either DataTypes.StringType or
# StringType(), both of these returns object of String type.
udf_1 = udf(lambda state: state_change(state),StringType())

In [None]:
from pyspark.sql.functions import udf
df =df.withColumn("is_dead",(udf_1(col('state'))))
df.show(truncate=False)

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------------+-------------+--------+-------+-------+-------+
|patient_id|sex   |age|country|province|city        |infection_case      |infected_by|contact_number|symptom_onset_date|confirmed_date|released_date      |deceased_date|state   |no_days|is_male|is_dead|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------------+-------------+--------+-------+-------+-------+
|1000000001|male  |50s|Korea  |Seoul   |Gangseo-gu  |overseas inflow     |null       |75            |2020-01-22        |2020-01-23    |2020-02-05 00:00:00|2020-02-05   |released|13     |true   |true   |
|1000000002|male  |30s|Korea  |Seoul   |Jungnang-gu |overseas inflow     |null       |31            |null              |2020-01-30    |2020-03-02 00:00:00|2020-03-02   |released|32     |tr

### Change the ages to bins from 10s, 0s, 10s, 20s,.etc to 0,10, 20

In [None]:
from pyspark.sql.functions import translate
from pyspark.sql.types import IntegerType
#translate is used to literally translate one character table to another character table. It doesn't care about the context,
#it doesn't use regular expressions, it only considers the character at hand
df = df.withColumn('age', translate('age','s',''))
df.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------------+-------------+--------+-------+-------+-------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|      released_date|deceased_date|   state|no_days|is_male|is_dead|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------------+-------------+--------+-------+-------+-------+
|1000000001|  male| 50|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|2020-02-05 00:00:00|   2020-02-05|released|     13|   true|   true|
|1000000002|  male| 30|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|2020-03-02 00:00:00|   2020-03-02|released|     32|  

### Change age, and no_days  to be typecasted as Double

In [None]:
from pyspark.sql.types import IntegerType , DoubleType
df = df.withColumn('age', col('age').cast(DoubleType()))
df = df.withColumn('no_days', col('no_days').cast(DoubleType()))

In [None]:
df.select("age","no_days").show()

+----+-------+
| age|no_days|
+----+-------+
|50.0|   13.0|
|30.0|   32.0|
|50.0|   20.0|
|20.0|   16.0|
|20.0|   24.0|
|50.0|   19.0|
|20.0|   10.0|
|20.0|   22.0|
|30.0|   16.0|
|60.0|   24.0|
|50.0|   23.0|
|20.0|   20.0|
|80.0|   null|
|60.0|   25.0|
|70.0|   null|
|70.0|   21.0|
|70.0|   10.0|
|20.0|   null|
|70.0|   17.0|
|70.0|   null|
+----+-------+
only showing top 20 rows



### Drop the columns
["patient_id","sex","infected_by","contact_number","released_date","state",
"symptom_onset_date","confirmed_date","deceased_date","country","no_days",
"city","infection_case"]

In [None]:
df = df.drop("patient_id","sex","infected_by","contact_number","released_date","state", "symptom_onset_date","confirmed_date","deceased_date","country","no_days", "city","infection_case")
df.show()

+----+--------+-------+-------+
| age|province|is_male|is_dead|
+----+--------+-------+-------+
|50.0|   Seoul|   true|   true|
|30.0|   Seoul|   true|   true|
|50.0|   Seoul|   true|   true|
|20.0|   Seoul|   true|   true|
|20.0|   Seoul|  false|   true|
|50.0|   Seoul|  false|   true|
|20.0|   Seoul|   true|   true|
|20.0|   Seoul|   true|   true|
|30.0|   Seoul|   true|   true|
|60.0|   Seoul|  false|   true|
|50.0|   Seoul|  false|   true|
|20.0|   Seoul|   true|   true|
|80.0|   Seoul|   true|   true|
|60.0|   Seoul|  false|   true|
|70.0|   Seoul|   true|   true|
|70.0|   Seoul|   true|   true|
|70.0|   Seoul|   true|   true|
|20.0|   Seoul|   true|   true|
|70.0|   Seoul|  false|   true|
|70.0|   Seoul|  false|   true|
+----+--------+-------+-------+
only showing top 20 rows



### Recount the number of nulls now

In [None]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+----+--------+-------+-------+
| age|province|is_male|is_dead|
+----+--------+-------+-------+
|1380|       0|      0|      0|
+----+--------+-------+-------+



## Now do the same but using SQL select statement

### From the original Patient DataFrame, Create a temporary view (table).

In [4]:
df1= spark.read.csv('PatientInfo.csv',header=True,inferSchema=True)

In [None]:
df1.createGlobalTempView("Patients")

### Use SELECT statement to select all columns from the dataframe and show the output.

In [7]:
spark.sql("SELECT * FROM global_temp.Patients").show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------+--------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|     confirmed_date|      released_date|deceased_date|   state|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|2020-01-23 00:00:00|2020-02-05 00:00:00|         null|released|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|2020-01-30 00:00:00|2020-03-02 00:00:00|         null|released|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-gu|contact with patient| 20020

### *Using SQL commands*, limit the output to only 5 rows 

In [8]:
spark.sql("SELECT * FROM global_temp.Patients LIMIT 5").show()

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------+--------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|     confirmed_date|      released_date|deceased_date|   state|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|2020-01-23 00:00:00|2020-02-05 00:00:00|         null|released|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|2020-01-30 00:00:00|2020-03-02 00:00:00|         null|released|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|

### Select the count of males and females in the dataset

In [None]:
df1.groupBy("sex").count().show()

+------+-----+
|   sex|count|
+------+-----+
|  null| 1122|
|female| 2218|
|  male| 1825|
+------+-----+



In [9]:
spark.sql("SELECT sex as sex, count(*) as count from global_temp.Patients group by sex").show()

+------+-----+
|   sex|count|
+------+-----+
|  null| 1122|
|female| 2218|
|  male| 1825|
+------+-----+



### How many people did survive, and how many didn't?

In [10]:
spark.sql("SELECT state as state, count(*) as count from global_temp.Patients group by state").show()

+--------+-----+
|   state|count|
+--------+-----+
|isolated| 2158|
|released| 2929|
|deceased|   78|
+--------+-----+



### Now, let's perform some preprocessing using SQL:
1. Convert *age* column to double after removing the 's' at the end -- *hint: check SUBSTRING method*
2. Select only the following columns: `['sex', 'age', 'province', 'state']`
3. Store the result of the query in a new dataframe

In [None]:
# col = ("patient_id","infected_by","contact_number","released_date","state", "symptom_onset_date","confirmed_date","deceased_date","country","no_days", "city","infection_case")

In [42]:
df1 = spark.sql("SELECT sex, province, state, Double(substring(age, 1,2)) as age from global_temp.Patients")
df1.show()


+------+--------+--------+----+
|   sex|province|   state| age|
+------+--------+--------+----+
|  male|   Seoul|released|50.0|
|  male|   Seoul|released|30.0|
|  male|   Seoul|released|50.0|
|  male|   Seoul|released|20.0|
|female|   Seoul|released|20.0|
|female|   Seoul|released|50.0|
|  male|   Seoul|released|20.0|
|  male|   Seoul|released|20.0|
|  male|   Seoul|released|30.0|
|female|   Seoul|released|60.0|
|female|   Seoul|released|50.0|
|  male|   Seoul|released|20.0|
|  male|   Seoul|deceased|80.0|
|female|   Seoul|released|60.0|
|  male|   Seoul|released|70.0|
|  male|   Seoul|released|70.0|
|  male|   Seoul|released|70.0|
|  male|   Seoul|released|20.0|
|female|   Seoul|released|70.0|
|female|   Seoul|released|70.0|
+------+--------+--------+----+
only showing top 20 rows



## Machine Learning 
### Create a pipeline model to predict is_dead and evaluate the performance.
- Use <b>StringIndexer</b> to transform <b>string</b> data type to indices.
- Use <b>OneHotEncoder</b> to deal with categorical values.
- Use <b>Imputer</b> to fill missing data with mean.

In [None]:

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import LinearRegression 

In [None]:
#we want to prepare our features ("province","is_male")
#A label indexer that maps a string column of labels to an ML column of label indices.
# If the input column is numeric, we cast it to string and index the string values.

SInd = [StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))for c in ["province","is_male"]]

[StringIndexer_40e8d2bba079, StringIndexer_f189cb421424]

In [None]:
#A one-hot encoder that maps a column of category indices to a column of binary vectors,
# with at most a single one-value per row that indicates the input category index
#This is different from scikit-learn's OneHotEncoder, which keeps all categories. The output vectors are sparse
encoders = [OneHotEncoder(inputCol=SI.getOutputCol(),outputCol="{0}_encoded".format(SI.getOutputCol())) for SI in SInd]


In [None]:
#A feature transformer that merges multiple columns into a vector column
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders],outputCol="features")

In [None]:
lr = LinearRegression(featuresCol='featurs', labelCol = 'label', predictionCol='prediction')

In [None]:
pipeline =Pipeline(stages = SInd+encoders+[assembler])
pipelineModel = pipeline.fit(df)
predDF = pipelineModel.transform(df)
predDF.show()


+----+--------+-------+-------+----------------+---------------+------------------------+-----------------------+--------------------+
| age|province|is_male|is_dead|province_indexed|is_male_indexed|province_indexed_encoded|is_male_indexed_encoded|            features|
+----+--------+-------+-------+----------------+---------------+------------------------+-----------------------+--------------------+
|50.0|   Seoul|   true|   true|             0.0|            1.0|          (16,[0],[1.0])|              (1,[],[])|      (17,[0],[1.0])|
|30.0|   Seoul|   true|   true|             0.0|            1.0|          (16,[0],[1.0])|              (1,[],[])|      (17,[0],[1.0])|
|50.0|   Seoul|   true|   true|             0.0|            1.0|          (16,[0],[1.0])|              (1,[],[])|      (17,[0],[1.0])|
|20.0|   Seoul|   true|   true|             0.0|            1.0|          (16,[0],[1.0])|              (1,[],[])|      (17,[0],[1.0])|
|20.0|   Seoul|  false|   true|             0.0|       