# Lab 2 PySpark:

### Machine Learning

#### But first, let's do some SQL :)

**First, install and import PySPark and SparkSession**

In [1]:
! pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import *


**Now, download the dataset**

In [3]:
!gdown https://drive.google.com/uc?id=1PB6wBDVTM_eocxOyi0lWlLBQOlH0rLe_ -O PatientInfo.csv

Downloading...
From: https://drive.google.com/uc?id=1PB6wBDVTM_eocxOyi0lWlLBQOlH0rLe_
To: /content/PatientInfo.csv
  0% 0.00/489k [00:00<?, ?B/s]100% 489k/489k [00:00<00:00, 107MB/s]


**Create a SparkSession object and name the app "Lab2"**

In [4]:
sc = SparkContext()
# spark = SparkSession.builder.appName("Lab1_G2").getOrCreate()
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)



**1. Read the file PatientInfo.csv into a dataframe**

In [5]:
# import pandas as pd
# patient_df = pd.read_csv("PatientInfo.csv")
# patient = spark.read.csv("PatientInfo.csv",header = True, inferSchema=True)
# patient.printSchema()

patient = sqlContext.read.csv("PatientInfo.csv",header = True,inferSchema=True)

**Show the first 5 lines of the dataframe**

In [6]:
# patient_df.head()
patient.show(5)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|         null|released|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|         null|released|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|              null|    2020-01-30|   202

**Now do the same but using SQL select statement**

1. Create a temporary view (table) called patients

In [7]:
patient.createOrReplaceTempView('patient_view')

2. Use SELECT statement to select all columns from the dataframe

In [8]:
data = sqlContext.sql("""
  select * from patient_view;
""")
# patient.select(patient.columns).show()
data.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|         null|released|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|         null|released|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-gu|contact with patient| 2002000001|            17|              null|    2020-01-30|

3. Limit the output to only 5 rows *using SQL commands*

In [9]:
# patient.select(patient.columns).limit(5).show()
data_limit = sqlContext.sql("""
  select * from patient_view limit 5;
""")
data_limit.show()

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|         null|released|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|         null|released|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|              null|    2020-01-30|   202

4. Select the count of males and females in the dataset

In [10]:
# patient.where(col("sex") == "male").agg(count(col("sex")).alias("Male")).show()
# patient.where(col("sex") == "female").agg(count(col("sex")).alias("Female")).show()
# patient.groupBy(col("sex")).agg(count(col("sex"))).show()

sex_count = sqlContext.sql("""
  select sex,count(sex) from patient_view group by sex;
""")
male_count = sqlContext.sql("""
  select count(sex) from patient_view where sex == "male";
""")
female_count = sqlContext.sql("""
  select count(sex) from patient_view where sex == "female";
""")

sex_count.show()
male_count.show()
female_count.show()

+------+----------+
|   sex|count(sex)|
+------+----------+
|  null|         0|
|female|      2218|
|  male|      1825|
+------+----------+

+----------+
|count(sex)|
+----------+
|      1825|
+----------+

+----------+
|count(sex)|
+----------+
|      2218|
+----------+



5. Select the count of males and females *as percentage* (how many percent of the data are males and how many are females?)

In [11]:
male_percentage = sqlContext.sql("""
  select count(sex)/(select count(sex) from patient_view) *100 as male from patient_view where sex == "male";
""")
female_percentage = sqlContext.sql("""
  select count(sex)/(select count(sex) from patient_view) *100 as female from patient_view where sex == "female";
""")
male_percentage.show()
female_percentage.show()

+-----------------+
|             male|
+-----------------+
|45.13974771209498|
+-----------------+

+-----------------+
|           female|
+-----------------+
|54.86025228790502|
+-----------------+



6. How many people did survive, and how many didn't?

In [12]:
state_count = sqlContext.sql("""
  select state,count(state) from patient_view group by state;
""")
survive_count = sqlContext.sql("""
  select count(state) as Survived from patient_view where state == "released" ;
""")
unsurvive_count = sqlContext.sql("""
  select count(state) as Unsurvived from patient_view where state =='deceased';
""")

state_count.show()
survive_count.show()
unsurvive_count.show()

+--------+------------+
|   state|count(state)|
+--------+------------+
|isolated|        2158|
|released|        2929|
|deceased|          78|
+--------+------------+

