In [None]:
from google.colab import drive
drive.mount('/content/drive')
%cd /content/drive/My Drive

In [None]:
!pip install pyspark py4j

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local[*]")\
        .appName('PySpark_Tutorial')\
        .getOrCreate()


In [None]:
from pyspark.sql import functions as F
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import numpy as np
from pyspark.ml.tuning import CrossValidator
import plotly.graph_objects as go

In [None]:
data=spark.read.csv('heart.csv',inferSchema=True,header=True)
     

data.show(5)

In [None]:
data.count()

In [None]:
data.printSchema()
    

In [None]:
data.dtypes

"""
Feature Information:

1. age: The person’s age in years

2. sex: The person’s sex (1 = male, 0 = female)

3. cp: The chest pain experienced (0 = typical angina, 1= atypical angina, 2= non-anginal pain, 3 = asymptomatic)

4. trestbps: The person’s resting blood pressure (mm Hg on admission to the hospital)

5. chol: The person’s cholesterol measurement in mg/dl

6. fbs: The person’s fasting blood sugar (> 120 mg/dl, 1 = true; 0 = false).

7. restecg: Resting electrocardiographic measurement (0 = normal, 1 = having ST-T wave abnormality, 2 = showing probable or definite left ventricular hypertrophy by Estes’ criteria)

8. thalach: The person’s maximum heart rate achieved

9. exang: Exercise induced angina (1 = yes; 0 = no)

10. oldpeak: ST depression induced by exercise relative to rest

11. slope: the slope of the peak exercise ST segment (0 = upsloping, 1 = flat, 2 = downsloping)

12. ca: The number of major vessels (0–4)

13. thal: A blood disorder called thalassemia (3 = normal; 6 = fixed defect; 7 = reversable defect)

14. target: Heart disease (0 = no, 1 = yes)
"""
     

In [None]:
from pyspark.sql.functions import col,sum
df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns)).show()
     

data.select('target').show(3)

In [None]:
pd_data =data.toPandas()
pd_data

In [None]:
data.groupby('target').count().show()

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

sns.countplot(pd_data['target'])

In [None]:
data.groupby('sex').count().show()

In [None]:
sns.countplot(pd_data['sex'])
plt.xticks([0,1],['Female', 'Male'])
plt.show()

In [None]:
sns.countplot(x='sex',hue="target",data=pd_data)
plt.xticks([1,0],['Male','Female'])
plt.legend(labels = ['No-Disease','Disease'])
plt.show()

In [None]:
import pandas as pd

bins= [10,20,30,40,50,60,70,80,90]

labels = [2,3,4,5,6,7,8,9]
pd_data['AgeGroup'] = pd.cut(pd_data['age'], bins=bins, labels=labels, right=False)

In [None]:
sns.countplot(x='AgeGroup',hue="target",data=pd_data)
plt.legend(labels = ['men No-Disease','men Disease'])
plt.show()

In [None]:
sns.countplot(x='AgeGroup',hue="target",data=pd_data.loc[pd_data['sex']!=1])
plt.legend(labels = ['men No-Disease','men Disease'])
plt.show()

In [None]:
sns.countplot(x='AgeGroup',hue="target",data=pd_data.loc[pd_data['sex']!=0])
plt.legend(labels = ['women No-Disease','women Disease'])
plt.show()

In [None]:
data.groupby('target','cp').count().show()

In [None]:
from pyspark.sql.functions import col,sum
data.select(*(sum(col(c).isNull().cast('int')).alias(c) for c in data.columns)).show()

In [None]:
data.describe().show()

In [None]:
df=pd_data.groupby('target').count().reset_index()[['target','age']].rename(columns={'age':'counts'})
colors = ['gold', 'mediumturquoise', 'darkorange', 'lightgreen']
fig = go.Figure(data=[go.Pie(labels=df.target,
                             values=df.counts)])
