In [1]:
#/FileStore/tables/8norpmcw1472208802508/SAheart.csv


###Read the data from csv file

In [3]:
HeartDF = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferSchema='true').load('/FileStore/tables/8norpmcw1472208802508/SAheart.csv')


In [4]:
HeartDF.show(4)

###Findout relationship between family history and coronary heart disease

In [6]:
chd_count = HeartDF.groupBy( 'famhist', 'chd' ).count()

chd_count.show()

chd_count_pd = chd_count.toPandas()
chd_count_pd

In [7]:
import matplotlib.pyplot as plt #sets up plotting under plt
import seaborn as sn            #sets up styles and gives us more plotting options


fig = plt.figure(figsize=(10, 8))

#plt.show()
sn.barplot( y='count',x = 'famhist',hue = 'chd',data = chd_count_pd )

plt.close()
display(fig)

Observation: Family history increases the chances of having coronary disease

###Finding relationship between alcohol consumption and coronary heart disease

In [10]:
chd_alcohol_pd = HeartDF.select( HeartDF['alcohol'],HeartDF['chd']).toPandas()
chd_alcohol_pd.head()

In [11]:
# distplot is used to worked with single variable data for distributions

fig_ac =plt.figure(figsize=(10, 8))

sn.distplot( chd_alcohol_pd[chd_alcohol_pd.chd == 1].alcohol,
          hist = False,
          color = 'r' )

sn.distplot( chd_alcohol_pd[chd_alcohol_pd.chd == 0].alcohol,
          hist = False,
          color = 'g' )
display(fig_ac)
plt.close()

Observation: Alcohol consumption does not seem to be a differentiating factor between people who has coronary disease and not having coronary disease.

####Relationship between age and coronary heart disease

In [14]:
#Relationship between age and coronary heart disease

chd_age_pd = HeartDF.select( HeartDF['age'], HeartDF['chd'] ).toPandas()

fig_agc = plt.figure(figsize=(10, 8))

sn.distplot( chd_age_pd[chd_age_pd.chd == 1].age,
          hist = False, color = 'r' )

sn.distplot( chd_age_pd[chd_age_pd.chd == 0].age,
          hist = False, color = 'b' )
display(fig_agc)
plt.close()         


Observation: Age seems to be a differentiating factor between people who has coronary disease and not having coronary disease

####Find correations between variables

In [17]:
#Draw scatterplots for joint relationships and histograms for univariate distributions

chd_pair_pd = HeartDF.select( HeartDF['age'],
                          HeartDF['sbp'],
                          HeartDF['obesity'],
                          HeartDF['ldl'] ).toPandas()


#f = plt.figure(figsize=(10,8))
fig_ex = plt.figure(figsize=(10, 8))
sn.pairplot(data=chd_pair_pd)

fig_ex.savefig("sns_pair_plot.png")
display(fig_ex)
plt.close()


###Find relationship between obesity and ldl

In [19]:
# using regression line plots.

fig_oidl = plt.figure(figsize=(10, 8))

sn.regplot(y="ldl", x="obesity", data=chd_pair_pd)

display(fig_oidl)
plt.close()

"Observation: There is a positive correlation between ldl and obesity"

####Drawing boxplots to understand distributions

In [22]:
chd_alh_tob_pd = HeartDF.select( HeartDF['alcohol'],
                             HeartDF['tobacco'],
                             HeartDF['chd'] ).toPandas()
fig_box = plt.figure(figsize=(10, 8))
sn.boxplot(y="alcohol", x="chd", data=chd_alh_tob_pd)
display(fig_box)
plt.close()

In [23]:
chd_alh_tob_pd = HeartDF.select( HeartDF['alcohol'],
                             HeartDF['tobacco'],
                             HeartDF['chd'] ).toPandas()
fig_box2 = plt.figure(figsize=(10, 8))
sn.boxplot(y="tobacco", x="chd", data=chd_alh_tob_pd)

display(fig_box2)
plt.close()

In [24]:
#Calculating basic statistics

from pyspark.mllib.stat import Statistics
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint


def getVector( rec ):
    return Vectors.dense(rec.alcohol, rec.tobacco,rec.age,rec.obesity,rec.ldl)
chd_vec = HeartDF.rdd.map(lambda rec: getVector(rec))

summary = Statistics.colStats( chd_vec )

print (" Mean of Summary {0}".format(summary.mean()))

print (" Variance of Summary {0}".format(summary.variance()))

In [25]:
#Calcuating correlations

