## DATA 612 Final Project: Instacart Recommendations Using Spark

Instacart in one of the most popular grocery ordering and delivery apps. After browsing and selecting products through the app, personal shoppers review orders and do in-store shopping and deliveries for customers. Due to the vast majority of available retailers, inventory size, and diverse customer base, it is very important for Instacart to assist its customers in navigating  the wealth of available options by providing useful personalized recommendations. It is in this light that I will build a recommender system using the Instacart Online Grocery Shopping Dataset 2017 that they have made publicly available.

### The System

Explicit ratings for products were not provided, therefore they recommender will be based on **implicit ratings** based on the number of times the product was purchased by each user. A collaborative filtering approach will be used because products will be recommended based on the interactions of other users.  The main algorithm that will be used is **Alternative Least Squares (ALS)** because it is a proven method for decomposing the very large user-item matrix into lower dimensional user factors and item factors and also because it has been optimized for the distrubutive in-memory processing of the Spark framework. The goal is to  minimizing the **RMSE** by tuning different parameters of the ALS function. After suitable parameters have been determined, the model will be built on an **EC2** instance and a web application using **Python's Flask** framework will built to demonstrate it's use in an online setting. 

### Data Exploration

The dataset is anonymized and contains a sample of over **3 million** grocery orders from more than 200,000 Instacart users. The data is a relational database separated into 6 tables describing customers' orders over time. For each user, between 4 and 100 of their orders are provided, with the sequence of products purchased in each order. 

The dataset is provided as-is for non-commercial use, and may be accessed from https://www.instacart.com/datasets/grocery-shopping-2017, or from the Kaggle competition "Instacart Market Basket Analysis" at https://www.kaggle.com/c/instacart-market-basket-analysis/overview. The data will be cited as specified.

We will start by loading the data into Spark.


#### Connecting to Spark and Loading Libraries

The data set is quite large so we intialize the Spark session with a healthy amount of RAM.

In [1]:
import findspark
findspark.init()

from pyspark.sql import  SparkSession
from pyspark.sql.types import StructType, IntegerType, StructField
from pyspark.conf import SparkConf
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
from pyspark.ml.evaluation import RegressionEvaluator
import pandas as pd
import numpy as np
import math

# Create a spark session
spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '20G'),
                                        ('spark.app.name', 'Spark Updated Conf'), 
                                        ('spark.executor.cores', '4'), 
                                        ('spark.cores.max', '4'), 
                                        ('spark.driver.memory','20G'),
                                        ('spark.executor.memoryOverhead', '20G')])
spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext


#### Load Data

Here will load the data files and persist them so that we may access them in future without havig to re-load them.

In [2]:
try:
	aisles_df = spark.read.parquet("aisles.parquet")
	departments_df = spark.read.parquet("departments.parquet")
	order_products_df = spark.read.parquet("order_products.parquet")
	orders_df = spark.read.parquet("orders.parquet")
	products_df = spark.read.parquet("products.parquet")

except:
	# if not, load data files
	aisles_df = spark.read.format('csv').options(header='true', inferSchema='true').load('data/aisles.csv')
	departments_df = spark.read.format('csv').options(header='true', inferSchema='true').load('data/departments.csv')
	order_products = spark.read.format('csv').options(header='true', inferSchema='true').load('data/order_products__prior.csv')
	order_products_df = order_products.union(spark.read.format('csv').options(header='true', inferSchema='true').load('data/order_products__train.csv'))
	orders_df = spark.read.format('csv').options(header='true', inferSchema='true').load('data/orders.csv')
	products_df = spark.read.format('csv').options(header='true', inferSchema='true').load('data/products.csv')
	# persist data to Spark for future access
	aisles_df.write.parquet("aisles.parquet", mode = "overwrite" )
	departments_df.write.parquet("departments.parquet")
	order_products_df.write.parquet("order_products.parquet")
	orders_df.write.parquet("orders.parquet")
	products_df.write.parquet("products.parquet")
    
users = orders_df.groupBy("user_id").count()
users = users.drop("count")

#### Data Exploration

In [30]:
top_products = order_products_df.groupBy("product_id").count()
top_products = top_products.sort("count", ascending=False)
top_products = top_products.limit(1).join(products_df, top_products.product_id == products_df.product_id)
top_product=top_products.collect()[0]['product_name']

print("The number of users are " + str(users.count()) )
print("The number of products are " + str(products_df.count()) )
print("The number of orders are " + str(orders_df.count()) )
print("The number of aisles are " + str(aisles_df.count()) )
print("The number of departments are " + str(departments_df.count()) )
print("The top product is " + str(top_product))

The number of users are 206209
The number of products are 49688
The number of orders are 3421083
The number of aisles are 134
The number of departments are 21
The top product is Banana


