In [11]:
# install Java8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# download spark3.0.0
!wget -q https://dlcdn.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
# unzip it
!tar xf spark-3.3.1-bin-hadoop3.tgz
# install findspark 
!pip install -q findspark

In [12]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

In [13]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
sc = spark.sparkContext
import pandas as pd
from pyspark.sql.types import IntegerType
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

In [39]:
# Check the pyspark version
import pyspark
print(pyspark.__version__)

3.3.1


In [49]:
# Load the data
cov19_df = spark.read.csv('Covid_Data.csv',header=True, inferSchema = True)

In [50]:
# To show the first 10 rows of dataset
cov19_df.show(10)

+-----+------------+---+------------+----------+-------+---------+---+--------+--------+----+------+-------+------------+-------------+--------------+-------+-------------+-------+--------------------+---+
|USMER|MEDICAL_UNIT|SEX|PATIENT_TYPE| DATE_DIED|INTUBED|PNEUMONIA|AGE|PREGNANT|DIABETES|COPD|ASTHMA|INMSUPR|HIPERTENSION|OTHER_DISEASE|CARDIOVASCULAR|OBESITY|RENAL_CHRONIC|TOBACCO|CLASIFFICATION_FINAL|ICU|
+-----+------------+---+------------+----------+-------+---------+---+--------+--------+----+------+-------+------------+-------------+--------------+-------+-------------+-------+--------------------+---+
|    2|           1|  1|           1|03/05/2020|     97|        1| 65|       2|       2|   2|     2|      2|           1|            2|             2|      2|            2|      2|                   3| 97|
|    2|           1|  2|           1|03/06/2020|     97|        1| 72|      97|       2|   2|     2|      2|           1|            2|             2|      1|            1|    

In [51]:
cov19_df.printSchema()

root
 |-- USMER: integer (nullable = true)
 |-- MEDICAL_UNIT: integer (nullable = true)
 |-- SEX: integer (nullable = true)
 |-- PATIENT_TYPE: integer (nullable = true)
 |-- DATE_DIED: string (nullable = true)
 |-- INTUBED: integer (nullable = true)
 |-- PNEUMONIA: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- PREGNANT: integer (nullable = true)
 |-- DIABETES: integer (nullable = true)
 |-- COPD: integer (nullable = true)
 |-- ASTHMA: integer (nullable = true)
 |-- INMSUPR: integer (nullable = true)
 |-- HIPERTENSION: integer (nullable = true)
 |-- OTHER_DISEASE: integer (nullable = true)
 |-- CARDIOVASCULAR: integer (nullable = true)
 |-- OBESITY: integer (nullable = true)
 |-- RENAL_CHRONIC: integer (nullable = true)
 |-- TOBACCO: integer (nullable = true)
 |-- CLASIFFICATION_FINAL: integer (nullable = true)
 |-- ICU: integer (nullable = true)



In [52]:
print("There are", cov19_df.count(), "rows", len(cov19_df.columns), "columns in the dataset")

There are 1048575 rows 21 columns in the dataset


In [53]:
# Rename the column name.
cov19_df = cov19_df.withColumnRenamed("HIPERTENSION","HYPERTENSION")

In [54]:
# Convert Dataframe to Pandas for processing since we have small dataset.
cov19_pd_df = cov19_df.toPandas()

In [55]:
# Show the datatypes of each columns.
cov19_pd_df.dtypes

USMER                    int32
MEDICAL_UNIT             int32
SEX                      int32
PATIENT_TYPE             int32
DATE_DIED               object
INTUBED                  int32
PNEUMONIA                int32
AGE                      int32
PREGNANT                 int32
DIABETES                 int32
COPD                     int32
ASTHMA                   int32
INMSUPR                  int32
HYPERTENSION             int32
OTHER_DISEASE            int32
CARDIOVASCULAR           int32
OBESITY                  int32
RENAL_CHRONIC            int32
TOBACCO                  int32
CLASIFFICATION_FINAL     int32
ICU                      int32
dtype: object

In [56]:
# Show first 5 rows of Pandas DataFrame.
cov19_pd_df.head()

Unnamed: 0,USMER,MEDICAL_UNIT,SEX,PATIENT_TYPE,DATE_DIED,INTUBED,PNEUMONIA,AGE,PREGNANT,DIABETES,...,ASTHMA,INMSUPR,HYPERTENSION,OTHER_DISEASE,CARDIOVASCULAR,OBESITY,RENAL_CHRONIC,TOBACCO,CLASIFFICATION_FINAL,ICU
0,2,1,1,1,03/05/2020,97,1,65,2,2,...,2,2,1,2,2,2,2,2,3,97
1,2,1,2,1,03/06/2020,97,1,72,97,2,...,2,2,1,2,2,1,1,2,5,97
2,2,1,2,2,09/06/2020,1,2,55,97,1,...,2,2,2,2,2,2,2,2,3,2
3,2,1,1,1,12/06/2020,97,2,53,2,2,...,2,2,2,2,2,2,2,2,7,97
4,2,1,2,1,21/06/2020,97,2,68,97,1,...,2,2,1,2,2,2,2,2,3,97