+--------+
|Survived|
+--------+
|    2929|
+--------+

+----------+
|Unsurvived|
+----------+
|        78|
+----------+



Now, let's perform some preprocessing using SQL:

1. Convert *age* column to double after removing the 's' at the end -- *hint: check SUBSTRING method*
2. Select only the following columns: `['sex', 'age', 'province', 'state']`
3. Store the result of the query in a new dataframe

In [13]:
from pyspark.sql.types import DoubleType
# patient.createOrReplaceTempView('patient_view_ml')
patient_view_ml = patient.withColumn("age",regexp_replace(col("age"),'s','')).withColumn("age",col("age").cast(DoubleType()))
patient_view_ml = patient_view_ml.select("sex","age","province","state")
patient_view_ml.printSchema()

root
 |-- sex: string (nullable = true)
 |-- age: double (nullable = true)
 |-- province: string (nullable = true)
 |-- state: string (nullable = true)



Now view the new dataframe to make sure everything is alright

In [14]:
patient_view_ml.show()

+------+----+--------+--------+
|   sex| age|province|   state|
+------+----+--------+--------+
|  male|50.0|   Seoul|released|
|  male|30.0|   Seoul|released|
|  male|50.0|   Seoul|released|
|  male|20.0|   Seoul|released|
|female|20.0|   Seoul|released|
|female|50.0|   Seoul|released|
|  male|20.0|   Seoul|released|
|  male|20.0|   Seoul|released|
|  male|30.0|   Seoul|released|
|female|60.0|   Seoul|released|
|female|50.0|   Seoul|released|
|  male|20.0|   Seoul|released|
|  male|80.0|   Seoul|deceased|
|female|60.0|   Seoul|released|
|  male|70.0|   Seoul|released|
|  male|70.0|   Seoul|released|
|  male|70.0|   Seoul|released|
|  male|20.0|   Seoul|released|
|female|70.0|   Seoul|released|
|female|70.0|   Seoul|released|
+------+----+--------+--------+
only showing top 20 rows



**Now, let's get back to spark operations**

Please copy the following operations from your solution in Lab 1
___

Add a **is_dead** column if patient state is not released then it should yield true, else then False

In [15]:
patient_view_ml = patient_view_ml.withColumn("is_dead",when((col("state") == 'deceased'),lit(1)).otherwise(lit(0)))
patient_view_ml.show(5)

+------+----+--------+--------+-------+
|   sex| age|province|   state|is_dead|
+------+----+--------+--------+-------+
|  male|50.0|   Seoul|released|      0|
|  male|30.0|   Seoul|released|      0|
|  male|50.0|   Seoul|released|      0|
|  male|20.0|   Seoul|released|      0|
|female|20.0|   Seoul|released|      0|
+------+----+--------+--------+-------+
only showing top 5 rows



**Please split the data into train and test dataframes**

*Ratio: 80:20 - Seed=42*

In [16]:
trainDF,testDF = patient_view_ml.randomSplit([0.8,0.2],seed = 42)

In [17]:
testDF.show(5)

+----+----+-----------+--------+-------+
| sex| age|   province|   state|is_dead|
+----+----+-----------+--------+-------+
|null|null|Gyeonggi-do|isolated|      0|
|null|null|Gyeonggi-do|isolated|      0|
|null|null|Gyeonggi-do|isolated|      0|
|null|null|Gyeonggi-do|isolated|      0|
|null|null|Gyeonggi-do|isolated|      0|
+----+----+-----------+--------+-------+
only showing top 5 rows



**Now, let's import RandomForestClassifier and start our ML pipeline**

In [18]:
from pyspark.ml.classification import RandomForestClassifier


**Create a pipeline that contains the following stages:**

