### Classification Using PySpark
I will demonstrate three classification algorithms.

1.NaiveBayes Classification

2.Multi-Layer Perceptron Classification

3.Decision Trees Classification

We explore the supervised classification algorithms using IRIS data

In [52]:
from pyspark.sql import *
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import  StringIndexer
import pandas as pd

In [14]:

spark = SparkSession.builder.appName("Classification").getOrCreate()

In [46]:
# Read the iris data
df_iris = pd.read_csv("https://raw.githubusercontent.com/amjadraza/blogs-data/master/spark_ml/iris.csv",header=None)

In [47]:
df_iris.head()

Unnamed: 0,0,1,2,3,4
0,5.1,3.5,1.4,0.2,Iris-setosa
1,4.9,3.0,1.4,0.2,Iris-setosa
2,4.7,3.2,1.3,0.2,Iris-setosa
3,4.6,3.1,1.5,0.2,Iris-setosa
4,5.0,3.6,1.4,0.2,Iris-setosa


In [64]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, FloatType
#creating columns
schema = StructType([ \
    StructField("sepal_length",FloatType(),True), \
    StructField("sepal_width",FloatType(),True), \
    StructField("petal_length",FloatType(),True), \
    StructField("petal_width", FloatType(), True), \
    StructField("species", StringType(), True) \
  ])


In [65]:
iris_df = spark.createDataFrame(df_iris, schema=schema)

In [66]:
iris_df.show(2)

+------------+-----------+------------+-----------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|    species|
+------------+-----------+------------+-----------+-----------+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|
+------------+-----------+------------+-----------+-----------+
only showing top 2 rows



In [67]:
# Converting the columns into features
vectorAssembler = VectorAssembler(inputCols = ["sepal_length", "sepal_width", "petal_length", "petal_width"],outputCol = "features")


In [69]:
viris_df = vectorAssembler.transform(iris_df)

In [70]:
viris_df.show(2)