# Data Processing

In [57]:
# Checking the missing values (97, 98, 99)
for col in cov19_pd_df.columns:
    print(col)
    print(cov19_pd_df.loc[cov19_pd_df[col] == 97, col].count())
    print(cov19_pd_df.loc[cov19_pd_df[col] == 98, col].count())
    print(cov19_pd_df.loc[cov19_pd_df[col] == 99, col].count())
    print()

USMER
0
0
0

MEDICAL_UNIT
0
0
0

SEX
0
0
0

PATIENT_TYPE
0
0
0

DATE_DIED
0
0
0

INTUBED
848544
0
7325

PNEUMONIA
0
0
16003

AGE
135
124
86

PREGNANT
523511
3754
0

DIABETES
0
3338
0

COPD
0
3003
0

ASTHMA
0
2979
0

INMSUPR
0
3404
0

HYPERTENSION
0
3104
0

OTHER_DISEASE
0
5045
0

CARDIOVASCULAR
0
3076
0

OBESITY
0
3032
0

RENAL_CHRONIC
0
3006
0

TOBACCO
0
3220
0

CLASIFFICATION_FINAL
0
0
0

ICU
848544
0
7488



In [58]:
# Parse the missing values of the PREGNANT column
cov19_pd_df.groupby(['PREGNANT', 'SEX'])['SEX'].count()

PREGNANT  SEX
1         1        8131
2         1      513179
97        2      523511
98        1        3754
Name: SEX, dtype: int64

It can be observed that all values '97' are under men. Hence the value '2' in PREGNANT can be replaced as '2'

In [59]:
# Replace value '97' to '2' in PREGNANT column.
cov19_pd_df.PREGNANT = cov19_pd_df.PREGNANT.replace(97, 2)

In [60]:
cov19_pd_df.PREGNANT.value_counts()

2     1036690
1        8131
98       3754
Name: PREGNANT, dtype: int64

In [61]:
# Parse the missing values of the INTUBED columns.
cov19_pd_df.groupby(['INTUBED', 'PATIENT_TYPE'])['PATIENT_TYPE'].count()

INTUBED  PATIENT_TYPE
1        2                33656
2        2               159050
97       1               848544
99       2                 7325
Name: PATIENT_TYPE, dtype: int64

Since we cannot determine whether the returned home patients were attached to a ventilator or not, the records in INTUBED column with value '97' and '99' are dropped

In [62]:
# Drop the rows with values '97' and '99' in INTUBED column.
cov19_pd_df.drop(cov19_pd_df.index[cov19_pd_df.INTUBED == 97], axis=0, inplace=True)
cov19_pd_df.drop(cov19_pd_df.index[cov19_pd_df.INTUBED == 99], axis=0, inplace=True)

In [63]:
cov19_pd_df.INTUBED.value_counts()

2    159050
1     33656
Name: INTUBED, dtype: int64

In [64]:
# Parse the missing values of the ICU columns.
cov19_pd_df.groupby(['ICU', 'PATIENT_TYPE'])['PATIENT_TYPE'].count()

ICU  PATIENT_TYPE
1    2                16858
2    2               175683
99   2                  165
Name: PATIENT_TYPE, dtype: int64

Since it is unknown that the hospitalised patients were admitted to ICU, the records in ICU column with value '99' are dropped.

In [65]:
# Drop the rows with value '99' in ICU column.
cov19_pd_df.drop(cov19_pd_df.index[cov19_pd_df.ICU == 99], axis=0, inplace=True)

In [66]:
cov19_pd_df.ICU.value_counts()

2    175683
1     16858
Name: ICU, dtype: int64

In [67]:
# Drop the rows with the value of '98'.
cov19_pd_df.drop(cov19_pd_df.index[cov19_pd_df.PNEUMONIA == 99], axis=0, inplace=True)
cov19_pd_df.drop(cov19_pd_df.index[cov19_pd_df.PREGNANT == 98], axis=0, inplace=True)
cov19_pd_df.drop(cov19_pd_df.index[cov19_pd_df.COPD == 98], axis=0, inplace=True)
cov19_pd_df.drop(cov19_pd_df.index[cov19_pd_df.DIABETES == 98], axis=0, inplace=True)
cov19_pd_df.drop(cov19_pd_df.index[cov19_pd_df.ASTHMA == 98], axis=0, inplace=True)
cov19_pd_df.drop(cov19_pd_df.index[cov19_pd_df.INMSUPR == 98], axis=0, inplace=True)
cov19_pd_df.drop(cov19_pd_df.index[cov19_pd_df.HYPERTENSION == 98], axis=0, inplace=True)
cov19_pd_df.drop(cov19_pd_df.index[cov19_pd_df.OTHER_DISEASE == 98], axis=0, inplace=True)
cov19_pd_df.drop(cov19_pd_df.index[cov19_pd_df.CARDIOVASCULAR == 98], axis=0, inplace=True)
cov19_pd_df.drop(cov19_pd_df.index[cov19_pd_df.OBESITY == 98], axis=0, inplace=True)
cov19_pd_df.drop(cov19_pd_df.index[cov19_pd_df.RENAL_CHRONIC == 98], axis=0, inplace=True)
cov19_pd_df.drop(cov19_pd_df.index[cov19_pd_df.TOBACCO == 98], axis=0, inplace=True)

