# Spark - Classification - Naive Bayes Classifier ML Pipelines

## Algorithm Summary
- Task : Classification with binary or multiclass labels
- Input : Label (binary or multiclass, 0-based indexed), feature vectors(discrete)
- Smoothing : Additive smothing, default parameter is set to 1.0
- Model type : Multinomial (default) or Bernouli. to use Bernouli, convert feature vectors to 0-1 vectors and set modelType to "Bernouli"
- Assumptions:
    - Independence between every pair of features
    - Feature values are nonnegative, such as counts


## Data Analysis Example
- <a href="https://github.com/vincentarelbundock/Rdatasets/blob/master/csv/datasets/iris.csv">Iris dataset</a>
- Make a connection to spark cluster
- Dataset Review
- Load Data & Data preprocessing
- Explore the data
- Create a multiclass naive Bayes Classifier and Evaluation
- Experimenting with Various Smoothing Parameters

## Dataset Review
The dataset contains 3 species of iris, there are Setosa, Versicolor and Virginica with 50 instances of each. in this example, we are going to try to predict the species from its features.

Feature Information:
1. Sepal Length in cm
2. Sepal Width in cm
3. Petal Length in cm
4. Petal Width in cm

Target and Label :
- Species
    - Setosa
    - Versicolor
    - Virginica

In [24]:
import os
import sys

spark_path = '/Users/pradmishra/Downloads/spark-2.4.0-bin-hadoop2.7'
os.environ['SPARK_HOME']= spark_path
os.environ['HADOOP_HOME']=spark_path
sys.path.append(spark_path+'/bin')
sys.path.append(spark_path+'/python')
sys.path.append(spark_path+'/python/pyspark')
sys.path.append(spark_path+'/python/lib')
sys.path.append(spark_path+'/python/lib/pyspark.zip')
sys.path.append(spark_path+'/python/lib/py4j-0.10.7-src.zip')

from pyspark import SparkContext
from pyspark import SparkConf

In [2]:
#import libraries from python
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.ml.feature import StringIndexer

In [26]:
# Configure the necessary Spark environment
import os
import sys

pyspark_submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "")
if not "pyspark-shell" in pyspark_submit_args: pyspark_submit_args += " pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

spark_home = os.environ.get('SPARK_HOME', None)
sys.path.insert(0, spark_home + "/python")

# Add the py4j to the path.
# You may need to change the version number to match your install
sys.path.insert(0, os.path.join(spark_home, "python/lib/py4j-0.10.7-src.zip"))

# Initialize PySpark
exec(open(os.path.join(spark_home, "python/pyspark/shell.py")).read())

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Python version 3.6.6 (default, Jun 28 2018 11:07:29)
SparkSession available as 'spark'.


In [27]:
spark

In [3]:
#get some context.
#create a SparkContext and a SQLContext to use
conf = SparkConf()
#conf.setMaster("spark://sparklab-master:7077")
conf.setMaster("local[4]")
conf.setAppName("Spark Classification with Naive Bayes - Iris Datasets")
sc = SparkContext.getOrCreate(conf=conf)
sqlContext=SQLContext(sc)

## Load Data and Data Preprocessing

In [4]:
data_file = "https://raw.githubusercontent.com/vincentarelbundock/Rdatasets/master/csv/datasets/iris.csv"
import pandas as pd
df = pd.read_csv(data_file)
del df['Unnamed: 0']
df.columns = ['SepalLength', 'SepalWidth', 'PetalLength', 'PetalWidth',
       'Species']
df.to_csv('iris.csv')


we use csv format datasets and load the dataset with sqlCOntext format

In [5]:
#create a dataframe
#stored in a MYSQL database
#data = sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true").load(data_file)
#data.show()

data = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('iris.csv')
data.take(5)

[Row(_c0=0, SepalLength=5.1, SepalWidth=3.5, PetalLength=1.4, PetalWidth=0.2, Species='setosa'),
 Row(_c0=1, SepalLength=4.9, SepalWidth=3.0, PetalLength=1.4, PetalWidth=0.2, Species='setosa'),
 Row(_c0=2, SepalLength=4.7, SepalWidth=3.2, PetalLength=1.3, PetalWidth=0.2, Species='setosa'),
 Row(_c0=3, SepalLength=4.6, SepalWidth=3.1, PetalLength=1.5, PetalWidth=0.2, Species='setosa'),
 Row(_c0=4, SepalLength=5.0, SepalWidth=3.6, PetalLength=1.4, PetalWidth=0.2, Species='setosa')]

