# Comics Rx
## [A comic book recommendation system](https://github.com/MangrobanGit/comics_rx)
<img src="https://images.unsplash.com/photo-1514329926535-7f6dbfbfb114?ixlib=rb-1.2.1&ixid=eyJhcHBfaWQiOjEyMDd9&auto=format&fit=crop&w=2850&q=80" width="400" align='left'>

---

# 4 - ALS Model - Reduced Data

This time, as explored in the EDA NB, let's consider removing customers who we feel have too few or too many purchases to influence the model in the intended way.

Examples:
- Too few - Customers who have only bought 1 comic (series).
- Too many - Customers with > 1000 series (for example, think all eBay customers are rolled into one account number).

# Libraries

In [None]:
%matplotlib inline
%load_ext autoreload
%autoreload 2  # 1 would be where you need to specify the files
#%aimport data_fcns

import pandas as pd # dataframes
import os
import time
import numpy as np

# Data storage
from sqlalchemy import create_engine # SQL helper
import psycopg2 as psql #PostgreSQL DBs

# import necessary libraries
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
# from pyspark.sql.types import (StructType, StructField, IntegerType
#                                ,FloatType, LongType, StringType)
from pyspark.sql.types import *

import pyspark.sql.functions as F
from pyspark.sql.functions import col, explode, lit, isnan, when, count
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [6]:
import sys

In [7]:
sys.path.append('..')

In [8]:
# Custom
import data_fcns as dfc
import keys  # Custom keys lib
import comic_recs as cr

In [9]:
# instantiate SparkSession object
spark = pyspark.sql.SparkSession.builder.master("local[*]").getOrCreate()
# spark = SparkSession.builder.master("local").getOrCreate()

## Import the data

There is way to directly hit PostgreSQL through JDBC, but I don't know how to do that yet. So have worked around by saving the candidate dataset to JSON, and then will use that as input to Spark.


In [None]:
# We have previously created a version of the transactions table and filtered it down.
trans = spark.read.json('raw_data/trans_filtered_floor.json')

In [None]:
# Persist the data
trans.persist()

In [5]:
print(trans.count(), len(trans.columns))

327839 7


In [6]:
# check schema
trans.printSchema()

root
 |-- account_num: string (nullable = true)
 |-- comic_title: string (nullable = true)
 |-- date_sold: long (nullable = true)
 |-- item_id: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- qty_sold: long (nullable = true)
 |-- title_and_num: string (nullable = true)



### More exploration/testing

We won't be using pandas dataframes in the matrix factorization through Spark, but let's cast to one anyway as it will be easier to work with for EDA.

In [7]:
# cast to Pandas dataframe to turn timestamp data to datetime and check nulls. 
trans_df = trans.select('*').toPandas()
trans_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 327839 entries, 0 to 327838
Data columns (total 7 columns):
account_num      327839 non-null object
comic_title      327839 non-null object
date_sold        327839 non-null int64
item_id          327839 non-null object
publisher        327839 non-null object
qty_sold         327839 non-null int64
title_and_num    327839 non-null object
dtypes: int64(2), object(5)
memory usage: 17.5+ MB


In [8]:
# Let's double check the data is how we expect it
trans_df.head()

Unnamed: 0,account_num,comic_title,date_sold,item_id,publisher,qty_sold,title_and_num
0,399,Royal Historian of Oz (SLG),1279136980000,DCD416182,Amaze Ink Slave Labor Graphics,1,Royal Historian of Oz #1
1,327,Royal Historian of Oz (SLG),1288543119000,DCD416182,Amaze Ink Slave Labor Graphics,1,Royal Historian of Oz #1
2,327,Royal Historian of Oz (SLG),1288543119000,DCD423794,Amaze Ink Slave Labor Graphics,1,Royal Historian of Oz #2
3,1065,Warlord of Io & Other Storie (SLG),1412166247000,DCD390709,Amaze Ink Slave Labor Graphics,1,Warlord of Io & Other Stories
4,1033,Afterlife With Archie (Archie),1390505789000,DCD630105,Archie Comics,1,Afterlife With Archie #1 2nd P


In [9]:
trans_df['dt'] = pd.to_datetime(trans_df['date_sold'], unit='ms')

Yes. Reverse-confirmed versus the original transactions dataframe in the other notebook that this datetime is correct. 

### Data Prep for ALS

Let's aggregate the data to the two columns we need:
- `account_num` - This is the identifier for individual customers.


- `comic_title` - The comic. Represents individual volumes/runs of a comic.


