In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 5GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/orders/Order.all.20200901_20200930.csv
/kaggle/input/orders/Order.all.20200801_20200831.csv
/kaggle/input/orders/Order.all.20200701_20200731.csv


In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.0.1.tar.gz (204.2 MB)
[K     |████████████████████████████████| 204.2 MB 28 kB/s s eta 0:00:01
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 53.9 MB/s eta 0:00:01
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612244 sha256=80b9981ccfb41b597e8a9ea0e196202ee43d54d30253aaf93d8280de73950ed9
  Stored in directory: /root/.cache/pip/wheels/5e/34/fa/b37b5cef503fc5148b478b2495043ba61b079120b7ff379f9b
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1
You should consider upgrading via the '/opt/conda/bin/python3.7 -m pip install --upgrade pip' command.[0m


In [3]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import col
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

import pandas as pd

In [4]:
spark = SparkSession \
    .builder \
    .appName("sales") \
    .config("spark.driver.maxResultSize", "96g") \
    .config("spark.driver.memory", "96g") \
    .config("spark.executor.memory", "8g") \
    .getOrCreate()

In [5]:
def read_data(csv_file):
  df = spark.read.csv(csv_file, header=True, inferSchema=True)
  return df

In [6]:
# data from July, August and September
sales_07 = read_data('../input/orders/Order.all.20200701_20200731.csv')
sales_08 = read_data('../input/orders/Order.all.20200801_20200831.csv')
sales_09 = read_data('../input/orders/Order.all.20200901_20200930.csv')

In [7]:
# only use the completed orders
def data_cleaning(df):
  completed_orders = df.filter(col('Order Status') == 'Completed')
  completed_orders = completed_orders.select('Username (Buyer)','Product Name','Quantity')
  return completed_orders

In [8]:
df_07 = data_cleaning(sales_07)
df_08 = data_cleaning(sales_08)
df_09 = data_cleaning(sales_09)

In [9]:
df = df_07.unionByName(df_08).unionByName(df_09)

In [10]:
# transform user name to integer id
users = df.select('Username (Buyer)').distinct()
users = users.coalesce(1)
users = users.withColumn(
"userIntId", monotonically_increasing_id()).persist()
# users.show()

In [11]:
# transform product name to integer id
items = df.select('Product Name').distinct()
items = items.coalesce(1)
items = items.withColumn(
"itemIntId", monotonically_increasing_id()).persist()
# items.show()

In [12]:
sales_w_int_ids = df.join(
users, "Username (Buyer)", "left").join(items, "Product Name", "left")
# sales_w_int_ids.show()

In [13]:
sales_data = sales_w_int_ids.select(
                                        col("userIntId").alias("userId"),
                                        col("itemIntId").alias("itemId"),
                                        col("Quantity"))
# sales_data.show()

In [14]:
users = sales_data.select("userId").distinct()
items = sales_data.select("itemId").distinct()

cross_join = users.crossJoin(items).join(sales_data, ["userId", "itemId"], "left").fillna(0).persist()
# cross_join.show()

In [15]:
# Sparsity
sparsity = 1 - sales_data.count()/(users.count()*items.count())

In [16]:
print ("Sparsity: ", sparsity)

Sparsity:  0.9590305330343281


In [17]:
userCol = "userId"
itemCol = "itemId"
ratingCol = "Quantity"

In [18]:
# https://github.com/jamenlong/ALS_expected_percent_rank_cv/blob/master/ROEM_function.py

#Expected percentile rank error metric function
def ROEM(predictions, userCol = userCol, itemCol = itemCol, ratingCol = ratingCol):
  #Creates table that can be queried
  predictions.createOrReplaceTempView("predictions")

  #Sum of total number of purchases of all products
  denominator = predictions.groupBy().sum(ratingCol).collect()[0][0]

  #Calculating rankings of products predictions by user
  spark.sql("SELECT " + userCol + " , " + ratingCol + " , PERCENT_RANK() OVER (PARTITION BY " + userCol + " ORDER BY prediction DESC) AS rank FROM predictions").createOrReplaceTempView("rankings")

  #Multiplies the rank of each product by the number of purchases and adds the products together
  numerator = spark.sql('SELECT SUM(' + ratingCol + ' * rank) FROM rankings').collect()[0][0]

  performance = numerator/denominator

  return performance

In [19]:
(train, test) = cross_join.randomSplit([.8, .2], seed=12)

In [20]:
# Empty list to be filled with models
model_list = []
params_list = []
roems = []


ranks = [4,5,6]
maxIters = [13,14,15]
regParams = [0.01,0.015,0.02]
alphas = [12,13,14]

In [21]:
# For loop will automatically create and store ALS models
for r in ranks:
    for mi in maxIters:
        for rp in regParams:
            for a in alphas:
                params_list.append({'rank': r, 'maxIter': mi, 'regParam': rp, 'alpha': a})
                model_list.append(ALS(userCol= userCol, itemCol= itemCol, ratingCol= ratingCol, 
                                      rank = r, maxIter = mi, regParam = rp, alpha = a, 
                                      coldStartStrategy="drop",nonnegative = True, implicitPrefs = True))

In [22]:
len(params_list)

81

In [None]:
for model in model_list:
  # Fits each model to the training data
  trained_model = model.fit(train)
  # Generates test predictions
  predictions = trained_model.transform(test)
  # Evaluates each model's performance
  roems.append(ROEM(predictions))

In [None]:
roems

In [24]:
df_params = pd.DataFrame(params_list)

In [25]:
df_params['ROEM'] = roems

In [26]:
df_params[:40]

Unnamed: 0,rank,maxIter,regParam,alpha,ROEM
0,4,13,0.01,12,0.041962
1,4,13,0.01,13,0.041962
2,4,13,0.01,14,0.041962
3,4,13,0.015,12,0.041962
4,4,13,0.015,13,0.041962
5,4,13,0.015,14,0.041962
6,4,13,0.02,12,0.041962
7,4,13,0.02,13,0.041962
8,4,13,0.02,14,0.041962
9,4,14,0.01,12,0.041962


In [None]:
df_params.iloc[np.argmin(df_params.ROEM)]

In [27]:
df_params.iloc[np.argmin(df_params.ROEM)]

rank         5.00000
maxIter     14.00000
regParam     0.02000
alpha       14.00000
ROEM         0.01962
Name: 44, dtype: float64

In [28]:
best_model = ALS(userCol= userCol, itemCol= itemCol, ratingCol= ratingCol, 
                                      rank = 5, maxIter = 14, regParam = 0.02, alpha = 14, 
                                      coldStartStrategy="drop",nonnegative = True, implicitPrefs = True)

In [29]:
fit_best_model = best_model.fit(cross_join)
predictions = fit_best_model.transform(cross_join)

In [31]:
predictions.collect()

[Row(userId=148, itemId=28, Quantity=0, prediction=0.0),
 Row(userId=243, itemId=28, Quantity=0, prediction=0.09610066562891006),
 Row(userId=31, itemId=28, Quantity=0, prediction=0.0),
 Row(userId=85, itemId=28, Quantity=0, prediction=0.0),
 Row(userId=137, itemId=28, Quantity=0, prediction=0.09185744822025299),
 Row(userId=251, itemId=28, Quantity=0, prediction=0.0),
 Row(userId=65, itemId=28, Quantity=0, prediction=0.0),
 Row(userId=53, itemId=28, Quantity=0, prediction=0.09185744822025299),
 Row(userId=255, itemId=28, Quantity=0, prediction=4.342777174315415e-06),
 Row(userId=133, itemId=28, Quantity=0, prediction=0.0),
 Row(userId=296, itemId=28, Quantity=0, prediction=0.09185744822025299),
 Row(userId=78, itemId=28, Quantity=0, prediction=0.09185744822025299),
 Row(userId=322, itemId=28, Quantity=0, prediction=0.09185744822025299),
 Row(userId=321, itemId=28, Quantity=0, prediction=0.0),
 Row(userId=362, itemId=28, Quantity=0, prediction=0.09610066562891006),
 Row(userId=108, ite