<a href="https://colab.research.google.com/github/YousraAshour/PySpark/blob/main/Practical_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Labs 1 and 2 PySpark:**

In these labs we will be using the "[[NeurIPS 2020] Data Science for COVID-19 (DS4C)](https://www.kaggle.com/datasets/kimjihoo/coronavirusdataset?select=PatientInfo.csv)" dataset, retrieved from [Kaggle](https://www.kaggle.com/) on 1/6/2022, for educational non commercial purpose, License
[CC BY-NC-SA 4.0
](https://creativecommons.org/licenses/by-nc-sa/4.0/)


The csv file that we will be using in this lab is **PatientInfo**.

## PatientInfo.csv

**patient_id**
the ID of the patient

**sex**
the sex of the patient

**age**
the age of the patient

**country**
the country of the patient

**province**
the province of the patient

**city**
the city of the patient

**infection_case**
the case of infection

**infected_by**
the ID of who infected the patient


**contact_number**
the number of contacts with people

**symptom_onset_date**
the date of symptom onset

**confirmed_date**
the date of being confirmed

**released_date**
the date of being released

**deceased_date**
the date of being deceased

**state**
isolated / released / deceased

### Import the pyspark and check it's version

In [1]:
! pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Using cached pyspark-3.3.0-py2.py3-none-any.whl
Collecting py4j==0.10.9.5
  Using cached py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


### Import and create SparkSession

In [2]:
import pyspark

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.getOrCreate()

### Load the PatientInfo.csv file and show the first 5 rows

In [5]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [6]:
df = spark.read.csv("/content/PatientInfo.csv", header=True, inferSchema=True)
df.head(5)

[Row(patient_id=1000000001, sex='male', age='50s', country='Korea', province='Seoul', city='Gangseo-gu', infection_case='overseas inflow', infected_by=None, contact_number='75', symptom_onset_date='2020-01-22', confirmed_date=datetime.datetime(2020, 1, 23, 0, 0), released_date=datetime.datetime(2020, 2, 5, 0, 0), deceased_date=None, state='released'),
 Row(patient_id=1000000002, sex='male', age='30s', country='Korea', province='Seoul', city='Jungnang-gu', infection_case='overseas inflow', infected_by=None, contact_number='31', symptom_onset_date=None, confirmed_date=datetime.datetime(2020, 1, 30, 0, 0), released_date=datetime.datetime(2020, 3, 2, 0, 0), deceased_date=None, state='released'),
 Row(patient_id=1000000003, sex='male', age='50s', country='Korea', province='Seoul', city='Jongno-gu', infection_case='contact with patient', infected_by='2002000001', contact_number='17', symptom_onset_date=None, confirmed_date=datetime.datetime(2020, 1, 30, 0, 0), released_date=datetime.datetime

### Display the schema of the dataset

In [7]:
df.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: timestamp (nullable = true)
 |-- released_date: timestamp (nullable = true)
 |-- deceased_date: timestamp (nullable = true)
 |-- state: string (nullable = true)



### Display the statistical summary

In [8]:
df.summary().show()

+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+------------------+--------+
|summary|          patient_id|   sex| age|   country|province|          city|      infection_case|         infected_by|      contact_number|symptom_onset_date|   state|
+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+------------------+--------+
|  count|                5165|  4043|3785|      5165|    5165|          5071|                4246|                1346|                 791|               690|    5165|
|   mean|2.8636345618679576E9|  null|null|      null|    null|          null|                null|2.2845944015643125E9|1.6772572523506988E7|              null|    null|
| stddev| 2.074210725277473E9|  null|null|      null|    null|          null|                null|1.5265072953383324E9| 3.093097580985502E8|              n

### Using the state column.
### How many people survived (released), and how many didn't survive (isolated/deceased)?

In [9]:
df.groupBy("state").count().show()

+--------+-----+
|   state|count|
+--------+-----+
|isolated| 2158|
|released| 2929|
|deceased|   78|
+--------+-----+



### Display the number of null values in each column

In [10]:
from pyspark.sql.functions import col,isnan,when,count
df.select([count(when(col(c).isNull(),c)).alias(c)
                    for c in df.columns]).show()

+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+
|patient_id| sex| age|country|province|city|infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|state|
+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+
|         0|1122|1380|      0|       0|  94|           919|       3819|          4374|              4475|             3|         3578|         5099|    0|
+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+



## Data preprocessing

### Fill the nulls in the deceased_date with the released_date. 
- You can use <b>coalesce</b> function

In [11]:
from pyspark.sql.functions import coalesce
df = df.withColumn('deceased_date',coalesce(df["released_date"],
                                               df["deceased_date"]))

### Add a column named no_days which is difference between the deceased_date and the confirmed_date then show the top 5 rows. Print the schema.
- <b> Hint: You need to typecast these columns as date first <b>

In [12]:
from pyspark.sql.types import DateType

In [13]:
df = df.withColumn("deceased_date",col("deceased_date").cast(DateType()))
df = df.withColumn("confirmed_date",col("confirmed_date").cast(DateType()))

In [14]:
df.select("deceased_date").show()

+-------------+
|deceased_date|
+-------------+
|   2020-02-05|
|   2020-03-02|
|   2020-02-19|
|   2020-02-15|
|   2020-02-24|
|   2020-02-19|
|   2020-02-10|
|   2020-02-24|
|   2020-02-21|
|   2020-02-29|
|   2020-02-29|
|   2020-02-27|
|         null|
|   2020-03-12|
|         null|
|   2020-03-11|
|   2020-03-01|
|         null|
|   2020-03-08|
|         null|
+-------------+
only showing top 20 rows



In [15]:
#df.withColumn('no_days',(df["deceased_date_updated"] - df["confirmed_date"])).show()
from pyspark.sql.functions import datediff 
df = df.withColumn("no_days", datediff(col("deceased_date"),col("confirmed_date")))
df.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------------+-------------+--------+-------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|      released_date|deceased_date|   state|no_days|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------------+-------------+--------+-------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|2020-02-05 00:00:00|   2020-02-05|released|     13|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|2020-03-02 00:00:00|   2020-03-02|released|     32|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-gu|contact with

### Add a is_male column if male then it should yield true, else then False

In [16]:
from pyspark.sql.functions import when
df = df.withColumn("is_male",when(col("sex") == 'male','true').otherwise('false'))
df.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------------+-------------+--------+-------+-------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|      released_date|deceased_date|   state|no_days|is_male|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------------+-------------+--------+-------+-------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|2020-02-05 00:00:00|   2020-02-05|released|     13|   true|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|2020-03-02 00:00:00|   2020-03-02|released|     32|   true|
|1000000003|  male|50s|  

### Add a is_dead column if patient state is not released then it should yield true, else then False

- Use <b>UDF</b> to perform this task. 
- However, UDF is not recommended there is no built in function can do the required operation.
- UDF is slower than built in functions.

In [17]:
def statrel(stat):
  x=False
  if stat != 'released':
    x=True
  return x

In [18]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
convertUDF = udf(lambda z: statrel(z),StringType())

In [19]:
df = df.withColumn("is_dead",convertUDF(col("state")))
df.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------------+-------------+--------+-------+-------+-------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|      released_date|deceased_date|   state|no_days|is_male|is_dead|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------------+-------------+--------+-------+-------+-------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|2020-02-05 00:00:00|   2020-02-05|released|     13|   true|  false|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|2020-03-02 00:00:00|   2020-03-02|released|     32|  

### Change the ages to bins from 10s, 0s, 10s, 20s,.etc to 0,10, 20

In [20]:
from pyspark.sql.functions import translate
from pyspark.sql.types import IntegerType
df = df.withColumn('age', translate('age','s',''))
df.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------------+-------------+--------+-------+-------+-------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|      released_date|deceased_date|   state|no_days|is_male|is_dead|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------------+-------------+--------+-------+-------+-------+
|1000000001|  male| 50|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|2020-02-05 00:00:00|   2020-02-05|released|     13|   true|  false|
|1000000002|  male| 30|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|2020-03-02 00:00:00|   2020-03-02|released|     32|  

### Change age, and no_days  to be typecasted as Double

In [21]:
from pyspark.sql.types import IntegerType , DoubleType
df = df.withColumn('age', col('age').cast(DoubleType()))
df = df.withColumn('no_days', col('no_days').cast(DoubleType()))
df.show()

+----------+------+----+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------------+-------------+--------+-------+-------+-------+
|patient_id|   sex| age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|      released_date|deceased_date|   state|no_days|is_male|is_dead|
+----------+------+----+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------------+-------------+--------+-------+-------+-------+
|1000000001|  male|50.0|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|2020-02-05 00:00:00|   2020-02-05|released|   13.0|   true|  false|
|1000000002|  male|30.0|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|2020-03-02 00:00:00|   2020-03-02|released|   32

### Drop the columns
["patient_id","sex","infected_by","contact_number","released_date","state",
"symptom_onset_date","confirmed_date","deceased_date","country","no_days",
"city","infection_case"]

In [22]:
cols = ("patient_id","sex","infected_by","contact_number","released_date","state", "symptom_onset_date","confirmed_date","deceased_date","country","no_days", "city","infection_case")
df = df.drop(*cols)
df.show()

+----+--------+-------+-------+
| age|province|is_male|is_dead|
+----+--------+-------+-------+
|50.0|   Seoul|   true|  false|
|30.0|   Seoul|   true|  false|
|50.0|   Seoul|   true|  false|
|20.0|   Seoul|   true|  false|
|20.0|   Seoul|  false|  false|
|50.0|   Seoul|  false|  false|
|20.0|   Seoul|   true|  false|
|20.0|   Seoul|   true|  false|
|30.0|   Seoul|   true|  false|
|60.0|   Seoul|  false|  false|
|50.0|   Seoul|  false|  false|
|20.0|   Seoul|   true|  false|
|80.0|   Seoul|   true|   true|
|60.0|   Seoul|  false|  false|
|70.0|   Seoul|   true|  false|
|70.0|   Seoul|   true|  false|
|70.0|   Seoul|   true|  false|
|20.0|   Seoul|   true|  false|
|70.0|   Seoul|  false|  false|
|70.0|   Seoul|  false|  false|
+----+--------+-------+-------+
only showing top 20 rows



### Recount the number of nulls now

In [23]:
df.select([count(when(col(c).isNull(),c)).alias(c)
                    for c in df.columns])\
                    .show()

+----+--------+-------+-------+
| age|province|is_male|is_dead|
+----+--------+-------+-------+
|1380|       0|      0|      0|
+----+--------+-------+-------+



## Now do the same but using SQL select statement

### From the original Patient DataFrame, Create a temporary view (table).

### Use SELECT statement to select all columns from the dataframe and show the output.

### *Using SQL commands*, limit the output to only 5 rows 

### Select the count of males and females in the dataset

### How many people did survive, and how many didn't?

### Now, let's perform some preprocessing using SQL:
1. Convert *age* column to double after removing the 's' at the end -- *hint: check SUBSTRING method*
2. Select only the following columns: `['sex', 'age', 'province', 'state']`
3. Store the result of the query in a new dataframe

## Machine Learning 
### Create a pipeline model to predict is_dead and evaluate the performance.
- Use <b>StringIndexer</b> to transform <b>string</b> data type to indices.
- Use <b>OneHotEncoder</b> to deal with categorical values.
- Use <b>Imputer</b> to fill missing data with mean.

In [24]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

In [25]:
# indexer = StringIndexer().setInputCol("is_dead").setOutputCol("is_dead1")
# df = indexer.fit(df).transform(df)
# df.show()

In [26]:
# df = StringIndexer().setInputCol("is_male").setOutputCol("is_male1").fit(df).transform(df)
# df.show()

In [27]:
# df = StringIndexer().setInputCol("province").setOutputCol("province1").fit(df).transform(df)
# df.show()

In [28]:
# cols = ("province","is_male",'is_dead')
# df = df.drop(*cols)
# df.show()

In [29]:
cols = ["province","is_male"]

indexers = [
    StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
    for c in cols
]

encoders = [
    OneHotEncoder(
        inputCol=indexer.getOutputCol(),
        outputCol="{0}_encoded".format(indexer.getOutputCol())) 
    for indexer in indexers
]

assembler = VectorAssembler(
    inputCols=[encoder.getOutputCol() for encoder in encoders],
    outputCol="features"
)


pipeline = Pipeline(stages=indexers + encoders + [assembler])
pipeline.fit(df).transform(df).show()

+----+--------+-------+-------+----------------+---------------+------------------------+-----------------------+--------------------+
| age|province|is_male|is_dead|province_indexed|is_male_indexed|province_indexed_encoded|is_male_indexed_encoded|            features|
+----+--------+-------+-------+----------------+---------------+------------------------+-----------------------+--------------------+
|50.0|   Seoul|   true|  false|             0.0|            1.0|          (16,[0],[1.0])|              (1,[],[])|      (17,[0],[1.0])|
|30.0|   Seoul|   true|  false|             0.0|            1.0|          (16,[0],[1.0])|              (1,[],[])|      (17,[0],[1.0])|
|50.0|   Seoul|   true|  false|             0.0|            1.0|          (16,[0],[1.0])|              (1,[],[])|      (17,[0],[1.0])|
|20.0|   Seoul|   true|  false|             0.0|            1.0|          (16,[0],[1.0])|              (1,[],[])|      (17,[0],[1.0])|
|20.0|   Seoul|  false|  false|             0.0|       