## PatientInfo.csv

**patient_id**
the ID of the patient

**sex**
the sex of the patient

**age**
the age of the patient

**country**
the country of the patient

**province**
the province of the patient

**city**
the city of the patient

**infection_case**
the case of infection

**infected_by**
the ID of who infected the patient


**contact_number**
the number of contacts with people

**symptom_onset_date**
the date of symptom onset

**confirmed_date**
the date of being confirmed

**released_date**
the date of being released

**deceased_date**
the date of being deceased

**state**
isolated / released / deceased

In [None]:
from getpass import getpass
import os
user = "asmaamohy"
key = "109bae6fb0b390dcca9d9690f10d5b2e"

if '.kaggle' not in os.listdir('/root'):
    !mkdir ~/.kaggle
!touch /root/.kaggle/kaggle.json
!chmod 666 /root/.kaggle/kaggle.json
with open('/root/.kaggle/kaggle.json', 'w') as f:
    f.write('{"username":"%s","key":"%s"}' % (user, key))
!chmod 600 /root/.kaggle/kaggle.json

In [None]:
!kaggle datasets download -d kimjihoo/coronavirusdataset

Downloading coronavirusdataset.zip to /content
  0% 0.00/7.00M [00:00<?, ?B/s]
100% 7.00M/7.00M [00:00<00:00, 85.5MB/s]


In [None]:
!unzip /content/coronavirusdataset.zip

Archive:  /content/coronavirusdataset.zip
  inflating: Case.csv                
  inflating: PatientInfo.csv         
  inflating: Policy.csv              
  inflating: Region.csv              
  inflating: SearchTrend.csv         
  inflating: SeoulFloating.csv       
  inflating: Time.csv                
  inflating: TimeAge.csv             
  inflating: TimeGender.csv          
  inflating: TimeProvince.csv        
  inflating: Weather.csv             


In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


### Import the pyspark and check it's version

In [None]:
import pyspark
pyspark.__version__

'3.3.0'

### Import and create SparkSession

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

### Load the PatientInfo.csv file and show the first 5 rows

In [None]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [None]:
df= spark.read.option("header",True).csv('/content/PatientInfo.csv',inferSchema =True)
df.show(5)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------+--------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|     confirmed_date|      released_date|deceased_date|   state|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|2020-01-23 00:00:00|2020-02-05 00:00:00|         null|released|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|2020-01-30 00:00:00|2020-03-02 00:00:00|         null|released|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|

### Display the schema of the dataset

In [None]:
df.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: timestamp (nullable = true)
 |-- released_date: timestamp (nullable = true)
 |-- deceased_date: timestamp (nullable = true)
 |-- state: string (nullable = true)



### Display the statistical summary

In [None]:
df.describe().show()

+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+------------------+--------+
|summary|          patient_id|   sex| age|   country|province|          city|      infection_case|         infected_by|      contact_number|symptom_onset_date|   state|
+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+------------------+--------+
|  count|                5165|  4043|3785|      5165|    5165|          5071|                4246|                1346|                 791|               690|    5165|
|   mean|2.8636345618679576E9|  null|null|      null|    null|          null|                null|2.2845944015643125E9|1.6772572523506988E7|              null|    null|
| stddev| 2.074210725277473E9|  null|null|      null|    null|          null|                null|1.5265072953383324E9| 3.093097580985502E8|              n

### Using the state column.
### How many people survived (released), and how many didn't survive (isolated/deceased)?

In [None]:
df.groupBy("state").count().show()

+--------+-----+
|   state|count|
+--------+-----+
|isolated| 2158|
|released| 2929|
|deceased|   78|
+--------+-----+



### Display the number of null values in each column

In [None]:
import pyspark.sql.functions as fn

In [None]:
df.select([fn.count(fn.when(fn.isnull(c), c)).alias(c) for c in df.columns]).show()

+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+
|patient_id| sex| age|country|province|city|infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|state|
+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+
|         0|1122|1380|      0|       0|  94|           919|       3819|          4374|              4475|             3|         3578|         5099|    0|
+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+