+------------+-----------+------------+-----------+-----------+--------------------+
|sepal_length|sepal_width|petal_length|petal_width|    species|            features|
+------------+-----------+------------+-----------+-----------+--------------------+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|[5.09999990463256...|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|[4.90000009536743...|
+------------+-----------+------------+-----------+-----------+--------------------+
only showing top 2 rows



In [71]:
indexer = StringIndexer(inputCol="species", outputCol = "label")

In [75]:
ivris_df = indexer.fit(viris_df).transform(viris_df)

In [80]:
ivris_df.show(10)

+------------+-----------+------------+-----------+-----------+--------------------+-----+
|sepal_length|sepal_width|petal_length|petal_width|    species|            features|label|
+------------+-----------+------------+-----------+-----------+--------------------+-----+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|[5.09999990463256...|  0.0|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|[4.90000009536743...|  0.0|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|[4.69999980926513...|  0.0|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|[4.59999990463256...|  0.0|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|[5.0,3.5999999046...|  0.0|
|         5.4|        3.9|         1.7|        0.4|Iris-setosa|[5.40000009536743...|  0.0|
|         4.6|        3.4|         1.4|        0.3|Iris-setosa|[4.59999990463256...|  0.0|
|         5.0|        3.4|         1.5|        0.2|Iris-setosa|[5.0,3.4000000953...|  0.0|

### Naive Bayes Classification
Once the data is prepared, we are ready to apply the first classification algorithm.


In [81]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [84]:
# Create the traing and test splits
splits = ivris_df.randomSplit([0.6,0.4], 1)
train_df = splits[0]
test_df = splits[1]
# Apply the Naive bayes classifier
nb = NaiveBayes(modelType="multinomial")
nbmodel = nb.fit(train_df)
predictions_df = nbmodel.transform(test_df)

predictions_df.show(1, False)

+------------+-----------+------------+-----------+-----------+-------------------------------------------------------------+-----+-----------------------------------------------------------+-----------------------------------------------------------+----------+
|sepal_length|sepal_width|petal_length|petal_width|species    |features                                                     |label|rawPrediction                                              |probability                                                |prediction|
+------------+-----------+------------+-----------+-----------+-------------------------------------------------------------+-----+-----------------------------------------------------------+-----------------------------------------------------------+----------+
|4.3         |3.0        |1.1         |0.1        |Iris-setosa|[4.300000190734863,3.0,1.100000023841858,0.10000000149011612]|0.0  |[-9.75709373042928,-11.533008646025936,-11.892055622741834]|[0.7766522467004511,

In [85]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
nbaccuracy = evaluator.evaluate(predictions_df)
nbaccuracy

0.5873015873015873

### Decision Trees Classification

In [87]:

from pyspark.ml.classification import DecisionTreeClassifier
# Define the DT Classifier 
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
# Evaluate the DT Classifier
dt_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
dt_accuracy = dt_evaluator.evaluate(dt_predictions)
dt_accuracy

0.9365079365079365

# Regression using PySpark

In this section, we explore the Machine learning models for regression problems using pyspark. Regression models are helpful in predicting future values using past data.

In [88]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

In [89]:
# Read the iris data
df_ccpp = pd.read_csv("https://raw.githubusercontent.com/amjadraza/blogs-data/master/spark_ml/ccpp.csv")
pp_df = spark.createDataFrame(df_ccpp)
pp_df.show(2, False)

+-----+-----+-------+-----+------+
|AT   |V    |AP     |RH   |PE    |
+-----+-----+-------+-----+------+
|14.96|41.76|1024.07|73.17|463.26|
|25.18|62.96|1020.04|59.08|444.37|
+-----+-----+-------+-----+------+
only showing top 2 rows



In [90]:
# Create the feature column using VectorAssembler class
vectorAssembler = VectorAssembler(inputCols =["AT", "V", "AP", "RH"], outputCol = "features")
vpp_df = vectorAssembler.transform(pp_df)
vpp_df.show(2, False)

+-----+-----+-------+-----+------+---------------------------+
|AT   |V    |AP     |RH   |PE    |features                   |
+-----+-----+-------+-----+------+---------------------------+
|14.96|41.76|1024.07|73.17|463.26|[14.96,41.76,1024.07,73.17]|
|25.18|62.96|1020.04|59.08|444.37|[25.18,62.96,1020.04,59.08]|
+-----+-----+-------+-----+------+---------------------------+
only showing top 2 rows



### Linear Regression
We start with the simplest regression technique i.e. Linear Regression.

In [100]:
# Define and fit Linear Regression
lr = LinearRegression(featuresCol="features", labelCol="PE")
lr_model = lr.fit(vpp_df)

In [102]:
# Print and save the Model output
lr_model.coefficients
lr_model.intercept
lr_model.summary.r2

#lr_model.save()

0.9286960898122537

### Decision Tree Regression
In this section, we explore the Decision Tree Regression commonly used in Machine learning.

In [98]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
vpp_df.show(2, False)


# Define train and test data split
splits = vpp_df.randomSplit([0.7,0.3])
train_df = splits[0]
test_df = splits[1]
# Define the Decision Tree Model 
dt = DecisionTreeRegressor(featuresCol="features", labelCol="PE")
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_predictions.show(1, False)


# Evaluate the Model
dt_evaluator = RegressionEvaluator(labelCol="PE", predictionCol="prediction", metricName="r2")
dt_rmse = dt_evaluator.evaluate(dt_predictions)
print("The R Square of Decision Tree regression Model is {}".format(dt_rmse))


+-----+-----+-------+-----+------+---------------------------+
|AT   |V    |AP     |RH   |PE    |features                   |
+-----+-----+-------+-----+------+---------------------------+
|14.96|41.76|1024.07|73.17|463.26|[14.96,41.76,1024.07,73.17]|
|25.18|62.96|1020.04|59.08|444.37|[25.18,62.96,1020.04,59.08]|
+-----+-----+-------+-----+------+---------------------------+
only showing top 2 rows

+----+-----+------+-----+------+-------------------------+------------------+
|AT  |V    |AP    |RH   |PE    |features                 |prediction        |
+----+-----+------+-----+------+-------------------------+------------------+
|3.73|39.42|1024.4|82.42|488.58|[3.73,39.42,1024.4,82.42]|486.64627027027035|
+----+-----+------+-----+------+-------------------------+------------------+
only showing top 1 row

The R Square of Decision Tree regression Model is 0.9305138819663558


### Gradient Boosting Decision Tree Regression
Gradient Boosting is another common choice among ML professionals. Let us try the GBM in this section.

In [103]:
from pyspark.ml.regression import GBTRegressor
# Define the GBT Model
gbt = GBTRegressor(featuresCol="features", labelCol="PE")
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
# Evaluate the GBT Model
gbt_evaluator = RegressionEvaluator(labelCol="PE", predictionCol="prediction", metricName="r2")
gbt_rmse = gbt_evaluator.evaluate(gbt_predictions)
print("The R square of GBT Tree regression Model is {}".format(gbt_rmse))

The R square of GBT Tree regression Model is 0.9454720080073261
