# Machine Learning in Spark

In [1]:
import pyspark
spark = pyspark.sql.SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [2]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import feature
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoderEstimator

In [3]:
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

In [4]:
df = spark.read.format('csv').option('header', 'true').\
load('../../../../../data/births/US_births_2000-2014_SSA.csv')

In [6]:
df.toPandas().head(3)

In [7]:
df.dtypes

In [8]:
df = df.withColumn('births', df['births'].cast('int'))
df = df.withColumn('day_of_week', df['day_of_week'].cast('int'))
df = df.withColumn('date_of_month', df['date_of_month'].cast('int'))
df = df.withColumn('month', df['month'].cast('int'))
df = df.withColumn('year', df['year'].cast('int'))

In [10]:
ohe = feature.OneHotEncoderEstimator(inputCols=['date_of_month',
                                                'day_of_week'],
                                     outputCols=['date_vec',
                                                  'day_vec'],
                                     dropLast=True)
one_hot_encoded = ohe.fit(df).transform(df)
one_hot_encoded.head()

Note the 'SparseVector' we've created!

In [11]:
features = ['year', 'month', 'date_of_month', 'day_of_week']

target = 'births'

vector = VectorAssembler(inputCols=features, outputCol='features')
vectorized_df = vector.transform(one_hot_encoded)

The Vector Assembler is often what we want when we're building a model in Spark. [How does the VectorAssembler work?](https://spark.apache.org/docs/2.1.0/ml-features.html#vectorassembler)

In [12]:
vectorized_df.columns

In [13]:
rf_model = RandomForestRegressor(featuresCol='features',
                                 labelCol='births',
                                 predictionCol="prediction").fit(vectorized_df)

In [14]:
predictions = rf_model.transform(vectorized_df).select("births", "prediction")
predictions.head(3)

Let's evaluate our model! [Here](https://spark.apache.org/docs/2.2.0/mllib-evaluation-metrics.html) is a reference for the many metrics available in Spark.

In [16]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='births')

evaluator.evaluate(predictions, {evaluator.metricName:"r2"})

In [17]:
evaluator.evaluate(predictions, {evaluator.metricName:"mae"})

In [18]:
one_hot_encoder = OneHotEncoderEstimator(inputCols=['date_of_month',
                                                'day_of_week'],
                                     outputCols=['date_vec',
                                                  'day_vec'],
                                     dropLast=True)
vector_assember = VectorAssembler(inputCols=features,
                                  outputCol='features')
random_forest = RandomForestRegressor(featuresCol='features',
                                      labelCol='births')
stages = [one_hot_encoder, vector_assember, random_forest]

pipeline = Pipeline(stages=stages)

Note: The stages in a pipeline can be either *Transformers* or *Estimators*. An estimator fits a DataFrame to produce a Transformer.

In [19]:
random_forest.params

In [20]:
params = ParamGridBuilder().addGrid(random_forest.maxDepth,
                                    [5,10,15]).addGrid(random_forest.numTrees,
                                                       [20,50,100]).build()

In [25]:
reg_evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='births',
                                    metricName = 'mae')

In [26]:
cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=params,
    evaluator=reg_evaluator,
    parallelism=4
)

In [22]:
df.show(n=5)

In [39]:
df.limit(1000)

In [34]:
cross_validated_model = cv.fit(df.limit(1000).cache())

In [35]:
cross_validated_model.avgMetrics

In [36]:
cross_validated_model.

## .bestModel

In [37]:
cross_validated_model.bestModel.stages

In [38]:
cross_validated_model.bestModel.stages[2].getNumTrees