In [0]:
from pyspark.context import SparkContext

In [0]:
sc=SparkContext.getOrCreate()

In [0]:

#Load the CSV file into a RDD
irisData = sc.textFile("/FileStore/tables/Stores-4.csv")
irisData.persist()


Out[9]: /FileStore/tables/Stores-4.csv MapPartitionsRDD[159] at textFile at NativeMethodAccessorImpl.java:0

In [0]:
#Remove the first line (contains headers)
dataLines = irisData.filter(lambda x: "Store_Sales" not in x)
dataLines.count()

Out[10]: 896

In [0]:
#Convert the RDD into a Dense Vector. As a part of this exercise
import math
from pyspark.ml.linalg import Vectors, VectorUDT
#from pyspark.ml.linalg import Vectors

In [0]:
# Change labels to numeric ones

def transformToNumeric( inputStr) :
    attList=inputStr.split(",")
    
    #Set default to setosa
    irisValue=1.0
    if attList[4] == "versicolor":
        irisValue=2.0
    if attList[4] == "virginica":
        irisValue=3.0
       
    #Filter out columns not wanted at this stage
    values= Vectors.dense([ irisValue, \
                     float(attList[0]),  \
                     float(attList[1]),  \
                     float(attList[2]),  \
                     float(attList[3])  \
                     ])
    return values

In [0]:
#Change to a Vector
irisVectors = dataLines.map(transformToNumeric)
irisVectors.collect()

