### Prepare the Data

First, import the libraries you will need and prepare the training and test data:

In [2]:
# Import Spark SQL and Spark ML libraries
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression , RandomForestClassifier , DecisionTreeClassifier , MultilayerPerceptronClassifier
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [3]:
from pyspark.sql import SparkSession

# @hidden_cell
# This function is used to setup the access of Spark to your Object Storage. The definition contains your credentials.
# You might want to remove those credentials before you share your notebook.
def set_hadoop_config_with_credentials_b52a0b3d(name):
    """This function sets the Hadoop configuration so it is possible to
    access data from Bluemix Object Storage using Spark"""

    prefix = 'fs.swift.service.' + name
    hconf = sc._jsc.hadoopConfiguration()
    hconf.set(prefix + '.auth.url', 'https://identity.open.softlayer.com'+'/v3/auth/tokens')
    hconf.set(prefix + '.auth.endpoint.prefix', 'endpoints')
    hconf.set(prefix + '.tenant', '29ed2398afbc40789d5dd57b0df07ede')
    hconf.set(prefix + '.username', '9690261a6dfb48bfb72b94e07ad34f24')
    hconf.set(prefix + '.password', 'DAhp{2JtXY=[a!1#')
    hconf.setInt(prefix + '.http.port', 8080)
    hconf.set(prefix + '.region', 'dallas')
    hconf.setBoolean(prefix + '.public', False)

# you can choose any name
name = 'keystone'
set_hadoop_config_with_credentials_b52a0b3d(name)

spark = SparkSession.builder.getOrCreate()

In [4]:
csv= sqlContext.sql("Select * from nymc_csv");
csv.show(2);


In [5]:
data = sqlContext.sql("select Borough, NUMBEROFPERSONSINJURED + NUMBEROFPERSONSKILLED + NUMBEROFPEDESTRIANSINJURED + NUMBEROFPEDESTRIANSKILLED +NUMBEROFCYCLISTINJURED + NUMBEROFMOTORISTINJURED + NUMBEROFMOTORISTKILLED as Incidents from ppp");

indexer= StringIndexer(inputCol="Borough", outputCol="indx_borough")
indexed= indexer.fit(data).transform(data)
indx_feat=indexed.select("indx_borough", col("Incidents").alias("label"))



In [6]:
# Select features and label
'''data = csv.select("Borough", "VEHICLETYPECODE 1", ((col("Incidets")).cast("Int").alias("label")))'''

# Split the data
splits = indx_feat.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")
train_rows = train.count()
test_rows = test.count()
print "Training Rows:", train_rows, " Testing Rows:", test_rows
train.show(10)
test.show(10)

In [7]:
vectorAssembler = VectorAssembler(inputCols=["indx_borough"], outputCol="features")
dt = RandomForestClassifier(labelCol="label", featuresCol= "features")
pipeline = Pipeline(stages=[vectorAssembler, dt])

In [8]:
model = pipeline.fit(train)

predictions1 = model.transform(test)
predictions1.select("*").show(100)

In [9]:
evaluator= MulticlassClassificationEvaluator()
.setLabelCol("trueLabel")
.setPredictionCol("prediction")
.setMetricName("accuracy")
treeModel = model.stages[1]

print "Learned classification tree model:" , treeModel 
accuracy = evaluator.evaluate(predictions1)
print "Average Accuracy =", accuracy
print "Test Error = " , (1.0 - accuracy)

In [10]:
vectorAssembler = VectorAssembler(inputCols=["indx_borough"], outputCol="features")
dt = LogisticRegression(labelCol="label", featuresCol= "features")
pipeline = Pipeline(stages=[vectorAssembler, dt])


In [11]:
model = pipeline.fit(train)

predictions = model.transform(test)
predictions.select("*").show(100)

In [12]:
evaluator= MulticlassClassificationEvaluator()
.setLabelCol("trueLabel")
.setPredictionCol("prediction")
.setMetricName("accuracy")
treeModel = model.stages[1]

print "Learned classification tree model:" , treeModel 
accuracy = evaluator.evaluate(predictions)
print "Average Accuracy =", accuracy
print "Test Error = " , (1.0 - accuracy)


In [13]:
vectorAssembler = VectorAssembler(inputCols=["indx_borough"], outputCol="features")
dt = DecisionTreeClassifier(labelCol="label", featuresCol= "features")
pipeline = Pipeline(stages=[vectorAssembler, dt])


In [14]:
model = pipeline.fit(train)

predictions = model.transform(test)
predictions.select("*").show(100)

In [15]:
evaluator = MulticlassClassificationEvaluator()
.setLabelCol("trueLabel")
.setPredictionCol("prediction")
.setMetricName("accuracy")
treeModel = model.stages[1]

print "Learned classification tree model:" , treeModel 
accuracy = evaluator.evaluate(predictions)
print "Average Accuracy =", accuracy
print "Test Error = ", (1 - accuracy)
