### Import stuff and setup

In [1]:
sc.stop()

NameError: name 'sc' is not defined

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *

# First setup
conf = SparkConf().setAppName("BDProject").setMaster("local")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

### Random forest

In [35]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

def TrainRF(trainingData,testData,numTrees=10):
    # Train a RandomForest model.
    rf = RandomForestClassifier(numTrees=numTrees)
    model = rf.fit(trainingData)
    # Make predictions.
    predictions = model.transform(testData)
    # Select (prediction, true label) and compute test error
    evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)
    print("Test Error = %g, accuracy = %g" % (1.0 - accuracy,accuracy))
    
    return model

### Multi-layer perceptron

In [36]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

def TrainMLP(trainingData,testData,layers):    
    # specify layers for the neural network:
    # input layer of size (features), two intermediate layers
    # and output of size (classes)

    # create the trainer and set its parameters
    mlp = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128)

    # train the model
    model = mlp.fit(trainingData)

    # Make predictions.
    predictions = model.transform(testData)

    # Select (prediction, true label) and compute test error
    evaluator = MulticlassClassificationEvaluator(
        labelCol="label", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)
    print("Test Error = %g, accuracy = %g" % (1.0 - accuracy,accuracy))
    
    return model

### Read and prepare data, run models

In [6]:
from utils import *

Covertype

In [23]:
# Load and parse the data file, converting it to a DataFrame.
covertype = sqlContext.read.load('Datasets/Covertype/covtype.data', 
                          format='com.databricks.spark.csv', 
                          header='false', 
                          inferSchema='true')

In [37]:
labelcol = "_c54"
readycols = covertype.columns[0:-1]
dataset = covertype
nclasses = 7
dataset_name = 'covertype'

trainingData,testData = Prepare(dataset,labelcol,readycols,[])

In [None]:
model = TrainRF(trainingData,testData)
model.save('Models/'+dataset_name+'_RF')

In [None]:
layers = [len(trainingData.select('features').take(1)[0][0]), 100, 100, nclasses]
model = TrainMLP(trainingData,testData,layers)
model.save('Models/'+dataset_name+'_MLP')

Wearable

In [7]:
# Load and parse the data file, converting it to a DataFrame.
wearable = sqlContext.read.load('Datasets/Wearable/dataset-har-PUC-Rio-ugulino.csv', 
                          format='com.databricks.spark.csv', 
                          header='true', 
                          delimiter=';',
                          inferSchema='true')

In [8]:
strtodouble = ['how_tall_in_meters','body_mass_index','z4']
for col in strtodouble:
    wearable = wearable.withColumn(col, regexp_replace(col, ',', '.'))
    wearable = wearable.withColumn(col, wearable[col].cast("double"))

In [9]:
wearable = wearable.na.drop(subset=["z4"])

In [10]:
labelcol = "class"
readycols = [k for k,v in wearable.dtypes if v in ['int','double']]
categoricalColumns = [k for k,v in wearable.dtypes if v == 'string' and k!= labelcol]
dataset = wearable
nclasses = 5
dataset_name = 'wearable'

trainingData,testData = Prepare(dataset,labelcol,readycols,categoricalColumns)

In [68]:
accuracy, model = TrainRF(trainingData,testData)
model.save('Models/'+dataset_name+'_RF')

Test Error = 0.126227, accuracy = 0.873773


In [None]:
layers = [len(trainingData.select('features').take(1)[0][0]), 100, 100, nclasses]
accuracy, model = TrainMLP(trainingData,testData,layers)
model.save('Models/'+dataset_name+'_MLP')