As you can see, there over **200,000 users, almost 50,000 products from a total of 134 asles, and over 3.4 million orders**. The top rated product is **banana** based on number of orders. Now lets calculate the implicit ratings for user and product interactions based on the number of purchases by each user.

In [3]:
implicit_ratings = order_products_df.join(orders_df, 
                                          order_products_df.order_id == orders_df.order_id).groupBy("user_id", 
                                                                                                    "product_id").count()
print("There are " + str(implicit_ratings.count()) + " ratings.")
implicit_ratings.show(20)

There are 13863746 ratings.
+-------+----------+-----+
|user_id|product_id|count|
+-------+----------+-----+
|  22352|     15873|    1|
| 152610|     11175|    2|
| 118860|     46979|   18|
|   5430|     15424|   16|
| 106387|     33768|    1|
| 175918|     30850|    2|
| 167393|     43692|    2|
|  40286|     23001|    1|
|   4076|     47049|    9|
| 184598|     27086|    4|
| 171199|     17896|    1|
|  46527|     24497|    3|
| 185432|     24852|   20|
| 185432|     43014|   22|
|  91926|     11182|    4|
| 176010|     32134|   12|
|  58707|     32373|    5|
| 143707|     13176|   18|
|  85359|     21137|   32|
|  41502|     29487|    3|
+-------+----------+-----+
only showing top 20 rows



As you can see, there are over **13 million ratings**! The first 20 ratings are shown. Now lets train ALS model to find the best rank and select the one with the lowest RMSE to go in the final model. We will take a sample of the data to reduce computation time. We will first sample the data, the split into training, test, and evaluations sets. Because we are using implicit ratings, we will use the *trainImplicit* function from PySpark's mlib. Part of this code was inspired from __[here](https://www.codementor.io/jadianes/building-a-recommender-with-apache-spark-python-example-app-part1-du1083qbw)__

In [17]:
# sample ratings
sample_ratings = implicit_ratings.sample(False, 0.1).limit(11500000)

# create train, test, and validation sets
training, validation, test = sample_ratings.rdd.randomSplit([6, 2, 2])
validation_prediction = validation.map(lambda x: (x[0], x[1]))

