# Comics Rx
## [A comic book recommendation system](https://github.com/MangrobanGit/comics_rx)
<img src="https://images.unsplash.com/photo-1514329926535-7f6dbfbfb114?ixlib=rb-1.2.1&ixid=eyJhcHBfaWQiOjEyMDd9&auto=format&fit=crop&w=2850&q=80" width="400" align='left'>

---

# 6 - ALS Model - Reduced Data: Grid Search + Cross-Validation

This time, as explored in the EDA NB, let's consider removing customers who we feel have too few or too many purchases to influence the model in the intended way.

Examples:
- Too few - Customers who have only bought 1 comic (series).
- Too many - Customers with > 1000 series (for example, think all eBay customers are rolled into one account number).

# Libraries

In [13]:
%matplotlib inline
%load_ext autoreload
# %autoreload 1 #would be where you need to specify the files
# %aimport comic_recs

import pandas as pd # dataframes
import os
import pickle

# Data storage
from sqlalchemy import create_engine # SQL helper
#import psycopg2 as psql #PostgreSQL DBs

# import necessary libraries
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
# from pyspark.sql.types import (StructType, StructField, IntegerType
#                                ,FloatType, LongType, StringType)
from pyspark.sql.types import *

import pyspark.sql.functions as F
from pyspark.sql.functions import col, explode, lit, isnan, when, count
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql import DataFrame

# Custom
import lib.data_fcns as dfc
import lib.keys  # Custom keys lib
import lib.comic_recs as cr

import time
import itertools
from functools import reduce
import numpy as np

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [2]:
# instantiate SparkSession object
spark = pyspark.sql.SparkSession.builder.master("local[*]").getOrCreate()
# spark = SparkSession.builder.master("local").getOrCreate()

## Import Data

We've previously set aside the dataset into a `json` file.

In [3]:
# We have previously created a version of the transactions table and filtered it down.
sold = spark.read.json('raw_data/als_input_filtered.json')

In [4]:
# Persist the data
sold.persist()

DataFrame[account_id: bigint, bought: bigint, comic_id: bigint]

### ALS Model

Let's start with  train/test split.

In [82]:
# Split data into training and test set
(train, test) = sold.randomSplit([.8, .2], seed=41916)

Make sure shapes make sense.

In [83]:
print(train.count(), len(train.columns))

49402 3


In [84]:
print(test.count(), len(test.columns))

12469 3


In [85]:
# Evaluate the model by computing the RMSE on the test data
eval_reg = RegressionEvaluator(metricName="rmse"
                               , labelCol="bought"
                               , predictionCol="prediction")

### Grid Search

In [86]:
# hyper-param config
num_iterations = [10, 20]
ranks = [5, 10, 15, 20]
reg_params = [0.01, 0.1, 0.5]
alphas = [1, 50, 500, 1000, 5000]

Let's further subset into test and validation sets.

In [88]:
# Split data into training and validation sets
(gs_train, gs_val) = train.randomSplit([.8, .2], seed=41916)

In [None]:
# grid search and select best model
start_time = time.time()
#final_model, param_errors = tr_ALS(train, test, eval_reg, num_iterations, reg_params, ranks, alphas)
final_model, params_errs = cr.train_ALS(gs_train, gs_val, eval_reg, num_iterations, reg_params, ranks, alphas)

print ('Total Runtime: {:.2f} seconds'.format(time.time() - start_time))

Save the descriptive results

In [56]:
param_errs_rd_1 = params_errs

In [57]:
with open('support_data/params_errs_rd1.pkl', 'wb') as f:
    pickle.dump(param_errs_rd_1, f)
    
# Example - load pickle
# pickle_in = open("support_data/params_errs_rd1.pkl","rb")
# pe1 = pickle.load(pickle_in)

Hmmm. Let's put `params_errs` into a dataframe and find the model with the lowest error!

In [73]:
gs_cols = ['max_iters', 'reg', 'rank', 'alpha', 'rmse']

In [74]:
gs_df = pd.DataFrame(params_errs, columns=gs_cols)

In [75]:
gs_df.head()

