written by **`Chul Sung`**, Data Scientist @ IBM Cloud
**LinkedIn:** https://www.linkedin.com/in/chul-sung-8b073923 **Twitter:** `@csung7`


<span style="color: #0000A0; font-size: 18pt; font-weight: bold;">
Welcome to IBM `Apache Spark` Service!
</span>

<span style="font-family: monospace; font-size: 12pt;">
**Recommendation Systems for Implicit Feedback**
</span>

---
Through this tutorial you will answer the following questions:
* What Are Apache Spark DataFrames? 
* How are Window Functions different from Aggregates or user-defined functions (UDFs)?
* How to Analyze Data with Spark DataFrames?
* How to Build a Machine Learning Model with Spark ML?
* How to Evaluate the Model?
---

<span style="color: #0000A0; font-size: 18pt; font-weight: bold;">
What Are `Spark DataFrames`?
</span>
    
<span style="font-family: monospace; font-size: 12pt;">
**Spark DataFrames** are conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. Once created, it can be manipulated using a variety of functions for getting basic information about the DataFrame.
</span>

<span style="font-family: monospace; font-size: 12pt;">
Spark DataFrames have two types of functions: **transformations** and **actions**. If a function of dataframe returns a dataframe then it is a transformation function like map() or filter(), whereas if its function returns **something else** other than a dataframe, then it is an **action** function like count() or collect(). I mentioned two types of functions because all transformations in Spark are completely **lazy**, and Spark does not begin computing the partitions until an action is called.
</span>

