In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession, SQLContext
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, IndexToString
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, GBTRegressor
from pyspark.ml.classification import LogisticRegression, GBTClassifier, RandomForestClassifier
from pyspark.ml.evaluation import RegressionEvaluator, MulticlassClassificationEvaluator

In [3]:
spark = SparkSession\
    .builder\
    .appName("Machine_Learning")\
    .getOrCreate()
    
sqlContext = SQLContext(spark)

#### 1. Regression

In this question, I will try to predict the number of rings (tells us the age) of an abalone based on different features like sex, length, diameter, height and weight using linear regression, decision tree regression and gradient-boosted tree regression. I evaluate the performance of the model using root mean squared error (RMSE). As seen, RMSE improves with each model. 

The data is in the 'libsvm' format so no preprocessing is necessary.

In [4]:
# Read the data
# Data needs to be in libsvm format
regData = spark.read.format('libsvm').load('abalone.txt')

# Checking the dimensions of the Dataframe
print("Number of rows = " + str(regData.count()))

Number of rows = 4177


In [5]:
# Split the data into train (75%) and test (25%)
(train, test) = regData.randomSplit([0.75,0.25], seed=0)

In [6]:
# Linear Regression

lr = LinearRegression(maxIter=15, regParam=0.5, elasticNetParam=0.3)

# Training summary
lr_model = lr.fit(train)
print("Coefficients: %s" % str(lr_model.coefficients))
print("Intercept: %s" % str(lr_model.intercept))

# Evaluation on test data
prediction = lr_model.transform(test)
evaluator = RegressionEvaluator(labelCol="label",
                                predictionCol="prediction",
                                metricName="rmse")

rmse1 = evaluator.evaluate(prediction)
print("The RMSE of linear regression is " + str(rmse1))

Coefficients: [-0.23336738972975304,0.2482148684290254,2.5261012612260365,16.27060791361096,0.0,-1.4825385837384837,0.0,8.089093572250103]
Intercept: 5.553189074979348
The RMSE of linear regression is 2.5262466917925055


In [7]:
# Decision Tree Regression

# Creating a pipeline for the model
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(regData)
dt = DecisionTreeRegressor(featuresCol="indexedFeatures")
pipeline = Pipeline(stages=[featureIndexer, dt])

# Fitting the model
dt_model = pipeline.fit(train)

# Prediction and evaluation
prediction = dt_model.transform(test)
evaluator = RegressionEvaluator(labelCol="label",
                                predictionCol="prediction",
                                metricName="rmse")

rmse2 = evaluator.evaluate(prediction)
print("The RMSE of decision tree regression is " + str(rmse2))

The RMSE of decision tree regression is 2.275210428073053


In [8]:
# Gradient-boosted Tree Regression

# Creating pipeline for the model
gbt = GBTRegressor(featuresCol="indexedFeatures", maxIter=15)
pipeline = Pipeline(stages=[featureIndexer, gbt])

# Fitting the model
gbt_model = pipeline.fit(train)

# Prediction and evaluation
prediction = gbt_model.transform(test)
evaluator = RegressionEvaluator(labelCol="label",
                                predictionCol="prediction",
                                metricName="rmse")

rmse3 = evaluator.evaluate(prediction)
print("The RMSE of gradient-boosted tree regression is " + str(rmse3))

The RMSE of gradient-boosted tree regression is 2.2074058312931144


#### 2. Classification

I performed classification on the diabetes dataset using the following methods:

(i) Logistic Regression

(ii) Gradient Boosted Tree

(iii) Random Forest

The accuracy of classification on the test set improves with each method.

In [9]:
# Read the data
diabetes = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('diabetes.csv')

In [10]:
# Looking at the first five rows of the Dataframe
diabetes.show(5)

# Checking the dimensions of the Dataframe
print("Number of rows = " + str(diabetes.count()))
print("Number of columns = " + str(len(diabetes.columns)))

# Column names
diabetes.columns

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
only showing top 5 rows

Number of rows = 768
Number of columns = 9


['Pregnancies',
 'Glucose',
 'BloodPressure',
 'SkinThickness',
 'Insulin',
 'BMI',
 'DiabetesPedigreeFunction',
 'Age',
 'Outcome']

In [11]:
# Splitting into train (75%) and test (25%)
(train, test) = diabetes.randomSplit([0.75,0.25], seed=0)

In [12]:
# Logistic Regression

# Creating the pipeline for fitting the model
vectorAssembler = VectorAssembler(inputCols = ['Pregnancies','Glucose','BloodPressure','SkinThickness','Insulin','BMI',
                                               'DiabetesPedigreeFunction','Age'], outputCol = 'features')
logisticRegression = LogisticRegression(maxIter=15, regParam=0.5, elasticNetParam=0.3).setLabelCol('Outcome')
pipeline = Pipeline(stages=[vectorAssembler, logisticRegression])

# Fitting thr model
model = pipeline.fit(train)

# Prediction and evaluation
prediction = model.transform(test)
evaluator = MulticlassClassificationEvaluator(labelCol="Outcome",
                                              predictionCol="prediction",
                                              metricName="accuracy")
accuracy1 = evaluator.evaluate(prediction)
print("The accuracy of logistic regression is " + str(accuracy1))

The accuracy of logistic regression is 0.6839080459770115


In [13]:
# Gradient Boosted Tree Classification

# Creating the pipeline for model fitting
labelIndexer = StringIndexer(inputCol="Outcome", outputCol="indexedLabel")
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4)
gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=15)
pipeline = Pipeline(stages=[vectorAssembler, labelIndexer, featureIndexer, gbt])

# Fitting the model
model = pipeline.fit(train)

# Prediction and evaluation
prediction = model.transform(test)
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel",
                                              predictionCol="prediction",
                                              metricName="accuracy")
accuracy2 = evaluator.evaluate(prediction)
print("The accuracy of gradient boosted tree classifier is " + str(accuracy2))

The accuracy of gradient boosted tree classifier is 0.7586206896551724


In [14]:
# Random Forest Classification

# Creating the pipeline for fitting the model
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=100)
pipeline = Pipeline(stages=[vectorAssembler, labelIndexer, featureIndexer, rf])

# Fitting the model
model = pipeline.fit(train)

# Prediction and evaluation
prediction = model.transform(test)
accuracy3 = evaluator.evaluate(prediction)
print("The accuracy of random forest classifier is " + str(accuracy3))

The accuracy of random forest classifier is 0.7758620689655172
