In [None]:
!rm -rf metastore_db/
from pyspark.sql import *
sqlContext = SQLContext(sc)

In [None]:
import numpy as np

from pyspark.ml.linalg import Vectors
from pyspark.ml.param import Param, Params

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier, RandomForestClassificationModel
from pyspark.ml.feature import StringIndexer, IndexToString, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel

import pyspark.mllib.util

In [None]:
!wget https://archive.ics.uci.edu/ml/machine-learning-databases/wine/wine.data

In [None]:
rawdata = sc.textFile("wine.data").map(lambda line: line.split(","))

In [None]:
rawdata.take(1)

The first column of each entry is an integer classification. 
This is followed by 13 continuous features.
Construct a dataframe with two columns. 
Name the first column "label" and the second one "features".
The label column will need to a float.
This feature column should contain a dense vector of 13 doubles.

In [None]:
# fix the return value so it behaves as shown in the next cell
def row_to_tuple(array_of_strings):
    return (array_of_strings[0], Vectors.dense(map(float, array_of_strings[1:])))

In [None]:
row_to_tuple(rawdata.take(1)[0]) == ('1',
 Vectors.dense([14.23, 1.71, 2.43, 15.6, 127.0, 2.8, 3.06, 0.28, 2.29, 5.64, 1.04, 3.92, 1065.0]))

In [None]:
dataset = rawdata.map(lambda a: row_to_tuple(a)).toDF(("label", "features"))

In [None]:
dataset.show(10)

In [None]:
labelIndexer = StringIndexer(inputCol="label", outputCol="indexed")

In [None]:
li_model = labelIndexer.fit(dataset)

In [None]:
indexedDataset = li_model.transform(dataset)

In [None]:
indexedDataset.take(2)

In [None]:
#split into training and test sets
splits = indexedDataset.randomSplit([70.0, 30.0], 24)

In [None]:
(splits[0].count(), splits[1].count())

In [None]:
rf = RandomForestClassifier(labelCol="indexed", seed=42)

In [None]:
#fit a model to the training data
model = rf.fit(splits[0])

In [None]:
#use the model to generate predictions from the test set
predictions = model.transform(splits[1])
predictions

In [None]:
#use a MulticlassClassificationEvaluator to evaluate the results
evaluator = MulticlassClassificationEvaluator(labelCol="indexed", predictionCol="prediction", metricName="f1")
evaluator.evaluate(predictions)

In [None]:
print model.toDebugString

In [None]:
#look at the featureImportances
print model.featureImportances.toArray()

In [None]:
print rf.explainParams()

In [None]:
rf.extractParamMap()