Out[13]: [DenseVector([1.0, 1.0, 1659.0, 1961.0, 530.0]),
 DenseVector([1.0, 2.0, 1461.0, 1752.0, 210.0]),
 DenseVector([1.0, 3.0, 1340.0, 1609.0, 720.0]),
 DenseVector([1.0, 4.0, 1451.0, 1748.0, 620.0]),
 DenseVector([1.0, 5.0, 1770.0, 2111.0, 450.0]),
 DenseVector([1.0, 6.0, 1442.0, 1733.0, 760.0]),
 DenseVector([1.0, 7.0, 1542.0, 1858.0, 1030.0]),
 DenseVector([1.0, 8.0, 1261.0, 1507.0, 1020.0]),
 DenseVector([1.0, 9.0, 1090.0, 1321.0, 680.0]),
 DenseVector([1.0, 10.0, 1030.0, 1235.0, 1130.0]),
 DenseVector([1.0, 11.0, 1187.0, 1439.0, 1090.0]),
 DenseVector([1.0, 12.0, 1751.0, 2098.0, 720.0]),
 DenseVector([1.0, 13.0, 1746.0, 2064.0, 1050.0]),
 DenseVector([1.0, 14.0, 1615.0, 1931.0, 1160.0]),
 DenseVector([1.0, 15.0, 1469.0, 1756.0, 770.0]),
 DenseVector([1.0, 16.0, 1644.0, 1950.0, 790.0]),
 DenseVector([1.0, 17.0, 1578.0, 1907.0, 1440.0]),
 DenseVector([1.0, 18.0, 1703.0, 2045.0, 670.0]),
 DenseVector([1.0, 19.0, 1438.0, 1731.0, 1030.0]),
 DenseVector([1.0, 20.0, 1940.0, 2340.0, 9

In [0]:
#Transform to a Data Frame for input to Machine Learing
#Drop columns that are not required (low correlation)

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

def transformToLabeledPoint(inStr) :
    attList=inStr.split(",")
    lp = ( attList[4], Vectors.dense([attList[0],attList[2],attList[3]]))
    return lp

In [0]:
irisLp = dataLines.map(transformToLabeledPoint)
irisDF = sqlContext.createDataFrame(irisLp,["label", "features"])
irisDF.select("label","features").show(10)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|66490|  [1.0,1961.0,530.0]|
|39820|  [2.0,1752.0,210.0]|
|54010|  [3.0,1609.0,720.0]|
|53730|  [4.0,1748.0,620.0]|
|46620|  [5.0,2111.0,450.0]|
|45260|  [6.0,1733.0,760.0]|
|72240| [7.0,1858.0,1030.0]|
|37720| [8.0,1507.0,1020.0]|
|46310|  [9.0,1321.0,680.0]|
|44150|[10.0,1235.0,1130.0]|
+-----+--------------------+
only showing top 10 rows



In [0]:
#Indexing needed as pre-req for Decision Trees
from pyspark.ml.feature import StringIndexer
stringIndexer = StringIndexer(inputCol="label", outputCol="indexed")
si_model = stringIndexer.fit(irisDF)
td = si_model.transform(irisDF)
td.collect()

Out[17]: [Row(label='66490', features=DenseVector([1.0, 1961.0, 530.0]), indexed=566.0),
 Row(label='39820', features=DenseVector([2.0, 1752.0, 210.0]), indexed=197.0),
 Row(label='54010', features=DenseVector([3.0, 1609.0, 720.0]), indexed=399.0),
 Row(label='53730', features=DenseVector([4.0, 1748.0, 620.0]), indexed=393.0),
 Row(label='46620', features=DenseVector([5.0, 2111.0, 450.0]), indexed=283.0),
 Row(label='45260', features=DenseVector([6.0, 1733.0, 760.0]), indexed=264.0),
 Row(label='72240', features=DenseVector([7.0, 1858.0, 1030.0]), indexed=639.0),
 Row(label='37720', features=DenseVector([8.0, 1507.0, 1020.0]), indexed=169.0),
 Row(label='46310', features=DenseVector([9.0, 1321.0, 680.0]), indexed=281.0),
 Row(label='44150', features=DenseVector([10.0, 1235.0, 1130.0]), indexed=250.0),
 Row(label='71280', features=DenseVector([11.0, 1439.0, 1090.0]), indexed=627.0),
 Row(label='57620', features=DenseVector([12.0, 2098.0, 720.0]), indexed=455.0),
 Row(label='60470', feat

In [0]:

#Split into training and testing data
(trainingData, testData) = td.randomSplit([0.9, 0.1])
trainingData.collect()
#testData.count()
#testData.collect()

Out[18]: [Row(label='102310', features=DenseVector([409.0, 1587.0, 1310.0]), indexed=81.0),
 Row(label='102920', features=DenseVector([433.0, 1638.0, 680.0]), indexed=82.0),
 Row(label='14920', features=DenseVector([32.0, 1508.0, 990.0]), indexed=85.0),
 Row(label='21300', features=DenseVector([353.0, 1686.0, 850.0]), indexed=89.0),
 Row(label='21650', features=DenseVector([373.0, 2254.0, 1340.0]), indexed=91.0),
 Row(label='21750', features=DenseVector([278.0, 1869.0, 1030.0]), indexed=92.0),
 Row(label='21830', features=DenseVector([253.0, 1907.0, 680.0]), indexed=93.0),
 Row(label='24410', features=DenseVector([224.0, 1606.0, 1200.0]), indexed=97.0),
 Row(label='25630', features=DenseVector([273.0, 1571.0, 1190.0]), indexed=99.0),
 Row(label='25820', features=DenseVector([27.0, 1777.0, 1100.0]), indexed=100.0),
 Row(label='25950', features=DenseVector([261.0, 1426.0, 980.0]), indexed=101.0),
 Row(label='26220', features=DenseVector([271.0, 1439.0, 780.0]), indexed=102.0),
 Row(label

In [0]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.linalg import Vectors, VectorUDT
#from pyspark.ml.linalg import Vectors as MLLibVectors
#from pyspark.ml.linalg import VectorUDT
#from pyspark.sql.functions import udf


In [0]:
#Create the model
dtClassifer = DecisionTreeClassifier(maxDepth=2, labelCol="indexed")
dtModel = dtClassifer.fit(trainingData)
dtModel.numNodes
dtModel.depth


Out[20]: 2

In [0]:
#Predict on the test data
predictions = dtModel.transform(trainingData)


In [0]:
predictions.select("prediction","indexed","label","features").collect()

Out[22]: [Row(prediction=1.0, indexed=81.0, label='102310', features=DenseVector([409.0, 1587.0, 1310.0])),
 Row(prediction=1.0, indexed=82.0, label='102920', features=DenseVector([433.0, 1638.0, 680.0])),
 Row(prediction=1.0, indexed=85.0, label='14920', features=DenseVector([32.0, 1508.0, 990.0])),
 Row(prediction=1.0, indexed=89.0, label='21300', features=DenseVector([353.0, 1686.0, 850.0])),
 Row(prediction=71.0, indexed=91.0, label='21650', features=DenseVector([373.0, 2254.0, 1340.0])),
 Row(prediction=1.0, indexed=92.0, label='21750', features=DenseVector([278.0, 1869.0, 1030.0])),
 Row(prediction=1.0, indexed=93.0, label='21830', features=DenseVector([253.0, 1907.0, 680.0])),
 Row(prediction=1.0, indexed=97.0, label='24410', features=DenseVector([224.0, 1606.0, 1200.0])),
 Row(prediction=1.0, indexed=99.0, label='25630', features=DenseVector([273.0, 1571.0, 1190.0])),
 Row(prediction=1.0, indexed=100.0, label='25820', features=DenseVector([27.0, 1777.0, 1100.0])),
 Row(predicti

In [0]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",labelCol="indexed")
evaluator.evaluate(predictions)

Out[23]: 0.000675613424847584

In [0]:
#Draw a confusion matrix
labelList=predictions.select("indexed","label").distinct().toPandas()
predictions.groupBy("indexed","prediction").count().show()


+-------+----------+-----+
|indexed|prediction|count|
+-------+----------+-----+
|  304.0|       1.0|    1|
|  448.0|       1.0|    1|
|  594.0|       1.0|    1|
|  695.0|       1.0|    1|
|   10.0|       1.0|    2|
|  637.0|       1.0|    1|
|  796.0|       1.0|    1|
|  341.0|       1.0|    1|
|  390.0|       1.0|    1|
|  435.0|       1.0|    1|
|   24.0|       1.0|    2|
|    6.0|       1.0|    2|
|  231.0|       1.0|    1|
|  567.0|       1.0|    1|
|  128.0|       1.0|    1|
|  659.0|       1.0|    1|
|  129.0|       1.0|    1|
|  202.0|      71.0|    1|
|  301.0|       1.0|    1|
|  452.0|       1.0|    1|
+-------+----------+-----+
only showing top 20 rows

