# **Labs 1 and 2 PySpark:**

In these labs we will be using the "[[NeurIPS 2020] Data Science for COVID-19 (DS4C)](https://www.kaggle.com/datasets/kimjihoo/coronavirusdataset?select=PatientInfo.csv)" dataset, retrieved from [Kaggle](https://www.kaggle.com/) on 1/6/2022, for educational non commercial purpose, License
[CC BY-NC-SA 4.0
](https://creativecommons.org/licenses/by-nc-sa/4.0/)


The csv file that we will be using in this lab is **PatientInfo**.

## PatientInfo.csv

**patient_id**
the ID of the patient

**sex**
the sex of the patient

**age**
the age of the patient

**country**
the country of the patient

**province**
the province of the patient

**city**
the city of the patient

**infection_case**
the case of infection

**infected_by**
the ID of who infected the patient


**contact_number**
the number of contacts with people

**symptom_onset_date**
the date of symptom onset

**confirmed_date**
the date of being confirmed

**released_date**
the date of being released

**deceased_date**
the date of being deceased

**state**
isolated / released / deceased

In [None]:
# install java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

# install findspark using pip
!pip install -q findspark

!pip install pyspark

import findspark
findspark.init()

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


### Import the pyspark and check it's version

In [None]:
# Import PySpark
import pyspark
from pyspark.sql import SparkSession

### Import and create SparkSession

In [None]:
# Create SparkSession
spark = SparkSession.builder.master("local[4]") \
                    .appName('Practica_Day') \
                    .getOrCreate()
print('PySpark Version :'+spark.version)

PySpark Version :3.0.0


### Load the PatientInfo.csv file and show the first 5 rows

In [None]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [None]:
df = spark.read.csv("PatientInfo.csv", header=True, inferSchema=True)

In [None]:
df1 = df

### Display the schema of the dataset

In [None]:
df.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: string (nullable = true)
 |-- released_date: string (nullable = true)
 |-- deceased_date: string (nullable = true)
 |-- state: string (nullable = true)



### Display the statistical summary

In [None]:
df.summary().show()

+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+------------------+--------------+-------------+-------------+--------+
|summary|          patient_id|   sex| age|   country|province|          city|      infection_case|         infected_by|      contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+------------------+--------------+-------------+-------------+--------+
|  count|                5165|  4043|3785|      5165|    5165|          5071|                4246|                1346|                 791|               690|          5162|         1587|           66|    5165|
|   mean|2.8636345618679576E9|  null|null|      null|    null|          null|                null|2.2845944015643125E9|1.6772572523506988E7|            

### Using the state column.
### How many people survived (released), and how many didn't survive (isolated/deceased)?

In [None]:
df.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|         null|released|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|         null|released|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-gu|contact with patient| 2002000001|            17|              null|    2020-01-30|

In [None]:
from pyspark.sql.functions import *

#Number of people survived
df.createOrReplaceTempView('MyDF')
df.select('state').where(df.state=='released').agg(count('*').alias('NumberOfdeceased')).show()

+----------------+
|NumberOfdeceased|
+----------------+
|            2929|
+----------------+



In [None]:
df.select('state').where(df.state=='deceased').agg(count('*').alias('NumberOfdeceased')).show()

+----------------+
|NumberOfdeceased|
+----------------+
|              78|
+----------------+



### Display the number of null values in each column

In [None]:
df.select([count(when(col(c).isNull(),c)).alias(c) for c in df.columns]).show()

+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+
|patient_id| sex| age|country|province|city|infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|state|
+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+
|         0|1122|1380|      0|       0|  94|           919|       3819|          4374|              4475|             3|         3578|         5099|    0|
+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+



Exception ignored in: <function JavaWrapper.__del__ at 0x7ff141f5f950>
Traceback (most recent call last):
  File "/content/spark-3.0.0-bin-hadoop3.2/python/pyspark/ml/wrapper.py", line 42, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'MulticlassClassificationEvaluator' object has no attribute '_java_obj'
Exception ignored in: <function JavaWrapper.__del__ at 0x7ff141f5f950>
Traceback (most recent call last):
  File "/content/spark-3.0.0-bin-hadoop3.2/python/pyspark/ml/wrapper.py", line 42, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'MulticlassClassificationEvaluator' object has no attribute '_java_obj'


## Data preprocessing

### Fill the nulls in the deceased_date with the released_date. 
- You can use <b>coalesce</b> function

In [None]:
df = df.withColumn("deceased_date",coalesce(df.deceased_date,df.released_date))

### Add a column named no_days which is difference between the deceased_date and the confirmed_date then show the top 5 rows. Print the schema.
- <b> Hint: You need to typecast these columns as date first <b>

In [None]:
import pyspark.sql.functions as fn
df = df.withColumn('deceased_date',df.deceased_date.cast('Date')).withColumn('confirmed_date',df.confirmed_date.cast('Date'))
df.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: date (nullable = true)
 |-- released_date: string (nullable = true)
 |-- deceased_date: date (nullable = true)
 |-- state: string (nullable = true)



In [None]:
df = df.withColumn('no_days', lit(datediff(col("deceased_date"),col("confirmed_date"))))
df.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-gu|contact with patient| 2002000001|         

### Add a is_male column if male then it should yield true, else then False

In [None]:
df = df.withColumn('is_male', when((df.sex == 'male'), lit("True")).otherwise(lit("False")))
df.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|is_male|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|   True|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|   True|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-gu|co

### Add a is_dead column if patient state is not released then it should yield true, else then False

- Use <b>UDF</b> to perform this task. 
- However, UDF is not recommended there is no built in function can do the required operation.
- UDF is slower than built in functions.

In [None]:
def Myfun(statecol):
  if statecol == 'released':
    return False
  else:
    return True

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType,BooleanType,DateType

mudf = udf(Myfun, BooleanType())
df = df.withColumn("is_dead", mudf(df.state))
df.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|is_male|is_dead|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|   True|  false|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|   True|  false|
|1000000003|  m

### Change the ages to bins from 10s, 0s, 10s, 20s,.etc to 0,10, 20

In [None]:
from pyspark.sql.functions import regexp_replace
df = df.withColumn('age', regexp_replace('age', 's', ''))
df.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|is_male|is_dead|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|1000000001|  male| 50|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|   True|  false|
|1000000002|  male| 30|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|   True|  false|
|1000000003|  m

### Change age, and no_days  to be typecasted as Double

In [None]:
df = df.withColumn('age',df.age.cast('Double')).withColumn('no_days',df.no_days.cast('Double'))
df.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: double (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: date (nullable = true)
 |-- released_date: string (nullable = true)
 |-- deceased_date: date (nullable = true)
 |-- state: string (nullable = true)
 |-- no_days: double (nullable = true)
 |-- is_male: string (nullable = false)
 |-- is_dead: boolean (nullable = true)



In [None]:
df.show()

+----------+------+----+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|patient_id|   sex| age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|is_male|is_dead|
+----------+------+----+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|1000000001|  male|50.0|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|   13.0|   True|  false|
|1000000002|  male|30.0|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|   2020-03-02|released|   32.0|   True|  false|
|100000000

### Drop the columns
["patient_id","sex","infected_by","contact_number","released_date","state",
"symptom_onset_date","confirmed_date","deceased_date","country","no_days",
"city","infection_case"]

In [None]:
df = df.drop("patient_id","sex","infected_by","contact_number","released_date","state", "symptom_onset_date","confirmed_date","deceased_date","country","no_days","city","infection_case")
df.show()

+----+--------+-------+-------+
| age|province|is_male|is_dead|
+----+--------+-------+-------+
|50.0|   Seoul|   True|  false|
|30.0|   Seoul|   True|  false|
|50.0|   Seoul|   True|  false|
|20.0|   Seoul|   True|  false|
|20.0|   Seoul|  False|  false|
|50.0|   Seoul|  False|  false|
|20.0|   Seoul|   True|  false|
|20.0|   Seoul|   True|  false|
|30.0|   Seoul|   True|  false|
|60.0|   Seoul|  False|  false|
|50.0|   Seoul|  False|  false|
|20.0|   Seoul|   True|  false|
|80.0|   Seoul|   True|   true|
|60.0|   Seoul|  False|  false|
|70.0|   Seoul|   True|  false|
|70.0|   Seoul|   True|  false|
|70.0|   Seoul|   True|  false|
|20.0|   Seoul|   True|  false|
|70.0|   Seoul|  False|  false|
|70.0|   Seoul|  False|  false|
+----+--------+-------+-------+
only showing top 20 rows



### Recount the number of nulls now

In [None]:
df.select([count(when(col(c).isNull(),c)).alias(c) for c in df.columns]).show()

+----+--------+-------+-------+
| age|province|is_male|is_dead|
+----+--------+-------+-------+
|1380|       0|      0|      0|
+----+--------+-------+-------+



## Now do the same but using SQL select statement

### From the original Patient DataFrame, Create a temporary view (table).

In [None]:
df1.createOrReplaceTempView('MyTempView')

### Use SELECT statement to select all columns from the dataframe and show the output.

In [None]:
spark.sql("select * from MyTempView").show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|         null|released|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|         null|released|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-gu|contact with patient| 2002000001|            17|              null|    2020-01-30|

### *Using SQL commands*, limit the output to only 5 rows 

In [None]:
spark.sql("select * from MyTempView limit 5").show()

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|         null|released|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|         null|released|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|              null|    2020-01-30|   202

### Select the count of males and females in the dataset

In [None]:
spark.sql("select sex, count(sex) from MyTempView group by sex").show()

+------+----------+
|   sex|count(sex)|
+------+----------+
|  null|         0|
|female|      2218|
|  male|      1825|
+------+----------+



### How many people did survive, and how many didn't?

In [None]:
spark.sql("select sex, count(sex) from MyTempView group by sex").show()

+------+----------+
|   sex|count(sex)|
+------+----------+
|  null|         0|
|female|      2218|
|  male|      1825|
+------+----------+



### Now, let's perform some preprocessing using SQL:
1. Convert *age* column to double after removing the 's' at the end -- *hint: check SUBSTRING method*
2. Select only the following columns: `['sex', 'age', 'province', 'state']`
3. Store the result of the query in a new dataframe

In [None]:
df2 = spark.sql("SELECT sex, cast(SUBSTRING(age, 0, 2 ) as double) as age, province, state  from MyTempView")

In [None]:
df2.show()

+------+----+--------+--------+
|   sex| age|province|   state|
+------+----+--------+--------+
|  male|50.0|   Seoul|released|
|  male|30.0|   Seoul|released|
|  male|50.0|   Seoul|released|
|  male|20.0|   Seoul|released|
|female|20.0|   Seoul|released|
|female|50.0|   Seoul|released|
|  male|20.0|   Seoul|released|
|  male|20.0|   Seoul|released|
|  male|30.0|   Seoul|released|
|female|60.0|   Seoul|released|
|female|50.0|   Seoul|released|
|  male|20.0|   Seoul|released|
|  male|80.0|   Seoul|deceased|
|female|60.0|   Seoul|released|
|  male|70.0|   Seoul|released|
|  male|70.0|   Seoul|released|
|  male|70.0|   Seoul|released|
|  male|20.0|   Seoul|released|
|female|70.0|   Seoul|released|
|female|70.0|   Seoul|released|
+------+----+--------+--------+
only showing top 20 rows



## Machine Learning 
### Create a pipeline model to predict is_dead and evaluate the performance.
- Use <b>StringIndexer</b> to transform <b>string</b> data type to indices.
- Use <b>OneHotEncoder</b> to deal with categorical values.
- Use <b>Imputer</b> to fill missing data with mean.

In [None]:
df = df.withColumn('is_dead', when((df.is_dead == True), lit(1)).otherwise(lit(0)))

In [None]:
df.show()

+----+--------+-------+-------+
| age|province|is_male|is_dead|
+----+--------+-------+-------+
|50.0|   Seoul|   True|      0|
|30.0|   Seoul|   True|      0|
|50.0|   Seoul|   True|      0|
|20.0|   Seoul|   True|      0|
|20.0|   Seoul|  False|      0|
|50.0|   Seoul|  False|      0|
|20.0|   Seoul|   True|      0|
|20.0|   Seoul|   True|      0|
|30.0|   Seoul|   True|      0|
|60.0|   Seoul|  False|      0|
|50.0|   Seoul|  False|      0|
|20.0|   Seoul|   True|      0|
|80.0|   Seoul|   True|      1|
|60.0|   Seoul|  False|      0|
|70.0|   Seoul|   True|      0|
|70.0|   Seoul|   True|      0|
|70.0|   Seoul|   True|      0|
|20.0|   Seoul|   True|      0|
|70.0|   Seoul|  False|      0|
|70.0|   Seoul|  False|      0|
+----+--------+-------+-------+
only showing top 20 rows



In [None]:
from pyspark.ml.feature import StringIndexer,OneHotEncoder,VectorAssembler, Imputer

categoricalCols = [f for (f,d) in df.dtypes if ((d == 'string')&(f!='is_dead'))]
indexOutputCols = [x + "_Index" for x in categoricalCols]
oheOutputCols = [x + "_OHE" for x in categoricalCols]

stringIndexer = StringIndexer(inputCols=categoricalCols,outputCols=indexOutputCols,handleInvalid='keep')

oheEncoder = OneHotEncoder(inputCols=indexOutputCols,outputCols=oheOutputCols)

In [None]:
numericCols = [f for (f,d) in df.dtypes if ((d == 'double')&(f!='is_dead'))]
imputedCols =  [x + "imputed" for x in numericCols]

In [None]:
imputer = Imputer(strategy='mean', inputCols=numericCols, outputCols=imputedCols)

In [None]:
assemblerInputs = oheOutputCols + imputedCols
assemblerInputs

['province_OHE', 'is_male_OHE', 'ageimputed']

In [None]:
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol='features')

In [None]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol='features',
                      labelCol='is_dead',
                      predictionCol='prediction')

In [None]:
from pyspark.ml import Pipeline

trainDF, testDF = df.randomSplit([0.8,0.2],seed=42)

myStages = [stringIndexer, oheEncoder, imputer, vecAssembler,lr]
pipeline = Pipeline(stages=myStages)
pipelineModel = pipeline.fit(trainDF)
predDF = pipelineModel.transform(testDF)

In [None]:
predDF.show(10)

+----+-----------+-------+-------+--------------+-------------+---------------+-------------+------------------+--------------------+--------------------+--------------------+----------+
| age|   province|is_male|is_dead|province_Index|is_male_Index|   province_OHE|  is_male_OHE|        ageimputed|            features|       rawPrediction|         probability|prediction|
+----+-----------+-------+-------+--------------+-------------+---------------+-------------+------------------+--------------------+--------------------+--------------------+----------+
|null| Gangwon-do|  False|      0|          10.0|          0.0|(17,[10],[1.0])|(2,[0],[1.0])|40.085978835978835|(20,[10,17,19],[1...|[0.74231215165995...|[0.67750125309139...|       0.0|
|null|Gyeonggi-do|  False|      1|           2.0|          0.0| (17,[2],[1.0])|(2,[0],[1.0])|40.085978835978835|(20,[2,17,19],[1....|[-3.1704499900426...|[0.04029301093980...|       1.0|
|null|Gyeonggi-do|  False|      1|           2.0|          0.0| (

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
regeval_r2 = MulticlassClassificationEvaluator(predictionCol='prediction',labelCol='is_dead', metricName = 'accuracy')

In [None]:
regeval_r2.evaluate(predDF)

0.8268268268268268