# set parameters
iterations = 20
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.trainImplicit(training, rank, seed=5, iterations=iterations)
    predictions = model.predictAll(validation_prediction).map(lambda r: ((r[0], r[1]), r[2]))
    rates_predictions = validation.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_predictions.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print('For rank %s the RMSE is %s' % (rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank

print('The best model was trained with rank %s' % best_rank)

For rank 4 the RMSE is 4.3713949667434555
For rank 8 the RMSE is 4.3716101198856645
For rank 12 the RMSE is 4.371843867487587
The best model was trained with rank 4


The best model had a rank of four we, will use this value to train the full model and make recommendations.

### Make Recommendations

We will now train the **implicit ALS** model using a Rank of 4 and and 20 iterations. The model will be saved for future use such that our web application will only retrain on a needs basis.

In [8]:
#train full model
rank = 4
numIterations = 20
final_model = ALS.trainImplicit(implicit_ratings, rank, numIterations)
final_model.save(sc, "als_final")

The function below will accept a user and the number of recommendations to make for that for that user. The recommendations will then me mapped to the product an returned. I wrote the function for easy integration into our web application to facilitate AJAX requests and that is why it returns a dictionary containing the status of the request and a list of products as data. 

In [20]:
#make product recommendations to user
def recommend(user_id, num_products):
    # list to store recommended product IDs
    product_ids=[]
    # get recommendations as Ratings object
    recommendations = final_model.recommendProducts(int(user_id), num_products)

    # access Ratings object and extract product IDs
    for rec in recommendations:
        product_ids.append(rec.product)

    # map product IDs to product an get aisle and department information
    products = products_df.filter(products_df["product_id"].isin(product_ids))
    products = products.join(aisles_df,products_df.aisle_id == aisles_df.aisle_id)
    products = products.join(departments_df, products_df.department_id == departments_df.department_id)
    products = products.drop("aisle_id","department_id")

    return {"success":True, "webdata":products.toPandas().values.tolist(), "data":products.toPandas()}

Let's  get 20 recommendations for user 25 and 678.

In [28]:
user25 = recommend(25, 20)
print("User 25's Recommendations:")
print(user25['data'].to_string(index=False))

print("\n\n")

user678 = recommend(678, 20)
print("User 678's Recommendations:")
print(user678['data'].to_string(index=False))



User 25's Recommendations:
product_id                   product_name                       aisle department
      5876                  Organic Lemon                fresh fruits    produce
      8277       Apple Honeycrisp Organic                fresh fruits    produce
      8518              Organic Red Onion            fresh vegetables    produce
     10749        Organic Red Bell Pepper            fresh vegetables    produce
     13176         Bag of Organic Bananas                fresh fruits    produce
     17794                        Carrots            fresh vegetables    produce
     21137           Organic Strawberries                fresh fruits    produce
     21903           Organic Baby Spinach  packaged vegetables fruits    produce
     22935           Organic Yellow Onion            fresh vegetables    produce
     24964                 Organic Garlic            fresh vegetables    produce
     27104              Fresh Cauliflower            fresh vegetables    produce
 

The recommendations seem quite interesting. It seems that user 25 is into healthly fruits and vegetables which is not surprizing in the current hype health and fitness awareness. User 678 on the other hand gets a fair share of fruits and vegetables but get products from several departments such as the dips from the deli and mayonnaise from the pantry. Now lets wrap this into a web application.

### Web Application

The recommender above will be transformed into a web application using **Flask**. This is a fairly complex application for users that are new to web development because it relies on AJAX requests that allows the interface to be updated without refreshing the page. It uses **Javascript** produce the charts and uses JSON to serve **HTTP** requests. I created an **Object Oriented** class to serve the application from Python. The Python code is below. The Python code is only a portion of the application. It needs the **HTML, CSS, and JavaScript** files to run. It should be noted that I have used this HTML template in a group setting before and I have adopted it for this ALS recommender. You will need to research these topics if you are not familiar with them as the code below is just for demonstrative purposes. You can access the web application at http://35.170.73.218:5000/dashboard. 
All the code and other web files may be found at https://github.com/albert-gilharry/DATA-612/tree/master/Final%20Project. 

In [None]:
# -*- coding: utf-8 -*-
"""
DATA 612 Assignment Final Project: Instacart Recommender
Authors: 
    Albert Gilharry
"""

import findspark
findspark.init()

from pyspark.sql import  SparkSession
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel
from flask import Flask, render_template, request
import json


app = Flask(__name__)

class Recommender:
    
    def __init__(self):
        self.department_dist = {"departments":[], "orders":[]}
        #initialize Spark connection
        # initialize a spark session
        self.spark = SparkSession.builder.getOrCreate()
        conf = self.spark.sparkContext._conf.setAll([('spark.executor.memory', '20G'),
                                                ('spark.app.name', 'Spark Updated Conf'), 
                                                ('spark.executor.cores', '4'), 
                                                ('spark.cores.max', '4'), 
                                                ('spark.driver.memory','20G'),
                                                ('spark.executor.memoryOverhead', '20G')])
        self.spark = SparkSession.builder.config(conf=conf).getOrCreate()
        self.sc = self.spark.sparkContext
        self.loadData()
    
    def loadData(self):
        # check if data is already loaded into Spark
        try:
            self.aisles_df = self.spark.read.parquet("aisles.parquet")
            self.departments_df = self.spark.read.parquet("departments.parquet")
            self.order_products_df = self.spark.read.parquet("order_products.parquet")
            self.orders_df = self.spark.read.parquet("orders.parquet")
            self.products_df = self.spark.read.parquet("products.parquet")
    
        except:
            # if not, load data files
            self.aisles_df = self.spark.read.format('csv').options(header='true', inferSchema='true').load('data/aisles.csv')
            self.departments_df = self.spark.read.format('csv').options(header='true', inferSchema='true').load('data/departments.csv')
            order_products = self.spark.read.format('csv').options(header='true', inferSchema='true').load('data/order_products__prior.csv')
            self.order_products_df = order_products.union(self.spark.read.format('csv').options(header='true', inferSchema='true').load('data/order_products__train.csv'))
            self.orders_df = self.spark.read.format('csv').options(header='true', inferSchema='true').load('data/orders.csv')
            self.products_df = self.spark.read.format('csv').options(header='true', inferSchema='true').load('data/products.csv')
            # persist data to Spark for future access
            self.aisles_df.write.parquet("aisles.parquet", mode = "overwrite" )
            self.departments_df.write.parquet("departments.parquet")
            self.order_products_df.write.parquet("order_products.parquet")
            self.orders_df.write.parquet("orders.parquet")
            self.products_df.write.parquet("products.parquet")
        self.implicitRatings()
            
    def implicitRatings(self):
        # create users table
        self.users = self.orders_df.groupBy("user_id").count()
        self.users = self.users.drop("count")
        self.num_users = self.users.count()
        # create ratings table
        self.implicit_ratings = self.order_products_df.join(self.orders_df, self.order_products_df.order_id == self.orders_df.order_id).groupBy("user_id", "product_id").count()
        
        # train ALS model if it is not available
    def trainALS(self):
        try:
            self.model = MatrixFactorizationModel.load(self.sc, "als_final")
        except (RuntimeError, TypeError, NameError) as e:
            rank = 4
            numIterations = 20
            self.model = ALS.trainImplicit(self.implicit_ratings, rank, numIterations)
            self.model.save(self.sc, "als_final")
         
         
    # Get the recommended products to user    
    def getRecommendations( self, user_id, n):
        recommendedValue = self.item_similarity_top_k[self.item_similarity_top_k['user_id'] == user_id ]
        return list(recommendedValue["item_id"].astype(str))
	
    # Create dashboard visuals
    def getVisuals(self):
        
        # get popular aisles
        aisles_dist = {"success":True,"data":[]}
        
        try:
            self.aisles_distribution = self.spark.read.parquet("aisles_distribution.parquet")
        except:
            aisles_distribution = self.implicit_ratings.join(self.products_df, self.implicit_ratings.product_id == self.products_df.product_id)
            aisles_distribution=aisles_distribution.join(self.aisles_df,aisles_distribution.aisle_id == self.aisles_df.aisle_id).groupBy("aisle").count()
            aisles_distribution=aisles_distribution.sort("count", ascending=False)
            self.aisles_distribution=aisles_distribution.limit(10)
            self.aisles_distribution.write.parquet("aisles_distribution.parquet", mode = "overwrite")
        aisles_dist['data'] = self.aisles_distribution.toPandas().values.tolist()
        
        # get orders by hour of day
        hourly_orders = self.orders_df.groupBy("order_hour_of_day").count()
        hourly_orders = hourly_orders.sort("count")
        hour_dist = {"success":True,"hour":hourly_orders.select("order_hour_of_day").toPandas().values.tolist(), 
                     "orders":hourly_orders.select("count").toPandas().values.tolist()}
        
        # get orders by day of week
        weekly_orders = self.orders_df.groupBy("order_dow").count()
        weekly_orders = weekly_orders.sort("count")
        weekly_orders_pd = weekly_orders.toPandas()
        weekly_orders_pd = weekly_orders_pd.replace({0:"Sunday",1:"Monday",2:"Tuesday",3:"Wednesday",4:"Thursday",5:"Friday", 6:"Saturday"})
        dow_dist = {"success":True, "data":weekly_orders_pd.values.tolist()}
        
        # other stats
        num_orders = self.orders_df.count()
        num_users = self.users.count()
        num_products = self.products_df.count()
        top_products = self.order_products_df.groupBy("product_id").count()
        top_products = top_products.sort("count", ascending=False)
        top_products = top_products.limit(1).join(self.products_df, top_products.product_id == self.products_df.product_id)
        top_product=top_products.collect()[0]['product_name']
        
        return {"aisles":aisles_dist,"doweek":dow_dist,"hour_of_day":hour_dist,
                "num_orders":num_orders, 
                "num_users":num_users,
                "num_products":num_products,
                "top_product":top_product}
        
    # Get a sample of users to reduce load on the interface
    def sampleUsers(self):
        self.users.sample(False, 0.1).limit(200).toPandas().values.tolist()
        return {"success":True, "data": self.users.sample(False, 0.1).limit(200).toPandas().values.tolist()}
    
    # Send recommendations along with additonal product information to the browswer 
    def recommend(self, user_id):
        self.trainALS()
        product_ids=[]
        recommendations = self.model.recommendProducts(int(user_id), 20)
        print(recommendations)
        for rec in recommendations:
            product_ids.append(rec.product)
            print(int(rec.product))
        print(product_ids)
        products = self.products_df.filter(self.products_df["product_id"].isin(product_ids))
        products = products.join(self.aisles_df,self.products_df.aisle_id == self.aisles_df.aisle_id)
        products = products.join(self.departments_df, self.products_df.department_id == self.departments_df.department_id)
        products = products.drop("aisle_id","department_id")
        
        return {"success":True, "data":products.toPandas().values.tolist()}

recommender = Recommender()

@app.route("/")
def main():
    return render_template('index.html')

@app.route("/dashboard")
def dashboard():
    return render_template('index.html')

@app.route("/recommendations")
def recommendations():
    return render_template('recommendations.html')

@app.route("/getGraphics",methods=['GET'])
def getGraphics():
    graphics = recommender.getVisuals()
    return json.dumps(graphics) 

@app.route("/sampleUsers",methods=['GET'])
def sampleUsers():
    sample = recommender.sampleUsers()
    return json.dumps(sample) 

@app.route("/getRecommendations",methods=['POST'])
def getRecommendations():
    user_id = request.form['user']
    recommendations = recommender.recommend(user_id)
    return json.dumps(recommendations) 


if __name__ == "__main__":
    app.run(host='0.0.0.0', port=5000)
