In [51]:
import pyspark

In [3]:
# import packages
from pyspark.ml import linalg
from pyspark.mllib.stat import Statistics

from pyspark.ml import feature

from pyspark.ml import regression
from pyspark.ml import classification
from pyspark.ml import clustering

from pyspark.ml import evaluation
from pyspark.ml import param
from pyspark.ml import util
from pyspark.ml import tuning

import numpy as np
import pandas as pd

In [4]:
from pyspark import SparkContext, SparkConf
sc = SparkContext()

## Basicm statistics

In [5]:
from pyspark.mllib.stat import Statistics

In [6]:
# create RDD of Vectors
sample = sc.parallelize([np.array([1.0, 10.0, 100.0]), np.array([2.0, 20.0, 200.0]), np.array([3.0, 30.0, 300.0])])
sample

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:480

colStats() returns an instance of MultivariateStatisticalSummary, which contains the column-wise max, min, mean, variance, and number of nonzeros, as well as the total count.


In [9]:
statics = Statistics.colStats(sample)
statics

<pyspark.mllib.stat._statistics.MultivariateStatisticalSummary at 0x112611588>

In [10]:
# a dense vector containing the mean value for each column
print(statics.mean())

# column-wise variance
print(statics.variance())

# number of nonzeros in each column
print(statics.numNonzeros())

[   2.   20.  200.]
[  1.00000000e+00   1.00000000e+02   1.00000000e+04]
[ 3.  3.  3.]


In [11]:
print(statics.min())

[   1.   10.  100.]


In [12]:
print(statics.count())

3


### Correlation

Compute the correlation (matrix) for the input RDD(s) using the specified method. Methods currently supported: pearson (default), spearman

In [13]:
seriesX = sc.parallelize([1.0, 2.0, 3.0, 3.0, 5.0])
seriesY = sc.parallelize([11.0, 22.0, 33.0, 33.0, 555.0])
print("Correlation is: " + str(Statistics.corr(seriesX, seriesY, method="pearson")))

Correlation is: 0.8500286768773007


In [14]:
#vectors
from pyspark.mllib.linalg import Vectors

rdd = sc.parallelize([Vectors.dense([1, 0, 0, -2]), Vectors.dense([4, 5, 0, 3]),Vectors.dense([6, 7, 0,  8]), Vectors.dense([9, 0, 0, 1])])
pearsonCorr = Statistics.corr(rdd)

In [15]:
print(str(pearsonCorr))

[[ 1.          0.05564149         nan  0.40047142]
 [ 0.05564149  1.                 nan  0.91359586]
 [        nan         nan  1.                 nan]
 [ 0.40047142  0.91359586         nan  1.        ]]


### Import Iris Data 

In [16]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [17]:
# define data struct
struct = StructType([
        StructField('Id', IntegerType(), True),
        StructField('SepalLengthCm', DoubleType(), True),
        StructField('SepalWidthCm', DoubleType(), True),
        StructField('PetalLengthCm', DoubleType(), True),
        StructField('PetalWidthCm', DoubleType(), True),
        StructField('Species', StringType(), True)
    ])


In [18]:
from pyspark.sql import SQLContext, Row 
sqlContext = SQLContext(sc)

In [19]:
# load data
df_iris = sqlContext.read.load('../iris.csv',
                               format='com.databricks.spark.csv', 
                               header='true', 
                               schema= struct)


# Or load data from hdfs cluster
# df_iris =spark.read.csv('hdfs:///user/aadsyanw/data/iris.csv', header=True, schema=struct)

In [20]:
df_iris.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- SepalLengthCm: double (nullable = true)
 |-- SepalWidthCm: double (nullable = true)
 |-- PetalLengthCm: double (nullable = true)
 |-- PetalWidthCm: double (nullable = true)
 |-- Species: string (nullable = true)



In [21]:
# top 5
df_iris.show(5)

+---+-------------+------------+-------------+------------+-------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|Species|
+---+-------------+------------+-------------+------------+-------+
|  1|          5.1|         3.5|          1.4|         0.2| Setosa|
|  2|          4.9|         3.0|          1.4|         0.2| Setosa|
|  3|          4.7|         3.2|          1.3|         0.2| Setosa|
|  4|          4.6|         3.1|          1.5|         0.2| Setosa|
|  5|          5.0|         3.6|          1.4|         0.2| Setosa|
+---+-------------+------------+-------------+------------+-------+
only showing top 5 rows



### Basic Statistics

In [22]:

df_iris.describe().show()