Unnamed: 0,max_iters,reg,rank,alpha,rmse
0,10,0.01,5,1,0.842977
1,10,0.01,5,50,0.459353
2,10,0.01,5,500,0.298698
3,10,0.01,5,1000,0.284917
4,10,0.01,5,5000,0.283452


In [76]:
min_err = gs_df.rmse.min()

In [77]:
gs_df.loc[gs_df['rmse']==min_err]

Unnamed: 0,max_iters,reg,rank,alpha,rmse
123,20,0.5,5,1000,0.265817


### Round 2

In [18]:
# hyper-param config
num_iterations = [15, 20]
ranks = [5, 10]
reg_params = [0.1, 0.5]
alphas = [40]

In [19]:
# grid search and select best model
start_time = time.time()
final_model, params_errs_rd_2 = cr.train_ALS(train, test, eval_reg, num_iterations, reg_params, ranks, alphas)

print ('Total Runtime: {:.2f} seconds'.format(time.time() - start_time))

15 iterations, 5 latent factors, regularization=0.1, and alpha @ 40 : validation error is 0.4830
15 iterations, 10 latent factors, regularization=0.1, and alpha @ 40 : validation error is 0.4779
15 iterations, 5 latent factors, regularization=0.5, and alpha @ 40 : validation error is 0.4858
15 iterations, 10 latent factors, regularization=0.5, and alpha @ 40 : validation error is 0.4780
20 iterations, 5 latent factors, regularization=0.1, and alpha @ 40 : validation error is 0.4822
20 iterations, 10 latent factors, regularization=0.1, and alpha @ 40 : validation error is 0.4767
20 iterations, 5 latent factors, regularization=0.5, and alpha @ 40 : validation error is 0.4844
20 iterations, 10 latent factors, regularization=0.5, and alpha @ 40 : validation error is 0.4761
Total Runtime: 57.86 seconds


In [21]:
with open('support_data/params_errs_rd2.pkl', 'wb') as f:
    pickle.dump(params_errs_rd_2, f)
    
# Example - load pickle
# pickle_in = open("support_data/params_errs_rd1.pkl","rb")
# pe1 = pickle.load(pickle_in)

- Keep 20 iterations, and `reg_param` = 0.1

### Round 3

In [16]:
# hyper-param config
num_iterations = [20,30]
ranks = [5,10]
reg_params = [0.1]
alphas = [40]

In [17]:
# grid search and select best model
start_time = time.time()
final_model, params_errs_rd_3 = cr.train_ALS(train, test, eval_reg, num_iterations, reg_params, ranks, alphas)

print ('Total Runtime: {:.2f} seconds'.format(time.time() - start_time))

20 iterations, 5 latent factors, regularization=0.1, and alpha @ 40 : validation error is 0.4822
20 iterations, 10 latent factors, regularization=0.1, and alpha @ 40 : validation error is 0.4767


Py4JJavaError: An error occurred while calling o725.fit.
: org.apache.spark.SparkException: Job 238 cancelled because SparkContext was shut down
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:932)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:930)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
	at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:930)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2128)
	at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2041)
	at org.apache.spark.SparkContext$$anonfun$stop$6.apply$mcV$sp(SparkContext.scala:1949)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:1948)
	at org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:575)
	at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
	at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
	at org.apache.spark.rdd.RDD$$anonfun$aggregate$1.apply(RDD.scala:1124)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1117)
	at org.apache.spark.ml.recommendation.ALS$.computeYtY(ALS.scala:1712)
	at org.apache.spark.ml.recommendation.ALS$.org$apache$spark$ml$recommendation$ALS$$computeFactors(ALS.scala:1653)
	at org.apache.spark.ml.recommendation.ALS$$anonfun$train$4.apply(ALS.scala:973)
	at org.apache.spark.ml.recommendation.ALS$$anonfun$train$4.apply(ALS.scala:970)
	at scala.collection.immutable.Range.foreach(Range.scala:160)
	at org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:970)
	at org.apache.spark.ml.recommendation.ALS$$anonfun$fit$1.apply(ALS.scala:676)
	at org.apache.spark.ml.recommendation.ALS$$anonfun$fit$1.apply(ALS.scala:658)
	at org.apache.spark.ml.util.Instrumentation$$anonfun$11.apply(Instrumentation.scala:183)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:183)
	at org.apache.spark.ml.recommendation.ALS.fit(ALS.scala:658)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 37842)
