## PatientInfo.csv

**patient_id**
the ID of the patient

**sex**
the sex of the patient

**age**
the age of the patient

**country**
the country of the patient

**province**
the province of the patient

**city**
the city of the patient

**infection_case**
the case of infection

**infected_by**
the ID of who infected the patient


**contact_number**
the number of contacts with people

**symptom_onset_date**
the date of symptom onset

**confirmed_date**
the date of being confirmed

**released_date**
the date of being released

**deceased_date**
the date of being deceased

**state**
isolated / released / deceased

### Import the pyspark and check it's version

In [196]:
import pyspark
sc

### Import and create SparkSession

In [197]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

### Load the PatientInfo.csv file and show the first 5 rows

In [198]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [199]:
from pyspark.sql.functions import *
import pyspark.sql.functions as fn
from pyspark.sql.types import *

In [274]:
df = spark.read.csv('PatientInfo.csv',header=True,inferSchema=True)
df.show(20)

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|         NULL|released|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|         NULL|released|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-gu|contact with patient| 2002000001|            17|              NULL|    2020-01-30|

### Display the schema of the dataset

In [201]:
df.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: date (nullable = true)
 |-- released_date: date (nullable = true)
 |-- deceased_date: date (nullable = true)
 |-- state: string (nullable = true)



### Display the statistical summary

In [202]:
df.describe().show()

[Stage 306:>                                                        (0 + 1) / 1]

+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+------------------+--------+
|summary|          patient_id|   sex| age|   country|province|          city|      infection_case|         infected_by|      contact_number|symptom_onset_date|   state|
+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+------------------+--------+
|  count|                5165|  4043|3785|      5165|    5165|          5071|                4246|                1346|                 791|               690|    5165|
|   mean|2.8636345618679576E9|  NULL|NULL|      NULL|    NULL|          NULL|                NULL|2.2845944015643125E9|1.6772572523506988E7|              NULL|    NULL|
| stddev| 2.074210725277473E9|  NULL|NULL|      NULL|    NULL|          NULL|                NULL|1.5265072953383324E9| 3.093097580985502E8|              N

                                                                                

### Using the state column.
### How many people survived (released), and how many didn't survive (isolated/deceased)?

In [203]:
df.groupBy('state').count().show()

+--------+-----+
|   state|count|
+--------+-----+
|isolated| 2158|
|released| 2929|
|deceased|   78|
+--------+-----+



### Display the number of null values in each column

In [204]:
df.select([fn.count(when(col(x).isNull(), x)).alias(x) for x in df.columns]).show()

+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+
|patient_id| sex| age|country|province|city|infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|state|
+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+
|         0|1122|1380|      0|       0|  94|           919|       3819|          4374|              4475|             3|         3578|         5099|    0|
+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+



## Data preprocessing

### Fill the nulls in the deceased_date with the released_date. 
- You can use <b>coalesce</b> function

In [275]:
filled_df = df.withColumn('deceased_date', fn.coalesce(col('deceased_date'),\
                                                       col('released_date')))
filled_df.select('deceased_date','released_date').show()

+-------------+-------------+
|deceased_date|released_date|
+-------------+-------------+
|   2020-02-05|   2020-02-05|
|   2020-03-02|   2020-03-02|
|   2020-02-19|   2020-02-19|
|   2020-02-15|   2020-02-15|
|   2020-02-24|   2020-02-24|
|   2020-02-19|   2020-02-19|
|   2020-02-10|   2020-02-10|
|   2020-02-24|   2020-02-24|
|   2020-02-21|   2020-02-21|
|   2020-02-29|   2020-02-29|
|   2020-02-29|   2020-02-29|
|   2020-02-27|   2020-02-27|
|         NULL|         NULL|
|   2020-03-12|   2020-03-12|
|         NULL|         NULL|
|   2020-03-11|   2020-03-11|
|   2020-03-01|   2020-03-01|
|         NULL|         NULL|
|   2020-03-08|   2020-03-08|
|         NULL|         NULL|
+-------------+-------------+
only showing top 20 rows



