# PySpark ML Tutorial

## Section 1: Getting Started with ML Pipeline

First step is to initiate the `sc`, `spark`, and prepare the data

In [51]:
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

import pandas as pd

In [2]:
conf = SparkConf().setMaster('local').setAppName('SparkBeginner')
sc = SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

In [3]:
# load all the other movielens tables into spark context
links_df = spark.read.csv('./input/ml-latest-small/links.csv', header=True)
movies_df = spark.read.csv('./input/ml-latest-small/movies.csv', header=True)
ratings_df = spark.read.csv('./input/ml-latest-small/ratings.csv', header=True)
tags_df = spark.read.csv('./input/ml-latest-small/tags.csv', header=True)
links_df.createOrReplaceTempView('links')
movies_df.createOrReplaceTempView('movies')
ratings_df.createOrReplaceTempView('ratings')
tags_df.createOrReplaceTempView('tags')

spark.catalog.listTables()

[Table(name='links', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='movies', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='ratings', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='tags', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

Next step, before diving into ML pipeline, we will first prepare a joined spark dataframe with data from `tags`, `ratings`, and `movies`

In [40]:
ml_df = tags_df.withColumnRenamed('timestamp', 'tag_timestamp').join(
    ratings_df.withColumnRenamed('timestamp', 'rating_timestamp'), on=['userId', 'movieId'], how='full_outer').join(
    movies_df, on='movieId', how='left_outer')

### Using `.withColumn()` and `.cast()` to force numeric data type

Additionally, Spark ML does not take any **non-numeric** values.

When dealing with features that's numeric but having dtype of "string", we need to force the type back to numeric (either `"integer"` or `"double"`). To do this, use `.cast()` method in combination with the `.withColumn()` method. It's important to note that .cast() works on columns, while .withColumn() works on DataFrames. The only argument that needs to be passed to `.cast()` is the kind of value to create, in string form. For integers, the argument is `"integer"` and for decimal numbers it's `"double"`.

The call to `spark_column.cast()` inside a call to `spark_dataframe.withColumn()` can be used to overwrite the already existing column, for instance:

```python
df = df.withColumn('col', df.col.cast('double'))
```

In [41]:
ml_df

DataFrame[movieId: string, userId: string, tag: string, tag_timestamp: string, rating: string, rating_timestamp: string, title: string, genres: string]

In [42]:
ml_df = ml_df.withColumn('rating', ml_df.rating.cast('integer')).select('movieId', 'userId', 'tag', 'rating', 'title', 'genres')

# split the genres by "|" to be used in .withColumn
genre_cols = F.split(ml_df.genres, '[,|]')

# take the first 3 genres from the "genres" list for each movie review
ml_df = ml_df.withColumn(
    'genre_1', genre_cols.getItem(0)).withColumn(
    'genre_2', genre_cols.getItem(1)).withColumn(
    'genre_3', genre_cols.getItem(2)).drop('title', 'genres')

ml_df.show()

+-------+------+----+------+---------+---------+--------+
|movieId|userId| tag|rating|  genre_1|  genre_2| genre_3|
+-------+------+----+------+---------+---------+--------+
| 117529|   103|null|     4|   Action|Adventure|   Drama|
|   2161|   103|null|     3|Adventure| Children| Fantasy|
|   2502|   104|null|     3|   Comedy|    Crime|    null|
|    356|   104|null|     4|   Comedy|    Drama| Romance|
|    616|   104|null|     3|Animation| Children|    null|
|   1201|   105|null|     4|   Action|Adventure| Western|
|  55247|   105|null|     5|   Action|Adventure|   Drama|
|   5618|   105|null|     4|Adventure|Animation| Fantasy|
|   5878|   105|null|     3|    Drama|  Romance|    null|
|    608|   105|null|     4|   Comedy|    Crime|   Drama|
|  61323|   105|null|     3|   Comedy|    Crime|   Drama|
|  64839|   105|null|     4|    Drama|     null|    null|
|  70286|   105|null|     4|  Mystery|   Sci-Fi|Thriller|
|  72641|   105|null|     4|    Drama|     null|    null|
|  81847|   10

## Building a `Pipeline` object for the pyspark machine learning

### Using `StringIndexer()` and `OneHotEncoder()` to transform the string-based categorical features

As displayed above, the `rating` column is now interger type. For the `genre_n` columns, we need to instead transform them into `one-hot encoded` values. This can be done following 2 steps:

1. Use the `StringIndexer()` from `pyspark.ml.features` module that uses an `Estimator` that maps strings to values and then a `Transformer` that creates mapped numeric values based on string column
2. Use the `OneHotEncoder()` from same module that takes in the numeric values from the `StringIndexer()` and generates one-hot encoded columns for each feature category.

**NOTE:**
The above transformation of the input data is considered to be "steps" in a pyspark `Pipeline`; since this step needs to be stable for both model training and deploying, it is important that the pipeline stays the same and is reuseable.

Use `.describe()` to obtain the descriptive statistics about any column of choice

In [46]:
ml_df.describe().show()

+-------+------------------+------------------+-----------+------------------+------------------+---------+---------+
|summary|           movieId|            userId|        tag|            rating|           genre_1|  genre_2|  genre_3|
+-------+------------------+------------------+-----------+------------------+------------------+---------+---------+
|  count|            102884|            102884|       3683|            102677|            102884|    86171|    57197|
|   mean|19732.228918004745|328.01602775941836|       null| 3.363966613749915|              null|     null|     null|
| stddev|35870.571562184814|183.15834507456145|       null|1.0903566711483974|              null|     null|     null|
|    min|                 1|                 1|"""artsy"""|                 0|(no genres listed)|Adventure|Animation|
|    max|             99992|                99|    zombies|                 5|           Western|  Western|  Western|
+-------+------------------+------------------+---------

Let's create a `StringIndexer()` and a `OneHotEncoder()` for each `genre_n` feature. Both of these 2 objects takes 2 major parameters:

1. The `inputCol` is the name of the column you want to index or encode using the `Estimator`
2. The `outputCol` is the name of the new column that the `Transformer` should create

In [56]:
# create stringindexer and onehotencoder for each genre feature
g1_indexer = StringIndexer(inputCol='genre_1', outputCol='genre_1_index', handleInvalid='skip') #"keep" puts NULL in separate bucket
g1_encoder = OneHotEncoder(inputCol='genre_1_index', outputCol='genre_1_fact')
g2_indexer = StringIndexer(inputCol='genre_2', outputCol='genre_2_index',  handleInvalid='skip')
g2_encoder = OneHotEncoder(inputCol='genre_2_index', outputCol='genre_2_fact')
g3_indexer = StringIndexer(inputCol='genre_3', outputCol='genre_3_index', handleInvalid='skip')
g3_encoder = OneHotEncoder(inputCol='genre_3_index', outputCol='genre_3_fact')

### Create a final ML-ready data wrapper step for the `Pipeline`

The last step in the `Pipeline` is to combine all of the columns containing our features **into a single column**. This has to be done **before modeling can take place** because every Spark modeling routine expects the data to be in this form.

You can do this by storing each of the values from a column as an entry in a vector. Then, from the model's point of view, every observation is a vector that contains all of the information about it and a label that tells the modeler what value that observation corresponds to. Because of this, the `pyspark.ml.feature` submodule contains a class called `VectorAssembler`. This `Transformer` takes all of the columns you specify and combines them into a new vector column.

In [57]:
vec_assembler = VectorAssembler(inputCols=['genre_1_fact', 'genre_2_fact', 'genre_3_fact'],
                                outputCol='features')

### Create a `Pipeline` to wrap all the `Transformer` and `Estimator` together

Pipeline is a class in the `pyspark.ml` module that combines all the `Estimators` and `Transformers` created. This lets us reuse the same modeling process over and over again by wrapping it up in one simple object.

In [58]:
movies_pipe = Pipeline(stages=[g1_indexer, g1_encoder,
                               g2_indexer, g2_encoder,
                               g3_indexer, g3_encoder,
                               vec_assembler])

In [59]:
# using the .fit() to create estimators and then .transform() to output result df
ml_df_transformed = movies_pipe.fit(ml_df).transform(ml_df)

In [60]:
ml_df_transformed.show()

+-------+------+----+------+---------+---------+--------+-------------+--------------+-------------+---------------+-------------+---------------+--------------------+
|movieId|userId| tag|rating|  genre_1|  genre_2| genre_3|genre_1_index|  genre_1_fact|genre_2_index|   genre_2_fact|genre_3_index|   genre_3_fact|            features|
+-------+------+----+------+---------+---------+--------+-------------+--------------+-------------+---------------+-------------+---------------+--------------------+
| 117529|   103|null|     4|   Action|Adventure|   Drama|          0.0|(18,[0],[1.0])|          1.0| (17,[1],[1.0])|          3.0| (16,[3],[1.0])|(51,[0,19,38],[1....|
|   2161|   103|null|     3|Adventure| Children| Fantasy|          3.0|(18,[3],[1.0])|          9.0| (17,[9],[1.0])|          4.0| (16,[4],[1.0])|(51,[3,27,39],[1....|
|    356|   104|null|     4|   Comedy|    Drama| Romance|          1.0|(18,[1],[1.0])|          0.0| (17,[0],[1.0])|          2.0| (16,[2],[1.0])|(51,[1,18,37],

### For standard Train/Test split, it is recommended to be done **after** the transformations

Use the DataFrame method `.randomSplit()` to split the transformed ml dataframe into two pieces. The exact ratio list (say `[0.6, 0.4]`) will split the records randomly with the designated ratios.

In [61]:
train, test = ml_df_transformed.randomSplit([0.6, 0.4])

In [64]:
train.describe().show()

+-------+-----------------+------------------+-----------+------------------+-------+---------+---------+------------------+------------------+------------------+
|summary|          movieId|            userId|        tag|            rating|genre_1|  genre_2|  genre_3|     genre_1_index|     genre_2_index|     genre_3_index|
+-------+-----------------+------------------+-----------+------------------+-------+---------+---------+------------------+------------------+------------------+
|  count|            34581|             34581|       1212|             34533|  34581|    34581|    34581|             34581|             34581|             34581|
|   mean|21200.52265695035|  324.878979786588|       null|3.3959111574436047|   null|     null|     null|1.5649923368323646|3.7205690986379802|3.5989994505653393|
| stddev|35756.71951384241|183.80604503118502|       null|1.0770596153504473|   null|     null|     null|2.0795986324372713| 3.703135382337923| 3.538601170942013|
|    min|             

In [65]:
test.describe().show()

+-------+------------------+------------------+--------------------+------------------+-------+---------+---------+------------------+------------------+------------------+
|summary|           movieId|            userId|                 tag|            rating|genre_1|  genre_2|  genre_3|     genre_1_index|     genre_2_index|     genre_3_index|
+-------+------------------+------------------+--------------------+------------------+-------+---------+---------+------------------+------------------+------------------+
|  count|             22616|             22616|                 859|             22577|  22616|    22616|    22616|             22616|             22616|             22616|
|   mean|21208.063760169793| 325.8613813229572|                null| 3.399787394250786|   null|     null|     null|1.5554032543332155|3.7286876547576937|3.6040856031128405|
| stddev|35964.071936322696|182.78807694002515|                null|1.0752086594646937|   null|     null|     null| 2.078884074985521| 

## Section 2: Modeling Steps after ML-Ready Data is Prepared

Using logistic regression as model to walk through the modeling process.

In [66]:
from pyspark.ml.classification import LogisticRegression

# Create a LogisticRegression Estimator
lr = LogisticRegression()

In [67]:
# Import the evaluation submodule
import pyspark.ml.evaluation as evals

# Create a BinaryClassificationEvaluator
evaluator = evals.BinaryClassificationEvaluator(metricName='areaUnderROC')

In [69]:
# Import the tuning submodule
import pyspark.ml.tuning as tune
import numpy as np

# Create the parameter grid
grid = tune.ParamGridBuilder()

# Add the hyperparameter
grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01))
grid = grid.addGrid(lr.elasticNetParam, [0, 1])

# Build the grid
grid = grid.build()

In [70]:
# Create the CrossValidator
cv = tune.CrossValidator(estimator=lr,
                         estimatorParamMaps=grid,
                         evaluator=evaluator)

In [None]:
# Fit cross validation models
models = cv.fit(train)

# Extract the best model
best_lr = models.bestModel