## Data preprocessing

### Fill the nulls in the deceased_date with the released_date. 
- You can use <b>coalesce</b> function

In [None]:
df_fill=df.withColumn('deceased_date', fn.coalesce('deceased_date', 'released_date'))
df_fill.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------------+--------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|     confirmed_date|      released_date|      deceased_date|   state|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------------+--------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|2020-01-23 00:00:00|2020-02-05 00:00:00|2020-02-05 00:00:00|released|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|2020-01-30 00:00:00|2020-03-02 00:00:00|2020-03-02 00:00:00|released|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-

### Add a column named no_days which is difference between the deceased_date and the confirmed_date then show the top 5 rows. Print the schema.
- <b> Hint: You need to typecast these columns as date first <b>

In [None]:
df_fill=df_fill.withColumn('no_days', fn.datediff(df_fill['confirmed_date'],df_fill['deceased_date']))
df_fill.show(5)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------------+--------+-------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|     confirmed_date|      released_date|      deceased_date|   state|no_days|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------------+--------+-------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|2020-01-23 00:00:00|2020-02-05 00:00:00|2020-02-05 00:00:00|released|    -13|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|2020-01-30 00:00:00|2020-03-02 00:00:00|2020-03-02 00:00:00|released|    -32|
|1000000003|  m

### Add a is_male column if male then it should yield true, else then False

In [None]:
# df_fill = df_fill.withColumn("is_male", fn.when(df['sex'] == "male",True).otherwise(False))
# df_fill.show()
from pyspark.sql.types import BooleanType
t = lambda s: s=='male'
   
t=fn.udf(t,BooleanType())
df_fill=df_fill.withColumn('is_male',t('state'))
df_fill.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------------+--------+-------+-------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|     confirmed_date|      released_date|      deceased_date|   state|no_days|is_male|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------------+--------+-------+-------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|2020-01-23 00:00:00|2020-02-05 00:00:00|2020-02-05 00:00:00|released|    -13|  false|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|2020-01-30 00:00:00|2020-03-02 00:00:00|2020-03-02 00:00

### Add a is_dead column if patient state is not released then it should yield true, else then False

- Use <b>UDF</b> to perform this task. 
- However, UDF is not recommended there is no built in function can do the required operation.
- UDF is slower than built in functions.

In [None]:
from pyspark.sql.types import BooleanType
f = lambda s: s=='released'
   
f=fn.udf(f,BooleanType())
df_fill=df_fill.withColumn('is_dead',f('state'))
df_fill.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------------+--------+-------+-------+-------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|     confirmed_date|      released_date|      deceased_date|   state|no_days|is_male|is_dead|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------------+--------+-------+-------+-------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|2020-01-23 00:00:00|2020-02-05 00:00:00|2020-02-05 00:00:00|released|    -13|  false|   true|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|2020-01-30 00:00:00|2020

### Change the ages to bins from 10s, 0s, 10s, 20s,.etc to 0,10, 20

In [None]:
from pyspark.sql.types import IntegerType
#f2= lambda s : int(s[:-1])
def f2(age):
    try:
      return int(age[:-1])
    except:
      return None
f2=fn.udf(f2,IntegerType())
df_fill=df_fill.withColumn('age',f2('age'))
df_fill.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------------+--------+-------+-------+-------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|     confirmed_date|      released_date|      deceased_date|   state|no_days|is_male|is_dead|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------------+--------+-------+-------+-------+
|1000000001|  male| 50|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|2020-01-23 00:00:00|2020-02-05 00:00:00|2020-02-05 00:00:00|released|    -13|  false|   true|
|1000000002|  male| 30|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|2020-01-30 00:00:00|2020

### Change age, and no_days  to be typecasted as Double

