# Machine Learning in Spark

In [None]:
import pyspark

# un-comment the following lines if running locally
spark = pyspark.sql.SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [None]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import feature
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

This example assumes that we have a holdout validation dataset somewhere else, so we don't need to perform a train-test split, we only need to perform cross validation

Follow [these instructions](https://docs.databricks.com/data/data.html#import-data-1) to import `US_births_2000-2014_SSA.csv` into Databricks

In [None]:
!tar -zxvf data.tar

In [None]:
# this file path will be different if you are running Spark locally
df = spark.read.format('csv').option('header', 'true').load('data/US_births_2000-2014_SSA.csv')
#load('/FileStore/tables/US_births_2000_2014_SSA-daa0e.csv')

In [None]:
df.toPandas().head(3)

In [None]:
df.dtypes

In [None]:
df = df.withColumn('births', df['births'].cast('int'))
df = df.withColumn('day_of_week', df['day_of_week'].cast('int'))
df = df.withColumn('date_of_month', df['date_of_month'].cast('int'))
df = df.withColumn('month', df['month'].cast('int'))
df = df.withColumn('year', df['year'].cast('int'))

In [None]:
ohe = feature.OneHotEncoder(inputCols=['date_of_month',
                                                'day_of_week'],
                                     outputCols=['date_vec',
                                                  'day_vec'],
                                     dropLast=True)
one_hot_encoded = ohe.fit(df).transform(df)
one_hot_encoded.head()

Note the 'SparseVector' we've created!

In [None]:
features = ['year', 'month', 'date_of_month', 'day_of_week']

target = 'births'

vector = VectorAssembler(inputCols=features, outputCol='features')
vectorized_df = vector.transform(one_hot_encoded)

The Vector Assembler is often what we want when we're building a model in Spark. [How does the VectorAssembler work?](https://spark.apache.org/docs/2.1.0/ml-features.html#vectorassembler)

In [None]:
vectorized_df.columns

In [None]:
rf_model = RandomForestRegressor(featuresCol='features',
                                 labelCol='births',
                                 predictionCol="prediction").fit(vectorized_df)

In [None]:
predictions = rf_model.transform(vectorized_df).select("births", "prediction")
predictions.head(3)

Let's evaluate our model! [Here](https://spark.apache.org/docs/2.2.0/mllib-evaluation-metrics.html) is a reference for the many metrics available in Spark.

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='births')

evaluator.evaluate(predictions, {evaluator.metricName:"r2"})

In [None]:
evaluator.evaluate(predictions, {evaluator.metricName:"mae"})

In [None]:
one_hot_encoder = OneHotEncoder(inputCols=['date_of_month',
                                                'day_of_week'],
                                     outputCols=['date_vec',
                                                  'day_vec'],
                                     dropLast=True)
vector_assember = VectorAssembler(inputCols=features,
                                  outputCol='features')
random_forest = RandomForestRegressor(featuresCol='features',
                                      labelCol='births')
stages = [one_hot_encoder, vector_assember, random_forest]

pipeline = Pipeline(stages=stages)

Note: The stages in a pipeline can be either *Transformers* or *Estimators*. An estimator fits a DataFrame to produce a Transformer.

In [None]:
random_forest.params

In [None]:
params = ParamGridBuilder().addGrid(random_forest.maxDepth,
                                    [5,10,15]).addGrid(random_forest.numTrees,
                                                       [20,50,100]).build()

In [None]:
reg_evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='births',
                                    metricName = 'mae')

In [None]:
cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=params,
    evaluator=reg_evaluator,
    parallelism=4
)

In [None]:
df.show(n=5)

In [None]:
df.limit(1000)

In [None]:
cross_validated_model = cv.fit(df.limit(1000).cache())

In [None]:
cross_validated_model.avgMetrics

In [None]:
cross_validated_model.

## .bestModel

In [None]:
cross_validated_model.bestModel.stages

In [None]:
cross_validated_model.bestModel.stages[2].getNumTrees

## Challenge

Look at [this documentation](https://docs.databricks.com/data/databricks-datasets.html) to find large datasets that come pre-loaded on DBFS (Databricks file system).  Choose one, and build an ML model based on it.

In [None]:
display(dbutils.fs.ls("/databricks-datasets"))

In [None]:
display(dbutils.fs.ls("/databricks-datasets/Rdatasets/"))

In [None]:
with open("/databricks-datasets/Rdatasets/README.md") as f:
    x = ''.join(f.readlines())

print(x)

^ I'm not sure why that breaks but I'm going to leave it there for now