In [68]:
# Creating a new column DIED as the target variable as the risk level of Covid-19 patients. 
# Create a function to get the values of DIED column from DATE_DIED column, value '1' means died and '0' means not died.
def died_column(row):
    if row == '9999-99-99':
        return 0
    else:
        return 1

In [69]:
cov19_pd_df['DIED'] = cov19_pd_df.DATE_DIED.apply(died_column)

In [70]:
cov19_pd_df.DIED.value_counts()

0    122643
1     66638
Name: DIED, dtype: int64

We can observe that there are 122643 patients are dead and 66638 are not.

In [71]:
# Convert the Pandas DataFrame back to PySpark DataFrame.
cov19_df = spark.createDataFrame(cov19_pd_df)

In [72]:
cov19_df.show(5)

+-----+------------+---+------------+----------+-------+---------+---+--------+--------+----+------+-------+------------+-------------+--------------+-------+-------------+-------+--------------------+---+----+
|USMER|MEDICAL_UNIT|SEX|PATIENT_TYPE| DATE_DIED|INTUBED|PNEUMONIA|AGE|PREGNANT|DIABETES|COPD|ASTHMA|INMSUPR|HYPERTENSION|OTHER_DISEASE|CARDIOVASCULAR|OBESITY|RENAL_CHRONIC|TOBACCO|CLASIFFICATION_FINAL|ICU|DIED|
+-----+------------+---+------------+----------+-------+---------+---+--------+--------+----+------+-------+------------+-------------+--------------+-------+-------------+-------+--------------------+---+----+
|    2|           1|  2|           2|09/06/2020|      1|        2| 55|       2|       1|   2|     2|      2|           2|            2|             2|      2|            2|      2|                   3|  2|   1|
|    2|           1|  1|           2|9999-99-99|      2|        1| 40|       2|       2|   2|     2|      2|           2|            2|             2|      

In [73]:
# Check the data types of each feature.
cov19_df.printSchema()

root
 |-- USMER: integer (nullable = true)
 |-- MEDICAL_UNIT: integer (nullable = true)
 |-- SEX: integer (nullable = true)
 |-- PATIENT_TYPE: integer (nullable = true)
 |-- DATE_DIED: string (nullable = true)
 |-- INTUBED: integer (nullable = true)
 |-- PNEUMONIA: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- PREGNANT: integer (nullable = true)
 |-- DIABETES: integer (nullable = true)
 |-- COPD: integer (nullable = true)
 |-- ASTHMA: integer (nullable = true)
 |-- INMSUPR: integer (nullable = true)
 |-- HYPERTENSION: integer (nullable = true)
 |-- OTHER_DISEASE: integer (nullable = true)
 |-- CARDIOVASCULAR: integer (nullable = true)
 |-- OBESITY: integer (nullable = true)
 |-- RENAL_CHRONIC: integer (nullable = true)
 |-- TOBACCO: integer (nullable = true)
 |-- CLASIFFICATION_FINAL: integer (nullable = true)
 |-- ICU: integer (nullable = true)
 |-- DIED: long (nullable = true)



In [74]:
# Convert columns to int
cov19_df = cov19_df.withColumn('DIED',cov19_df.DIED.cast(IntegerType()))

In [75]:
# Check again the data types of each feature.
cov19_df.printSchema()

root
 |-- USMER: integer (nullable = true)
 |-- MEDICAL_UNIT: integer (nullable = true)
 |-- SEX: integer (nullable = true)
 |-- PATIENT_TYPE: integer (nullable = true)
 |-- DATE_DIED: string (nullable = true)
 |-- INTUBED: integer (nullable = true)
 |-- PNEUMONIA: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- PREGNANT: integer (nullable = true)
 |-- DIABETES: integer (nullable = true)
 |-- COPD: integer (nullable = true)
 |-- ASTHMA: integer (nullable = true)
 |-- INMSUPR: integer (nullable = true)
 |-- HYPERTENSION: integer (nullable = true)
 |-- OTHER_DISEASE: integer (nullable = true)
 |-- CARDIOVASCULAR: integer (nullable = true)
 |-- OBESITY: integer (nullable = true)
 |-- RENAL_CHRONIC: integer (nullable = true)
 |-- TOBACCO: integer (nullable = true)
 |-- CLASIFFICATION_FINAL: integer (nullable = true)
 |-- ICU: integer (nullable = true)
 |-- DIED: integer (nullable = true)



