In [1]:
# Load data

In [2]:
%fs ls FileStore/tables/dict.csv

In [3]:
%sql DROP TABLE IF EXISTS vehicle_process

In [4]:
#Create table

In [5]:
%sql

CREATE TABLE vehicle_process (
  Photo_Name STRING,
  Left DOUBLE,
  Top DOUBLE,
  Right DOUBLE,
  Bottom DOUBLE,
  Vehicle_Type STRING)
USING com.databricks.spark.csv
OPTIONS (path "FileStore/tables/dict.csv", header "true")

In [6]:
vehicles = spark.table("vehicle_process")
cols = vehicles.columns

In [7]:
display(vehicles)

In [8]:
#Preprocess Data

In [9]:
# Using One-Hot Encoding

In [10]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

categoricalColumns = ["Photo_Name","Vehicle_Type"]
stages = [] 
for categoricalCol in categoricalColumns:
  stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index")
  encoder = OneHotEncoder(inputCol=categoricalCol+"Index", outputCol=categoricalCol+"classVec")
  stages += [stringIndexer, encoder]

In [11]:
label_stringIdx = StringIndexer(inputCol = "Vehicle_Type", outputCol = "label")
stages += [label_stringIdx]

In [12]:
numericCols = ["Left", "Top", "Right", "Bottom"]
assemblerInputs = map(lambda c: c + "classVec", categoricalColumns) + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [13]:
pipeline = Pipeline(stages=stages)

pipelineModel = pipeline.fit(vehicles)
vehicles = pipelineModel.transform(vehicles)

selectedcols = ["label", "features"] + cols
vehicles = vehicles.select(selectedcols)
display(vehicles)

In [14]:
# Randomly split data into training and test sets. set seed for reproducibility

In [15]:
(trainingData, testData) = vehicles.randomSplit([0.7, 0.3], seed = 100)
print trainingData.count()
print testData.count()

In [16]:
# Fit and Evaluate Models

In [17]:
# Logistic Regression

In [18]:
# Create initial LogisticRegression model

In [19]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

lrModel = lr.fit(trainingData)

In [20]:
predictions = lrModel.transform(testData)

In [21]:
predictions.printSchema()

In [22]:
selected = predictions.select("label", "prediction", "probability", "Left", "Vehicle_Type")
display(selected)

In [23]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

In [24]:
evaluator.getMetricName()

In [25]:
print lr.explainParams()

In [26]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

In [27]:
# Create 5-fold CrossValidator and Run cross validations

In [28]:
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

cvModel = cv.fit(trainingData)

In [29]:
predictions = cvModel.transform(testData)

In [30]:
evaluator.evaluate(predictions)

In [31]:
print 'Model Intercept: ', cvModel.bestModel.interceptVector

In [32]:
weights = cvModel.bestModel.coefficientMatrix

weights = map(lambda w: (float(w),), weights) 
weightsDF = sqlContext.createDataFrame(weights, ["Feature Weight"])
display(weightsDF)

In [33]:
selected = predictions.select("label", "prediction", "probability", "Left", "Vehicle_Type")
display(selected)

In [34]:
# Decision Trees

In [35]:
# Create  Decision Tree Model

In [36]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=3)

dtModel = dt.fit(trainingData)

In [37]:
print "numNodes = ", dtModel.numNodes
print "depth = ", dtModel.depth

In [38]:
predictions = dtModel.transform(testData)

In [39]:
predictions.printSchema()

In [40]:
selected = predictions.select("label", "prediction", "probability", "Left", "Vehicle_Type")
display(selected)

In [41]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

In [42]:
dt.getImpurity()

In [43]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [1,2,6,10])
             .addGrid(dt.maxBins, [20,40,80])
             .build())

In [44]:
cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

cvModel = cv.fit(trainingData)

In [45]:
print "numNodes = ", cvModel.bestModel.numNodes
print "depth = ", cvModel.bestModel.depth

In [46]:
predictions = cvModel.transform(testData)

In [47]:
evaluator.evaluate(predictions)

In [48]:
selected = predictions.select("label", "prediction", "probability", "Left", "Vehicle_Type")
display(selected)

In [49]:
# Random Forest

In [50]:
# Create  RandomForest model

In [51]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol="label", featuresCol="features")

rfModel = rf.fit(trainingData)

In [52]:
predictions = rfModel.transform(testData)

In [53]:
predictions.printSchema()

In [54]:
selected = predictions.select("label", "prediction", "probability", "Left", "Vehicle_Type")
display(selected)

In [55]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

In [56]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [2, 4, 6])
             .addGrid(rf.maxBins, [20, 60])
             .addGrid(rf.numTrees, [5, 20])
             .build())

In [57]:
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

cvModel = cv.fit(trainingData)

In [58]:
predictions = cvModel.transform(testData)

In [59]:
evaluator.evaluate(predictions)

In [60]:
selected = predictions.select("label", "prediction", "probability", "Left", "Vehicle_Type")
display(selected)

In [61]:
# Make Predictions

In [62]:
bestModel = cvModel.bestModel

In [63]:
finalPredictions = bestModel.transform(vehicles)

In [64]:
evaluator.evaluate(finalPredictions)

In [65]:
finalPredictions.createOrReplaceTempView("finalPredictions")