## Explore data

In [6]:
# Convert target into numerical categories
labelIndexer = StringIndexer(inputCol="Species", outputCol="label")

Split row and just show SepalLength and Species

In [7]:
data.select("SepalLength","Species").show()

+-----------+-------+
|SepalLength|Species|
+-----------+-------+
|        5.1| setosa|
|        4.9| setosa|
|        4.7| setosa|
|        4.6| setosa|
|        5.0| setosa|
|        5.4| setosa|
|        4.6| setosa|
|        5.0| setosa|
|        4.4| setosa|
|        4.9| setosa|
|        5.4| setosa|
|        4.8| setosa|
|        4.8| setosa|
|        4.3| setosa|
|        5.8| setosa|
|        5.7| setosa|
|        5.4| setosa|
|        5.1| setosa|
|        5.7| setosa|
|        5.1| setosa|
+-----------+-------+
only showing top 20 rows



In [8]:
#take raw data from iris2 dataset.
data.take(1)

[Row(_c0=0, SepalLength=5.1, SepalWidth=3.5, PetalLength=1.4, PetalWidth=0.2, Species='setosa')]

In [9]:
#train the model
#Split the data into train and test
# To proceed, we will first randomly split the dataset into training set (70%) and test set (30%).
trainData, testData = data.randomSplit([0.7, 0.3], seed = 100)

In [10]:
trainData.show(10)

+---+-----------+----------+-----------+----------+-------+
|_c0|SepalLength|SepalWidth|PetalLength|PetalWidth|Species|
+---+-----------+----------+-----------+----------+-------+
|  0|        5.1|       3.5|        1.4|       0.2| setosa|
|  1|        4.9|       3.0|        1.4|       0.2| setosa|
|  2|        4.7|       3.2|        1.3|       0.2| setosa|
|  3|        4.6|       3.1|        1.5|       0.2| setosa|
|  5|        5.4|       3.9|        1.7|       0.4| setosa|
|  6|        4.6|       3.4|        1.4|       0.3| setosa|
| 10|        5.4|       3.7|        1.5|       0.2| setosa|
| 11|        4.8|       3.4|        1.6|       0.2| setosa|
| 12|        4.8|       3.0|        1.4|       0.1| setosa|
| 13|        4.3|       3.0|        1.1|       0.1| setosa|
+---+-----------+----------+-----------+----------+-------+
only showing top 10 rows



In [11]:
testData.show(10)

+---+-----------+----------+-----------+----------+-------+
|_c0|SepalLength|SepalWidth|PetalLength|PetalWidth|Species|
+---+-----------+----------+-----------+----------+-------+
|  4|        5.0|       3.6|        1.4|       0.2| setosa|
|  7|        5.0|       3.4|        1.5|       0.2| setosa|
|  8|        4.4|       2.9|        1.4|       0.2| setosa|
|  9|        4.9|       3.1|        1.5|       0.1| setosa|
| 15|        5.7|       4.4|        1.5|       0.4| setosa|
| 18|        5.7|       3.8|        1.7|       0.3| setosa|
| 19|        5.1|       3.8|        1.5|       0.3| setosa|
| 21|        5.1|       3.7|        1.5|       0.4| setosa|
| 25|        5.0|       3.0|        1.6|       0.2| setosa|
| 26|        5.0|       3.4|        1.6|       0.4| setosa|
+---+-----------+----------+-----------+----------+-------+
only showing top 10 rows



In [12]:
print("Counting Training Data : {}".format(trainData.count()))

Counting Training Data : 103


In [13]:
print("Counting Test Data : {}".format(testData.count()))

Counting Test Data : 47


In [14]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler

In [15]:
vecAssembler = VectorAssembler(inputCols=["SepalLength", "SepalWidth", "PetalLength", "PetalWidth"], outputCol="features")

In [29]:
# example of Vector Assembler 
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

dataset = spark.createDataFrame([(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],["id", "hour", "mobile", "userFeatures", "clicked"])

assembler = VectorAssembler(
    inputCols=["hour", "mobile", "userFeatures"],
    outputCol="features")

output = assembler.transform(dataset)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)

Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'
+-----------------------+-------+
|features               |clicked|
+-----------------------+-------+
|[18.0,1.0,0.0,10.0,0.5]|1.0    |
+-----------------------+-------+



