<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Problem-Description" data-toc-modified-id="Problem-Description-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Problem Description</a></span><ul class="toc-item"><li><ul class="toc-item"><li><span><a href="#Goal" data-toc-modified-id="Goal-1.0.1"><span class="toc-item-num">1.0.1&nbsp;&nbsp;</span>Goal</a></span></li><li><span><a href="#Challenge-Description" data-toc-modified-id="Challenge-Description-1.0.2"><span class="toc-item-num">1.0.2&nbsp;&nbsp;</span>Challenge Description</a></span></li></ul></li></ul></li><li><span><a href="#Resources" data-toc-modified-id="Resources-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Resources</a></span></li><li><span><a href="#Imports" data-toc-modified-id="Imports-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Imports</a></span></li><li><span><a href="#Load-the-data" data-toc-modified-id="Load-the-data-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>Load the data</a></span></li><li><span><a href="#Qn1:-the-customer-who-bought-the-most-items-overall-in-her-lifetime" data-toc-modified-id="Qn1:-the-customer-who-bought-the-most-items-overall-in-her-lifetime-5"><span class="toc-item-num">5&nbsp;&nbsp;</span>Qn1: the customer who bought the most items overall in her lifetime</a></span></li><li><span><a href="#Qn2:-for-each-item,-the-customer-who-bought-that-product-the-most" data-toc-modified-id="Qn2:-for-each-item,-the-customer-who-bought-that-product-the-most-6"><span class="toc-item-num">6&nbsp;&nbsp;</span>Qn2: for each item, the customer who bought that product the most</a></span></li><li><span><a href="#Qn3:-Modelling-Clustering-of-Similar-Items" data-toc-modified-id="Qn3:-Modelling-Clustering-of-Similar-Items-7"><span class="toc-item-num">7&nbsp;&nbsp;</span>Qn3: Modelling Clustering of Similar Items</a></span></li></ul></div>

<div class="alert alert-block alert-success">
<b>Kernel Author:</b>  <br>
<a href="https://bhishanpdl.github.io/" , target="_blank">Bhishan Poudel,  Data Scientist, Ph.D Astrophysics</a> .
</div>

# Problem Description

### Goal

Online shops often sell tons of different items and this can become very messy very quickly!

Data science can be extremely useful to automatically organize the products in categories so that they can be easily found by the customers.

The goal of this challenge is to look at user purchase history and create categories of items that are likely to be bought together and, therefore, should belong to the same section.

### Challenge Description

Company XYZ is an online grocery store. In the current version of the website, they have manually grouped the items into a few categories based on their experience.

However, they now have a lot of data about user purchase history. Therefore, they would like to
put the data into use!

This is what they asked you to do:
1. The company founder wants to meet with some of the best customers to go through a focus group with them. You are asked to send the ID of the following customers to the founder:

a. The customer who bought the most items overall in her lifetime

b. For each item, the customer who bought that product the most

2. Cluster items based on user co-purchase history. That is, create clusters of products that have the highest probability of being bought together. The goal of this is to replace the old/manually created categories with these new ones. Each item can belong to just one cluster.

# Resources

- https://spark.apache.org/docs/latest/mllib-clustering.html
- https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.clustering.KMeansModel
- https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.evaluation.ClusteringEvaluator
- https://spark.apache.org/docs/latest/api/python/_modules/pyspark/ml/evaluation.html


# Imports

In [1]:
import numpy as np
import pandas as pd
import seaborn as sns
sns.set(color_codes=True)

import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline

import os
import time

# random state
random_state=100
np.random.seed(random_state) # we need this in each cell

# Jupyter notebook settings for pandas
pd.set_option('display.max_columns', 200)
pd.set_option('display.max_rows', 100) # None for all the rows
pd.set_option('display.max_colwidth', 50)

print([(x.__name__,x.__version__) for x in [np, pd,sns,matplotlib]])

[('numpy', '1.17.5'), ('pandas', '1.0.5'), ('seaborn', '0.10.1'), ('matplotlib', '3.2.2')]


In [2]:
%%javascript
IPython.OutputArea.auto_scroll_threshold = 9999;

<IPython.core.display.Javascript object>

In [3]:
# pyspark
import pyspark
spark = pyspark.sql.SparkSession.builder.appName('bhishan').getOrCreate()
print([(x.__name__,x.__version__) for x in [np, pd, pyspark]])

[('numpy', '1.17.5'), ('pandas', '1.0.5'), ('pyspark', '3.0.0')]