- `score` - We need to figure out what we want to use to act as a `score`. If these were Amazon items then review scores would be natural fit; but we don't have that. We can maybe use a binary flag of `bought`/`not bought`. Or we can use the `qty_sold`. This might be interesting in that it might capture some interesting behavior from comic 'collectors/speculators'. Since this is first pass, I'm curious as to what `qty_sold` might do!


We only care about `account_num`, `comic_title` and `qty_sold`.

In [10]:
comics_sold = trans[['account_num', 'comic_title', 'qty_sold']]
comics_sold.persist()

DataFrame[account_num: string, comic_title: string, qty_sold: bigint]

In [11]:
comics_sold = comics_sold.withColumn('bought', lit(1))

In [12]:
comics_sold.show(10)

+-----------+--------------------+--------+------+
|account_num|         comic_title|qty_sold|bought|
+-----------+--------------------+--------+------+
|      00399|Royal Historian o...|       1|     1|
|      00327|Royal Historian o...|       1|     1|
|      00327|Royal Historian o...|       1|     1|
|      01065|Warlord of Io & O...|       1|     1|
|      01033|Afterlife With Ar...|       1|     1|
|      01333|Afterlife With Ar...|       1|     1|
|      00946|Afterlife With Ar...|       1|     1|
|      01278|Afterlife With Ar...|       1|     1|
|      01212|Afterlife With Ar...|       1|     1|
|      00877|Afterlife With Ar...|       1|     1|
+-----------+--------------------+--------+------+
only showing top 10 rows



In [13]:
comics_sold = trans[['account_num', 'comic_title', 'qty_sold']]
comics_sold.persist()

DataFrame[account_num: string, comic_title: string, qty_sold: bigint]

In [14]:
total_comics_sold = comics_sold.groupBy(['account_num', 'comic_title']).agg({'qty_sold':'sum'})
total_comics_sold.persist()

DataFrame[account_num: string, comic_title: string, sum(qty_sold): bigint]

Ok, let's take a look at the results.

In [15]:
total_comics_sold.show(10)

