In [1]:
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()

'C:\\BigData\\spark'

In [2]:
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark import SparkFiles
sc = SparkContext()
sc.addFile("heart.csv")
sqlContext = SQLContext(sc)

In [3]:
df = sqlContext.read.csv(SparkFiles.get("heart.csv"), header=True, inferSchema= True)

In [4]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- sex: integer (nullable = true)
 |-- cp: integer (nullable = true)
 |-- trestbps: integer (nullable = true)
 |-- chol: integer (nullable = true)
 |-- fbs: integer (nullable = true)
 |-- restecg: integer (nullable = true)
 |-- thalach: integer (nullable = true)
 |-- exang: integer (nullable = true)
 |-- oldpeak: double (nullable = true)
 |-- slope: integer (nullable = true)
 |-- ca: integer (nullable = true)
 |-- thal: integer (nullable = true)
 |-- target: integer (nullable = true)



In [5]:
df.show(5, truncate = False)

+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
|age|sex|cp |trestbps|chol|fbs|restecg|thalach|exang|oldpeak|slope|ca |thal|target|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
|63 |1  |3  |145     |233 |1  |0      |150    |0    |2.3    |0    |0  |1   |1     |
|37 |1  |2  |130     |250 |0  |1      |187    |0    |3.5    |0    |0  |2   |1     |
|41 |0  |1  |130     |204 |0  |0      |172    |0    |1.4    |2    |0  |2   |1     |
|56 |1  |1  |120     |236 |0  |1      |178    |0    |0.8    |2    |0  |2   |1     |
|57 |0  |0  |120     |354 |0  |1      |163    |1    |0.6    |2    |0  |2   |1     |
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
only showing top 5 rows



In [7]:
df.select('age', 'target').show()

+---+------+
|age|target|
+---+------+
| 63|     1|
| 37|     1|
| 41|     1|
| 56|     1|
| 57|     1|
| 57|     1|
| 56|     1|
| 44|     1|
| 52|     1|
| 57|     1|
| 54|     1|
| 48|     1|
| 49|     1|
| 64|     1|
| 58|     1|
| 50|     1|
| 58|     1|
| 66|     1|
| 43|     1|
| 69|     1|
+---+------+
only showing top 20 rows



In [8]:
df.groupBy("target").count().sort("count",ascending=True).show()

+------+-----+
|target|count|
+------+-----+
|     0|  138|
|     1|  165|
+------+-----+



In [9]:
df.describe().show()

+-------+------------------+-------------------+------------------+------------------+------------------+-------------------+-----------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+
|summary|               age|                sex|                cp|          trestbps|              chol|                fbs|          restecg|           thalach|              exang|           oldpeak|             slope|                ca|              thal|            target|
+-------+------------------+-------------------+------------------+------------------+------------------+-------------------+-----------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+
|  count|               303|                303|               303|               303|               303|                303|              303|               303|    

In [10]:
df.describe('age').show()

+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|               303|
|   mean|54.366336633663366|
| stddev|  9.08210098983786|
|    min|                29|
|    max|                77|
+-------+------------------+



In [11]:
df.crosstab('age', 'target').sort("age_target").show()

+----------+---+---+
|age_target|  0|  1|
+----------+---+---+
|        29|  0|  1|
|        34|  0|  2|
|        35|  2|  2|
|        37|  0|  2|
|        38|  1|  2|
|        39|  1|  3|
|        40|  2|  1|
|        41|  1|  9|
|        42|  1|  7|
|        43|  3|  5|
|        44|  3|  8|
|        45|  2|  6|
|        46|  3|  4|
|        47|  2|  3|
|        48|  3|  4|
|        49|  2|  3|
|        50|  3|  4|
|        51|  3|  9|
|        52|  4|  9|
|        53|  2|  6|
+----------+---+---+
only showing top 20 rows



In [12]:
df.filter(df.age > 0).count()

303

In [13]:
df.groupby('target').agg({'age': 'mean'}).show()

+------+-----------------+
|target|         avg(age)|
+------+-----------------+
|     1| 52.4969696969697|
|     0|56.60144927536232|
+------+-----------------+



In [14]:
from pyspark.sql.functions import *

In [15]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
CATE_FEATURES = ['age', 'cp', 'trestbps', 'chol', 'fbs', 'restecg', 'thalach', 'exang', 'oldpeak', 'slope', 'ca', 
                 'thal']