### Add a column named no_days which is difference between the deceased_date and the confirmed_date then show the top 5 rows. Print the schema.
- <b> Hint: You need to typecast these columns as date first <b>

In [276]:
type_cast_df = filled_df.withColumn('deceased_date', fn.to_date('deceased_date', 'yyyy-MM-dd')) \
                        .withColumn('confirmed_date', fn.to_date('confirmed_date', 'yyyy-MM-dd'))

diff_df = type_cast_df.withColumn('no_days', fn.datediff('deceased_date', 'confirmed_date'))

diff_df.select('deceased_date','confirmed_date','no_days').show(5)
diff_df.printSchema()

+-------------+--------------+-------+
|deceased_date|confirmed_date|no_days|
+-------------+--------------+-------+
|   2020-02-05|    2020-01-23|     13|
|   2020-03-02|    2020-01-30|     32|
|   2020-02-19|    2020-01-30|     20|
|   2020-02-15|    2020-01-30|     16|
|   2020-02-24|    2020-01-31|     24|
+-------------+--------------+-------+
only showing top 5 rows

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: date (nullable = true)
 |-- released_date: date (nullable = true)
 |-- deceased_date: date (nullable = true)
 |-- state: string (nullable = true)
 |-- no_days: integer (nullable = true)



### Remove null values of sex column.
### Add a is_male column if male then it should yield true, else (Female) then False

In [277]:
filtered_df = diff_df.filter(col('sex').isNotNull())
sex_df = filtered_df.withColumn('is_male', fn.when(col('sex') == 'male', True).otherwise(False))
sex_df.select('sex','is_male').show(5)

+------+-------+
|   sex|is_male|
+------+-------+
|  male|   true|
|  male|   true|
|  male|   true|
|  male|   true|
|female|  false|
+------+-------+
only showing top 5 rows



### Add a is_dead column if patient state is not released then it should yield true, else then False

- Use <b>UDF</b> to perform this task. 
- However, UDF is not recommended there is no built in function can do the required operation.
- UDF is slower than built in functions.

In [254]:
from pyspark.sql.functions import udf
def map_patient_state(state):
    return state != "released"

In [255]:
map_patient_state_udf=udf(map_patient_state,BooleanType())

In [256]:
map_patient_state('abdo')

True

In [266]:
df_with_mapped_state = diff_df.withColumn("is_not_released", map_patient_state(col("state")))
df_with_mapped_state.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+---------------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|is_not_released|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+---------------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|          false|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|          false|
|1000000003|  m

In [278]:
dead_df = sex_df.withColumn('is_dead', fn.when(col('state') == 'released', True).otherwise(False))
dead_df.select('state','is_dead').show(15)

+--------+-------+
|   state|is_dead|
+--------+-------+
|released|   true|
|released|   true|
|released|   true|
|released|   true|
|released|   true|
|released|   true|
|released|   true|
|released|   true|
|released|   true|
|released|   true|
|released|   true|
|released|   true|
|deceased|  false|
|released|   true|
|released|   true|
+--------+-------+
only showing top 15 rows



### Change the ages to bins from 10s, 0s, 10s, 20s,.etc to 0,10, 20

In [279]:
suffix_adj_df = dead_df.withColumn('age', fn.regexp_replace(col('age'), 's$', ''))
suffix_adj_df.select('age').show(5)

+---+
|age|
+---+
| 50|
| 30|
| 50|
| 20|
| 20|
+---+
only showing top 5 rows



### Change age, and no_days  to be typecasted as Double

In [280]:
types_changed_df = suffix_adj_df.withColumn('age', col('age').cast(DoubleType())) \
                                .withColumn('no_days', col('no_days').cast(DoubleType()))