+-------+------------------+------------------+-------------------+------------------+------------------+---------+
|summary|                Id|     SepalLengthCm|       SepalWidthCm|     PetalLengthCm|      PetalWidthCm|  Species|
+-------+------------------+------------------+-------------------+------------------+------------------+---------+
|  count|               150|               150|                150|               150|               150|      150|
|   mean|              75.5| 5.843333333333335|  3.057333333333334|3.7580000000000027| 1.199333333333334|     null|
| stddev|43.445367992456916|0.8280661279778637|0.43586628493669793|1.7652982332594662|0.7622376689603467|     null|
|    min|                 1|               4.3|                2.0|               1.0|               0.1|   Setosa|
|    max|               150|               7.9|                4.4|               6.9|               2.5|Virginica|
+-------+------------------+------------------+-------------------+-----

### VectorAssembler 

A transformer that combines a given list of columns into a single vector column for modeling.

In [23]:
vecAssembler = VectorAssembler(inputCols=["SepalLengthCm", "SepalWidthCm", "PetalLengthCm"], outputCol="features")
df_features = vecAssembler.transform(df_iris)
df_features.show(5)

+---+-------------+------------+-------------+------------+-------+-------------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|Species|     features|
+---+-------------+------------+-------------+------------+-------+-------------+
|  1|          5.1|         3.5|          1.4|         0.2| Setosa|[5.1,3.5,1.4]|
|  2|          4.9|         3.0|          1.4|         0.2| Setosa|[4.9,3.0,1.4]|
|  3|          4.7|         3.2|          1.3|         0.2| Setosa|[4.7,3.2,1.3]|
|  4|          4.6|         3.1|          1.5|         0.2| Setosa|[4.6,3.1,1.5]|
|  5|          5.0|         3.6|          1.4|         0.2| Setosa|[5.0,3.6,1.4]|
+---+-------------+------------+-------------+------------+-------+-------------+
only showing top 5 rows



In [24]:
vecAssembler = VectorAssembler(inputCols=["SepalLengthCm", "SepalWidthCm", "PetalLengthCm", "PetalWidthCm"], outputCol="features")
df_features = vecAssembler.transform(df_iris)
df_features.show(5)

+---+-------------+------------+-------------+------------+-------+-----------------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|Species|         features|
+---+-------------+------------+-------------+------------+-------+-----------------+
|  1|          5.1|         3.5|          1.4|         0.2| Setosa|[5.1,3.5,1.4,0.2]|
|  2|          4.9|         3.0|          1.4|         0.2| Setosa|[4.9,3.0,1.4,0.2]|
|  3|          4.7|         3.2|          1.3|         0.2| Setosa|[4.7,3.2,1.3,0.2]|
|  4|          4.6|         3.1|          1.5|         0.2| Setosa|[4.6,3.1,1.5,0.2]|
|  5|          5.0|         3.6|          1.4|         0.2| Setosa|[5.0,3.6,1.4,0.2]|
+---+-------------+------------+-------------+------------+-------+-----------------+
only showing top 5 rows



In [25]:
# unique values in column
df_features.select('Species').distinct().show()

+----------+
|   Species|
+----------+
| Virginica|
|    Setosa|
|Versicolor|
+----------+



## Label Encoding

In [28]:
# transfer species into number

# StringIndexer encodes a string column of labels to a column of label indices.
stringIndexer = StringIndexer(inputCol="Species", outputCol="label")
df_features_lable = stringIndexer.fit(df_features).transform(df_features)
df_features_lable.show(5)


+---+-------------+------------+-------------+------------+-------+-----------------+-----+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|Species|         features|label|
+---+-------------+------------+-------------+------------+-------+-----------------+-----+
|  1|          5.1|         3.5|          1.4|         0.2| Setosa|[5.1,3.5,1.4,0.2]|  2.0|
|  2|          4.9|         3.0|          1.4|         0.2| Setosa|[4.9,3.0,1.4,0.2]|  2.0|
|  3|          4.7|         3.2|          1.3|         0.2| Setosa|[4.7,3.2,1.3,0.2]|  2.0|
|  4|          4.6|         3.1|          1.5|         0.2| Setosa|[4.6,3.1,1.5,0.2]|  2.0|
|  5|          5.0|         3.6|          1.4|         0.2| Setosa|[5.0,3.6,1.4,0.2]|  2.0|
+---+-------------+------------+-------------+------------+-------+-----------------+-----+
only showing top 5 rows



In [29]:
df_features_lable.select('label').distinct().show()

+-----+
|label|
+-----+
|  0.0|
|  1.0|
|  2.0|
+-----+



## Training Data/ Test data Split

In [30]:
df_train, df_test = df_features_lable.randomSplit([.8, .2])
df_train.count()

df_train.show(3)

