**Imporitng the Library**

In [None]:
import pandas as pd

**Loading the dataset from GCP using Big Query**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import flatten, udf, col
from pyspark.sql.types import BooleanType
bucket = 'cvd-bucket'
spark = SparkSession.builder\
    .appName("cvd-test")\
    .config("spark.jars", "gs://spark-lib/bigquery/spark-bigquery-latest.jar")\
    .getOrCreate()

df = spark.read \
    .format("bigquery") \
    .load('deft-return-385519.cvd_dataset.cvdtable').cache()
df.show(3)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/03 20:38:13 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
23/05/03 20:38:13 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
23/05/03 20:38:13 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
23/05/03 20:38:13 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator
[Stage 0:>                                                          (0 + 1) / 1]

+-----+-----+------+------+------+-----+-----+-----------+----+-----+----+------+------+
|   id|  age|gender|height|weight|ap_hi|ap_lo|cholesterol|gluc|smoke|alco|active|cardio|
+-----+-----+------+------+------+-----+-----+-----------+----+-----+----+------+------+
|32456|23386|     1|    55|    81|  130|   90|          1|   1|    0|   0|     1|     1|
|95141|18830|     1|    57|    61|  130|   90|          1|   1|    0|   0|     1|     1|
|91523|18426|     1|    59|    58|  125|   67|          1|   1|    0|   0|     0|     0|
+-----+-----+------+------+------+-----+-----+-----------+----+-----+----+------+------+
only showing top 3 rows



                                                                                

**Data Preprocessing**

In [None]:
import pandas as pd
pandasDF = df.toPandas()

In [None]:
pandasDF.head()

Unnamed: 0,id,age,gender,height,weight,ap_hi,ap_lo,cholesterol,gluc,smoke,alco,active,cardio
0,32456,23386,1,55,81,130,90,1,1,0,0,1,1
1,95141,18830,1,57,61,130,90,1,1,0,0,1,1
2,91523,18426,1,59,58,125,67,1,1,0,0,0,0
3,41661,19088,1,60,69,110,70,1,1,0,0,0,0
4,39462,20978,1,64,61,130,70,1,1,0,0,1,0


In [None]:
#dropping columns that are not required
df1= pandasDF.drop('id', axis=1)

In [None]:
df1.head()

Unnamed: 0,age,gender,height,weight,ap_hi,ap_lo,cholesterol,gluc,smoke,alco,active,cardio
0,23386,1,55,81,130,90,1,1,0,0,1,1
1,18830,1,57,61,130,90,1,1,0,0,1,1
2,18426,1,59,58,125,67,1,1,0,0,0,0
3,19088,1,60,69,110,70,1,1,0,0,0,0
4,20978,1,64,61,130,70,1,1,0,0,1,0


In [None]:
#Converting the age column from days to year
df1['age'] = df1['age'].div(365)

In [None]:
df1.head()

Unnamed: 0,age,gender,height,weight,ap_hi,ap_lo,cholesterol,gluc,smoke,alco,active,cardio
0,64.071233,1,55,81,130,90,1,1,0,0,1,1
1,51.589041,1,57,61,130,90,1,1,0,0,1,1
2,50.482192,1,59,58,125,67,1,1,0,0,0,0
3,52.29589,1,60,69,110,70,1,1,0,0,0,0
4,57.473973,1,64,61,130,70,1,1,0,0,1,0


In [1]:
#Null values check
df1.isnull().values.any()

False


In [None]:
#converting to required datatypes
df2 = df1.astype({'age':'int'})

In [None]:
df2.head()

Unnamed: 0,age,gender,height,weight,ap_hi,ap_lo,cholesterol,gluc,smoke,alco,active,cardio
0,64,1,55,81,130,90,1,1,0,0,1,1
1,51,1,57,61,130,90,1,1,0,0,1,1
2,50,1,59,58,125,67,1,1,0,0,0,0
3,52,1,60,69,110,70,1,1,0,0,0,0
4,57,1,64,61,130,70,1,1,0,0,1,0


**Feature Extraction for better Accuracy**

In [None]:
required_features = [
                    'age',
                    'gender',
                    'height',
                    'weight',
                     'ap_hi',
                     'ap_lo',
                     'cholesterol',
                     'gluc',
                     'smoke',
                     'alco',
                     'active'
                   ]

**Feature Transformation using VectorAssembler**

In [None]:
from pyspark.ml.feature import VectorAssembler
df4=spark.createDataFrame(df2) 
assembler = VectorAssembler(inputCols=required_features, outputCol='features')

transformed_data = assembler.transform(df4)

In [None]:
transformed_data.show()

[Stage 2:>                                                          (0 + 1) / 1]

+---+------+------+------+-----+-----+-----------+----+-----+----+------+------+--------------------+
|age|gender|height|weight|ap_hi|ap_lo|cholesterol|gluc|smoke|alco|active|cardio|            features|
+---+------+------+------+-----+-----+-----------+----+-----+----+------+------+--------------------+
| 64|     1|    55|    81|  130|   90|          1|   1|    0|   0|     1|     1|[64.0,1.0,55.0,81...|
| 51|     1|    57|    61|  130|   90|          1|   1|    0|   0|     1|     1|[51.0,1.0,57.0,61...|
| 50|     1|    59|    58|  125|   67|          1|   1|    0|   0|     0|     0|[50.0,1.0,59.0,58...|
| 52|     1|    60|    69|  110|   70|          1|   1|    0|   0|     0|     0|[52.0,1.0,60.0,69...|
| 57|     1|    64|    61|  130|   70|          1|   1|    0|   0|     1|     0|[57.0,1.0,64.0,61...|
| 52|     1|    65|    60|  120|   80|          1|   1|    0|   0|     1|     0|[52.0,1.0,65.0,60...|
| 53|     2|    65|    72|  130|   80|          1|   1|    0|   0|     0|     0|[5

                                                                                

**Splitting into Test data and Training Data Set **

In [None]:
(training_data, test_data) = transformed_data.randomSplit([0.8,0.2])

In [None]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol = 'features', labelCol = 'cardio', maxIter=10)
lrModel = lr.fit(training_data)

23/05/03 20:41:20 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/05/03 20:41:20 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

In [None]:
rf_predictions = lrModel.transform(test_data)

**Predicting the accauracy**

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'cardio', metricName = 'accuracy')
print('Random Forest classifier Accuracy:', multi_evaluator.evaluate(rf_predictions))



Random Forest classifier Accuracy: 0.6706237858838765


                                                                                

**Confusion Matrix**

In [4]:
#plot the confusion matrix
df = rf_predictions
tp = df4[(df4.cardio == 1) & (df4.prediction == 1)].count()
tn = df4[(df4.cardio == 0) & (df4.prediction == 0)].count()
fp = df4[(df4.cardio == 0) & (df4.prediction == 1)].count()
fn = df4[(df4.cardio == 1) & (df4.prediction == 0)].count()
print ("True Positives:", tp)
print ("True Negatives:", tn)
print ("False Positives:", fp)
print ("False Negatives:", fn)
print ("Total", df.count())

r = float(tp)/(tp + fn)
print ("Recall", r)

p = float(tp) / (tp + fp)
print ("Precision",p)

True Positives: 1409208 
True Negatives: 541895 
False Positives: 86901 
False Negatives: 0
Total 2650898 
Recall 1.0 
Precision 0.84888099508434645


In [None]:
Confusion Matrix
[1409208 0
  86901  541895]