In [None]:
from pyspark.sql.types import  DoubleType
df_fill = df_fill.withColumn("age", df_fill.age.cast(DoubleType()))
df_fill = df_fill.withColumn("no_days", df_fill.no_days.cast(DoubleType()))
df_fill.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: double (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: timestamp (nullable = true)
 |-- released_date: timestamp (nullable = true)
 |-- deceased_date: timestamp (nullable = true)
 |-- state: string (nullable = true)
 |-- no_days: double (nullable = true)
 |-- is_male: boolean (nullable = true)
 |-- is_dead: boolean (nullable = true)



### Drop the columns
["patient_id","sex","infected_by","contact_number","released_date","state",
"symptom_onset_date","confirmed_date","deceased_date","country","no_days",
"city","infection_case"]

In [None]:
cols=["patient_id","sex","infected_by","contact_number","released_date","state","symptom_onset_date","confirmed_date","deceased_date","country","no_days","city","infection_case"]
df_droped =df_fill.drop(*cols)

### Recount the number of nulls now

In [None]:
df_droped.select([fn.count(fn.when(fn.isnull(c), c)).alias(c) for c in df_droped.columns]).show()

+----+--------+-------+-------+
| age|province|is_male|is_dead|
+----+--------+-------+-------+
|1380|       0|      0|      0|
+----+--------+-------+-------+



In [None]:
df_fill.select([fn.count(fn.when(fn.isnull(c), c)).alias(c) for c in df_fill.columns]).show()

+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+-------+-------+-------+
|patient_id| sex| age|country|province|city|infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|state|no_days|is_male|is_dead|
+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+-------+-------+-------+
|         0|1122|1380|      0|       0|  94|           919|       3819|          4374|              4475|             3|         3578|         3514|    0|   3514|      0|      0|
+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+-------+-------+-------+



In [None]:
#df_fill =df_fill.na.drop()

## Now do the same but using SQL select statement

### From the original Patient DataFrame, Create a temporary view (table).

In [None]:
df.createOrReplaceTempView("df_Patient")

### Use SELECT statement to select all columns from the dataframe and show the output.

In [None]:
df.select("*").show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------+--------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|     confirmed_date|      released_date|deceased_date|   state|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|2020-01-23 00:00:00|2020-02-05 00:00:00|         null|released|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|2020-01-30 00:00:00|2020-03-02 00:00:00|         null|released|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-gu|contact with patient| 20020

### *Using SQL commands*, limit the output to only 5 rows 

In [None]:

spark.sql("""SELECT * from df_Patient
            limit 5 """).show()

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------+--------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|     confirmed_date|      released_date|deceased_date|   state|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|2020-01-23 00:00:00|2020-02-05 00:00:00|         null|released|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|2020-01-30 00:00:00|2020-03-02 00:00:00|         null|released|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|

### Select the count of males and females in the dataset

In [None]:
spark.sql("""SELECT sex, count(*) from df_Patient
             group by sex """).show()

+------+--------+
|   sex|count(1)|
+------+--------+
|  null|    1122|
|female|    2218|
|  male|    1825|
+------+--------+



### How many people did survive, and how many didn't?

In [None]:
spark.sql("""SELECT state, count(*) from df_Patient
             group by state """).show()

+--------+--------+
|   state|count(1)|
+--------+--------+
|isolated|    2158|
|released|    2929|
|deceased|      78|
+--------+--------+



### Now, let's perform some preprocessing using SQL:
1. Convert *age* column to double after removing the 's' at the end -- *hint: check SUBSTRING method*
2. Select only the following columns: `['sex', 'age', 'province', 'state']`
3. Store the result of the query in a new dataframe

In [None]:
query_df=spark.sql("""SELECT SUBSTRING(age,1,2) as age, sex, province,state from df_Patient """)
query_df.show()

+---+------+--------+--------+
|age|   sex|province|   state|
+---+------+--------+--------+
| 50|  male|   Seoul|released|
| 30|  male|   Seoul|released|
| 50|  male|   Seoul|released|
| 20|  male|   Seoul|released|
| 20|female|   Seoul|released|
| 50|female|   Seoul|released|
| 20|  male|   Seoul|released|
| 20|  male|   Seoul|released|
| 30|  male|   Seoul|released|
| 60|female|   Seoul|released|
| 50|female|   Seoul|released|
| 20|  male|   Seoul|released|
| 80|  male|   Seoul|deceased|
| 60|female|   Seoul|released|
| 70|  male|   Seoul|released|
| 70|  male|   Seoul|released|
| 70|  male|   Seoul|released|
| 20|  male|   Seoul|released|
| 70|female|   Seoul|released|
| 70|female|   Seoul|released|
+---+------+--------+--------+
only showing top 20 rows



## Machine Learning 
### Create a pipeline model to predict is_dead.
- Use <b>StringIndexer</b> to transform <b>string</b> data type to indices.
- Use <b>OneHotEncoder</b> to deal with categorical values.
- Use <b>Imputer</b> to fill missing data with mean.

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, Imputer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
df_fill = df_fill.withColumn("is_male", df_fill.is_male.cast(IntegerType()))
df_fill = df_fill.withColumn("is_dead", df_fill.is_dead.cast(IntegerType()))

In [None]:
# query_df = query_df.withColumn("age", query_df.age.cast(IntegerType()))
# query_df.printSchema()
# split data
trainDF, testDF = df_fill.randomSplit([.8,.2],seed=42)
print(f"There are {trainDF.count()} rows in the training set, and {testDF.count()} in the test set")

There are 4166 rows in the training set, and 999 in the test set


In [None]:
df_fill.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: double (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: timestamp (nullable = true)
 |-- released_date: timestamp (nullable = true)
 |-- deceased_date: timestamp (nullable = true)
 |-- state: string (nullable = true)
 |-- no_days: double (nullable = true)
 |-- is_male: integer (nullable = true)
 |-- is_dead: integer (nullable = true)



In [None]:
df_fill.select([fn.count(fn.when(fn.isnull(c), c)).alias(c) for c in df_fill.columns]).show()

+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+-------+-------+-------+
|patient_id| sex| age|country|province|city|infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|state|no_days|is_male|is_dead|
+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+-------+-------+-------+
|         0|1122|1380|      0|       0|  94|           919|       3819|          4374|              4475|             3|         3578|         3514|    0|   3514|      0|      0|
+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+-------+-------+-------+



In [None]:
categoricalCols = ['country','province','state']

In [None]:
indexOutputCols = [x + "_Index" for x in categoricalCols]
oheOutputCols = [x + "_OHE" for x in categoricalCols]

# create Indexer and Encoder
stringIndexer = StringIndexer(inputCols=categoricalCols,
                              outputCols=indexOutputCols,
                             handleInvalid='skip')

oheEncoder = OneHotEncoder(inputCols=indexOutputCols,
                          outputCols=oheOutputCols)

In [None]:
numericCols = ['is_male']

In [None]:
assemblerInputs = oheOutputCols + numericCols
# create Assembler
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=assemblerInputs,outputCol='features')

In [None]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol='features',
                      labelCol='is_dead',
                      predictionCol='prediction')

In [None]:
# Building the pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[stringIndexer,oheEncoder,
                           vecAssembler,lr])
pipelineModel = pipeline.fit(trainDF)

In [None]:
predDF = pipelineModel.transform(testDF)
predDF.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: double (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: timestamp (nullable = true)
 |-- released_date: timestamp (nullable = true)
 |-- deceased_date: timestamp (nullable = true)
 |-- state: string (nullable = true)
 |-- no_days: double (nullable = true)
 |-- is_male: integer (nullable = true)
 |-- is_dead: integer (nullable = true)
 |-- country_Index: double (nullable = false)
 |-- province_Index: double (nullable = false)
 |-- state_Index: double (nullable = false)
 |-- country_OHE: vector (nullable = true)
 |-- province_OHE: vector (nullable = true)
 |-- state_OHE: vector (nullable = true)
 |-- features: vect