stages = []
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
label_stringIdx =  StringIndexer(inputCol="target", outputCol="newtarget")
stages += [label_stringIdx]
assemblerInputs = [c + "classVec" for c in CATE_FEATURES]
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [16]:
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df)
model = pipelineModel.transform(df)

In [33]:
model.count()

303

In [18]:
from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newtarget"], DenseVector(x["features"])))

In [19]:
df_train = sqlContext.createDataFrame(input_data, ["target", "features"])

In [34]:
df_train.count()

303

In [21]:
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

In [22]:
train_data.groupby('target').agg({'target': 'count'}).show()

+------+-------------+
|target|count(target)|
+------+-------------+
|   0.0|          122|
|   1.0|          110|
+------+-------------+



In [23]:
test_data.groupby('target').agg({'target': 'count'}).show()

+------+-------------+
|target|count(target)|
+------+-------------+
|   0.0|           43|
|   1.0|           28|
+------+-------------+



In [24]:
# Import `LinearRegression`
from pyspark.ml.classification import LogisticRegression

# Initialize `lr`
lr = LogisticRegression(labelCol="target",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

# Fit the data to the model
linearModel = lr.fit(train_data)

In [25]:
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))

Coefficients: [0.2733711339589444,0.19889370472434198,-0.37155645332113174,0.2371931974640684,-0.05990955753690189,-0.24404391056632616,-0.06244503272680768,0.038424665630807875,0.3331812790183561,0.17813325320793047,-0.22676977250114688,-0.026751093433674763,0.07805351877246935,0.10482592079273846,-0.2793010823523223,-0.11224443012249508,-0.322495773473151,-0.3176777522008589,0.059943230540427855,0.40860741576658827,-0.26570534457634026,0.03379077432834721,-0.03664024269119086,0.13808365721611937,-0.003092108117558053,-0.15898024960728815,-0.5493091172259579,0.2630977446451618,0.1213420956530878,0.0009940326568633909,0.2357023450461051,-0.26207455721602313,0.1882235763095168,-0.24281588386400446,-0.5114043925917654,-0.3271238204097514,-0.25202414139320417,-0.36975018177425306,0.0,-0.43693049727664757,0.44776093649373816,-0.27992683876621144,-0.19903692673241064,-0.10057619817402148,-0.04906695549240823,0.1397518471715481,0.22197209603745005,0.12989324778848307,-0.3111816342166846,-0.0

In [26]:
# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)

In [27]:
predictions.printSchema()

root
 |-- target: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [28]:
predictions.show()

+------+--------------------+--------------------+--------------------+----------+
|target|            features|       rawPrediction|         probability|prediction|
+------+--------------------+--------------------+--------------------+----------+
|   0.0|[0.0,0.0,0.0,0.0,...|[0.32632924672382...|[0.58086595889630...|       0.0|
|   0.0|[0.0,0.0,0.0,0.0,...|[2.18843335408418...|[0.89920600293885...|       0.0|
|   0.0|[0.0,0.0,0.0,0.0,...|[1.22651299828303...|[0.77320768475176...|       0.0|
|   0.0|[0.0,0.0,0.0,0.0,...|[2.51014302048823...|[0.92484983142064...|       0.0|
|   0.0|[0.0,0.0,0.0,0.0,...|[1.20188546095681...|[0.76886002669069...|       0.0|
|   0.0|[0.0,0.0,0.0,0.0,...|[1.66327915301517...|[0.84067769937101...|       0.0|
|   0.0|[0.0,0.0,0.0,0.0,...|[1.69024610401177...|[0.84425652220610...|       0.0|
|   0.0|[0.0,0.0,0.0,0.0,...|[-0.0235492502494...|[0.49411295949842...|       1.0|
|   0.0|[0.0,0.0,0.0,0.0,...|[1.87235305007669...|[0.86673031026567...|       0.0|
|   

In [29]:
cm = predictions.select("target", "prediction")

In [30]:
cm.groupby('target').agg({'target': 'count'}).show()

+------+-------------+
|target|count(target)|
+------+-------------+
|   0.0|           43|
|   1.0|           28|
+------+-------------+



In [31]:
cm.groupby('prediction').agg({'prediction': 'count'}).show()

+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|               44|
|       1.0|               27|
+----------+-----------------+



In [32]:
acc = (cm.filter(cm.target == cm.prediction).count() / cm.count())*100
print("Model accuracy: %.2f%%" % (acc))

Model accuracy: 78.87%
