# All imports and Spark loading

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget http://mirror.linux-ia64.org/apache/spark/spark-3.0.0-preview/spark-3.0.0-preview-bin-hadoop2.7.tgz


--2019-12-03 08:29:28--  http://mirror.linux-ia64.org/apache/spark/spark-3.0.0-preview/spark-3.0.0-preview-bin-hadoop2.7.tgz
Resolving mirror.linux-ia64.org (mirror.linux-ia64.org)... 37.193.156.169
Connecting to mirror.linux-ia64.org (mirror.linux-ia64.org)|37.193.156.169|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 233622806 (223M) [application/x-gzip]
Saving to: ‘spark-3.0.0-preview-bin-hadoop2.7.tgz’


2019-12-03 08:33:43 (897 KB/s) - ‘spark-3.0.0-preview-bin-hadoop2.7.tgz’ saved [233622806/233622806]



In [0]:
!tar xf spark-3.0.0-preview-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-preview-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
import os
from google_drive_downloader import GoogleDriveDownloader as gdd
import pandas as pd
import numpy as np

In [0]:
from pyspark.ml.fpm import FPGrowth

# Load data

In [33]:
if not os.path.exists('./data'):
  os.mkdir('./data')

gdd.download_file_from_google_drive(file_id='1EMS377_Ew2h0esEMdEj54X894J_xU7Fl', dest_path='./data/instacart_online_grocery_shopping_2017_05_01.tar.gz')
gdd.download_file_from_google_drive(file_id='1LdRp0AHS0bkZkdLCOpFs-4SXrFJaERmg', dest_path='./data/basic_recommenders.py')

Downloading 1LdRp0AHS0bkZkdLCOpFs-4SXrFJaERmg into ./data/basic_recommenders.py... Done.


In [10]:
! mkdir ./data/unziped

mkdir: cannot create directory ‘./data/unziped’: File exists


In [0]:
! tar -xzf ./data/instacart_online_grocery_shopping_2017_05_01.tar.gz -C ./data/unziped

In [0]:
import data.basic_recommenders as br

#Metrics

In [0]:
def precision_at_k(predicted, actual, k):
  assert len(predicted) >= k
  intersection = set(predicted[:k]) & set(actual)
  return len(intersection) / k

In [0]:
def average_precision_at_k(predicted, actual, k):
  tmp = 0
  actual_length = min(k,len(actual))
  if len(predicted) < actual_length:
    warnings.warn("Length of predict is less than k")
  for i in range(actual_length):
    if predicted[i] in actual:
      tmp += precision_at_k(predicted[:i+1], actual, i+1)
  return tmp / actual_length

In [0]:
def mean_average_precision_at_k(predicted_list, actual_list, k):
  tmp = 0
  cnt = 0
  for predicted, actual in zip(predicted_list, actual_list):
    if len(actual) != 0:
      tmp += average_precision_at_k(predicted, actual, k)
      cnt += 1
  assert cnt != 0
  return tmp / cnt

# FPGrow

In [0]:
orders_df = pd.read_csv('./data/unziped/instacart_2017_05_01/orders.csv')
orders_df = orders_df.drop(orders_df[orders_df['eval_set'] == 'test'].index)
order_products_prior_df = pd.read_csv('./data/unziped/instacart_2017_05_01/order_products__prior.csv')
order_products_train_df = pd.read_csv('./data/unziped/instacart_2017_05_01/order_products__train.csv')

In [0]:
prior_orders_merged_df = pd.merge(orders_df[orders_df['eval_set'] == 'prior'], order_products_prior_df, on='order_id')
train_orders_merged_df = pd.merge(orders_df[orders_df['eval_set'] == 'train'], order_products_train_df, on='order_id')

In [14]:
prior_orders_merged_df.head()

Unnamed: 0,order_id,user_id,eval_set,order_number,order_dow,order_hour_of_day,days_since_prior_order,product_id,add_to_cart_order,reordered
0,2539329,1,prior,1,2,8,,196,1,0
1,2539329,1,prior,1,2,8,,14084,2,0
2,2539329,1,prior,1,2,8,,12427,3,0
3,2539329,1,prior,1,2,8,,26088,4,0
4,2539329,1,prior,1,2,8,,26405,5,0