types_changed_df.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: double (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: date (nullable = true)
 |-- released_date: date (nullable = true)
 |-- deceased_date: date (nullable = true)
 |-- state: string (nullable = true)
 |-- no_days: double (nullable = true)
 |-- is_male: boolean (nullable = false)
 |-- is_dead: boolean (nullable = false)



### Drop the columns
["patient_id","sex","infected_by","contact_number","released_date","state",
"symptom_onset_date","confirmed_date","deceased_date","country","no_days",
"city","infection_case"]

In [325]:
drop_columns = ["patient_id","sex","infected_by","contact_number","released_date","state", "symptom_onset_date","confirmed_date","deceased_date","country","no_days", "city","infection_case"]
df_drop_cols = types_changed_df.drop(*drop_columns)
df_drop_cols.printSchema()

root
 |-- age: double (nullable = true)
 |-- province: string (nullable = true)
 |-- is_male: boolean (nullable = false)
 |-- is_dead: boolean (nullable = false)



### Recount the number of nulls now

In [326]:
df_drop_cols.select([fn.count(when(col(x).isNull(), x)).alias(x) for x in df_drop_cols.columns]).show()

+---+--------+-------+-------+
|age|province|is_male|is_dead|
+---+--------+-------+-------+
|261|       0|      0|      0|
+---+--------+-------+-------+



## Now do the same but using SQL select statement

### From the original Patient DataFrame, Create a temporary view (table).

In [213]:
df.createOrReplaceTempView("patient")

### Use SELECT statement to select all columns from the dataframe and show the output.

In [214]:
spark.sql("""SELECT *
            from patient
          """).show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|         NULL|released|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|         NULL|released|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-gu|contact with patient| 2002000001|            17|              NULL|    2020-01-30|

### *Using SQL commands*, limit the output to only 5 rows 

In [215]:
spark.sql("""SELECT *
            from patient
            limit 5
          """).show()

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|         NULL|released|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|         NULL|released|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|              NULL|    2020-01-30|   202

### Select the count of males and females in the dataset

In [216]:
spark.sql("""SELECT sex,count(sex) gender_count
            from patient
            where sex is not null
            group by sex
          """).show()

+------+------------+
|   sex|gender_count|
+------+------------+
|female|        2218|
|  male|        1825|
+------+------------+



### How many people did survive, and how many didn't?

In [267]:
 spark.sql("""SELECT state,count(state) survival_count
            from patient
            where state is not null
            group by state
          """).show()

+--------+--------------+
|   state|survival_count|
+--------+--------------+
|isolated|          2158|
|released|          2929|
|deceased|            78|
+--------+--------------+



### Now, let's perform some preprocessing using SQL:
1. Convert *age* column to double after removing the 's' at the end -- *hint: check SUBSTRING method*
2. Select only the following columns: `['sex', 'age', 'province', 'state']`
3. Store the result of the query in a new dataframe

In [270]:
df_sql_write = spark.sql("""SELECT sex, age, province, state, substring(age,1,len(age)-1) as age_trimmed
            from patient
          """)
df_sql_write.write.csv('Mycsv',mode='overwrite')
df_sql_write.show()

+------+---+--------+--------+-----------+
|   sex|age|province|   state|age_trimmed|
+------+---+--------+--------+-----------+
|  male|50s|   Seoul|released|         50|
|  male|30s|   Seoul|released|         30|
|  male|50s|   Seoul|released|         50|
|  male|20s|   Seoul|released|         20|
|female|20s|   Seoul|released|         20|
|female|50s|   Seoul|released|         50|
|  male|20s|   Seoul|released|         20|
|  male|20s|   Seoul|released|         20|
|  male|30s|   Seoul|released|         30|
|female|60s|   Seoul|released|         60|
|female|50s|   Seoul|released|         50|
|  male|20s|   Seoul|released|         20|
|  male|80s|   Seoul|deceased|         80|
|female|60s|   Seoul|released|         60|
|  male|70s|   Seoul|released|         70|
|  male|70s|   Seoul|released|         70|
|  male|70s|   Seoul|released|         70|
|  male|20s|   Seoul|released|         20|
|female|70s|   Seoul|released|         70|
|female|70s|   Seoul|released|         70|
+------+---

## Machine Learning 
### Create a pipeline model to predict is_dead and evaluate the performance.
- Use <b>StringIndexer</b> to transform <b>string</b> data type to indices.
- Use <b>OneHotEncoder</b> to deal with categorical values.
- Use <b>Imputer</b> to fill missing data with mean.

In [336]:
df_drop_cols.printSchema()

root
 |-- age: double (nullable = true)
 |-- province: string (nullable = true)
 |-- is_male: double (nullable = false)
 |-- is_dead: double (nullable = false)



In [354]:
from pyspark.ml.feature import Imputer
from pyspark.ml.feature import VectorAssembler,StringIndexer,OneHotEncoder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

In [335]:
df_drop_cols = df_drop_cols.withColumn('is_dead', col('is_dead').cast(DoubleType())) \
                           .withColumn('is_male', col('is_male').cast(DoubleType()))

In [337]:
df_drop_cols.show()

+----+--------+-------+-------+
| age|province|is_male|is_dead|
+----+--------+-------+-------+
|50.0|   Seoul|    1.0|    1.0|
|30.0|   Seoul|    1.0|    1.0|
|50.0|   Seoul|    1.0|    1.0|
|20.0|   Seoul|    1.0|    1.0|
|20.0|   Seoul|    0.0|    1.0|
|50.0|   Seoul|    0.0|    1.0|
|20.0|   Seoul|    1.0|    1.0|
|20.0|   Seoul|    1.0|    1.0|
|30.0|   Seoul|    1.0|    1.0|
|60.0|   Seoul|    0.0|    1.0|
|50.0|   Seoul|    0.0|    1.0|
|20.0|   Seoul|    1.0|    1.0|
|80.0|   Seoul|    1.0|    0.0|
|60.0|   Seoul|    0.0|    1.0|
|70.0|   Seoul|    1.0|    1.0|
|70.0|   Seoul|    1.0|    1.0|
|70.0|   Seoul|    1.0|    1.0|
|20.0|   Seoul|    1.0|    1.0|
|70.0|   Seoul|    0.0|    1.0|
|70.0|   Seoul|    0.0|    1.0|
+----+--------+-------+-------+
only showing top 20 rows



In [333]:
imputer = Imputer(inputCols=['age'], outputCols=['age_imputed'])
imputer.setStrategy("mean")

Imputer_935369b1f57e

In [338]:
df_drop_cols.select([fn.count(when(col(x).isNull(), x)).alias(x) for x in df_drop_cols.columns]).show()

+---+--------+-------+-------+
|age|province|is_male|is_dead|
+---+--------+-------+-------+
|261|       0|      0|      0|
+---+--------+-------+-------+



In [365]:
col_types = df_impute.dtypes
string_cols = [k for k, v in col_types if ((v == 'string') & (k != 'is_dead'))]  
string_cols_out = [k + '_Index' for k in string_cols]
OHE_cols_out = [k + '_OHE' for k in string_cols]
all_cols = OHE_cols_out +['age_imputed']

In [366]:
StrInd = StringIndexer(inputCols=string_cols, outputCols=string_cols_out)
OHE = OneHotEncoder(inputCols=string_cols_out, outputCols=OHE_cols_out)
vecAssemb = VectorAssembler(inputCols=all_cols, outputCol='features')

In [359]:
all_cols

['province_OHE', 'is_male_OHE', 'age_imputed']

In [368]:
df_train, df_test = df_drop_cols.randomSplit([0.8,0.2],seed=42)

In [369]:
lr = LogisticRegression(featuresCol='features', labelCol='is_dead', predictionCol='prediction')

In [370]:
stages = [imputer, label_indexer, StrInd, OHE, vecAssemb, lr]
pl = Pipeline(stages=stages)

In [371]:
pl_Model = pl.fit(df_train)

                                                                                

In [372]:
pred_test = pl_Model.transform(df_test)

In [373]:
ac = MulticlassClassificationEvaluator(labelCol='is_dead', metricName='accuracy')

In [374]:
ac.evaluate(pred_test)

                                                                                

0.8773333333333333