You may find this series of notebooks at https://github.com/databricks-industry-solutions/als-recommender. For more information about this solution accelerator, visit https://www.databricks.com/solutions/accelerators/recommendation-engines

The purpose of this notebook is to prepare the data for use in the ALS recommender. 

## Introduction

In this notebook, we will make accessible purchase history data which will be used as the basis for the construction of a matrix factorization recommender.  The dataset we will use is the [Instacart dataset](https://www.kaggle.com/c/instacart-market-basket-analysis), downloadable from the Kaggle website. We will make the data available through a set of queryable tables and then derive implied ratings from the data before proceeding to the next notebook.

## Step 1: Data Preparation

The data in the Instacart dataset should be [downloaded](https://www.kaggle.com/c/instacart-market-basket-analysis) and uploaded to cloud storage. The cloud storage location should then be [mounted](https://docs.databricks.com/data/databricks-file-system.html#mount-object-storage-to-dbfs) to the Databricks file system as shown here:</p>

<img src='https://brysmiwasb.blob.core.windows.net/demos/images/instacart_filedownloads.png' width=240>

**NOTE** The name of the mount point, file locations and database used is configurable within the *00_Intro & Config* notebook.

The individual files that make up each entity in this dataset can then be presented as a queryable table as part of a database with a high-level schema as follows:</p>

<img src='https://brysmiwasb.blob.core.windows.net/demos/images/instacart_schema2.png' width=300>

We have automated this data preparation step for you in the notebook below and used a `/tmp/instacart_als` storage path throughout this accelerator in place of the mount path. 

In [0]:
%run "./util/data-extract"

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m


<Experiment: artifact_location='dbfs:/databricks/mlflow-tracking/4413013109107832', creation_time=1711965018511, experiment_id='4413013109107832', last_update_time=1711965018511, lifecycle_stage='active', name='/Users/mahendra.v@sapiens.com/als-recommender', tags={'mlflow.experiment.sourceName': '/Users/mahendra.v@sapiens.com/als-recommender',
 'mlflow.experimentType': 'MLFLOW_EXPERIMENT',
 'mlflow.ownerEmail': 'mahendra.v@sapiens.com',
 'mlflow.ownerId': '6670788333455762'}>

Downloading instacart-market-basket-analysis.zip to /databricks/driver


  0%|          | 0.00/196M [00:00<?, ?B/s]  1%|          | 1.00M/196M [00:00<00:45, 4.50MB/s]  2%|▏         | 3.00M/196M [00:00<00:19, 10.4MB/s]  4%|▎         | 7.00M/196M [00:00<00:09, 20.5MB/s]  6%|▌         | 11.0M/196M [00:00<00:07, 26.4MB/s]  7%|▋         | 14.0M/196M [00:00<00:06, 27.3MB/s] 10%|▉         | 19.0M/196M [00:00<00:05, 33.7MB/s] 12%|█▏        | 23.0M/196M [00:00<00:05, 34.1MB/s] 14%|█▍        | 27.0M/196M [00:01<00:06, 27.7MB/s] 15%|█▌        | 30.0M/196M [00:01<00:09, 18.7MB/s] 17%|█▋        | 34.0M/196M [00:01<00:07, 22.9MB/s] 19%|█▉        | 37.0M/196M [00:01<00:07, 21.6MB/s] 21%|██▏       | 42.0M/196M [00:01<00:05, 27.6MB/s] 24%|██▍       | 47.0M/196M [00:01<00:05, 28.4MB/s] 26%|██▌       | 51.0M/196M [00:02<00:04, 31.3MB/s] 28%|██▊       | 55.0M/196M [00:02<00:04, 33.7MB/s] 30%|███       | 59.0M/196M [00:02<00:04, 30.0MB/s] 33%|███▎      | 64.0M/196M [00:02<00:04, 34.4MB/s] 35%|███▍      | 68.0M/196M [00:02<00:04, 32.4MB/s] 37%|███▋      | 72.


Archive:  instacart-market-basket-analysis.zip
  inflating: aisles.csv.zip          
  inflating: departments.csv.zip     
  inflating: order_products__prior.csv.zip  
  inflating: order_products__train.csv.zip  
  inflating: orders.csv.zip          
  inflating: products.csv.zip        
  inflating: sample_submission.csv.zip  
Archive:  aisles.csv.zip
  inflating: aisles.csv              
   creating: __MACOSX/
  inflating: __MACOSX/._aisles.csv   
Archive:  departments.csv.zip
  inflating: departments.csv         
  inflating: __MACOSX/._departments.csv  
Archive:  order_products__prior.csv.zip
  inflating: order_products__prior.csv  
  inflating: __MACOSX/._order_products__prior.csv  
Archive:  order_products__train.csv.zip
  inflating: order_products__train.csv  
  inflating: __MACOSX/._order_products__train.csv  
Archive:  orders.csv.zip
  inflating: orders.csv              
  inflating: __MACOSX/._orders.csv   
Archive:  products.csv.zip
  inflating: products.csv            
  i

True

In [0]:
from pyspark.sql.types import *
import pyspark.sql.functions as fn
from pyspark.sql import window as w

In [0]:
def read_data(file_path, schema):
  df = (
    spark
      .read
      .csv(
        file_path,
        header=True,
        schema=schema
        )
    )
  return df

def write_data(df, table_name):
   _ = (
       df
        .write
        .format('delta')
        .mode('overwrite')
        .option('overwriteSchema','true')
        .saveAsTable(table_name)
       )  

In [0]:
orders_schema = StructType([
  StructField('order_id', IntegerType()),
  StructField('user_id', IntegerType()),
  StructField('eval_set', StringType()),
  StructField('order_number', IntegerType()),
  StructField('order_dow', IntegerType()),
  StructField('order_hour_of_day', IntegerType()),
  StructField('days_since_prior_order', FloatType())
  ])

orders = read_data(config['orders_path'], orders_schema)
write_data( orders, '{0}.als.orders'.format(config['database']))

In [0]:
# orders data
# # ---------------------------------------------------------
# orders_schema = StructType([
#   StructField('order_id', IntegerType()),
#   StructField('user_id', IntegerType()),
#   StructField('eval_set', StringType()),
#   StructField('order_number', IntegerType()),
#   StructField('order_dow', IntegerType()),
#   StructField('order_hour_of_day', IntegerType()),
#   StructField('days_since_prior_order', FloatType())
#   ])

# orders = read_data(config['orders_path'], orders_schema)
# write_data( orders, '{0}.als.orders'.format(config['database']))
# ---------------------------------------------------------

# products
# ---------------------------------------------------------
# products_schema = StructType([
#   StructField('product_id', IntegerType()),
#   StructField('product_name', StringType()),
#   StructField('aisle_id', IntegerType()),
#   StructField('department_id', IntegerType())
#   ])

# products = read_data( config['products_path'], products_schema)
# write_data( products, '{0}.als.products'.format(config['database']))
# ---------------------------------------------------------

# order products
# ---------------------------------------------------------
order_products_schema = StructType([
  StructField('order_id', IntegerType()),
  StructField('product_id', IntegerType()),
  StructField('add_to_cart_order', IntegerType()),
  StructField('reordered', IntegerType())
  ])

order_products = read_data( config['order_products_path'], order_products_schema)
write_data( order_products, '{0}.als.order_products'.format(config['database']))
# ---------------------------------------------------------

# departments
# ---------------------------------------------------------
departments_schema = StructType([
  StructField('department_id', IntegerType()),
  StructField('department', StringType())  
  ])

departments = read_data( config['departments_path'], departments_schema)
write_data( departments, '{0}.als.departments'.format(config['database']))
# ---------------------------------------------------------

# aisles
# ---------------------------------------------------------
aisles_schema = StructType([
  StructField('aisle_id', IntegerType()),
  StructField('aisle', StringType())  
  ])

aisles = read_data( config['aisles_path'], aisles_schema)
write_data( aisles, '{0}.als.aisles'.format(config['database']))
# ---------------------------------------------------------

In [0]:
display(
  spark
    .sql('SHOW TABLES')
)


com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:103)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2(SequenceExecutionState.scala:103)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2$adapted(SequenceExecutionState.scala:100)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:100)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:714)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:430)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:430)
	at com.databricks.spark.chauffeur.ChauffeurState.cancelExecutio