In [30]:
#Train a NaiveBayes Model
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

#chain LabelIndexer, vecAssembler and NBmodel in a
pipeline = Pipeline(stages=[labelIndexer, vecAssembler, nb])

#Run stages in pipeline and train model
model = pipeline.fit(trainData)

In [31]:
# Make predictions on testData so we can measure the accuracy of our model on new data
predictions = model.transform(testData)
print("Show the predictions")
predictions.select("label","prediction","probability").show()

Show the predictions
+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  2.0|       2.0|[0.16435741351710...|
|  2.0|       2.0|[0.18667884202571...|
|  2.0|       2.0|[0.22632018815338...|
|  2.0|       2.0|[0.19345806995097...|
|  2.0|       2.0|[0.13852460169332...|
|  2.0|       2.0|[0.17860399418640...|
|  2.0|       2.0|[0.17324670108747...|
|  2.0|       2.0|[0.19628177721965...|
|  2.0|       2.0|[0.22520036100353...|
|  2.0|       2.0|[0.23035185472860...|
|  2.0|       2.0|[0.11991695747320...|
|  2.0|       2.0|[0.21022032654016...|
|  2.0|       2.0|[0.17317576418211...|
|  2.0|       2.0|[0.17763221141908...|
|  2.0|       2.0|[0.19497376441884...|
|  2.0|       2.0|[0.22725681450445...|
|  2.0|       2.0|[0.18439734839081...|
|  0.0|       0.0|[0.50347518799337...|
|  0.0|       0.0|[0.47461382207661...|
|  0.0|       0.0|[0.48299790615630...|
+-----+----------+--------------------+
only showing top 20

In [32]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", 
                                              predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print ("Model Accurary : {}".format(accuracy))

Model Accurary : 0.9361702127659575


In [33]:
evaluator.explainParam("metricName")

'metricName: metric name in evaluation (f1|weightedPrecision|weightedRecall|accuracy) (default: f1, current: accuracy)'

In [34]:
from pyspark.mllib.evaluation import MulticlassMetrics
# Create (prediction, label) pairs
predictionAndLabel = predictions.select("prediction", "label").rdd

# Generate confusion matrix
metrics = MulticlassMetrics(predictionAndLabel)
print (metrics.confusionMatrix())

DenseMatrix([[13.,  2.,  0.],
             [ 1., 14.,  0.],
             [ 0.,  0., 17.]])


# Experimenting with Various Smoothing Parameters

In [35]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid and Evaluator for Cross Validation
paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0]).build()
cvEvaluator = MulticlassClassificationEvaluator(metricName="accuracy")

In [36]:
# Run Cross-validation
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=cvEvaluator)
cvModel = cv.fit(trainData)

In [37]:
# Make predictions on testData. cvModel uses the bestModel.
cvPredictions = cvModel.transform(testData)

In [38]:
#select results to view
cvPredictions.select("label", "prediction", "probability").show()

+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  2.0|       2.0|[0.16457861889794...|
|  2.0|       2.0|[0.18720068084740...|
|  2.0|       2.0|[0.22738378398297...|
|  2.0|       2.0|[0.19300467575986...|
|  2.0|       2.0|[0.14017982615653...|
|  2.0|       2.0|[0.18004482681025...|
|  2.0|       2.0|[0.17460923737806...|
|  2.0|       2.0|[0.19908569126593...|
|  2.0|       2.0|[0.22624604045935...|
|  2.0|       2.0|[0.23378946673763...|
|  2.0|       2.0|[0.11963122871765...|
|  2.0|       2.0|[0.21104925904871...|
|  2.0|       2.0|[0.17342576291261...|
|  2.0|       2.0|[0.17899828109252...|
|  2.0|       2.0|[0.19560457704350...|
|  2.0|       2.0|[0.22942331067819...|
|  2.0|       2.0|[0.18485341056025...|
|  0.0|       0.0|[0.50577482872555...|
|  0.0|       0.0|[0.47919072043808...|
|  0.0|       0.0|[0.48565715223632...|
+-----+----------+--------------------+
only showing top 20 rows



In [39]:
# Evaluate bestModel found from Cross Validation
evaluator.evaluate(cvPredictions)

0.9361702127659575