+---+-------------+------------+-------------+------------+-------+-----------------+-----+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|Species|         features|label|
+---+-------------+------------+-------------+------------+-------+-----------------+-----+
|  1|          5.1|         3.5|          1.4|         0.2| Setosa|[5.1,3.5,1.4,0.2]|  2.0|
|  2|          4.9|         3.0|          1.4|         0.2| Setosa|[4.9,3.0,1.4,0.2]|  2.0|
|  3|          4.7|         3.2|          1.3|         0.2| Setosa|[4.7,3.2,1.3,0.2]|  2.0|
+---+-------------+------------+-------------+------------+-------+-----------------+-----+
only showing top 3 rows



## Modeling

### Naive Bayes model

simple multiclass classification algorithm with the assumption of independence between every pair of features.

class pyspark.ml.classification.NaiveBayes(self, featuresCol="features", labelCol="label", predictionCol="prediction", 
probabilityCol="probability", rawPredictionCol="rawPrediction", smoothing=1.0, modelType="multinomial", thresholds=None)

In [34]:
# train Naive Bayes model
from pyspark.ml.classification import NaiveBayes

nb = NaiveBayes(featuresCol="features", labelCol="label")
nb_model = nb.fit(df_train)

In [35]:
# predict
df_predicted = nb_model.transform(df_test.select('features','label'))
df_predicted.show(3)

+-----------------+-----+--------------------+--------------------+----------+
|         features|label|       rawPrediction|         probability|prediction|
+-----------------+-----+--------------------+--------------------+----------+
|[5.0,3.4,1.5,0.2]|  2.0|[-14.014669680563...|[0.09449191112867...|       2.0|
|[4.8,3.0,1.4,0.1]|  2.0|[-12.795068216499...|[0.10132642496646...|       2.0|
|[5.7,3.8,1.7,0.3]|  2.0|[-15.825390788806...|[0.08502740174948...|       2.0|
+-----------------+-----+--------------------+--------------------+----------+
only showing top 3 rows



In [36]:
# evaluate
evaluator = MulticlassClassificationEvaluator()
evaluator.evaluate(df_predicted)

0.8148148148148148

In [38]:
#apply the model
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

test0 = sc.parallelize([Row(features=Vectors.dense([5.1, 3.5, 1.4, 0.2]))]).toDF()
result = nb_model.transform(test0).head()

In [39]:
result

Row(features=DenseVector([5.1, 3.5, 1.4, 0.2]), rawPrediction=DenseVector([-14.1737, -13.419, -11.9843]), probability=DenseVector([0.0829, 0.1764, 0.7406]), prediction=2.0)

In [40]:
result.prediction

2.0

### Export and import trained model

In [57]:
# export model and import model, i.e. save and load model
output_dir = "hdfs:///user/aadsyanw/models/NaiveBayesModel_Iris"
nb_model.save(output_dir)

In [60]:
from pyspark.ml.classification import NaiveBayesModel
model2 = NaiveBayesModel.load(output_dir)

In [61]:
result2 = model2.transform(test0).head()

In [62]:
result2.prediction

2.0

### Decision Tree

In [44]:
# train the model
from pyspark.ml.classification import DecisionTreeClassifier
dtree = DecisionTreeClassifier(maxDepth=5, featuresCol="features", labelCol="label")
dtree_model = dtree.fit(df_train)
dtree_model.numNodes

15

In [45]:
dtree_model.depth

5

In [46]:
# predict
result = dtree_model.transform(test0).head()

In [47]:
result

Row(features=DenseVector([5.1, 3.5, 1.4, 0.2]), rawPrediction=DenseVector([0.0, 0.0, 40.0]), probability=DenseVector([0.0, 0.0, 1.0]), prediction=2.0)

In [48]:
result.prediction

2.0

In [49]:
# notes of tree
dtree_model.toDebugString

'DecisionTreeClassificationModel (uid=DecisionTreeClassifier_4792a3e28072f894f99d) of depth 5 with 15 nodes\n  If (feature 2 <= 1.9)\n   Predict: 2.0\n  Else (feature 2 > 1.9)\n   If (feature 2 <= 4.8)\n    If (feature 3 <= 1.6)\n     Predict: 1.0\n    Else (feature 3 > 1.6)\n     If (feature 0 <= 5.9)\n      Predict: 1.0\n     Else (feature 0 > 5.9)\n      Predict: 0.0\n   Else (feature 2 > 4.8)\n    If (feature 3 <= 1.6)\n     If (feature 2 <= 4.9)\n      Predict: 1.0\n     Else (feature 2 > 4.9)\n      If (feature 0 <= 6.0)\n       Predict: 0.0\n      Else (feature 0 > 6.0)\n       Predict: 0.0\n    Else (feature 3 > 1.6)\n     Predict: 0.0\n'

### References

https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.ml.classification.DecisionTreeClassificationModel

https://spark.apache.org/docs/2.1.0/ml-features.html#vectorassembler

https://spark.apache.org/docs/1.5.2/ml-features.html
    
https://spark.apache.org/docs/1.2.0/mllib-guide.html