+-----------+--------------------+-------------+
|account_num|         comic_title|sum(qty_sold)|
+-----------+--------------------+-------------+
|      02247|Bubblegun VOL 2 (...|            1|
|      00487|Captain Swing (Av...|            2|
|      00029|God Is Dead (Avatar)|            7|
|      01260| Providence (Avatar)|            1|
|      00172|   Supergod (Avatar)|            3|
|      02493|       Abbott (Boom)|            3|
|      00052|Adventure Time Ma...|            6|
|      00032|Big Trouble In Li...|           11|
|      01149| Broken World (Boom)|            2|
|      01489|Jim Henson Labyri...|            1|
+-----------+--------------------+-------------+
only showing top 10 rows



In [16]:
print(total_comics_sold.count(), len(total_comics_sold.columns))

61871 3


In [17]:
total_comics_sold = total_comics_sold.withColumn('bought', lit(1))

I don't like that default column name. Let's fix that to be `qty_sold` again.

In [18]:
total_comics_sold.show(10)

+-----------+--------------------+-------------+------+
|account_num|         comic_title|sum(qty_sold)|bought|
+-----------+--------------------+-------------+------+
|      02247|Bubblegun VOL 2 (...|            1|     1|
|      00487|Captain Swing (Av...|            2|     1|
|      00029|God Is Dead (Avatar)|            7|     1|
|      01260| Providence (Avatar)|            1|     1|
|      00172|   Supergod (Avatar)|            3|     1|
|      02493|       Abbott (Boom)|            3|     1|
|      00052|Adventure Time Ma...|            6|     1|
|      00032|Big Trouble In Li...|           11|     1|
|      01149| Broken World (Boom)|            2|     1|
|      01489|Jim Henson Labyri...|            1|     1|
+-----------+--------------------+-------------+------+
only showing top 10 rows



In [19]:
total_comics_sold = total_comics_sold[['account_num', 'comic_title', 'bought']]

In [20]:
print(total_comics_sold.count(), len(total_comics_sold.columns))

61871 3


### Formatting

Sooooooo, I forgot that the values need to be numeric. So need to fix that.

#### Convert `account_id` to integer

In [21]:
to_int_udf = F.udf(dfc.make_int, IntegerType())

In [22]:
account_num_col = total_comics_sold['account_num']

In [23]:
total_comics_sold = total_comics_sold.withColumn('account_id'
                                        ,to_int_udf(account_num_col))
total_comics_sold.persist()

DataFrame[account_num: string, comic_title: string, bought: int, account_id: int]

In [24]:
total_comics_sold.show(10)

+-----------+--------------------+------+----------+
|account_num|         comic_title|bought|account_id|
+-----------+--------------------+------+----------+
|      02247|Bubblegun VOL 2 (...|     1|      2247|
|      00487|Captain Swing (Av...|     1|       487|
|      00029|God Is Dead (Avatar)|     1|        29|
|      01260| Providence (Avatar)|     1|      1260|
|      00172|   Supergod (Avatar)|     1|       172|
|      02493|       Abbott (Boom)|     1|      2493|
|      00052|Adventure Time Ma...|     1|        52|
|      00032|Big Trouble In Li...|     1|        32|
|      01149| Broken World (Boom)|     1|      1149|
|      01489|Jim Henson Labyri...|     1|      1489|
+-----------+--------------------+------+----------+
only showing top 10 rows



In [25]:
print(total_comics_sold.count(), len(total_comics_sold.columns))

61871 4


Now I need to find a way to give ids to the `comic_title`. Kind of clunky, but I do have the version in PostgreSQL of the big table. I can just build an ID table up there as source of truth. I could do something on PySpark side, but then think would want to save it somewhere (e.g. the DB) anyway. So might as well do it from the top.

#### Get `comic_id`

In [26]:
comics = spark.read.json('raw_data/comics.json')
comics.persist()

DataFrame[comic_id: bigint, comic_title: string]

In [27]:
comics.count()

7202

In [28]:
comics.show(10)

+--------+--------------------+
|comic_id|         comic_title|
+--------+--------------------+
|       1|0Secret Wars (Mar...|
|       2|100 Bullets Broth...|
|       3|100 Penny Press L...|
|       4|100 Penny Press S...|
|       5|100 Penny Press T...|
|       6|100 Penny Press T...|
|       7|100th Anniversary...|
|       8|12 Reasons To Die...|
|       9|    13 Coins (Other)|
|      10|13th Artifact One...|
+--------+--------------------+
only showing top 10 rows



In [29]:
print(comics.count(), len(comics.columns))

7202 2


Now we need to join this back into `total_comics_sold`.

In [30]:
# Set aliases
tot = total_comics_sold.alias('tot')
com = comics.alias('com')

In [31]:
tot_sold_ids_only = tot.join(com.select('comic_id','comic_title')
                      ,tot.comic_title==com.comic_title).select('account_id'
                                                                , 'comic_id'
                                                                , 'bought')
tot_sold_ids_only.persist()
tot_sold_ids_only.show(10)

+----------+--------+------+
|account_id|comic_id|bought|
+----------+--------+------+
|      2247|     995|     1|
|       487|    1102|     1|
|        29|    2680|     1|
|      1260|    4870|     1|
|       172|    6023|     1|
|      2493|      66|     1|
|        52|     116|     1|
|        32|     755|     1|
|      1149|     971|     1|
|      1489|    3503|     1|
+----------+--------+------+
only showing top 10 rows



In [32]:
tot_sold_ids_only.printSchema()

root
 |-- account_id: integer (nullable = true)
 |-- comic_id: long (nullable = true)
 |-- bought: integer (nullable = false)



In [33]:
print(tot_sold_ids_only.count(), len(tot_sold_ids_only.columns))

61871 3


## Save this intermediate table.

To save work, if needed.

In [34]:
als_input_df = tot_sold_ids_only.toPandas()

In [35]:
als_input_df.shape

(61871, 3)

In [36]:
als_input_df.to_json('raw_data/als_input_filtered.json', orient='records'
                     ,lines=True)

In [37]:
!head raw_data/als_input_filtered.json

{"account_id":2247,"comic_id":995,"bought":1}
{"account_id":487,"comic_id":1102,"bought":1}
{"account_id":29,"comic_id":2680,"bought":1}
{"account_id":1260,"comic_id":4870,"bought":1}
{"account_id":172,"comic_id":6023,"bought":1}
{"account_id":2493,"comic_id":66,"bought":1}
{"account_id":52,"comic_id":116,"bought":1}
{"account_id":32,"comic_id":755,"bought":1}
{"account_id":1149,"comic_id":971,"bought":1}
{"account_id":1489,"comic_id":3503,"bought":1}


### ALS Model

Let's start with  train/test split.

In [38]:
# Split data into training and test set
(train, test) = tot_sold_ids_only.randomSplit([.8, .2], seed=41916)

Make sure shapes make sense.

In [39]:
print(train.count(), len(train.columns))

44294 3


In [40]:
print(test.count(), len(test.columns))

11048 3


In [41]:
# Create ALS instance and fit model
als = ALS(maxIter=20,
          rank=10,
          userCol='account_id',
          itemCol='comic_id',
          ratingCol='bought',
          implicitPrefs=True,
          alpha=40,
          seed=41916)
model = als.fit(train)

In [42]:
now = time.ctime(int(time.time()))
print("Completed on {}.".format(now))

Completed on Fri Jun 28 10:52:49 2019.


### Evaluation on Test

In [43]:
# Generate predictions on TEST
predictions = model.transform(test)
predictions.persist()

DataFrame[account_id: int, comic_id: bigint, bought: int, prediction: float]

In [44]:
predictions.show(10)

+----------+--------+------+-----------+
|account_id|comic_id|bought| prediction|
+----------+--------+------+-----------+
|      1842|     471|     1|   0.323991|
|      1489|     496|     1|-0.13128927|
|       690|     496|     1|0.062074617|
|      3006|     833|     1| 0.25017345|
|      1089|     833|     1| 0.36356872|
|      2777|     833|     1|  0.3026107|
|      2770|     833|     1| 0.16230237|
|      2408|     833|     1| 0.48116648|
|      2123|     833|     1| 0.64031804|
|      1645|    1342|     1| 0.08538671|
+----------+--------+------+-----------+
only showing top 10 rows



`BinaryClassificationEvaluator` only likes doubles for `rawPredictionCol`, so cast it.

In [45]:
predictions = predictions.withColumn("prediction", predictions["prediction"].cast(DoubleType()))

In [46]:
predictions.show(10)

+----------+--------+------+-------------------+
|account_id|comic_id|bought|         prediction|
+----------+--------+------+-------------------+
|      1842|     471|     1|0.32399100065231323|
|      1489|     496|     1|-0.1312892735004425|
|       690|     496|     1|0.06207461655139923|
|      3006|     833|     1| 0.2501734495162964|
|      1089|     833|     1| 0.3635687232017517|
|      2777|     833|     1|0.30261069536209106|
|      2770|     833|     1|0.16230237483978271|
|      2408|     833|     1| 0.4811664819717407|
|      2123|     833|     1| 0.6403180360794067|
|      1645|    1342|     1|0.08538670837879181|
+----------+--------+------+-------------------+
only showing top 10 rows



### Initial Evaluation

Based on our first swing of the bat:
- `maxIter` = 20
- `rank` = 10

In [47]:
# Evaluate the model by computing the RMSE on the test data
eval_reg = RegressionEvaluator(metricName="rmse", labelCol="bought",
                                predictionCol="prediction")

In [48]:
rmse = eval_reg.evaluate(predictions)

In [49]:
print("RMSE= " + str(rmse))

RMSE= nan


Oh boy, better remove some nans (for now).

evaluator = BinaryClassificationEvaluator(metricName='areaUnderROC'
                                          ,labelCol='bought'
                                          ,rawPredictionCol='prediction'
                                          )

auc = evaluator.evaluate(predictions)

print("Area Under the Curve = " + str(auc))

## Check nans

In [50]:
# Convert to pandas dataframe
pred_df = predictions.select('*').toPandas()

# Check nulls
pred_num_nulls = pred_df['prediction'].isna().sum()

# Num rows
preds_attempted = pred_df.shape[0]

In [51]:
print("There are {} nulls out of {}.".format(pred_num_nulls, preds_attempted))

There are 342 nulls out of 11048.


In [52]:
print("So {:.2f}% are nulls.".format(pred_num_nulls/preds_attempted))

So 0.03% are nulls.


Since, they're such a tiny portion of the test set, for now let's remove the nulls for now.

In [53]:
predictions.count()

11048

In [54]:
# Convert back to spark dataframe
predictions = spark.createDataFrame(pred_df)

In [55]:
predictions.select([count(when(isnan(c), c)).alias(c) for c in predictions.columns]).show()

+----------+--------+------+----------+
|account_id|comic_id|bought|prediction|
+----------+--------+------+----------+
|         0|       0|     0|       342|
+----------+--------+------+----------+



In [56]:
pred_no_na = predictions.na.drop()

In [57]:
pred_no_na.persist()

DataFrame[account_id: bigint, comic_id: bigint, bought: bigint, prediction: double]

In [58]:
pred_no_na.count()

10706

In [59]:
# Evaluate the model by computing the rmse on the test data
rmse2 = eval_reg.evaluate(pred_no_na)

print("RMSE = " + str(rmse2))

RMSE = 0.49250295110493914


Ok, so that gives us a baseline. Let's see if we can do a little grid search action.

### Get Top N recommendations for Single User

Let's make a reference list of `account_id`'s, for testing purposes.

In [61]:
n_to_test = 5

users = (tot_sold_ids_only.select(als.getUserCol())
                          .sample(False
                                  ,n_to_test/tot_sold_ids_only.count()
                                  ,41916)
        )
users.persist()
users.show()

+----------+
|account_id|
+----------+
|      2185|
|      1663|
|         5|
|       775|
|       604|
+----------+



We developed and wrote the functionality out to a function in `comic_recs.py`

###  Testing function!

- Pass the function to a pandas dataframe. 
- Function will ask for an account_id.
- Will return top n, n defined in parameters.

In [62]:
top_n_df = cr.get_top_n_recs_for_user(spark=spark, model=model, topn=5)
top_n_df

2185


Unnamed: 0,comic_title
1,Bitch Planet Triple Feature (Image)
2,Redlands (Image)
3,Death Or Glory (Image)
4,Isola (Image)
5,Unsound (Boom)


In [63]:
top_n_df = cr.get_top_n_recs_for_user(spark=spark, model=model, topn=5)
top_n_df

604


Unnamed: 0,comic_title
1,Amazing Spider-Man Annual (Marvel)
2,Astonishing X-Men Annual (Marvel)
3,Hulk (Marvel)
4,Deadpool Annual (Marvel)
5,Astonishing X-Men (Marvel)


In [89]:
top_n_df = cr.get_top_n_recs_for_user(spark=spark, model=model, topn=20)
top_n_df

161


Unnamed: 0,comic_title
1,Doom Patrol (DC)
2,Deadpool (Marvel)
3,Punisher (Marvel)
4,Mighty Thor (Marvel)
5,Beef (Image)
6,Invincible (Image)
7,X-Men Grand Design (Marvel)
8,Moon Knight (Marvel)
9,Reborn (Image)
10,Space Riders Galaxy of Brutal (Other)


## Conclusions after single run
- Seems realistic? Only three tests, but the results seem 'individualized' in the sense that there is no overlap between the sets (albeit small samples).

---

## Grid Search

In [77]:
# Create a parameter grid
params = (ParamGridBuilder()
         # .addGrid(als.regParam, [1, 0.1])
          .addGrid(als.maxIter, [20])
          .addGrid(als.rank, [10])
          .addGrid(als.alpha, [40])).build()

cv = TrainValidationSplit(estimator=als
                          , evaluator=eval_reg
                          , estimatorParamMaps=params
                          , trainRatio=0.8)
#cv = CrossValidator(estimator=als_implicit, estimatorParamMaps=paramGrid, evaluator=RegressionEvaluator())
model_implicit = cv.fit(train)

In [78]:
### Cross validate for best hyperparameters
cv = CrossValidator(estimator=als
                    ,estimatorParamMaps=params
                    ,evaluator=eval_reg
                    ,parallelism=4
                   )

In [79]:
now = time.ctime(int(time.time()))
print("Completed on {}.".format(now))

Completed on Fri Jun 28 14:05:20 2019.


In [80]:
# Fit and store model
best_model = cv.fit(test)

In [81]:
als_model = best_model.bestModel

In [85]:
print(als_model.explainParams())

coldStartStrategy: strategy for dealing with unknown or new users/items at prediction time. This may be useful in cross-validation or production scenarios, for handling user/item ids the model has not seen in the training data. Supported values: nan,drop. (default: nan)
itemCol: column name for item ids. Ids must be within the integer value range. (default: item, current: comic_id)
predictionCol: prediction column name (default: prediction)
userCol: column name for user ids. Ids must be within the integer value range. (default: user, current: account_id)


In [88]:
!ls

LICENSE       archive	       comics_rx-1_data_prep.ipynb	   data_fcns.py
README.md     assets	       comics_rx-2_eda.ipynb		   keys.py
__pycache__   code_archive.py  comics_rx-3_als_all_data.ipynb	   raw_data
als_filtered  comic_recs.py    comics_rx-4_als_reduced_data.ipynb  scratch


In [87]:
als_model.save('als_filtered')

In [82]:
now = time.ctime(int(time.time()))
print("Completed on {}.".format(now))

Completed on Fri Jun 28 14:06:20 2019.


---

## Retrieving Saved Model

In [1]:
comic_rec_model = ALSModel.load('als_filtered')

NameError: name 'ALSModel' is not defined

In [None]:
top_n_df = cr.get_top_n_recs_for_user(spark=spark, model=comic_rec_model, topn=50)
top_n_df