In [4]:
# pyspark sql
from pyspark.sql.functions import col
from pyspark.sql.functions import udf # @udf("integer") def myfunc(x,y): return x - y
from pyspark.sql import functions as F # stddev format_number date_format, dayofyear, when
from pyspark.sql import Window
from pyspark.sql.types import StructField, StringType, IntegerType, FloatType, StructType, DateType

In [5]:
# pyspark ml feature
from pyspark.ml import feature as pml_feature

In [6]:
# classifiers
from pyspark.ml.clustering import KMeans

In [7]:
# cross validation
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import CrossValidatorModel

In [8]:
# model evaluation
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import ClusteringEvaluator

In [9]:
sc = spark.sparkContext
sqlContext = pyspark.SQLContext(sc) # spark_df = sqlContext.createDataFrame(pandas_df)
sc.setLogLevel("INFO")

In [10]:
import sys

sys.path.append("/Users/poudel/Dropbox/a00_Bhishan_Modules/bhishan")

from bhishan import bp

# Load the data

In [11]:
%%bash

cd ../Datasets/Business_Projects/Clustering_Grocery_Items/
pwd
ls
cd -

/Volumes/Media/github/Datasets/Business_Projects/Clustering_Grocery_Items
item_to_id.csv
purchase_history.csv
/Volumes/Media/github/Project_Clustering_Grocery_Items


In [12]:
%%bash
head -3 /Volumes/Media/github/Datasets/Business_Projects/Clustering_Grocery_Items/item_to_id.csv

"Item_name","Item_id"
"coffee",43
"tea",23


In [13]:
%%bash
head -3 /Volumes/Media/github/Datasets/Business_Projects/Clustering_Grocery_Items/purchase_history.csv

"user_id","id"
222087,"27,26"
1343649,"6,47,17"


In [14]:
p = "/Volumes/Media/github/Datasets/Business_Projects/Clustering_Grocery_Items"

df_item = spark.read.csv(p + '/item_to_id.csv', header=True, inferSchema=True).cache()

print('nrows = ', df_item.count(), 'ncols = ', len(df_item.columns))
df_item.limit(5).toPandas()

nrows =  48 ncols =  2


Unnamed: 0,Item_name,Item_id
0,coffee,43
1,tea,23
2,juice,38
3,soda,9
4,sandwich loaves,39


In [15]:
df_item.printSchema()

root
 |-- Item_name: string (nullable = true)
 |-- Item_id: integer (nullable = true)



In [16]:
print(df_item.columns)

['Item_name', 'Item_id']


In [17]:
p = "/Volumes/Media/github/Datasets/Business_Projects/Clustering_Grocery_Items"

df = spark.read.csv(p + '/purchase_history.csv', header=True, inferSchema=True).cache()

print('nrows = ', df.count(), 'ncols = ', len(df.columns))
df.limit(5).toPandas()

nrows =  39474 ncols =  2


Unnamed: 0,user_id,id
0,222087,2726
1,1343649,64717
2,404134,1812232227433820351
3,1110200,923220264737
4,224107,"31,18,5,13,1,21,48,16,26,2,44,32,20,37,42,35,4..."


In [18]:
df.select('id').show(1)

+-----+
|   id|
+-----+
|27,26|
+-----+
only showing top 1 row



In [19]:
# we may also cast as <int> but later we will make them columns
# so, I will keep them string type.
df = df.withColumn(
    "id",
    F.split(F.col("id"), ",\s*").cast("array<string>").alias("id")
)

df.show(2)

+-------+-----------+
|user_id|         id|
+-------+-----------+
| 222087|   [27, 26]|
|1343649|[6, 47, 17]|
+-------+-----------+
only showing top 2 rows



In [20]:
df = df.withColumn(
    "id",
    F.explode(F.col("id")).alias("id")
)

df.show(6)

+-------+---+
|user_id| id|
+-------+---+
| 222087| 27|
| 222087| 26|
|1343649|  6|
|1343649| 47|
|1343649| 17|
| 404134| 18|
+-------+---+
only showing top 6 rows



In [21]:
df = df.crosstab('user_id','id')

df.limit(2).toPandas()

Unnamed: 0,user_id_id,1,10,11,12,13,14,15,16,17,18,19,2,20,21,22,23,24,25,26,27,28,29,3,30,31,32,33,34,35,36,37,38,39,4,40,41,42,43,44,45,46,47,48,5,6,7,8,9
0,508452,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,1,0,1,0,0,0,0,1,0,0,2,0,0,0,0,0,1,0,1,2,2,0,0,0,0,1,1,1,1,0,0,2,0
1,253101,0,0,0,0,0,1,1,1,0,1,0,1,1,1,1,0,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,0,1,1,1,2,0,1,0,0,0