fig.update_traces(hoverinfo='label+percent', textinfo='value+percent', textfont_size=20, textfont_color='black',
                  marker=dict(colors=colors, line=dict(color='#000000', width=2)))
fig.update_layout(title='Heart Disease vs. Absence of Heart Disease', title_x=0.5)

In [None]:
corr = pd_data.corr()
fig = go.Figure(data=go.Heatmap(z=corr.values,
 x=corr.index.values,
 y=corr.columns.values,
 text=np.round(corr.values,2),
 texttemplate='%{text}'))
fig.update_layout(title=dict(font=dict(size=20), x=0.5))

In [None]:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(inputCols=['sex','cp','fbs','restecg','exang','slope','ca','thal'],
                           outputCols =['sexVec','cpVec','fbsVec','restecgVec','exangVec','slopeVec','caVec','thalVec'])
encoded =  encoder.fit(data).transform(data)
data_encoded = encoded.drop('sex','cp','fbs','restecg','exang','slope','ca','thal')
data_encoded.show()
     

In [None]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols =['age','trestbps','chol','thalach','oldpeak','sexVec','cpVec','fbsVec','restecgVec','exangVec','slopeVec','caVec','thalVec'],
                           outputCol="features")
assembled= assembler.transform(data_encoded)

data_asb =assembled.select("features","target")
data_asb.show()

In [None]:
train_data, test_data = data_asb.randomSplit([.8,.2],seed=12)

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

lr = LogisticRegression(labelCol="target", featuresCol="features", maxIter=10, regParam=0.3, elasticNetParam=0.8,family="binomial")

lr_model = lr.fit(train_data)
lr_predictions = lr_model.transform(test_data)

evaluator = MulticlassClassificationEvaluator(labelCol='target',predictionCol='prediction', metricName='accuracy')
lr_accuracy = evaluator.evaluate(lr_predictions)
print('Test Accuracy = ', lr_accuracy)

In [None]:
from pyspark.ml.classification import NaiveBayes

nb = NaiveBayes(labelCol="target", featuresCol="features",smoothing=1.0, modelType="gaussian")

nb_model = nb.fit(train_data)
nb_predictions = nb_model.transform(test_data)

evaluator = MulticlassClassificationEvaluator(labelCol='target',predictionCol='prediction', metricName='accuracy')
nb_accuracy = evaluator.evaluate(nb_predictions)
print('Test Accuracy = ', nb_accuracy)

In [None]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="target", featuresCol="features")

rf_model = rf.fit(train_data)
rf_predictions = rf_model.transform(test_data)

evaluator = MulticlassClassificationEvaluator(labelCol='target',predictionCol='prediction', metricName='accuracy')
rf_accuracy = evaluator.evaluate(rf_predictions)
print('Test Accuracy = ', rf_accuracy)

In [None]:
from pyspark.ml.feature import PCA

pca = PCA(k=13, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(data_asb).transform(data_asb)

data_pca = model.select("pcaFeatures","target")
data_pca.head()

In [None]:
train_pcadata, test_pcadata = data_pca.randomSplit([.8,.2],seed=12)

In [None]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="target", featuresCol="pcaFeatures")

rf_model = rf.fit(train_pcadata)
rf_predictions = rf_model.transform(test_pcadata)

evaluator = MulticlassClassificationEvaluator(labelCol='target',predictionCol='prediction', metricName='accuracy')
rf_accuracy = evaluator.evaluate(rf_predictions)
print('Test Accuracy = ', rf_accuracy)

In [None]:
from pyspark.ml.classification import NaiveBayes

nb = NaiveBayes(labelCol="target", featuresCol="pcaFeatures",smoothing=1.0, modelType="gaussian")

nb_model = nb.fit(train_pcadata)
nb_predictions = nb_model.transform(test_pcadata)

evaluator = MulticlassClassificationEvaluator(labelCol='target',predictionCol='prediction', metricName='accuracy')
nb_accuracy = evaluator.evaluate(nb_predictions)
print('Test Accuracy = ', nb_accuracy)