# Data Analytics

In [76]:
# Use VectorAssembler to create a single vector column from a specified list of vector columns.
assembler = VectorAssembler(inputCols = ['USMER', 'MEDICAL_UNIT', 'SEX', 'PATIENT_TYPE', 'INTUBED', 'PNEUMONIA', 'AGE', 
                                         'PREGNANT', 'DIABETES', 'COPD', 'ASTHMA', 'INMSUPR', 'HYPERTENSION', 'OTHER_DISEASE', 
                                         'CARDIOVASCULAR', 'OBESITY', 'RENAL_CHRONIC', 'TOBACCO', 'CLASIFFICATION_FINAL', 'ICU'], outputCol='features')
output = assembler.transform(cov19_df)

In [77]:
# Show the 'features' and DIED columns.
output.select('features', 'DIED').show(5, truncate=False)

+----------------------------------------------------------------------------------+----+
|features                                                                          |DIED|
+----------------------------------------------------------------------------------+----+
|[2.0,1.0,2.0,2.0,1.0,2.0,55.0,2.0,1.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,3.0,2.0]|1   |
|[2.0,1.0,1.0,2.0,2.0,1.0,40.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,3.0,2.0]|0   |
|[2.0,1.0,1.0,2.0,2.0,2.0,37.0,2.0,1.0,2.0,2.0,2.0,1.0,2.0,2.0,1.0,2.0,2.0,3.0,2.0]|0   |
|[2.0,1.0,1.0,2.0,2.0,2.0,25.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,3.0,2.0]|0   |
|[2.0,1.0,2.0,2.0,2.0,2.0,24.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,3.0,2.0]|0   |
+----------------------------------------------------------------------------------+----+
only showing top 5 rows



In [78]:
# Split the dataset into training (70%) and testing (30%) dataset.
train, test = output.randomSplit([0.7, 0.3], seed=7)

In [79]:
train.show(5)

+-----+------------+---+------------+----------+-------+---------+---+--------+--------+----+------+-------+------------+-------------+--------------+-------+-------------+-------+--------------------+---+----+--------------------+
|USMER|MEDICAL_UNIT|SEX|PATIENT_TYPE| DATE_DIED|INTUBED|PNEUMONIA|AGE|PREGNANT|DIABETES|COPD|ASTHMA|INMSUPR|HYPERTENSION|OTHER_DISEASE|CARDIOVASCULAR|OBESITY|RENAL_CHRONIC|TOBACCO|CLASIFFICATION_FINAL|ICU|DIED|            features|
+-----+------------+---+------------+----------+-------+---------+---+--------+--------+----+------+-------+------------+-------------+--------------+-------+-------------+-------+--------------------+---+----+--------------------+
|    1|           2|  1|           2|08/05/2020|      1|        2|  0|       2|       2|   2|     2|      2|           2|            2|             2|      2|            2|      2|                   7|  2|   1|[1.0,2.0,1.0,2.0,...|
|    1|           2|  1|           2|9999-99-99|      1|        1| 15|  

In [80]:
# Fit the Logistic Regression to the training data
lr = LogisticRegression(labelCol='DIED', featuresCol='features')
lrmd = lr.fit(train)

In [81]:
predict_test = lrmd.transform(test)

In [82]:
predict_test.select('DIED', 'features', 'rawPrediction', 'prediction', 'probability').show(truncate=False)

+----+----------------------------------------------------------------------------------+----------------------------------------+----------+-----------------------------------------+
|DIED|features                                                                          |rawPrediction                           |prediction|probability                              |
+----+----------------------------------------------------------------------------------+----------------------------------------+----------+-----------------------------------------+
|0   |[1.0,2.0,1.0,2.0,2.0,1.0,0.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,7.0,2.0] |[3.3533218907729765,-3.3533218907729765]|0.0       |[0.9662134471447473,0.03378655285525267] |
|0   |[1.0,2.0,1.0,2.0,2.0,1.0,1.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,7.0,2.0] |[3.3160620654292616,-3.3160620654292616]|0.0       |[0.9649757423163717,0.03502425768362827] |
|0   |[1.0,2.0,1.0,2.0,2.0,1.0,8.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,7

In [300]:
eval = BinaryClassificationEvaluator(labelCol = 'DIED')
auc = eval.evaluate(predict_test)
print('AUC ROC score:',auc)

AUC ROC score: 0.8127093907759539