<span style="font-size: 12pt; font-weight: bold;">
[References]
</span>
* [High Performance Spark - Publisher: O'Reilly Media](http://shop.oreilly.com/product/0636920046967.do)

---

<span style="font-size: 18pt; font-weight: bold;">
Then Why **DataFrames** instead of RDD?
</span>
<img src="https://databricks.com/wp-content/uploads/2015/02/Screen-Shot-2015-02-16-at-9.46.39-AM.png">
<span style="font-family: monospace; font-size: 12pt;">
The above chart compares the runtime performance of running group-by-aggregation on 10 million integer pairs on a single machine. Through the new DataFrame API, **Python programs** can achieve the **same level of performance** as JVM programs because the Catalyst optimizer compiles DataFrame operations into JVM bytecode. Indeed, performance sometimes beats hand-written Scala code.
</span>


<span style="font-size: 12pt; font-weight: bold;">
[References]
</span>
* [Recent performance improvements in Apache Spark: SQL, Python, DataFrames, and More](https://databricks.com/blog/2015/04/24/recent-performance-improvements-in-apache-spark-sql-python-dataframes-and-more.html)
* [Pyspark SQL DataFrame Docs](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame)

<span style="color: #0000A0; font-size: 18pt; font-weight: bold;">
Spark applications
</span>

<span style="font-family: monospace; font-size: 12pt;">
Each Spark application runs independently on a cluster, coordinated by the SparkContext object in your main program (called the **driver** program). Once the SparkContext connects to worker nodes through the cluster manager, Spark acquires **executors** on nodes in the cluster, which are processes that run computations and store data for your application. 
</span>

<span style="font-family: monospace; font-size: 12pt;">
When you create a notebook, SparkContext is already created for you, with the variable name **`sc`**.
</span>

<img src="http://spark.apache.org/docs/latest/img/cluster-overview.png">

<span style="font-size: 12pt; font-weight: bold;">
[References]
</span>
* [SparkContext - Entry Point to Spark](https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sparkcontext.html)
* [Cluster Mode Overview](http://spark.apache.org/docs/latest/cluster-overview.html)

<span style="font-family: monospace; font-size: 12pt;">
With SparkContext **`sc`** you can check your spark version and it determines how many resources are allotted to each executor:
</span>

<span style="font-size: 12pt; font-weight: bold;">
[References]
</span>
* [Available Properties](http://spark.apache.org/docs/latest/configuration.html#available-properties)

In [1]:
print sc.version

1.6.0


In [2]:
print sc._conf.get('spark.driver.cores')
print sc._conf.get('spark.driver.memory')

print sc._conf.get('spark.executor.cores')
print sc._conf.get('spark.executor.memory')

None
1512M
None
6G


<span style="color: #0000A0; font-size: 18pt; font-weight: bold;">
Let's Get Started `Recommendation Systems for Implicit Feedback with PySpark`.
</span>

<span style="color: #0000A0; font-size: 18pt; font-weight: bold;">
Create `Spark DataFrames` from your database.
</span>

<span style="font-family: monospace; font-size: 12pt;">
As the SparkContext is the entry point for all Spark applications, **`HiveContext`** and **`SQLContext`** objects serve as the entry points for Spark SQL, helping to connect your database through JDBC connector and create a Spark dataframe. **`HiveContext`** is a super set of the SQLContext in Spark < 2.0. One of the key differences is using `HiveContext` you can use the **window function** feature. Spark 2.0 provides `SparkSession` is the new entry point of Spark which is combination of SQLContext, HiveContext and future StreamingContext.
</span>

<span style="font-size: 12pt; font-weight: bold;">
[References]
</span>
* [Understanding Spark’s SparkConf, SparkContext, SQLContext and HiveContext](https://blogs.msdn.microsoft.com/bigdatasupport/2015/09/14/understanding-sparks-sparkconf-sparkcontext-sqlcontext-and-hivecontext/)
* [Difference between Spark HiveContext and SQLContext](http://www.openkb.info/2016/02/difference-between-spark-hivecontext.html)
* [What is the difference between Apache Spark SQLContext vs HiveContext?](http://stackoverflow.com/questions/33666545/what-is-the-difference-between-apache-spark-sqlcontext-vs-hivecontext)
* [Introduction to Spark 2.0 - Part 1 : Spark Session API](http://blog.madhukaraphatak.com/introduction-to-spark-two-part-1/)

<span style="color: #0000A0; font-size: 18pt; font-weight: bold;">
What are `Window Functions`?
</span>

<span style="font-family: monospace; font-size: 12pt;">
**Window functions** are complementary to existing DataFrame operations: **aggregates** such as sum and avg and ** user-defined functions (UDFs)**.  **Aggregates** return **one result**, a sum or average, for each group of records, while **UDFs** return one result for **each record** based on only data in that record. In contrast, **window functions** return one result for **each record** based on **a window of records**. With **Window Spec definition** you can specify a window spec which has three components: **partition by**, **order by**, and **frame** (or boundaries).  In this lecture, I will show you an example of **window functions**.
</span>

<span style="font-size: 12pt; font-weight: bold;">
[References]
</span>
* [Spark Window Functions for DataFrames and SQL](http://xinhstechblog.blogspot.com/2016/04/spark-window-functions-for-dataframes.html)
* [UDFs — User-Defined Functions](https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-udfs.html)

<span style="font-family: monospace; font-size: 12pt;">
Import `HiveContext` module.
</span>

In [3]:
from pyspark.sql import SQLContext, HiveContext

#sqlContext = SQLContext(sc)
sqlContext = HiveContext(sc)

<span style="font-family: monospace; font-size: 12pt;">
To connect to our database and create a dataframe for our data with the Spark SQL instance **`sqlContext`**, we need two key parameters: (1) our database JDBC URL (including Host name, Port number, Database name, User ID, and Password) and (2) a specific table name.
</span>

<span style="font-family: monospace; font-size: 12pt;">
**dashDB:** When you load your data into dashDB, in the **Connection Information** page it will show you **JDBC URL string**.
This is my dashDB JDBC URL: **`
jdbc:db2://awh-yp-small03.services.dal.bluemix.net:50001/BLUDB:user=dash107595;password=U4A5pYgThdbo;sslConnection=true;
`**
</span>

In [4]:
# kocsea dashDB
dashdb_url = 'jdbc:db2://awh-yp-small02.services.dal.bluemix.net:50001/BLUDB:user=dash104260;password=24GDWl5tgJ63;sslConnection=true;'
dashdb_table = 'DASH104260.T_ONLINE_RETAIL'

In [5]:
df_retail_data = sqlContext.read.format('jdbc').options(url=dashdb_url, dbtable=dashdb_table).load()

<span style="font-family: monospace; font-size: 12pt;">
The Spark dataframe **`printSchema()`** method show us the schema of a DataFrame.
</span>

<span style="font-size: 12pt; font-weight: bold;">
[References]
</span>
* [JSON Datasets](http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets)

In [6]:
df_retail_data.printSchema()

root
 |-- INVOICENO: string (nullable = true)
 |-- STOCKCODE: string (nullable = true)
 |-- DESCRIPTION: string (nullable = true)
 |-- QUANTITY: integer (nullable = true)
 |-- INVOICEDATE: timestamp (nullable = true)
 |-- UNITPRICE: decimal(8,3) (nullable = true)
 |-- CUSTOMERID: integer (nullable = true)
 |-- COUNTRY: string (nullable = true)



<span style="font-family: monospace; font-size: 12pt;">
Now that the online retail data has been loaded into a Spark dataframe, we can see what is in it using the dataframe **`show()`** function with **`numRows`** and **`truncate`** options. The **show()** function provides different parameters such as
```
df_retail_data.show(numRows=3, truncate=False)
df_retail_data.show(truncate=False)
df_retail_data.show(numRows=3)
```
</span>

<span style="font-size: 12pt; font-weight: bold;">
[References]
</span>
* [Creating DataFrames](http://spark.apache.org/docs/latest/sql-programming-guide.html#creating-dataframes)

In [7]:
df_retail_data.show(3)

+---------+---------+--------------------+--------+--------------------+---------+----------+--------------+
|INVOICENO|STOCKCODE|         DESCRIPTION|QUANTITY|         INVOICEDATE|UNITPRICE|CUSTOMERID|       COUNTRY|
+---------+---------+--------------------+--------+--------------------+---------+----------+--------------+
|   536796|    21967|PACK OF 12 SKULL ...|       1|2010-12-02 15:46:...|    0.290|     15574|United Kingdom|
|   536796|    21984|PACK OF 12 PINK P...|       1|2010-12-02 15:46:...|    0.290|     15574|United Kingdom|
|   536796|    21982|PACK OF 12 SUKI T...|       1|2010-12-02 15:46:...|    0.290|     15574|United Kingdom|
+---------+---------+--------------------+--------+--------------------+---------+----------+--------------+
only showing top 3 rows



<span style="font-family: monospace; font-size: 12pt;">
The datafame includes the invoice number (`INVOICENO`) for different purchases, along with the stock code (`STOCKCODE` as an item ID), an item description (`DESCRIPTION`), the number purchased (`QUANTITY`), the date of purchase (`INVOICEDATE`), the price of the items (`UNITPRICE`), a customer ID (`CUSTOMERID`), and the country of origin for the customer (`COUNTRY`).
</span>
-----

<span style="font-family: monospace; font-size: 12pt;">
When trying to collect some summary statistics for the DataFrame, there is a built-in DataFrame **`describe()`** function, returning number of non-null entries (count), mean, standard deviation, and minimum and maximum value for each numerical column.
</span>

In [8]:
df_retail_data.describe().show()

+-------+-----------------+---------------+------------------+
|summary|         QUANTITY|      UNITPRICE|        CUSTOMERID|
+-------+-----------------+---------------+------------------+
|  count|           541909|         541909|            406829|
|   mean| 9.55224954743324|      4.6111136|15287.690570239585|
| stddev|218.0811578502368|96.759853061181|1713.6003033215864|
|    min|           -80995|     -11062.060|             12346|
|    max|            80995|      38970.000|             18287|
+-------+-----------------+---------------+------------------+



<span style="font-family: monospace; font-size: 12pt;">
It can also be limited to certain columns:
</span>

In [9]:
df_retail_data.describe('STOCKCODE','CUSTOMERID').show()

+-------+------------------+------------------+
|summary|         STOCKCODE|        CUSTOMERID|
+-------+------------------+------------------+
|  count|            541909|            406829|
|   mean|27623.240210938104|15287.690570239585|
| stddev|16799.737628427672|1713.6003033216086|
|    min|             10002|             12346|
|    max|                 m|             18287|
+-------+------------------+------------------+



<span style="font-family: monospace; font-size: 12pt;">
Most columns have no missing values, but Customer ID (**`CUSTOMERID`**) is missing in several rows. If the customer ID is missing, we don't know who bought the item. We should drop these rows from our data first. We can use the DataFrame **`dropna()`** function to drop the rows with missing data, sepeficially in the Customer ID column.
</span>

In [10]:
df_cleaned_retail = df_retail_data.dropna(subset='CUSTOMERID')

<span style="font-family: monospace; font-size: 12pt;">
When we have more than one column in the subset, with the **`how`** parameter you can drop the rows that **any** NA values are present or **all** values are NA.
</span>

In [11]:
df_retail_data.dropna(subset=['STOCKCODE','CUSTOMERID'], how='all')

DataFrame[INVOICENO: string, STOCKCODE: string, DESCRIPTION: string, QUANTITY: int, INVOICEDATE: timestamp, UNITPRICE: decimal(8,3), CUSTOMERID: int, COUNTRY: string]

In [12]:
df_cleaned_retail.describe().show()

+-------+------------------+-----------------+------------------+
|summary|          QUANTITY|        UNITPRICE|        CUSTOMERID|
+-------+------------------+-----------------+------------------+
|  count|            406829|           406829|            406829|
|   mean| 12.06130339774205|        3.4604710|15287.690570239585|
| stddev|248.69337001882195|69.31516172321467|1713.6003033216004|
|    min|            -80995|            0.000|             12346|
|    max|             80995|        38970.000|             18287|
+-------+------------------+-----------------+------------------+



<span style="color: #0000A0; font-size: 18pt; font-weight: bold;">Mission 1. Create  `Item IDs` with Integer Type.</span>

<span style="font-family: monospace; font-size: 12pt;">
As you can see, **STOCKCODE** values consist of numbers and letters so the column type is **string** but we need the item IDs with a numerical type. To successfully complete this mission:</span>

---
1. Create a unique STOCKCODE dataframe from the **`df_cleaned_retail`** dataframe with **`dropDuplicates()`** function.
2. Create sequential numbers for the unique STOCKCODE column with the **`row_number()`** window function, saving the numbers into new `ITEM_ID` column. And then save the output dataframe into **`df_unique_item_ids_int`**. 
3. Do left join your new **`df_unique_item_ids_int`** dataframe into **`df_cleaned_retail`** dataframe on **`STOCKCODE`** column and create a new dataframe naming **`df_retail_data_w_item_ids`** including **`STOCKCODE, ITEM_ID, QUANTITY, CUSTOMERID`** columns.
---

<span style="font-size: 12pt; font-weight: bold;">
[References]
</span>
* [DataFrame.dropDuplicates()](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html)
* [pyspark.sql.functions.row_number()](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html)
* [DataFrame.join(other, on=None, how=None)](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html)

In [13]:
from pyspark.sql import functions as sqlfunc
from pyspark.sql.window import Window

In [14]:
# 1. Create a unique item list.
df_unique_item_ids = df_cleaned_retail.select(['STOCKCODE']).dropDuplicates()

# 2. And then with the `row_number` window function, assign the sequential numbers to the unique item IDs,
# creating a new column named `ITEM_ID`.
wSpec = Window.orderBy(df_unique_item_ids.STOCKCODE)
df_unique_item_ids_int = df_unique_item_ids.withColumn("ITEM_ID", sqlfunc.row_number().over(wSpec).alias("ITEM_ID"))

# 3. Combine the unique item list.
df_retail_data_w_item_ids = df_cleaned_retail.join(df_unique_item_ids_int, df_cleaned_retail.STOCKCODE == df_unique_item_ids_int.STOCKCODE, how='left').select(df_cleaned_retail.STOCKCODE,'ITEM_ID','QUANTITY','CUSTOMERID')

<span style="font-family: monospace; font-size: 12pt;">
For future reference, we are going to create item lookup dataframe including the an item description (**`DESCRIPTION`**).
</span>

In [15]:
df_item_lookup = df_cleaned_retail.join(df_unique_item_ids_int, df_cleaned_retail.STOCKCODE == df_unique_item_ids_int.STOCKCODE, how='left').select(df_cleaned_retail.STOCKCODE,'ITEM_ID','DESCRIPTION').dropDuplicates(['STOCKCODE', 'ITEM_ID'])

-----

<span style="color: #0000A0; font-size: 18pt; font-weight: bold;">Mission 2. Group **`Purchase Quantity`** by Stock Code and Customer ID.</span>

<span style="font-family: monospace; font-size: 12pt;">
We need to combine the purchase quantity of the same stock codes from same customer IDs. In order to do that, we are going to group together by stock code and customer ID **`['STOCKCODE','ITEM_ID','CUSTOMERID']`**. For example, customer A ordered product P 3 first and ordered product P 4 again. Then we will have two rows for the customer A invoices. We need to combine the quantities to 7.
</span>

---
1. Group purchase quantity together by stock code and customer ID **`['STOCKCODE','ITEM_ID','CUSTOMERID']`** with **`groupBy()`** function and save the output dataframe into **`df_retail_data_w_item_ids_sum`**.
2. Once you grouped the quantities, change any sums that equal zero to one with **`when() and otherwise()`** function and save the output dataframe into **`df_retail_data_w_item_ids_sum`**.
3. Only include customers with a positive purchase total to eliminate possible errors with **`filter()`** function and save the output dataframe into **`df_purchased_retail_data`**.
---

<span style="font-size: 12pt; font-weight: bold;">
[References]
</span>
* [DataFrame.groupBy()](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData)
* [pyspark.sql.functions.when()](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.when)
* [DataFrame.filter()](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html)

In [16]:
# 1. Group purchase quantity together by stock code ('STOCKCODE','ITEM_ID') and customer ID ('CUSTOMERID').
df_retail_data_w_item_ids_sum = df_retail_data_w_item_ids.groupBy(['STOCKCODE','ITEM_ID','CUSTOMERID']).sum("QUANTITY").withColumnRenamed("sum(Quantity)", "QUANTITY")

# 2. Change any sums that equal zero to one.
df_retail_data_w_item_ids_sum = df_retail_data_w_item_ids_sum.withColumn("QUANTITY", sqlfunc.when(df_retail_data_w_item_ids_sum.QUANTITY == 0, 1).otherwise(df_retail_data_w_item_ids_sum.QUANTITY))

# 3. Only include customers with a positive purchase total to eliminate possible errors.
df_purchased_retail_data = df_retail_data_w_item_ids_sum.filter(df_retail_data_w_item_ids_sum.QUANTITY > 0)

-----

<span style="color: #0000A0; font-size: 18pt; font-weight: bold;">Create a **`Training and Test Set`**.</span>

<span style="font-family: monospace; font-size: 12pt;">
In Machine Learning applications, we need to test whether the model we trained with the training set is any good on the new test set. We can randomly split the feature matrix into two groups and use one for training and the other for test set. However, it would be possible for some products purchased by the same users to assign into training and test sets separately. To avoid this case, we are going to randomly split customer data into two groups and based on the two groups of the customers we can create training and test sets.
</span>

---
1. Randomly split the customer list into **two groups**.
2. Using the first customer group, pull all products they ordered and create a complete **training set**.
3. Using the second customer group, pull all products they ordered and create a complete **test set**.
---

In [17]:
# 1. Randomly split the customers into **two groups** for training and test set.
df_user_lookup = df_purchased_retail_data.select(['CUSTOMERID']).dropDuplicates()
df_user_splits = df_user_lookup.randomSplit([8.0, 2.0])

# 2. Create a complete **training set** with the first customer list.
df_training_data = df_purchased_retail_data.join(df_user_splits[0], df_purchased_retail_data.CUSTOMERID == df_user_splits[0].CUSTOMERID,
                                             how='inner').select(df_purchased_retail_data.CUSTOMERID,'ITEM_ID','STOCKCODE','QUANTITY')

# 3. Create a complete **test set** with the second customer list.
df_test_data = df_purchased_retail_data.join(df_user_splits[1], df_purchased_retail_data.CUSTOMERID == df_user_splits[1].CUSTOMERID,
                                             how='inner').select(df_purchased_retail_data.CUSTOMERID,'ITEM_ID','STOCKCODE','QUANTITY')

-----

<span style="color: #0000A0; font-size: 18pt; font-weight: bold;">Implement **`Model-based Collaborative Filtering for Implicit Feedback`**.</span>

<span style="font-family: monospace; font-size: 12pt;">
Collaborative filtering is commonly used for recommender systems. These techniques aim to fill in the missing entries of a user-item association matrix. **Spark ML** (**`pyspark.ml`**) is a package of machine learning and statistics algorithms written with Spark, built on top of Spark SQL dataframes. And **Spark ML** currently supports **model-based collaborative filtering**, in which users and products are described by a small set of latent factors that can be used to predict missing entries. **`Spark ML`** uses the **alternating least squares (ALS) algorithm** to learn these latent factors. In the **`Spark ML`** the **ALS algorithm** is able to handle **explicit feedback** or **implicit feedback** data.
</span>

<span style="font-family: monospace; font-size: 12pt;">
In this tutorial we are using **implicit feedback** data so I will show you how to model implicit-feedback based recommender systems. This approach was developed on the following paper [Hu, Koren, and Volinsky](http://yifanhu.net/PUB/cf.pdf).
Basically we need to turn the ratings matrix into a confidence matrix with the following equation:
$$C_{ui}=1+\alpha r_{ui}$$ where $C_{ui}$ is the confidence matrix for our users $u$ and our items $i$. The $\alpha$ term represents a linear scaling of the rating preferences (in our case number of purchases) and the $r$ term is our original matrix of purchases. The paper suggests 40 as a good starting point for $\alpha$.
</span>

---
<span style="font-family: monospace; font-size: 12pt;">
For more details on the Collaborative Filtering, please take a look at the below contents later. The implementation in **spark.ml** has the following parameters:
* **rank** is the number of latent factors in the model (defaults to 10). The number of latent features in the user/item feature vectors. The paper recommends varying this between 20-200. Increasing the number of features may overfit but could reduce bias.
* **maxIter** is the maximum number of iterations to run (defaults to 10). The number of times to alternate between both user feature vector and item feature vector in alternating least squares. More iterations will allow better convergence at the cost of increased computation. The authors found 10 iterations was sufficient, but more may be required to converge.
* **regParam** specifies the regularization parameter in ALS (defaults to 1.0). Used for regularization during alternating least squares. Increasing this value may increase bias but decrease variance.
    * **Bias** is high if the concept class cannot model the true data distribution well, and does not depend on training set size (underfitting: when you have high bias).
    * **Variance** depends on the training set size. It decreases with more training data, and increases with more complicated classifiers (overfitting: when you have extra high variance).
* **implicitPrefs** specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data (defaults to false which means using explicit feedback). Implicit weighted ALS from Hu, Koren, and Volinsky 2008. Designed for alternating least squares and implicit feedback based collaborative filtering.
* **alpha** is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations (defaults to 1.0). The parameter associated with the confidence matrix discussed in the paper, where Cui = 1 + alpha*Rui. The paper found a default of 40 most effective. Decreasing this will decrease the variability in confidence between various ratings.
    

**Explicit vs. implicit feedback**
The standard approach to matrix factorization based collaborative filtering treats the entries in the user-item matrix as explicit preferences given by the user to the item, for example, users giving ratings to movies.

It is common in many real-world use cases to only have access to **implicit feedback** (e.g. views, clicks, purchases, likes, shares etc.). The approach used in **spark.ml** to deal with such data is taken from [Hu, Koren, and Volinsky](http://yifanhu.net/PUB/cf.pdf). Essentially, instead of trying to model the matrix of ratings directly, this approach treats the data as numbers representing the strength in observations of user actions (such as the number of clicks, or the cumulative duration someone spent viewing a movie). Those numbers are then related to the level of confidence in observed user preferences, rather than explicit ratings given to items. The model then tries to find latent factors that can be used to predict the expected preference of a user for an item.

**[ALS.scala code](https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala)**
```
if (implicitPrefs) {
  // Extension to the original paper to handle b < 0. confidence is a function of |b|
  // instead so that it is never negative. c1 is confidence - 1.0.
  val c1 = alpha * math.abs(rating)
  // For rating <= 0, the corresponding preference is 0. So the term below is only added
  // for rating > 0. Because YtY is already added, we need to adjust the scaling here.
  if (rating > 0) {
    numExplicits += 1
    ls.add(srcFactor, (c1 + 1.0) / c1, c1)
}
```
</span>

In [18]:
from pyspark.ml.recommendation import ALS

'''
If the rating matrix is derived from another source of information (i.e. it is inferred from other signals),
you can set implicitPrefs to True to get better results:
'''

rank = 25
implicitPrefs=True
iterations = 15
regularization_parameter = 0.1
alpha = 40.0
#seed = 5L

als = ALS(rank=rank, maxIter=iterations, regParam=regularization_parameter, alpha=alpha, implicitPrefs=implicitPrefs,
          userCol="CUSTOMERID", itemCol="ITEM_ID", ratingCol="QUANTITY")

# Training the ALS model.
als_model = als.fit(df_training_data)

-----

<span style="color: #0000A0; font-size: 18pt; font-weight: bold;">Predict **`Confidence`** for Test Set.</span>

<span style="font-family: monospace; font-size: 12pt;">
It will create a new column named **`prediction`** with recommendation confidence scores.
</span>

In [19]:
df_predictions = als_model.transform(df_test_data)

-----

<span style="color: #0000A0; font-size: 18pt; font-weight: bold;">Mission 3. Evaluate the **`ALS Recommender System`**.</span>


<span style="font-family: monospace; font-size: 12pt;">
The following paper [Hu, Koren, and Volinsky](http://yifanhu.net/PUB/cf.pdf) introduced how to evealute an ALS recommender system for implicit feedback: We denote by $rank_{ui}$ the **percentile-ranking of product** $i$ within the ordered list of all products prepared **for user $u$**. This way, $rank_{ui}$ = 0% would mean that product $i$ is predicted to be the most desirable for user $u$, thus preceding all other products in the list. On the other hand, $rank_{ui}$ = 100% indicates that product $i$ is predicted to be the least
preferred for user $u$, thus placed at the end of the list. (We opted for using percentile-ranks rather than absolute ranks in order to make our discussion general and independent of the number of programs.) Our basic quality measure is the expected percentile ranking of a watching unit in the test period, which is: $$\overline{rank}=\frac{\sum_{u,i}r_{ui}rank_{ui}}{\sum_{u,i}r_{ui}}$$ Lower values of $\overline{rank}$ are more desirable, as they indicate ranking actually watched shows closer to the top of the recommendation lists. Notice that for random predictions, the expected value of $rank_{ui}$ is 50% (placing $i$ in the middle of the sorted list). Thus, $\overline{rank}\geqslant 50\%$ indicates an algorithm no better than random.
</span>

---
1. Filter out the prediction results which our model could not calculate their confidence scores and returned NaN. We are using **`filter()`** and **`isnan()`** function and save the output dataframe into **`df_predictions_filtered`**.
2. Convert the confidence scores of each customer to percentile-rank with **`percent_rank()`** window function. With **`windowSpecRank`** window spec definition, we need to partition our data by each customer. And then calcuate the percentile-rank, saving the output column into the new **`PRANK`** column and save the output dataframe to **`df_predictions_pranks`**.
3. Calcuate the new **`PRANK`** column times the **`QUANTITY`** column, saving the output column into the new **`RRANK`** column and save the output dataframe to **`df_pred_cooked`**. And then calculate sum of **`RRANK`** column (numerator) over sum of **`QUANTITY`** column (denominator) with **`sum()`** function.
---

<span style="font-size: 12pt; font-weight: bold;">
[References]
</span>
* [pyspark.sql.functions.isnan()](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.isnan)
* [pyspark.sql.functions.percent_rank()](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.percent_rank)
* [pyspark.sql.functions.sum()](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.sum)
* [DataFrame.filter()](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html)

In [20]:
# 1. Filter out the products which the model is not able to calculate the confidence score.
df_predictions_filtered = df_predictions.filter(~sqlfunc.isnan("prediction"))

# 2. Convert the confidence scores to percentile-rank.
from pyspark.sql.window import Window

windowSpecRank = \
  Window.partitionBy(df_predictions_filtered['CUSTOMERID']) \
    .orderBy(df_predictions_filtered['prediction'].desc())
    
df_predictions_pranks = df_predictions_filtered.withColumn("PRANK", sqlfunc.percent_rank().over(windowSpecRank))

# 3. Calculate the numerator and denominator.
df_pred_cooked = df_predictions_pranks.withColumn("RRANK", (df_predictions_pranks['QUANTITY']*df_predictions_pranks.PRANK).alias('RRANK'))
print df_pred_cooked.select((sqlfunc.sum("RRANK")/sqlfunc.sum("QUANTITY")).alias('RRANK_FINAL')).show()

+-------------------+
|        RRANK_FINAL|
+-------------------+
|0.35321920032051574|
+-------------------+

None


---
<span style="color: #0000A0; font-size: 18pt; font-weight: bold;">A Recommendation Example.</span>

<span style="font-family: monospace; font-size: 12pt;">
Check item lookup dataframe:
</span>

In [21]:
# The contents will not truncate.
df_item_lookup.show(5, False)

+---------+-------+-----------------------------------+
|STOCKCODE|ITEM_ID|DESCRIPTION                        |
+---------+-------+-----------------------------------+
|17174    |107    |ASSTD RASTA KEY-CHAINS             |
|21340    |503    |CLASSIC METAL BIRDCAGE PLANT HOLDER|
|21458    |586    |2 PICTURE BOOK EGGS EASTER BUNNY   |
|21791    |793    |VINTAGE HEADS AND TAILS CARD GAME  |
|22187    |1083   |GREEN CHRISTMAS TREE CARD HOLDER   |
+---------+-------+-----------------------------------+
only showing top 5 rows



<span style="font-family: monospace; font-size: 12pt;">
Select one of customers in the training set:
</span>

In [22]:
df_training_data.select('CUSTOMERID').dropDuplicates().show(5, False)

+----------+
|CUSTOMERID|
+----------+
|12431     |
|12631     |
|13431     |
|13631     |
|13831     |
+----------+
only showing top 5 rows



In [23]:
my_customer = 12431

<span style="font-family: monospace; font-size: 12pt;">
Create a list for all of the products which my customer purchased:
</span>

In [24]:
my_product_list = df_cleaned_retail.filter(df_cleaned_retail['CUSTOMERID'] == my_customer).select('STOCKCODE').dropDuplicates().flatMap(lambda x: x).collect()

In [25]:
print my_product_list

[u'84378', u'21791', u'85099B', u'23079', u'85232D', u'22846', u'22730', u'48129', u'21507', u'21731', u'21239', u'23355', u'22466', u'22629', u'23240', u'22467', u'20685', u'84380', u'21622', u'22352', u'22965', u'23243', u'22966', u'22191', u'22354', u'23245', u'22192', u'23083', u'23084', u'22193', u'22356', u'22194', u'22690', u'22195', u'22196', u'22692', u'48138', u'22859', u'22697', u'22698', u'22699', u'22131', u'21240', u'22907', u'21080', u'21242', u'21243', u'35004B', u'21244', u'35004C', u'22631', u'21245', u'22138', u'35004G', u'21745', u'22971', u'22027', u'22975', u'22029', u'22976', u'22977', u'23300', u'23301', u'22411', u'22413', u'22090', u'21524', u'23306', u'21527', u'84945', u'21094', u'85184C', u'79191C', u'22037', u'21481', u'22988', u'23268', u'22423', u'21533', u'23316', u'23155', u'21537', u'23158', u'23040', u'23202', u'23159', u'23206', u'21428', u'22492', u'22204', u'22045', u'22992', u'23389', u'22382', u'22383', u'79067', u'22385', u'21706', u'21707', u'

<span style="font-family: monospace; font-size: 12pt;">
Using the **`dataframe isin()`** function, pull the descriptions of the products in the list.
</span>

<span style="font-size: 12pt; font-weight: bold;">
[References]
</span>
* [dataframe isin()](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html)

In [26]:
df_my_products = df_item_lookup.filter(df_item_lookup.STOCKCODE.isin(my_product_list))

In [27]:
df_my_products.show(truncate=False)

+---------+-------+---------------------------------+
|STOCKCODE|ITEM_ID|DESCRIPTION                      |
+---------+-------+---------------------------------+
|21791    |793    |VINTAGE HEADS AND TAILS CARD GAME|
|84378    |2851   |SET OF 3 HEART COOKIE CUTTERS    |
|23079    |1919   |TOADSTOOL BEDSIDE LIGHT          |
|85099B   |3236   |JUMBO BAG RED RETROSPOT          |
|22846    |1696   |BREAD BIN DINER STYLE RED        |
|85232D   |3353   |SET/3 DECOUPAGE STACKING TINS    |
|22730    |1584   |ALARM CLOCK BAKELIKE IVORY       |
|48129    |2624   |DOORMAT TOPIARY                  |
|21507    |618    |ELEPHANT, BIRTHDAY CARD,         |
|21731    |757    |RED TOADSTOOL LED NIGHT LIGHT    |
|21239    |442    |PINK  POLKADOT CUP               |
|23355    |2178   |HOT WATER BOTTLE KEEP CALM       |
|22466    |1333   |FAIRY TALE COTTAGE NIGHT LIGHT   |
|20685    |147    |DOORMAT RED RETROSPOT            |
|22467    |1334   |GUMBALL COAT RACK                |
|22629    |1487   |SPACEBOY 

In [28]:
df_my_products.count()

163

<span style="font-family: monospace; font-size: 12pt;">
Using the **`dataframe isin()`** function, pull the products which the customer did not purchase.
</span>

In [29]:
df_products_wo_mine = df_item_lookup.filter(~df_item_lookup.STOCKCODE.isin(my_product_list))

In [30]:
df_products_wo_mine.count()

3521

<span style="font-family: monospace; font-size: 12pt;">
Add my customer ID into the product dataframe which the customer did not purchase. 
New columns can be created only by using literals with **`pyspark.sql.functions.lit()`** function.
</span>

[References]
</span>
* [pyspark.sql.functions.lit()](https://spark.apache.org/docs/1.6.1/api/python/pyspark.sql.html#pyspark.sql.functions.lit)

In [31]:
df_products_wo_mine_w_my_cus_id = df_products_wo_mine.withColumn("CUSTOMERID", sqlfunc.lit(my_customer))

<span style="font-family: monospace; font-size: 12pt;">
Predict new products for my customer:
</span>

In [32]:
df_my_new_products_predict = als_model.transform(df_products_wo_mine_w_my_cus_id)

<span style="font-family: monospace; font-size: 12pt;">
Top 10 highly recommended products:
</span>

In [33]:
# 1. Filter out the products which the model is not able to calculate the confidence score.
df_my_new_products_predict_filtered = df_my_new_products_predict.filter(~sqlfunc.isnan("prediction"))
df_my_new_products_predict_filtered.sort(df_my_new_products_predict_filtered.prediction.desc()).select(['STOCKCODE','DESCRIPTION']).show(10, False)

+---------+---------------------------------+
|STOCKCODE|DESCRIPTION                      |
+---------+---------------------------------+
|37370    |RETRO COFFEE MUGS ASSORTED       |
|22357    |KINGS CHOICE BISCUIT TIN         |
|21875    |KINGS CHOICE MUG                 |
|22358    |KINGS CHOICE TEA CADDY           |
|21992    |VINTAGE PAISLEY STATIONERY SET   |
|22364    |GLASS JAR DIGESTIVE BISCUITS     |
|22361    |GLASS JAR DAISY FRESH COTTON WOOL|
|22807    |SET OF 6 T-LIGHTS TOADSTOOLS     |
|22042    |CHRISTMAS CARD SINGING ANGEL     |
|21135    |VICTORIAN  METAL POSTCARD SPRING |
+---------+---------------------------------+
only showing top 10 rows