## Step 2: Generate Ratings

The records that make up the Instacart dataset represent grocery purchases. As would be expected in a grocery scenario, there are no explicit ratings provided in this dataset. Explicit ratings are typically found in scenarios where users are significantly invested (either monetarily or in terms of time or social standing) in the items they are purchasing or consuming.  When we are considering apples and bananas purchased to have around the house as a snack or to be dropped in a kid's lunch, most users are just not interested in providing 1 to 5 star ratings on those items.

We therefore need to examine the data for implied ratings (preferences).  In a grocery scenario where items are purchased for consumption, repeat purchases may provide a strong signal of preference. [Douglas Oard and Jinmook Kim](https://terpconnect.umd.edu/~oard/pdf/aaai98.pdf) provide a nice discussion of the various ways we might derive implicit ratings in a variety of scenarios and it is certainly worth considering alternative ways of deriving an input metric.  However, for the sake of simplicity, we'll leverage the percentage of purchases involving a particular item as our implied rating:

In [0]:
%sql
DROP VIEW IF EXISTS adv_analytics_poc.als.user_product_purchases;

CREATE VIEW adv_analytics_poc.als.user_product_purchases
AS
  SELECT
    monotonically_increasing_id() as row_id,
    x.user_id,
    x.product_id,
    x.product_purchases / y.purchase_events as rating
  FROM (  -- product purchases
    SELECT
      a.user_id,
      b.product_id,
      COUNT(*) as product_purchases
    FROM adv_analytics_poc.als.orders a
    INNER JOIN adv_analytics_poc.als.order_products b
      ON a.order_id=b.order_id
    INNER JOIN adv_analytics_poc.als.products c
      ON b.product_id=c.product_id
    GROUP BY a.user_id, b.product_id
    ) x 
  INNER JOIN ( -- purchase events
    SELECT 
      user_id, 
      COUNT(DISTINCT order_id) as purchase_events 
    FROM adv_analytics_poc.als.orders 
    GROUP BY user_id
    ) y
    ON x.user_id=y.user_id
    ;
    
SELECT *
FROM adv_analytics_poc.als.user_product_purchases;

row_id,user_id,product_id,rating
0,178520,41276,0.8245614035087719
1,156122,12962,0.0943396226415094
2,22352,15873,0.1111111111111111
3,139016,31506,0.0833333333333333
4,135442,1529,0.12
5,135442,3464,0.4
6,152610,11175,0.074074074074074
7,45082,26878,0.0909090909090909
8,118860,28934,0.0625
9,118860,4656,0.1458333333333333


© 2022 Databricks, Inc. All rights reserved. The source in this notebook is provided subject to the Databricks License.