In [22]:
df.select("user_id_id","1","10").show(4)

+----------+---+---+
|user_id_id|  1| 10|
+----------+---+---+
|    508452|  0|  1|
|    253101|  0|  0|
|    507200|  0|  0|
|    576533|  0|  0|
+----------+---+---+
only showing top 4 rows



In [23]:
df_item.sort('Item_id').limit(11).toPandas()

Unnamed: 0,Item_name,Item_id
0,sugar,1
1,lettuce,2
2,pet items,3
3,baby items,4
4,waffles,5
5,poultry,6
6,sandwich bags,7
7,butter,8
8,soda,9
9,carrots,10


In [24]:
item_name = [i.asDict()['Item_name'] for i in df_item.select('Item_name').collect()]
item_id = [ str(i.asDict()['item_id']) for i in df_item.select('item_id').collect()]

item_name[:5], item_id[:5] 

(['coffee', 'tea', 'juice', 'soda', 'sandwich loaves'],
 ['43', '23', '38', '9', '39'])

In [25]:
mapping = dict(zip(item_id, item_name))
new_names = [df.columns[0]] + [F.col(c).alias(mapping.get(c, c)) for c in df.columns if c in item_id]

df = df.select(new_names)

df.select(df.columns[:5]).limit(2).toPandas()

Unnamed: 0,user_id_id,sugar,carrots,cereals,shampoo
0,508452,0,1,0,0
1,253101,0,0,0,0


In [26]:
vals = df.select(F.concat(F.lit("user_"), F.col("user_id_id")))
vals.limit(2).toPandas()

Unnamed: 0,"concat(user_, user_id_id)"
0,user_508452
1,user_253101


In [27]:
# we can create new column with user_ + id name like this.
# df.withColumn("user_id",
#              F.concat(F.lit("user_"), F.col("user_id_id"))
#              ).show(2)

# Qn1: the customer who bought the most items overall in her lifetime

In [28]:
df2 = df.withColumn('row_total', sum(df[c] for c in df.columns if c not in ['user_id_id']))

# expression = ' + '.join([ '`' + i + '`' for i in df.columns if i not in ['user_id_id']])
# df = df.withColumn('row_total', F.expr(expression))


df2.limit(2).toPandas()

Unnamed: 0,user_id_id,sugar,carrots,cereals,shampoo,bagels,eggs,aluminum foil,milk,beef,laundry detergent,shaving cream,lettuce,grapefruit,cheeses,frozen vegetables,tea,paper towels,cherries,spaghetti sauce,dishwashing,canned vegetables,hand soap,pet items,flour,pasta,apples,toilet paper,tortillas,soap,ice cream,dinner rolls,juice,sandwich loaves,baby items,berries,ketchup,cucumbers,coffee,broccoli,cauliflower,bananas,pork,yogurt,waffles,poultry,sandwich bags,butter,soda,row_total
0,508452,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,1,0,1,0,0,0,0,1,0,0,2,0,0,0,0,0,1,0,1,2,2,0,0,0,0,1,1,1,1,0,0,2,0,20
1,253101,0,0,0,0,0,1,1,1,0,1,0,1,1,1,1,0,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,0,1,1,1,2,0,1,0,0,0,20


In [29]:
df2.sort(F.desc('row_total')).select('user_id_id', 'row_total').show(5)

+----------+---------+
|user_id_id|row_total|
+----------+---------+
|    269335|       72|
|    367872|       70|
|    397623|       64|
|    599172|       64|
|    377284|       63|
+----------+---------+
only showing top 5 rows



# Qn2: for each item, the customer who bought that product the most

In [30]:
df.limit(2).toPandas()

Unnamed: 0,user_id_id,sugar,carrots,cereals,shampoo,bagels,eggs,aluminum foil,milk,beef,laundry detergent,shaving cream,lettuce,grapefruit,cheeses,frozen vegetables,tea,paper towels,cherries,spaghetti sauce,dishwashing,canned vegetables,hand soap,pet items,flour,pasta,apples,toilet paper,tortillas,soap,ice cream,dinner rolls,juice,sandwich loaves,baby items,berries,ketchup,cucumbers,coffee,broccoli,cauliflower,bananas,pork,yogurt,waffles,poultry,sandwich bags,butter,soda
0,508452,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,1,0,1,0,0,0,0,1,0,0,2,0,0,0,0,0,1,0,1,2,2,0,0,0,0,1,1,1,1,0,0,2,0
1,253101,0,0,0,0,0,1,1,1,0,1,0,1,1,1,1,0,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,0,1,1,1,2,0,1,0,0,0