- Imputer: impute the null values in `age` column to the mean value
- StringIndexer: convert `sex` to `is_male` and `province` to `province_index` as numerical values
- OneHotEncoder: perform one hot encoding on both `is_male` and -province_index`
- VectorAssembler: assemble feature vector from the following columns: `'age', 'is_male', 'province_index'`
- RandomForestClassifier: final estimator

In [26]:

from pyspark.ml.feature import Imputer,StringIndexer,OneHotEncoder,VectorAssembler
imputer = Imputer(inputCol='age',outputCol='age').setStrategy("mean")
indexer = StringIndexer(inputCols =['sex','province'],outputCols=['is_male','province_index'], handleInvalid='skip')
encoder = OneHotEncoder(inputCols=['is_male','province_index'],outputCols=['is_male_encoded','province_index_encoded'])
assembler = VectorAssembler(inputCols=['age','is_male_encoded','province_index_encoded'],outputCol='features')
rf = RandomForestClassifier(featuresCol='features', labelCol = 'is_dead')

Fit the pipeline to the train dataframe

In [28]:
from pyspark.ml import Pipeline 
pipe = Pipeline(stages=[imputer,indexer,encoder,assembler,rf])
training = pipe.fit(trainDF)

Now transform the test DF to get predictions

In [30]:
pred =training.transform(testDF)
pred.show()

+------+----------------+----------------+--------+-------+-------+--------------+---------------+----------------------+--------------------+--------------------+--------------------+----------+
|   sex|             age|        province|   state|is_dead|is_male|province_index|is_male_encoded|province_index_encoded|            features|       rawPrediction|         probability|prediction|
+------+----------------+----------------+--------+-------+-------+--------------+---------------+----------------------+--------------------+--------------------+--------------------+----------+
|female|40.0494233937397|Gyeongsangbuk-do|released|      0|    0.0|           1.0|  (1,[0],[1.0])|        (16,[1],[1.0])|(18,[0,1,3],[40.0...|[19.7698683020076...|[0.98849341510038...|       0.0|
|female|40.0494233937397|Gyeongsangbuk-do|released|      0|    0.0|           1.0|  (1,[0],[1.0])|        (16,[1],[1.0])|(18,[0,1,3],[40.0...|[19.7698683020076...|[0.98849341510038...|       0.0|
|female|40.049423393

Show the final predictions DF

In [31]:
pred.show()

+------+----------------+----------------+--------+-------+-------+--------------+---------------+----------------------+--------------------+--------------------+--------------------+----------+
|   sex|             age|        province|   state|is_dead|is_male|province_index|is_male_encoded|province_index_encoded|            features|       rawPrediction|         probability|prediction|
+------+----------------+----------------+--------+-------+-------+--------------+---------------+----------------------+--------------------+--------------------+--------------------+----------+
|female|40.0494233937397|Gyeongsangbuk-do|released|      0|    0.0|           1.0|  (1,[0],[1.0])|        (16,[1],[1.0])|(18,[0,1,3],[40.0...|[19.7698683020076...|[0.98849341510038...|       0.0|
|female|40.0494233937397|Gyeongsangbuk-do|released|      0|    0.0|           1.0|  (1,[0],[1.0])|        (16,[1],[1.0])|(18,[0,1,3],[40.0...|[19.7698683020076...|[0.98849341510038...|       0.0|
|female|40.049423393

In [32]:
pred.select('features','is_dead','prediction').show(5,truncate=False)

+---------------------------------------+-------+----------+
|features                               |is_dead|prediction|
+---------------------------------------+-------+----------+
|(18,[0,1,3],[40.0494233937397,1.0,1.0])|0      |0.0       |
|(18,[0,1,3],[40.0494233937397,1.0,1.0])|0      |0.0       |
|(18,[0,1,5],[40.0494233937397,1.0,1.0])|0      |0.0       |
|(18,[0,1,5],[40.0494233937397,1.0,1.0])|0      |0.0       |
|(18,[0,1,5],[40.0494233937397,1.0,1.0])|0      |0.0       |
+---------------------------------------+-------+----------+
only showing top 5 rows



**Model Evaluation**

Now let's evaluate our model! Let's get the accuracy of our model

In [36]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
Binary_Evaluator= BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                         labelCol='is_dead',metricName='areaUnderROC')
Binary_Evaluator.evaluate(pred)

0.5

Excellent! Now let's generate the confusion matrix of our predictions

*Hint: we can use `scikit-learn`'s `classification_report`. You will need to transform the predictions into pandas DF first*

## Great Job!

**If you followed the instructions correctly, you should get a total accuracy of 89%, and F1 scores of 92% and 85%**

**Do you think you can improve this accuracy? Let's see what you can do :)**

___
If you have any questions you can reach out to me:

### Omar Hammad
#### Software Engineer
##### Email: ommar365@gmail.com
##### Phone: 01144070145
##### Linkedin: https://www.linkedin.com/in/omar-a-hammad