import numpy as np

print (" np.sqrt of Summary {0}".format(np.sqrt( summary.variance())))

seriesX = HeartDF.select( HeartDF["obesity"] )
seriesY = HeartDF.select( HeartDF["ldl"] )

print (" seriesX : {0}".format(seriesX))


correlation = Statistics.corr(chd_vec, method="pearson")

print ("\n correlation : {0}".format(correlation))

####Creating vectors to represent multidimensional data

In [27]:
def parsePoint(rec):
    return LabeledPoint( rec.chd, Vectors.dense(rec.alcohol,rec.tobacco,rec.age,rec.obesity,rec.ldl))

chd_lp = HeartDF.rdd.map(lambda rec: parsePoint( rec ) )

chd_lp.take(10)

####Building a predictive model using Logistic Regression

In [29]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS,LogisticRegressionModel

model = LogisticRegressionWithLBFGS.train( chd_lp )

###Making predictions using the predictive model

In [31]:
labelsAndPreds = chd_lp.map(lambda lp: (lp.label,float(model.predict(lp.features))))

print "predictions using the predictive model"
labelsAndPreds.take( 10 )

###Calculating the accuracy of the model

In [33]:


total_count = labelsAndPreds.count()
success_count = labelsAndPreds.filter(lambda rec: rec[0] == rec[1]).count()
print success_count,total_count

print("Successful prediction percentage: 0.71 " + 
    str( round( success_count / total_count, 2 ) ) )

####Adding more variables to the model prediction

In [35]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

###Encoding the categorical variable
####Categorical variables can not be represented as continuous variables.
####Categorical variables need to converted into binary representation, which is called one hot encoding.

In [37]:
month_stringIndexer = StringIndexer(inputCol="famhist",
                                  outputCol="famhistIndex")

month_model = month_stringIndexer.fit(HeartDF)

month_indexed = month_model.transform(HeartDF)

month_encoder = OneHotEncoder(dropLast=False,
                            inputCol="famhistIndex",
                            outputCol="famhistVec")

traindata_final = month_encoder.transform(month_indexed)

traindata_final.show(10)

###Combine all variables to create final vectors

In [39]:
def parseNewPoint(rec):
  return LabeledPoint( rec.chd,
          Vectors.dense(tuple( [rec.sbp,
                              rec.tobacco,
                              rec.ldl,
                              rec.adiposity,
                              rec.typea,
                              rec.obesity,
                              rec.alcohol,
                              rec.age] +
                              rec.famhistVec.toArray().tolist() ) ) )
                              
chd_lp_new = traindata_final.rdd.map( lambda rec: parseNewPoint( rec ) )


chd_lp_new.take(10)

###Building the model and making predictions

In [41]:
model = LogisticRegressionWithLBFGS.train( chd_lp_new )

labelsAndPreds_new = chd_lp_new.map(lambda lp: ( lp.label,
                                              float(model.predict(lp.features))))

success_count_new = labelsAndPreds_new.filter(lambda rec:
                                            rec[0] == rec[1]).count()

print("Successful prediction percentage: 0.74 " +
    str( round( success_count_new / total_count, 2 ) ) )

###Split the dataset into train & test
#####Build the model using train dataset

#####Test the model using test dataset

#####Spark provides randomSplit() method on RDDs to split it randomly. The size of each split can be passed as an argument in terms of list.

#####In the example below, train data set has 70% observations and test data set has only 30% of the observations.

In [43]:
trainingData, testData = chd_lp_new.randomSplit( [0.7, 0.3] )

model = LogisticRegressionWithLBFGS.train( trainingData )

labelsAndPreds_new = testData.map(lambda lp: ( lp.label,
                                              float(model.predict(lp.features))))

success_count_new = labelsAndPreds_new.filter(lambda rec:
                                            rec[0] == rec[1]).count()

print("Successful prediction percentage: 0.72 " +
    str( round( success_count_new / testData.count(), 2 ) ) )

###Final

######Create a confusion metrics to understand True positive rates and False Positive Rates

In [45]:
from sklearn import metrics


#fig_heatmap = plt.figure(figsize=(10, 8))
labelsAndPreds_new_df = labelsAndPreds_new.toDF().toPandas()
cm = metrics.confusion_matrix( labelsAndPreds_new_df._1, labelsAndPreds_new_df._2 )

f = plt.figure(figsize=(10, 8))

sn.heatmap(cm, annot=True, fmt='.2f' )
plt.close()
display(f)