In [31]:
df.createOrReplaceTempView("grocery")

# spark.sql('select * from grocery limit 1').show()

In [32]:
# look for bad column names: eg leading/trailing spaces


for c in df.columns:
    if 'dish' in c:
        print(f"`{c}`")

`dishwashing `


In [33]:
# we see that column name `dishwasing ` has a trailing space in the end.
# we need to rename this.

In [34]:
# trim spaces both sides

# bad column is `dishwashing\xa0`
# df = df.select([F.trim(c) for c in df.columns]) # this did not work.

df = df.select([F.col(c).alias(c.strip()) for c in df.columns]) # this works.


print([f"`{c}`" for c in df2.columns]) # make sure there are not bad columns 

['`user_id_id`', '`sugar`', '`carrots`', '`cereals`', '`shampoo`', '`bagels`', '`eggs`', '`aluminum foil`', '`milk`', '`beef`', '`laundry detergent`', '`shaving cream`', '`lettuce`', '`grapefruit`', '`cheeses`', '`frozen vegetables`', '`tea`', '`paper towels`', '`cherries`', '`spaghetti sauce`', '`dishwashing\xa0`', '`canned vegetables`', '`hand soap`', '`pet items`', '`flour`', '`pasta`', '`apples`', '`toilet paper`', '`tortillas`', '`soap`', '`ice cream`', '`dinner rolls`', '`juice`', '`sandwich loaves`', '`baby items`', '`berries`', '`ketchup`', '`cucumbers`', '`coffee`', '`broccoli`', '`cauliflower`', '`bananas`', '`pork`', '`yogurt`', '`waffles`', '`poultry`', '`sandwich bags`', '`butter`', '`soda`', '`row_total`']


In [35]:
# replace space by _
df = df.select([F.col(c).alias(c.replace(' ', '_')) for c in df.columns])

# exprs = [col(c).alias(c.replace(' ', '_')) for c in df.columns]
# df3 = df.select(*exprs)

In [36]:
print([f"`{c}`" for c in df.columns]) # make sure there are not bad columns here.

['`user_id_id`', '`sugar`', '`carrots`', '`cereals`', '`shampoo`', '`bagels`', '`eggs`', '`aluminum_foil`', '`milk`', '`beef`', '`laundry_detergent`', '`shaving_cream`', '`lettuce`', '`grapefruit`', '`cheeses`', '`frozen_vegetables`', '`tea`', '`paper_towels`', '`cherries`', '`spaghetti_sauce`', '`dishwashing`', '`canned_vegetables`', '`hand_soap`', '`pet_items`', '`flour`', '`pasta`', '`apples`', '`toilet_paper`', '`tortillas`', '`soap`', '`ice_cream`', '`dinner_rolls`', '`juice`', '`sandwich_loaves`', '`baby_items`', '`berries`', '`ketchup`', '`cucumbers`', '`coffee`', '`broccoli`', '`cauliflower`', '`bananas`', '`pork`', '`yogurt`', '`waffles`', '`poultry`', '`sandwich_bags`', '`butter`', '`soda`']


In [37]:
# rename first column
df = df.withColumnRenamed('user_id_id','user_id')

df.limit(2).toPandas()

Unnamed: 0,user_id,sugar,carrots,cereals,shampoo,bagels,eggs,aluminum_foil,milk,beef,laundry_detergent,shaving_cream,lettuce,grapefruit,cheeses,frozen_vegetables,tea,paper_towels,cherries,spaghetti_sauce,dishwashing,canned_vegetables,hand_soap,pet_items,flour,pasta,apples,toilet_paper,tortillas,soap,ice_cream,dinner_rolls,juice,sandwich_loaves,baby_items,berries,ketchup,cucumbers,coffee,broccoli,cauliflower,bananas,pork,yogurt,waffles,poultry,sandwich_bags,butter,soda
0,508452,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,1,0,1,0,0,0,0,1,0,0,2,0,0,0,0,0,1,0,1,2,2,0,0,0,0,1,1,1,1,0,0,2,0
1,253101,0,0,0,0,0,1,1,1,0,1,0,1,1,1,1,0,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,0,1,1,1,2,0,1,0,0,0


In [38]:
df2 = df.selectExpr(
    'user_id',
    f'stack({len(df.columns[1:])}, ' + ', '.join(["'%s', `%s`" % (c, c) for c in df.columns[1:]]) + ') as (item, values)'
).withColumn(
    'rn',
    F.rank().over(Window.partitionBy('item').orderBy(F.desc('values')))
).filter('rn = 1').groupBy('item').agg(
    F.collect_list('user_id').alias('max_user'),
    F.max('values').alias('max_count')
).orderBy('item')

