In [1]:
import sys
import os

from pyspark.sql.types import *
import pyspark.sql.functions as func

from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import Bucketizer, OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.regression import GeneralizedLinearRegression, LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder 

import pyspark
from pyspark.sql import SQLContext

#if ('sc' not in locals() or 'sc' not in globals()):
#    os.environ['PYSPARK_PYTHON'] = '/usr/bin/python2'
sc = pyspark.SparkContext('local[*]')
sqlContext = SQLContext(sc)

In [2]:
df = sqlContext.read.csv("bezdekIris.data", inferSchema=True)\
.toDF("sep_len", "sep_wid", "pet_len", "pet_wid", "label")

In [16]:
df.show(150)

+---------------+-----------------+----------+
|          label|         features|labelIndex|
+---------------+-----------------+----------+
|    Iris-setosa|[5.1,3.5,1.4,0.2]|       0.0|
|    Iris-setosa|[4.9,3.0,1.4,0.2]|       0.0|
|    Iris-setosa|[4.7,3.2,1.3,0.2]|       0.0|
|    Iris-setosa|[4.6,3.1,1.5,0.2]|       0.0|
|    Iris-setosa|[5.0,3.6,1.4,0.2]|       0.0|
|    Iris-setosa|[5.4,3.9,1.7,0.4]|       0.0|
|    Iris-setosa|[4.6,3.4,1.4,0.3]|       0.0|
|    Iris-setosa|[5.0,3.4,1.5,0.2]|       0.0|
|    Iris-setosa|[4.4,2.9,1.4,0.2]|       0.0|
|    Iris-setosa|[4.9,3.1,1.5,0.1]|       0.0|
|    Iris-setosa|[5.4,3.7,1.5,0.2]|       0.0|
|    Iris-setosa|[4.8,3.4,1.6,0.2]|       0.0|
|    Iris-setosa|[4.8,3.0,1.4,0.1]|       0.0|
|    Iris-setosa|[4.3,3.0,1.1,0.1]|       0.0|
|    Iris-setosa|[5.8,4.0,1.2,0.2]|       0.0|
|    Iris-setosa|[5.7,4.4,1.5,0.4]|       0.0|
|    Iris-setosa|[5.4,3.9,1.3,0.4]|       0.0|
|    Iris-setosa|[5.1,3.5,1.4,0.3]|       0.0|
|    Iris-set

In [4]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [5]:
vector_assembler = VectorAssembler(\
inputCols=["sep_len", "sep_wid", "pet_len", "pet_wid"],\
outputCol="features")
df_temp = vector_assembler.transform(df)
df_temp.show(3)

+-------+-------+-------+-------+-----------+-----------------+
|sep_len|sep_wid|pet_len|pet_wid|      label|         features|
+-------+-------+-------+-------+-----------+-----------------+
|    5.1|    3.5|    1.4|    0.2|Iris-setosa|[5.1,3.5,1.4,0.2]|
|    4.9|    3.0|    1.4|    0.2|Iris-setosa|[4.9,3.0,1.4,0.2]|
|    4.7|    3.2|    1.3|    0.2|Iris-setosa|[4.7,3.2,1.3,0.2]|
+-------+-------+-------+-------+-----------+-----------------+
only showing top 3 rows



In [6]:
df = df_temp.drop("sep_len").drop("sep_wid").drop("pet_len").drop("pet_wid")
df.show(3)

+-----------+-----------------+
|      label|         features|
+-----------+-----------------+
|Iris-setosa|[5.1,3.5,1.4,0.2]|
|Iris-setosa|[4.9,3.0,1.4,0.2]|
|Iris-setosa|[4.7,3.2,1.3,0.2]|
+-----------+-----------------+
only showing top 3 rows



In [7]:
from pyspark.ml.feature import StringIndexer
l_indexer = StringIndexer(inputCol="label", outputCol="labelIndex")
df = l_indexer.fit(df).transform(df)
df.show(3)

+-----------+-----------------+----------+
|      label|         features|labelIndex|
+-----------+-----------------+----------+
|Iris-setosa|[5.1,3.5,1.4,0.2]|       0.0|
|Iris-setosa|[4.9,3.0,1.4,0.2]|       0.0|
|Iris-setosa|[4.7,3.2,1.3,0.2]|       0.0|
+-----------+-----------------+----------+
only showing top 3 rows



### Decision tree classifier

In [47]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [48]:
dt = DecisionTreeClassifier(labelCol="labelIndex", featuresCol="features")
model = dt.fit(trainingData)

In [49]:
predictions = model.transform(testData)

In [50]:
predictions.select("prediction", "labelIndex").show(150)

+----------+----------+
|prediction|labelIndex|
+----------+----------+
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       1.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       1.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       1.0|       1.0|
|       1.0|       1.0|
|       1.0|       1.0|
|       1.0|       1.0|
|       1.0|       1.0|
|       1.0|       1.0|
|       1.0|       1.0|
|       1.0|       1.0|
|       2.0|       1.0|
|       1.0|       1.0|
|       1.0|       1.0|
|       1.0|       1.0|
|       2.0|       2.0|
|       2.0|       2.0|
|       2.0|       2.0|
|       2.0|       2.0|
|       2.0|       2.0|
|       2.0|       2.0|
|       2.0|       2.0|
|       2.0|       2.0|
|       2.0|       2.0|
|       2.0|       2.0|
+----------+----------+



In [51]:
evaluator = MulticlassClassificationEvaluator(\
labelCol="labelIndex", predictionCol="prediction",\
metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

Test Error = 0.0810811 


In [52]:
print(model)

DecisionTreeClassificationModel (uid=DecisionTreeClassifier_42328d8c17c7f89abf91) of depth 4 with 13 nodes


In [8]:
(trainingData, testData) = df.randomSplit([0.7, 0.3])

In [53]:
model.save('./model.pyspark')

Py4JJavaError: An error occurred while calling o313.save.
: java.io.IOException: Path ./model.pyspark already exists. Please use write.overwrite().save(path) to overwrite it.
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:110)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)


In [9]:
from pyspark.ml.classification import DecisionTreeClassificationModel
new_model = DecisionTreeClassificationModel.load('./model.pyspark')

In [23]:
from pyspark.ml.linalg import Vectors
d = [{'features': Vectors.dense([5.6,2.9,3.6,1.3]), 'labelIndex' : 0.0}]
data = sqlContext.createDataFrame(d)



In [24]:
predictions = new_model.transform(data)

In [25]:
predictions.select("prediction", "labelIndex").show(2)

+----------+----------+
|prediction|labelIndex|
+----------+----------+
|       1.0|       0.0|
+----------+----------+



In [32]:
print(predictions.select("prediction").collect()[0])

Row(prediction=1.0)


In [34]:
import pyspark.sql.functions as f
print(predictions.select(f.collect_list('prediction')).first()[0])

[1.0]


In [35]:
print(predictions.select('prediction').first()[0])

1.0


In [37]:
labels = ["setosa", "versicolor", "virginica"]
print(labels[int(predictions.select('prediction').first()[0])])

versicolor