In [0]:
grouped_by_user_df = prior_orders_merged_df.groupby('user_id').agg(items=pd.NamedAgg(column='product_id', aggfunc=lambda x: list(set(x))))
grouped_by_user_df.reset_index(inplace=True)
grouped_by_user_df.rename(columns={'user_id':'id'},inplace=True)

In [0]:
df = spark.createDataFrame(grouped_by_user_df)

In [17]:
df.show()

+---+--------------------+
| id|               items|
+---+--------------------+
|  1|[17122, 196, 2640...|
|  2|[45066, 2573, 189...|
|  3|[17668, 44683, 48...|
|  4|[21573, 17769, 35...|
|  5|[28289, 11777, 40...|
|  6|[40992, 27521, 20...|
|  7|[11520, 35333, 51...|
|  8|[11136, 8193, 177...|
|  9|[8834, 38277, 337...|
| 10|[36865, 20995, 13...|
| 11|[17794, 8197, 308...|
| 12|[45056, 11520, 17...|
| 13|[41351, 41480, 37...|
| 14|[8193, 17923, 184...|
| 15|[11266, 37059, 19...|
| 16|[15872, 28289, 17...|
| 17|[36736, 45190, 18...|
| 18|[2826, 25997, 220...|
| 19|[27138, 21001, 34...|
| 20|[13575, 6184, 938...|
+---+--------------------+
only showing top 20 rows



In [19]:
%%time
fpGrowth = FPGrowth(itemsCol="items", minSupport=0.1, minConfidence=0.6)
model = fpGrowth.fit(df)

# Display frequent itemsets.
model.freqItemsets.show()

+--------------+-----+
|         items| freq|
+--------------+-----+
|       [21616]|23031|
|       [47766]|42771|
|[47766, 21903]|20992|
|[47766, 24852]|23514|
|       [31717]|26421|
|       [47626]|46402|
|[47626, 24852]|25509|
|       [40706]|29025|
|       [30391]|24341|
|       [45066]|21951|
|       [45007]|32658|
|       [46979]|25698|
|       [24852]|73956|
|        [4920]|27512|
|       [34126]|23480|
|       [26209]|44859|
|[26209, 21137]|20661|
|[26209, 47626]|22000|
|[26209, 21903]|20695|
|[26209, 24852]|23209|
+--------------+-----+
only showing top 20 rows

CPU times: user 15.1 ms, sys: 2.68 ms, total: 17.8 ms
Wall time: 45.9 s


In [0]:
freq_itemsets = model.freqItemsets

In [26]:
freq_itemsets.show()

+--------------+-----+
|         items| freq|
+--------------+-----+
|       [21616]|23031|
|       [47766]|42771|
|[47766, 21903]|20992|
|[47766, 24852]|23514|
|       [31717]|26421|
|       [47626]|46402|
|[47626, 24852]|25509|
|       [40706]|29025|
|       [30391]|24341|
|       [45066]|21951|
|       [45007]|32658|
|       [46979]|25698|
|       [24852]|73956|
|        [4920]|27512|
|       [34126]|23480|
|       [26209]|44859|
|[26209, 21137]|20661|
|[26209, 47626]|22000|
|[26209, 21903]|20695|
|[26209, 24852]|23209|
+--------------+-----+
only showing top 20 rows



In [0]:
freq_itemsets_df = freq_itemsets.select("*").toPandas()

We need only itemset at least with 2 elements

In [0]:
freq_itemsets_df.drop(freq_itemsets_df[freq_itemsets_df['items'].apply(lambda x: len(x)) == 1].index, inplace=True)

In [41]:
freq_itemsets_df

Unnamed: 0,items,freq
2,"[47766, 21903]",20992
3,"[47766, 24852]",23514
6,"[47626, 24852]",25509
16,"[26209, 21137]",20661
17,"[26209, 47626]",22000
18,"[26209, 21903]",20695
19,"[26209, 24852]",23209
25,"[39275, 21137]",21395
31,"[21137, 13176]",29812
32,"[21137, 24852]",27065


In [0]:
freq_itemsets_df.sort_values(by='freq', ascending=False, inplace=True)

Convert to list

In [0]:
freq_itemsets_list = []

In [0]:
for index, row in freq_itemsets_df.iterrows():
    freq_itemsets_list.append((tuple(row['items']),row['freq']))

In [53]:
freq_itemsets_list[0]

((21137, 13176), 29812)

Train most popular - it will help us if association rules recommendations not enough

In [0]:
mp = br.MostPopularRecommender()
mp.fit(prior_orders_merged_df)

Build recommendation for 1 user: iterate through all products in cart and all frequent itemsets. If not enough recommendations -> add from most popular, if too much recommendations -> remove things that customer has never bought before.

In [0]:
def build_rec_for_user(products, itemsets, prior_user_products, top_n=5):
  recommendation = set()
  for product in products:
    for itemset in itemsets:
      if product in itemset[0]:
        recommendation.update(itemset[0])
  # remove products which are in this order already
  recommendation = list(recommendation.difference(set(products)))
  # truncate recommendation
  trunc_recommendation = []
  if len(recommendation) > top_n:
    for rec in recommendation:
      if rec in prior_user_products:
        trunc_recommendation.append(rec)
        if len(trunc_recommendation) == top_n:
          break
  recommendation = trunc_recommendation
  if len(recommendation) < top_n:
    recommendation.extend(mp.most_popular[:top_n-len(recommendation)])
  

  return recommendation

In [0]:
def eval_results(actual, predicted,k):

  def collect(x):
    return list(x)
  
  ground_truth = actual.groupby('user_id').agg({'product_id':collect}).reset_index()
  return mean_average_precision_at_k(list(predicted['product_id']), list(ground_truth['product_id']), k)

Calc number of products in order

In [0]:
def calc_prod_in_cart(df):
  return df.groupby('order_id').agg(num_of_prods_in_cart=pd.NamedAgg(column='add_to_cart_order', aggfunc='max')).reset_index()['num_of_prods_in_cart']

In [0]:
number_of_products_in_cart = train_orders_merged_df.groupby('order_id').agg(num_of_prods_in_cart=pd.NamedAgg(column='add_to_cart_order', aggfunc='max')).reset_index()

We make prediction only on orders with number of products > 10

In [0]:
full_df = pd.merge(train_orders_merged_df, number_of_products_in_cart.query('num_of_prods_in_cart > 10'), on='order_id')

First 5 products in train, all that remains to test

In [0]:
train_df, test_df = full_df.query('add_to_cart_order <= 5'), full_df.query('add_to_cart_order > 5')

In [59]:
train_df.shape, test_df.shape

((264240, 11), (690968, 11))

In [0]:
grouped_by_user_test = test_df.groupby('user_id').agg({'product_id':lambda x: list(x)})
grouped_by_user_train = train_df.groupby('user_id').agg({'product_id':lambda x: list(x)})
grouped_by_user_prior = prior_orders_merged_df.groupby('user_id').agg({'product_id':lambda x: list(x)})

In [61]:
grouped_by_user_test

Unnamed: 0_level_0,product_id
user_id,Unnamed: 1_level_1
1,"[10258, 13032, 26088, 27845, 49235, 46149]"
2,"[22825, 13640, 24852, 45066, 9387, 5450, 24838..."
8,"[4853, 27104, 7058, 41259, 37803, 48230, 47766..."
9,"[12075, 8467, 38988, 30252, 18926, 24954, 4057..."
14,"[37434, 3808, 15172, 8744, 29509, 42284]"
...,...
206198,"[39021, 21709, 47107, 10411, 44142, 14897, 159..."
206199,"[38341, 43821, 38930, 12127, 49198, 22242, 770..."
206200,"[27451, 15592, 47209, 22312, 23484, 8955, 4697..."
206203,"[14050, 26384, 3765, 36929, 23153, 3957, 31915..."


In [0]:
predicted = pd.DataFrame(columns=('user_id', 'product_id'))
predicted['user_id'] = list(test_df['user_id'].unique())
predicted['product_id'] = [build_rec_for_user(grouped_by_user_train.loc[user,'product_id'], freq_itemsets_list, grouped_by_user_prior.loc[user, 'product_id'],3)\
 for user in predicted['user_id'].unique()]

In [64]:
predicted

Unnamed: 0,user_id,product_id
0,1,"[24852, 13176, 21137]"
1,2,"[24852, 13176, 21137]"
2,8,"[24852, 13176, 21137]"
3,9,"[24852, 13176, 21137]"
4,14,"[24852, 13176, 21137]"
...,...,...
52843,206198,"[24852, 13176, 21137]"
52844,206199,"[24852, 13176, 21137]"
52845,206200,"[24852, 13176, 21137]"
52846,206203,"[24852, 13176, 21137]"


In [69]:
eval_results(test_df, predicted, 3)

0.07071433377065266