In [None]:
# Pyspark ML Heart and Advertisement Data Analysis
# Author: Ashutosh Kumar

In [1]:
# start a Spark session:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PySpark ML Heart Data Analysis").getOrCreate()

In [2]:
#Create dataframe by calling read() method on SparkSession/spark object:

heart = spark.read.format("csv").option("header", "true").option("inferSchema", "true")\
             .load("/common_folder/heart.csv")

# used option header true to ignore the first row in csv which is header

In [5]:
# Verify that the data is loaded in the dataframe :

heart.show(5)

# Pyspark way of checking:
#heart.take(5)

+---+---+---------+---+----------+---+-----------+--------------+---------------+-------------+--------+--------------------+----+-------------+----+
|age|sex|pain type| BP|cholestrol|fbs|resting ecg|max heart rate|exercise angina|ST depression|ST slope|flouroscopy coloured|thal|heart disease|_c14|
+---+---+---------+---+----------+---+-----------+--------------+---------------+-------------+--------+--------------------+----+-------------+----+
| 70|  1|        4|130|       322|  0|          2|           109|              0|          2.4|       2|                   3|   3|            2|null|
| 67|  0|        3|115|       564|  0|          2|           160|              0|          1.6|       2|                   0|   7|            1|null|
| 57|  1|        2|124|       261|  0|          0|           141|              0|          0.3|       1|                   0|   7|            2|null|
| 64|  1|        4|128|       263|  0|          0|           105|              1|          0.2|     

In [None]:
# Performing basic analysis:

In [6]:
# What is the maximum and minimum age?

# importing required functions for the same:
from pyspark.sql.functions import min, max

heart.select(min("age"), max("age")).show()

+--------+--------+
|min(age)|max(age)|
+--------+--------+
|      29|      77|
+--------+--------+



In [9]:
# create a new DataFrame with only the 'age' column from the original DataFrame. Save it to a variable, say 'heart_age'.:

heart_age= heart[["age"]]
heart_age.show()

+---+
|age|
+---+
| 70|
| 67|
| 57|
| 64|
| 74|
| 65|
| 56|
| 59|
| 60|
| 63|
| 59|
| 53|
| 44|
| 61|
| 57|
| 71|
| 46|
| 53|
| 64|
| 40|
+---+
only showing top 20 rows



In [15]:
# Bucket age groups to four different buckets:

# Importing the required method :
from pyspark.ml.feature import Bucketizer
bucketBorders =  [29, 40, 50, 60, 70, 80]

bucketer = Bucketizer().setSplits(bucketBorders).setInputCol("age").setOutputCol("bucket")
bucketer.transform(heart_age).show()

+---+------+
|age|bucket|
+---+------+
| 70|   4.0|
| 67|   3.0|
| 57|   2.0|
| 64|   3.0|
| 74|   4.0|
| 65|   3.0|
| 56|   2.0|
| 59|   2.0|
| 60|   3.0|
| 63|   3.0|
| 59|   2.0|
| 53|   2.0|
| 44|   1.0|
| 61|   3.0|
| 57|   2.0|
| 71|   4.0|
| 46|   1.0|
| 53|   2.0|
| 64|   3.0|
| 40|   1.0|
+---+------+
only showing top 20 rows



In [19]:
# What is the count of observations for each bucket?

# Importing sql functions of PySpark:
from pyspark.sql.functions import count,expr

bucketed = bucketer.transform(heart_age)
bucketed.groupBy("bucket").agg(
    count("age").alias("count"),
    expr("count(age)")).show()


+------+-----+----------+
|bucket|count|count(age)|
+------+-----+----------+
|   0.0|   12|        12|
|   1.0|   67|        67|
|   4.0|   10|        10|
|   3.0|   74|        74|
|   2.0|  107|       107|
+------+-----+----------+



In [24]:
# Let's import other data to build PySpark based ML model:

#Create dataframe by calling read() method on SparkSession/spark object:

advertisementDF = spark.read.format("csv").option("header", "true").option("inferSchema", "true")\
             .load("/common_folder/Advertising.csv")

# used option header true to ignore the first row in csv which is header

In [25]:
# Checking the data is loaded:

advertisementDF.show(5)

+-----+-----+---------+-----+
|   TV|Radio|Newspaper|Sales|
+-----+-----+---------+-----+
|230.1| 37.8|     69.2| 22.1|
| 44.5| 39.3|     45.1| 10.4|
| 17.2| 45.9|     69.3|  9.3|
|151.5| 41.3|     58.5| 18.5|
|180.8| 10.8|     58.4| 12.9|
+-----+-----+---------+-----+
only showing top 5 rows



In [33]:
# Now, to run linear regression, we need to first divide the data into feature and label columns, i.e., X and Y columns.

# Prepare data for Machine Learning.
# We need a single feature vector in order to train various ML models. We can do this using VectorAssembler.
# And we need two columns only — features and label(“Sales”):

# The VectorAssembler is a tool you’ll use in nearly every single pipeline you generate. It helps concatenate all your features
# into one big vector you can then pass into an estimator. It takes as input a number of columns of Boolean, Double, or Vector.

# Importing the required feature:
from pyspark.ml.feature import VectorAssembler

vectorAssembler  = VectorAssembler().setInputCols(["TV", "Radio", "Newspaper"]).setOutputCol("features")
#va.transform(advertisementDF).show()
vadvertisementDF = vectorAssembler.transform(advertisementDF)
vadvertisementDF = vadvertisementDF.select(['features', 'Sales'])
vadvertisementDF.show(5)

+-----------------+-----+
|         features|Sales|
+-----------------+-----+
|[230.1,37.8,69.2]| 22.1|
| [44.5,39.3,45.1]| 10.4|
| [17.2,45.9,69.3]|  9.3|
|[151.5,41.3,58.5]| 18.5|
|[180.8,10.8,58.4]| 12.9|
+-----------------+-----+
only showing top 5 rows



In [34]:
# Once the feature and label columns are ready, we need to fit a linear regression model. 

from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='Sales', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(vadvertisementDF)
print(lr.explainParams())


aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0, current: 0.8)
epsilon: The shape parameter to control the amount of robustness. Must be > 1.0. Only valid when loss is huber (default: 1.35)
featuresCol: features column name. (default: features, current: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label, current: Sales)
loss: The loss function to be optimized. Supported options: squaredError, huber. (default: squaredError)
maxIter: max number of iterations (>= 0). (default: 100, current: 10)
predictionCol: prediction column name. (default: prediction)
regParam: regularization parameter (>= 0). (default: 0.0, current: 0.3)
solver: The solver algorithm for optimization. Supported options: auto, normal, l-bfgs. (default: auto)
standardiza

In [35]:
# Checking the model coefficient and intercepts:
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [0.04262382354938965,0.1708116439330275,0.0]
Intercept: 3.7812243412809186
