In [26]:
from __future__ import print_function
# $example on$
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# $example off$
from pyspark.sql import SparkSession

In [27]:
if __name__ == "__main__":
    spark = SparkSession\
        .builder\
        .appName("DecisionTreeClassificationExample")\
        .getOrCreate()

In [28]:
data = spark.read.load("Social_Network_Ads.csv",format="csv", sep=",", inferSchema="true", header="true")
data.show(5)

+--------+------+---+---------------+---------+
| User ID|Gender|Age|EstimatedSalary|Purchased|
+--------+------+---+---------------+---------+
|15624510|  Male| 19|          19000|        0|
|15810944|  Male| 35|          20000|        0|
|15668575|Female| 26|          43000|        0|
|15603246|Female| 27|          57000|        0|
|15804002|  Male| 19|          76000|        0|
+--------+------+---+---------------+---------+
only showing top 5 rows



In [29]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="Gender", outputCol="GenderIndex")
data = indexer.fit(data).transform(data)
data = data.select("Age","EstimatedSalary","GenderIndex","Purchased")
data.show(5)

+---+---------------+-----------+---------+
|Age|EstimatedSalary|GenderIndex|Purchased|
+---+---------------+-----------+---------+
| 19|          19000|        1.0|        0|
| 35|          20000|        1.0|        0|
| 26|          43000|        0.0|        0|
| 27|          57000|        0.0|        0|
| 19|          76000|        1.0|        0|
+---+---------------+-----------+---------+
only showing top 5 rows



In [31]:
from pyspark.ml.feature import OneHotEncoderEstimator
encoder = OneHotEncoderEstimator(inputCols=["GenderIndex"],outputCols=["Gender"])
model = encoder.fit(data)
data = model.transform(data)
data = data.select("Age","EstimatedSalary","Gender","Purchased")
data.show(5)

+---+---------------+-------------+---------+
|Age|EstimatedSalary|       Gender|Purchased|
+---+---------------+-------------+---------+
| 19|          19000|    (1,[],[])|        0|
| 35|          20000|    (1,[],[])|        0|
| 26|          43000|(1,[0],[1.0])|        0|
| 27|          57000|(1,[0],[1.0])|        0|
| 19|          76000|    (1,[],[])|        0|
+---+---------------+-------------+---------+
only showing top 5 rows



In [32]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=[ "Age", "EstimatedSalary","Gender"],outputCol="features")
data = assembler.transform(data)
data=data.select("features", "Purchased")
data.show(5)

+------------------+---------+
|          features|Purchased|
+------------------+---------+
|[19.0,19000.0,0.0]|        0|
|[35.0,20000.0,0.0]|        0|
|[26.0,43000.0,1.0]|        0|
|[27.0,57000.0,1.0]|        0|
|[19.0,76000.0,0.0]|        0|
+------------------+---------+
only showing top 5 rows



In [33]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors
scaler = MinMaxScaler(inputCol="features", outputCol="featuresScaled")
scalerModel = scaler.fit(data)
data = scalerModel.transform(data)
data = data.selectExpr("featuresScaled as features", "Purchased as label")
data.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.02380952380952...|    0|
|[0.40476190476190...|    0|
|[0.19047619047619...|    0|
|[0.21428571428571...|    0|
|[0.02380952380952...|    0|
+--------------------+-----+
only showing top 5 rows



In [34]:
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")
model = dt.fit(trainingData)

In [35]:
# Make predictions.
predictions = model.transform(testData) 
predictions.select("prediction", "label", "features").show(5)

+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       0.0|    0|[0.0,0.2148148148...|
|       0.0|    0|[0.0,0.3925925925...|
|       0.0|    0|[0.02380952380952...|
|       0.0|    0|[0.02380952380952...|
|       0.0|    0|[0.04761904761904...|
+----------+-----+--------------------+
only showing top 5 rows



In [47]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

Test Error = 0.121951 


In [48]:

    # summary only
print(model)
    # $example off$

 

DecisionTreeClassificationModel (uid=DecisionTreeClassifier_182f76ce415b) of depth 5 with 25 nodes


In [49]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics

# Scala version implements .roc() and .pr()
# Python: https://spark.apache.org/docs/latest/api/python/_modules/pyspark/mllib/common.html
# Scala: https://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.html
class CurveMetrics(BinaryClassificationMetrics):
    def __init__(self, *args):
        super(CurveMetrics, self).__init__(*args)

    def _to_list(self, rdd):
        points = []
        # Note this collect could be inefficient for large datasets 
        # considering there may be one probability per datapoint (at most)
        # The Scala version takes a numBins parameter, 
        # but it doesn't seem possible to pass this from Python to Java
        for row in rdd.collect():
            # Results are returned as type scala.Tuple2, 
            # which doesn't appear to have a py4j mapping
            points += [(float(row._1()), float(row._2()))]
        return points

    def get_curve(self, method):
        rdd = getattr(self._java_model, method)().toJavaRDD()
        return self._to_list(rdd)

In [51]:
import matplotlib.pyplot as plt




# Returns as a list (false positive rate, true positive rate)
preds = predictions.select('label','probability').rdd.map(lambda row: (float(row['probability'][1]), float(row['label'])))
roc = CurveMetrics(preds).get_curve('roc')

plt.figure()

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

In [50]:
x_val = [x[0] for x in points]
y_val = [x[1] for x in points]
plt.title(title)
plt.xlabel(xlabel)
plt.ylabel(ylabel)
plt.plot(x_val, y_val)

NameError: name 'points' is not defined

<Figure size 432x288 with 0 Axes>