Traceback (most recent call last):
  File "/home/ubuntu/anaconda3/lib/python3.7/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/home/ubuntu/anaconda3/lib/python3.7/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/home/ubuntu/anaconda3/lib/python3.7/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/home/ubuntu/anaconda3/lib/python3.7/socketserver.py", line 720, in __init__
    self.handle()
  File "/home/ubuntu/spark-2.4.3-bin-hadoop2.7/python/pyspark/accumulators.py", line 269, in handle
    poll(accum_updates)
  File "/home/ubuntu/spark-2.4.3-bin-hadoop2.7/python/pyspark/accumulators.py", line 241, in poll
    if func():
  File "/home/ubuntu/spark-2.4.3-bin-hadoop2.7/python/pyspark/accumu

### Round 4

It seems none of my assets can support going consistently past 20 iterations. The param set using 20 iterations commpleted ok, but 30 did not. It's probably diminishing returns to try to pursue it any further, just select best model from the parameters I *have* been able to get to run.

In [23]:
# hyper-param config
num_iterations = [20]
ranks = [5,10,15,20]
reg_params = [0.1]
alphas = [40]

In [24]:
# grid search and select best model
start_time = time.time()
final_model, params_errs_rd_4 = cr.train_ALS(train, test, eval_reg, num_iterations, reg_params, ranks, alphas)

print ('Total Runtime: {:.2f} seconds'.format(time.time() - start_time))

20 iterations, 5 latent factors, regularization=0.1, and alpha @ 40 : validation error is 0.4822
20 iterations, 10 latent factors, regularization=0.1, and alpha @ 40 : validation error is 0.4767
20 iterations, 15 latent factors, regularization=0.1, and alpha @ 40 : validation error is 0.4874
20 iterations, 20 latent factors, regularization=0.1, and alpha @ 40 : validation error is 0.5072
Total Runtime: 32.48 seconds


In [25]:
with open('support_data/params_errs_rd4.pkl', 'wb') as f:
    pickle.dump(params_errs_rd_4, f)
    
# Example - load pickle
# pickle_in = open("support_data/params_errs_rd1.pkl","rb")
# pe1 = pickle.load(pickle_in)

In [45]:
# hyper-param config
num_iterations = [20]
ranks = [10]
reg_params = [0.1]
alphas = [45, 1000, 10000]

In [46]:
# grid search and select best model
start_time = time.time()
final_model, params_errs_rd_4 = cr.train_ALS(train, test, eval_reg, num_iterations, reg_params, ranks, alphas)

print ('Total Runtime: {:.2f} seconds'.format(time.time() - start_time))

20 iterations, 10 latent factors, regularization=0.1, and alpha @ 45 : validation error is 0.4658
20 iterations, 10 latent factors, regularization=0.1, and alpha @ 1000 : validation error is 0.3528
20 iterations, 10 latent factors, regularization=0.1, and alpha @ 10000 : validation error is 0.4089
Total Runtime: 27.23 seconds


**OK**. Let's call it good. 

## Results 
Looks like the best parameters we could find are:
- `maxIter` = 20
- `rank` = 10
- `regParam` = 0.1 (default)
- `alpha` = 40

Let's do cross-validate this candidate model.

## Cross Validation

The built-in cross validator in `Spark` keeps breaking when I try to use it, so let's build our own function.

In [28]:
k = 5
random_seed = 41916

In [29]:
folds = cr.get_spark_k_folds(sold, k=5, random_seed=random_seed)

In [40]:
# Create ALS instance for cv with our chosen parametrs
als_cv = ALS(maxIter=20,
          rank=10,
          userCol='account_id',
          itemCol='comic_id',
          ratingCol='bought',
          implicitPrefs=True,
          regParam=0.1,
          alpha=100,
          coldStartStrategy='drop', # we want to drop so can get through CV
          seed=41916)

In [41]:
errors = cr.get_cv_errors(folds, als_cv, eval_reg)