df2.show(5)

+-------------+--------------------+---------+
|         item|            max_user|max_count|
+-------------+--------------------+---------+
|aluminum_foil|            [143741]|        3|
|       apples|[1310896, 545108,...|        4|
|   baby_items|[5289, 73071, 432...|        3|
|       bagels|    [653800, 820788]|        4|
|      bananas|  [1269111, 1218645]|        4|
+-------------+--------------------+---------+
only showing top 5 rows



In [39]:
df.createOrReplaceTempView("grocery")

names = ', '.join(["'%s', `%s`" % (c, c) for c in df.columns[1:]])
q = f"""
    select
        item,
        collect_list(user_id) as max_user,
        max(values) as max_count
    from (
        select *,
            rank() over (partition by item order by values desc) as rn
        from (
            select
                user_id,
                stack({len(df.columns[1:])}, {names}) as (item, values)
            from grocery
        )
    )
    where rn = 1 and item is not null
    group by item
    order by item
"""

spark.sql(q).show(5)

+-------------+--------------------+---------+
|         item|            max_user|max_count|
+-------------+--------------------+---------+
|aluminum_foil|            [143741]|        3|
|       apples|[1310896, 545108,...|        4|
|   baby_items|[5289, 73071, 432...|        3|
|       bagels|    [653800, 820788]|        4|
|      bananas|  [1269111, 1218645]|        4|
+-------------+--------------------+---------+
only showing top 5 rows



# Qn3: Modelling Clustering of Similar Items

In [40]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import Normalizer

from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix

In [41]:
df.limit(2).toPandas()

Unnamed: 0,user_id,sugar,carrots,cereals,shampoo,bagels,eggs,aluminum_foil,milk,beef,laundry_detergent,shaving_cream,lettuce,grapefruit,cheeses,frozen_vegetables,tea,paper_towels,cherries,spaghetti_sauce,dishwashing,canned_vegetables,hand_soap,pet_items,flour,pasta,apples,toilet_paper,tortillas,soap,ice_cream,dinner_rolls,juice,sandwich_loaves,baby_items,berries,ketchup,cucumbers,coffee,broccoli,cauliflower,bananas,pork,yogurt,waffles,poultry,sandwich_bags,butter,soda
0,508452,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,1,0,1,0,0,0,0,1,0,0,2,0,0,0,0,0,1,0,1,2,2,0,0,0,0,1,1,1,1,0,0,2,0
1,253101,0,0,0,0,0,1,1,1,0,1,0,1,1,1,1,0,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,0,1,1,1,2,0,1,0,0,0


In [43]:
# assemble
assembler = VectorAssembler(inputCols=df.columns[1:], outputCol='features')
df = assembler.transform(df)

# scaling
norm = Normalizer(inputCol='features',outputCol='norm')
df = norm.transform(df)

In [44]:
df.limit(2).toPandas()

Unnamed: 0,user_id,sugar,carrots,cereals,shampoo,bagels,eggs,aluminum_foil,milk,beef,laundry_detergent,shaving_cream,lettuce,grapefruit,cheeses,frozen_vegetables,tea,paper_towels,cherries,spaghetti_sauce,dishwashing,canned_vegetables,hand_soap,pet_items,flour,pasta,apples,toilet_paper,tortillas,soap,ice_cream,dinner_rolls,juice,sandwich_loaves,baby_items,berries,ketchup,cucumbers,coffee,broccoli,cauliflower,bananas,pork,yogurt,waffles,poultry,sandwich_bags,butter,soda,features,norm
0,508452,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,1,0,1,0,0,0,0,1,0,0,2,0,0,0,0,0,1,0,1,2,2,0,0,0,0,1,1,1,1,0,0,2,0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, ...","(0.0, 0.1889822365046136, 0.0, 0.0, 0.0, 0.0, ..."
1,253101,0,0,0,0,0,1,1,1,0,1,0,1,1,1,1,0,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,0,1,1,1,2,0,1,0,0,0,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.21320071635561041,..."


In [45]:
mat = IndexedRowMatrix(
    df.select("user_id", "norm")\
        .rdd.map(lambda row: IndexedRow(row.user_id, row.norm.toArray()))).toBlockMatrix()

dot = mat.multiply(mat.transpose())

X = dot.toLocalMatrix().toArray()



IllegalArgumentException: requirement failed: The length of the values array must be less than Int.MaxValue. Currently numRows * numCols: 2249925000625