In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import DecisionTreeClassifier , RandomForestClassifier , MultilayerPerceptronClassifier
from pyspark.ml.feature import VectorAssembler, VectorIndexer
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, TrainValidationSplit

In [2]:
data = sqlContext.sql("SELECT ACTIVITY,NAME, CAST(count(VIOLATION_CODE) AS DOUBLE) as Total_violations,grade, CAST(score as DOUBLE) as label, CAST(sum(points) as DOUBLE) as Violation_points FROM rest_vio where score >= '60' group by NAME,ACTIVITY,grade,score")
data.show()

In [3]:
dat = data.select("total_violations","label","ACTIVITY","NAME","grade","Violation_points").where((col("Total_violations") <= "21"))
display(dat)

In [4]:
# Select features and label
indexer = StringIndexer(inputCol="grade", outputCol="Grade_Index")
indexed = indexer.fit(data).transform(data)
indexed_features = indexed.select("Total_violations",col("Grade_Index").alias("label"),"Violation_points")
ind_features = indexed_features.selectExpr("cast(Total_violations as int) Total_violations", 
                        "cast(label as int) label", 
                        "cast(Violation_points as int) Violation_points")
#display(indexed_features)
# Split the data
splits = ind_features.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")
print "We have %d training examples and %d test examples." % (train.count(), test.count())

In [5]:
meta = [f.metadata for f in indexed.schema.fields if f.name == "Grade_Index"]
meta[0]
dict(enumerate(meta[0]["ml_attr"]["vals"]))

In [6]:
vectorAssembler = VectorAssembler(inputCols=["Total_violations","Violation_points"], outputCol="features")
#Model1 - Decision Tree 
dt = DecisionTreeClassifier(labelCol="label", featuresCol= "features")
pipeline = Pipeline(stages=[vectorAssembler, dt])

In [7]:
model = pipeline.fit(train)

In [8]:
predictions = model.transform(test)
predictions.select("*").show(1000)

In [9]:
evaluator = MulticlassClassificationEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print "Average Accuracy =", accuracy
print "Test Error = ", (1 - accuracy)

treeModel = model.stages[1]
# summary only
print(treeModel)

In [10]:
vectorAssembler2 = VectorAssembler(inputCols=["Total_violations","Violation_points"], outputCol="features")
rf = RandomForestClassifier(labelCol="label", featuresCol= "features", numTrees=30)
pipeline2 = Pipeline(stages=[vectorAssembler2,rf])

In [11]:
model2 = pipeline2.fit(train)

# Make predictions.
predictions2 = model2.transform(test)

# Select example rows to display.
predictions2.select("*").show(5)

In [12]:
evaluator2 = MulticlassClassificationEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="accuracy")
accuracy2 = evaluator2.evaluate(predictions2)
print "Average Accuracy =", accuracy2
print "Test Error = ", (1 - accuracy2)

rfModel = model2.stages[1]
print(rfModel)  # summary only