In [42]:
# Make sure that # of errors = k
k == len(errors)

True

In [43]:
print(errors)

[0.40260606290390344, 0.4025822521947809, 0.4041795503970613, 0.40258120410961235, 0.4088074105090246]


In [20]:
avg_rmse = np.mean(errors)
sd_rmse = np.std(errors)
print("The average of the errors : {:.4f}".format(avg_rmse))
print("The standard deviation of the errors : {:.4f}".format(sd_rmse))


The average of the errors : 0.4770
The standard deviation of the errors : 0.0025


$\sum\limits_{u,i} (r_{ui} – x_u^T y_i)^{2} + \lambda \Big( \sum\limits_u \|x_u\|^{2} + \sum\limits_i \|y_i\|^{2} \Big)$

Looks good! Consistent with the best we've seen, and very stable.

## Candidate Model

In [9]:
# Create ALS instance and fit model
als = ALS(maxIter=25,
          rank=10,
          userCol='account_id',
          itemCol='comic_id',
          ratingCol='bought',
          implicitPrefs=True,
          regParam=0.1,
          alpha=40,
          coldStartStrategy='nan', # In the final we want to find our nan's
          seed=41916)
model_use = als.fit(train)

### Get Top N recommendations for Single User

Let's make a reference list of `account_id`'s, for testing purposes.

In [11]:
n_to_test = 2

users = (sold.select(als.getUserCol())
                          .sample(False
                                  ,n_to_test/sold.count()
                                  )
        )
users.persist()
users.show(2)

+----------+
|account_id|
+----------+
|       192|
|       569|
+----------+



We developed and wrote the functionality out to a function in `comic_recs.py`

###  Testing function!

- Pass the function to a pandas dataframe. 
- Function will ask for an account_id.
- Will return top n, n defined in parameters.

In [10]:
top_n_df = cr.get_top_n_new_recs(spark=spark, model=model_use, topn=5)
top_n_df

192
Total Runtime: 28.51 seconds


Unnamed: 0,comic_title
1,Brightest Day (DC)
2,New Mutants (Marvel)
3,Fear Itself (Marvel)
4,Batman Incorporated (DC)
5,Birds of Prey (DC)


In [11]:
top_n_df = cr.get_top_n_new_recs(spark=spark, model=model_use, topn=5)
top_n_df

569
Total Runtime: 29.68 seconds


Unnamed: 0,comic_title
1,Thor (Marvel)
2,Superior Spider-Man (Marvel)
3,Invincible (Image)
4,Image Firsts Saga Curr Ptg (Image)
5,Superior Iron Man (Marvel)


In [12]:
top_n_df = cr.get_top_n_new_recs(spark=spark, model=model_use, topn=10)
top_n_df

161
Total Runtime: 21.43 seconds


Unnamed: 0,comic_title
1,Aliens Dead Orbit (Dark Horse)
2,Beef (Image)
3,Vinegar Teeth (Dark Horse)
4,Leviathan (Image)
5,Shaolin Cowboy Wholl Stop th (Dark Horse)
6,Head Lopper (Image)
7,Crossed Badlands (Avatar)
8,Punisher (Marvel)
9,Godzilla Oblivion (IDW)
10,Nova (Marvel)


## Conclusions
- Seems realistic? Only three tests, but the results seem 'individualized' in the sense that there is no overlap between the sets (albeit small samples).

## Save the Model!

In [13]:
model_use.save('als_use')

## Retrieving Saved Model

In [14]:
comic_rec_model = ALSModel.load('als_use')

In [15]:
top_n_df = cr.get_top_n_new_recs(spark=spark, model=comic_rec_model, topn=10)
top_n_df

161
Total Runtime: 5.66 seconds


Unnamed: 0,comic_title
1,Aliens Dead Orbit (Dark Horse)
2,Beef (Image)
3,Vinegar Teeth (Dark Horse)
4,Leviathan (Image)
5,Shaolin Cowboy Wholl Stop th (Dark Horse)
6,Head Lopper (Image)
7,Crossed Badlands (Avatar)
8,Punisher (Marvel)
9,Godzilla Oblivion (IDW)
10,Nova (Marvel)
