In [1]:
%matplotlib inline
import pandas as pd
import numpy as np
import re
import matplotlib.pyplot as plt
import gzip
import copy
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
from pyspark.sql.functions import split

In [2]:
import pyspark.sql as sql
import pyspark.sql.functions as F
from pyspark import SparkContext

In [3]:
import os

def reload(filename, df, force_rewrite=False):
    '''Returns the dataframe stored in filename (located in data/saved), if it exists. Else, it writes the dataframe in filename and immediately reads it.'''
    """filename = os.path.join('data', 'saved', filename)
    if os.path.exists(filename) and not force_rewrite :
        return spark.read.parquet(filename)
    else :
        df.write.mode('overwrite').parquet(filename)
        return spark.read.parquet(filename)"""
    return df

def reload2(filename, df, force_rewrite=False):
    '''Returns the dataframe stored in filename (located in data/saved), if it exists. Else, it writes the dataframe in filename and immediately reads it.'''
    filename = os.path.join('data', 'saved', '2', filename)
    if os.path.exists(filename) and not force_rewrite :
        return spark.read.parquet(filename)
    else :
        df.write.mode('overwrite').parquet(filename)
        return spark.read.parquet(filename)
    

In [4]:
spark = sql.SparkSession.builder.getOrCreate()
sc = spark.sparkContext

# Import the datasets 

**Instacart datasets** (provided) : https://www.instacart.com/datasets/grocery-shopping-2017 . After loading the datasets, we verified if there were any missing values

In [4]:
products = pd.read_csv('data/products.csv', sep=',')
products.head()

Unnamed: 0,product_id,product_name,aisle_id,department_id
0,1,Chocolate Sandwich Cookies,61,19
1,2,All-Seasons Salt,104,13
2,3,Robust Golden Unsweetened Oolong Tea,94,7
3,4,Smart Ones Classic Favorites Mini Rigatoni Wit...,38,1
4,5,Green Chile Anytime Sauce,5,13


In [5]:
products.isnull().values.any()

False

In [6]:
aisles = pd.read_csv('data/aisles.csv', sep=',')
aisles.head()

Unnamed: 0,aisle_id,aisle
0,1,prepared soups salads
1,2,specialty cheeses
2,3,energy granola bars
3,4,instant foods
4,5,marinades meat preparation


In [7]:
aisles.isnull().values.any()

False

In [8]:
departments = pd.read_csv('data/departments.csv', sep=',')
departments.head()

Unnamed: 0,department_id,department
0,1,frozen
1,2,other
2,3,bakery
3,4,produce
4,5,alcohol


In [9]:
departments.isnull().values.any()

False

In [10]:
orders = pd.read_csv('data/orders.csv', sep=',')
orders.head()

Unnamed: 0,order_id,user_id,eval_set,order_number,order_dow,order_hour_of_day,days_since_prior_order
0,2539329,1,prior,1,2,8,
1,2398795,1,prior,2,3,7,15.0
2,473747,1,prior,3,3,12,21.0
3,2254736,1,prior,4,4,7,29.0
4,431534,1,prior,5,4,15,28.0


In [11]:
orders.isnull().values.sum()==orders['days_since_prior_order'].isnull().values.sum()

True

The missing values in _orders_ correspond to the date of "previous order", when it happens to be the first one.

In [12]:
history = pd.read_csv('data/order_products__prior.csv', sep=',')
history.head()

Unnamed: 0,order_id,product_id,add_to_cart_order,reordered
0,2,33120,1,1
1,2,28985,2,1
2,2,9327,3,0
3,2,45918,4,1
4,2,30035,5,0


In [13]:
history.isnull().values.any()

False

There are no unexpected missing values in any of the Instacart datasets.

**Create a dataset with all useful information gathered : products_litteral**

Add the names of the aisle and department in the _products_ dataset, instead of the ID, by merging the datasets :

In [14]:
products_litteral = pd.merge(pd.merge(products, aisles, on='aisle_id'), departments, on='department_id').drop(['aisle_id', 'department_id'], axis=1)
products_litteral.head()

Unnamed: 0,product_id,product_name,aisle,department
0,1,Chocolate Sandwich Cookies,cookies cakes,snacks
1,78,Nutter Butter Cookie Bites Go-Pak,cookies cakes,snacks
2,102,Danish Butter Cookies,cookies cakes,snacks
3,172,Gluten Free All Natural Chocolate Chip Cookies,cookies cakes,snacks
4,285,Mini Nilla Wafers Munch Pack,cookies cakes,snacks


**Remove 'missing' values**

In [15]:
missing = products_litteral[(products_litteral.department =='missing') | (products_litteral.aisle=='missing')]
missing.head(50)

Unnamed: 0,product_id,product_name,aisle,department
42813,38,Ultra Antibacterial Dish Liquid,missing,missing
42814,72,Organic Honeycrisp Apples,missing,missing
42815,110,Uncured Turkey Bologna,missing,missing
42816,297,"Write Bros Ball Point Pens, Cap-Pen, Medium (1...",missing,missing
42817,417,Classics Baby Binks Easter Chocolate Bunny,missing,missing
42818,437,Strawberry Cheesecake Nonfat Yogurt,missing,missing
42819,440,Crossovers Olive Thyme Almond,missing,missing
42820,472,Fancy Diced Hot Chilies,missing,missing
42821,491,Pompelmo Water,missing,missing
42822,556,Bake & Break Cheese Garlic Loaf,missing,missing


We can see many products don't have any name for the aisle and/or department

In [16]:
print("There are %d products with 'missing' aisle/department. This represent %0.1f%% of the products." %(missing.shape[0], missing.shape[0]/products_litteral.shape[0]*100))

There are 1258 products with 'missing' aisle/department. This represent 2.5% of the products.


The unlabelled products seem to come from many different categories, and represent a small fraction of our dataset. We decided to simply remove them.

In [17]:
products_litteral = products_litteral.drop(missing.index)

# First Statistical analysis

**Number of products per Aisle and Department**

Visualize the products per aisle and per department to detect any inconsistency

The 'personal care' department has the most choice of products, jsut followed by the 'snacks' one.

'Candy chocolate' and 'ice cream ice' contains the most numerous choice of products. We can see none of the 10 first aisles is in the 'Personal care' departement. Thus this department contains the most products because it contains a diverse choice of aisles of products, not one aisle with many products, as 'Snacks'.

**Number of orders**

Let's complete our 'useful' dataset with the informtion about the number of orders per product

In [18]:
number_of_order_per_product = history[['order_id','product_id']].groupby('product_id').count().sort_values(by='order_id', ascending=False)
number_of_order_per_product = number_of_order_per_product.rename(columns={"order_id" : "number_of_orders"})
number_of_order_per_product.head()

Unnamed: 0_level_0,number_of_orders
product_id,Unnamed: 1_level_1
24852,472565
13176,379450
21137,264683
21903,241921
47209,213584


In [19]:
products_litteral = pd.merge(products_litteral, number_of_order_per_product, on='product_id')
products_litteral.head()

Unnamed: 0,product_id,product_name,aisle,department,number_of_orders
0,1,Chocolate Sandwich Cookies,cookies cakes,snacks,1852
1,78,Nutter Butter Cookie Bites Go-Pak,cookies cakes,snacks,11
2,102,Danish Butter Cookies,cookies cakes,snacks,185
3,172,Gluten Free All Natural Chocolate Chip Cookies,cookies cakes,snacks,97
4,285,Mini Nilla Wafers Munch Pack,cookies cakes,snacks,156


**Number of orders per product**

# Try on Amazon dataset

**Amazon dataset** : _metadata_ and _reviews_ of Grocery and Gourmet Food products : https://cseweb.ucsd.edu/~jmcauley/datasets.html?fbclid=IwAR39s5O83nqUYkRBD4jol3OFu0FmcH-4dCzUtSjutOsmSd9LMZOHSGGtNxw#amazon_reviews


In [20]:
products_litteral.head()

Unnamed: 0,product_id,product_name,aisle,department,number_of_orders
0,1,Chocolate Sandwich Cookies,cookies cakes,snacks,1852
1,78,Nutter Butter Cookie Bites Go-Pak,cookies cakes,snacks,11
2,102,Danish Butter Cookies,cookies cakes,snacks,185
3,172,Gluten Free All Natural Chocolate Chip Cookies,cookies cakes,snacks,97
4,285,Mini Nilla Wafers Munch Pack,cookies cakes,snacks,156


In [23]:
Amazon_meta = spark.read.json("data/meta_Grocery_and_Gourmet_Food.json.gz")
#Amazon_meta = reload2('Amazon_meta.parquet', Amazon_meta)
Amazon_meta.show(2)

+--------------------+--------------------+----------+--------------+--------------------+----+--------------------+-------+-------+----+--------------------+--------+------+--------------------+------------+-----+--------------------+
|            also_buy|           also_view|      asin|         brand|            category|date|         description|details|feature| fit|               image|main_cat| price|                rank|similar_item|tech1|               title|
+--------------------+--------------------+----------+--------------+--------------------+----+--------------------+-------+-------+----+--------------------+--------+------+--------------------+------------+-----+--------------------+
|                null|[B0000D9MYM, B000...|0681727810|Ariola Imports|[Grocery & Gourme...|null|[BEEMSTER GOUDA C...|   null|   null|null|                null| Grocery|$41.91|165,181inGroceryG...|        null| null|Beemster Gouda - ...|
|[B01898YHXK, B01B...|                null|0853347867|  

In [24]:
Amazon_meta_light = Amazon_meta.drop('also_view','description','details','feature','fit', 'image','similar_item', 'tech1')
Amazon_meta_light = reload2('Amazon_meta_light.parquet', Amazon_meta_light)

In [25]:
Amazon_meta_light.printSchema()

root
 |-- also_buy: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- asin: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- category: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- date: string (nullable = true)
 |-- main_cat: string (nullable = true)
 |-- price: string (nullable = true)
 |-- rank: string (nullable = true)
 |-- title: string (nullable = true)



In [26]:
for row in Amazon_meta_light.take(20):
    print(row.category, 'for main_cat', row.main_cat)

['Grocery & Gourmet Food', 'Dairy, Cheese & Eggs', 'Cheese', 'Gouda'] for main_cat Grocery
['Grocery & Gourmet Food', 'Cooking & Baking', 'Sugar Substitutes', 'Xylitol'] for main_cat Grocery
['Grocery & Gourmet Food', 'Cooking & Baking', 'Frosting, Icing & Decorations', 'Cake & Cupcake Toppers', 'Cake Toppers'] for main_cat Amazon Home
['Grocery & Gourmet Food', 'Cooking & Baking', 'Frosting, Icing & Decorations', 'Cake & Cupcake Toppers', 'Cake Toppers'] for main_cat Amazon Home
['Grocery & Gourmet Food', 'Cooking & Baking', 'Frosting, Icing & Decorations', 'Cake & Cupcake Toppers', 'Cake Toppers'] for main_cat Amazon Home
['Grocery & Gourmet Food', 'Cooking & Baking', 'Frosting, Icing & Decorations', 'Cake & Cupcake Toppers', 'Cake Toppers'] for main_cat Amazon Home
['Grocery & Gourmet Food', 'Cooking & Baking', 'Frosting, Icing & Decorations', 'Cake & Cupcake Toppers', 'Cake Toppers'] for main_cat Amazon Home
['Grocery & Gourmet Food', 'Cooking & Baking', 'Frosting, Icing & Decorati

In [27]:
for row in Amazon_meta_light.filter(udf(lambda x: x=='Iguana Mean Green Jalapeno Pepper Sauce, 5 oz bottle', sql.types.BooleanType())('title')).take(10):
    print(row.title, row.category, sep=' | ')

Iguana Mean Green Jalapeno Pepper Sauce, 5 oz bottle | ['Grocery & Gourmet Food', 'Sauces, Gravies & Marinades', 'Hot Sauce']


In [29]:
#Amazon_reviews = spark.read.json("data/Grocery_and_Gourmet_Food.json.gz")
#Amazon_reviews = reload2('Amazon_reviews.parquet', Amazon_reviews)
#Amazon_reviews.show(2)

KeyboardInterrupt: 

In [None]:
#Amazon_reviews.printSchema()

**Connect the two datasets**

Use only a sample of the all Amazon datasets for the tests

In [30]:
Amazon1 = Amazon_meta_light.sample(False, 0.1)
Amazon1 = Amazon1.filter(~F.col('title').contains('{'))

In [31]:
Amazon1.count() #number of rows in this test sample Amazon dataset

28177

In [32]:
products_lit_spark = spark.createDataFrame(products_litteral)#.limit(10000) #convert Instacart panda df to spark df
products_lit_spark = products_lit_spark.withColumn('product_name', F.regexp_replace('product_name', '\\\"', ''))
products_lit_spark = reload2('products_lit_spark.parquet', products_lit_spark)

In [33]:
products_lit_spark.count()

48422

In [34]:
products_lit_spark.show()

+----------+--------------------+-----+----------+----------------+
|product_id|        product_name|aisle|department|number_of_orders|
+----------+--------------------+-----+----------+----------------+
|     36629|Lucky Irish Break...|  tea| beverages|               9|
|     36638|Scottish Breakfas...|  tea| beverages|              85|
|     36694|White Tea Unsweet...|  tea| beverages|              28|
|     36771|    Twig Kukicha Tea|  tea| beverages|              62|
|     36890|Healthy Cycle Her...|  tea| beverages|              18|
|     36891|Superfruit Pomegr...|  tea| beverages|               5|
|     36902|RX Stress Herbal ...|  tea| beverages|              22|
|     36951|   Organic White Tea|  tea| beverages|               6|
|     37071|Herbal Tea, Caffe...|  tea| beverages|              41|
|     37102|Honey Lemon Ginse...|  tea| beverages|              18|
|     37118|Peach, Natural Bl...|  tea| beverages|              24|
|     37154|Herbal Tea, Lemon...|  tea| beverage

In [35]:
for row in Amazon_meta_light.take(10):
    print(row.title)
print()
for row in products_lit_spark.take(10):
    print(row.product_name)

Beemster Gouda - Aged 18/24 Months - App. 1.5 Lbs
Trim Healthy Mama Xylitol
Letter C - Swarovski Crystal Monogram Wedding Cake Topper Letter
Letter H - Swarovski Crystal Monogram Wedding Cake Topper Letter
Letter S - Swarovski Crystal Monogram Wedding Cake Topper Letter
Letter J - Swarovski Crystal Monogram Wedding Cake Topper Letter
1 X Fully Covered in Crystal Monogram Wedding Cake Topper Letter - Letter O
Fully Covered in Crystal Monogram Wedding Cake Topper Letter - Letter R
Letter L - Swarovski Crystal Monogram Wedding Cake Topper Letter
Fully Covered in Crystal Monogram Wedding Cake Topper Letter - Letter C

Lucky Irish Breakfast Pot Of Gold Tea
Scottish Breakfast Tea
White Tea Unsweetened Hint O'Mint
Twig Kukicha Tea
Healthy Cycle Herbal Tea Bags, Caffeine Free
Superfruit Pomegranate Green Tea
RX Stress Herbal Iced Tea
Organic White Tea
Herbal Tea, Caffeine Free, True Blueberry
Honey Lemon Ginseng Green Tea Bags


**first try** Try to join by the name of the products 

In [36]:
df = Amazon1.join(products_lit_spark, F.col('title')== F.col('product_name'))

In [37]:
df.count()

28

Not working, never more than a few fits (1 or 2) over the 2876 Amazon reviews and 49688 Instacart products

**second try** Try to join on : if Instacart name is in the Amazon title

Weird associations (still because of the non specific Instacart title). I think that is because I have a little Amazon sample, otherwise it would just drop almost every row

**Third try** : only join the categories 

In [38]:
Amazon3 = Amazon1.select('title','category')

In [39]:
Amazon3_cat_exploded = Amazon3.select('title', F.explode('category').alias('category'))
Amazon3_cat_exploded = Amazon3_cat_exploded.filter(udf(lambda x: x!='Grocery & Gourmet Food', sql.types.BooleanType())('category'))

Amazon3_cat_exploded = reload2('Amazon3_cat_exploded.parquet', Amazon3_cat_exploded)

In [40]:
Amazon3_cat_exploded.show(5)

+--------------------+--------------------+
|               title|            category|
+--------------------+--------------------+
|Frontier Co-op Or...|Herbs, Spices & S...|
|Frontier Co-op Or...|Single Herbs & Sp...|
|Frontier Co-op Or...|              Cloves|
|Frontier Pumpkin ...|Herbs, Spices & S...|
|Frontier Pumpkin ...|Mixed Spices & Se...|
+--------------------+--------------------+
only showing top 5 rows



In [41]:
Amazon3_cat_exploded.printSchema()
products_lit_spark.printSchema()

root
 |-- title: string (nullable = true)
 |-- category: string (nullable = true)

root
 |-- product_id: long (nullable = true)
 |-- product_name: string (nullable = true)
 |-- aisle: string (nullable = true)
 |-- department: string (nullable = true)
 |-- number_of_orders: long (nullable = true)



In [42]:
df3 = Amazon3_cat_exploded.join(products_lit_spark, products_lit_spark.aisle==Amazon3_cat_exploded.category)

In [43]:
df3.count()

0

Not the same exact names of categories, so nto working

In [44]:
len(Amazon3_cat_exploded.select('category').distinct().collect())

983

There are too many categories for us to pair them by hand.

**fourth try** : joindre les categories Amazon et les aisles Instacart avec des mots en commun

In [45]:
#separate Instacart aisle name, because names are too specific (to match cookies cakes with snack cakes for instance)
split_col = split(products_lit_spark['aisle'], ' ')
split_aisle = products_lit_spark.withColumn('aisle1', split_col.getItem(0))
split_aisle = split_aisle.withColumn('aisle2', split_col.getItem(1))
split_aisle = reload2('split_aisle.parquet', split_aisle)
split_aisle.show()
split_aisles = products_lit_spark.withColumn('aisle_words', split_col)
split_aisles = reload2('split_aisles.parquet', split_aisles)
split_aisles.show()

+----------+--------------------+-----+----------+----------------+------+------+
|product_id|        product_name|aisle|department|number_of_orders|aisle1|aisle2|
+----------+--------------------+-----+----------+----------------+------+------+
|     36629|Lucky Irish Break...|  tea| beverages|               9|   tea|  null|
|     36638|Scottish Breakfas...|  tea| beverages|              85|   tea|  null|
|     36694|White Tea Unsweet...|  tea| beverages|              28|   tea|  null|
|     36771|    Twig Kukicha Tea|  tea| beverages|              62|   tea|  null|
|     36890|Healthy Cycle Her...|  tea| beverages|              18|   tea|  null|
|     36891|Superfruit Pomegr...|  tea| beverages|               5|   tea|  null|
|     36902|RX Stress Herbal ...|  tea| beverages|              22|   tea|  null|
|     36951|   Organic White Tea|  tea| beverages|               6|   tea|  null|
|     37071|Herbal Tea, Caffe...|  tea| beverages|              41|   tea|  null|
|     37102|Hone

In [46]:
#see if aisle words are included in the category name Amazon 
df4 = Amazon3_cat_exploded.select('category').distinct().join(split_aisle.select('aisle1', 'aisle2').distinct(), F.col('category').contains(F.col('aisle1')) | F.col('category').contains(F.col('aisle2')))
df4 = reload2('df4.parquet', df4)

In [47]:
df4.show(20)
for row in df4.take(20):
    print(row.category)

+--------------------+--------+-----------+
|            category|  aisle1|     aisle2|
+--------------------+--------+-----------+
|This brown sugar ...|   first|        aid|
|This brown sugar ...|     red|      wines|
|This brown sugar ...|    more|  household|
|-Irish Coffee Fud...|     eye|        ear|
|-Irish Coffee Fud...|     soy|lactosefree|
|-Irish Coffee Fud...|    nuts|      seeds|
|-Irish Coffee Fud...|    milk|       null|
|20 easy prefilled...|    eggs|       null|
|20 easy prefilled...|   candy|  chocolate|
|To best care for ...|  facial|       care|
|To best care for ...|   water|    seltzer|
|To best care for ...|    skin|       care|
|To best care for ...|     dry|      pasta|
|To best care for ...|    hair|       care|
|To best care for ...|feminine|       care|
|To best care for ...|    cold|        flu|
|Animal fans will ...|    eggs|       null|
|Powdered Drink Mi...|     red|      wines|
|2 bags of edible ...|   trash|       bags|
|2 bags of edible ...|   trail| 

ça ne marche pas du tout : il faut faire matcher les mots entier, pas juste "contains" !

In [48]:
#avec pyspark, necessité de passer par les fonctions python udf
mots_udf = udf(lambda x: list(set(re.compile("\\W+").split(x))), sql.types.ArrayType(sql.types.StringType()))

In [49]:
Amazon4 = Amazon3_cat_exploded.withColumn('mots', mots_udf(Amazon3_cat_exploded.category))
Amazon4 = reload2('Amazon4.parquet', Amazon4)

In [50]:
Amazon4.show(5)

+--------------------+--------------------+--------------------+
|               title|            category|                mots|
+--------------------+--------------------+--------------------+
|Frontier Co-op Or...|Herbs, Spices & S...|[Herbs, Spices, S...|
|Frontier Co-op Or...|Single Herbs & Sp...|[Herbs, Spices, S...|
|Frontier Co-op Or...|              Cloves|            [Cloves]|
|Frontier Pumpkin ...|Herbs, Spices & S...|[Herbs, Spices, S...|
|Frontier Pumpkin ...|Mixed Spices & Se...|[Seasonings, Spic...|
+--------------------+--------------------+--------------------+
only showing top 5 rows



In [51]:
#see if aisle words are included in the category name Amazon 
#df4_2 = Amazon4.select('mots').distinct().join(split_aisle.select('aisle1', 'aisle2').distinct(), F.array_contains('mots', 'aisle1') or F.array_contains('mots', 'aisle2'))

contains_udf = udf(lambda x,y: F.array_contains(x, y), sql.types.BooleanType())

#df4_2 = Amazon4.select('mots').distinct().join(split_aisle.select('aisle1', 'aisle2').distinct(), contains_udf(Amazon4.mots, split_aisle.aisle1))
#df4_2 = reload('df4_2.parquet', df4_2)
#df4_2.show()
#testdf = Amazon4.withColumn('contains', contains_udf(Amazon4.mots, split_aisle.aisle1))

In [52]:
#df?? = Amazon3_cat_exploded.join(products_lit_spark, products_lit_spark.aisle==Amazon3_cat_exploded.category)

**Essayons de regarder comment se passe le join sur les noms**

In [53]:
Amazon_title_as_words = Amazon1.withColumn('title_words', mots_udf('title'))
print(Amazon_title_as_words.count())
Amazon_title_as_words = Amazon_title_as_words.withColumn('str_wds', F.col('title_words').cast('string'))
Amazon_title_as_words = Amazon_title_as_words.dropDuplicates(subset=['str_wds']).drop('str_wds')
print(Amazon_title_as_words.count())
Amazon_title_as_words = reload2('Amazon_title_as_words.parquet', Amazon_title_as_words)
for row in Amazon_title_as_words.take(10):
    print(row.title_words)

28177
28109
['', '14', 'Mango', 'Langers', 'Lite', 'Fresca', 'Ounce', 'Pack', '12', 'Aqua', 'of', 'Juice']
['', '3', '1oz', 'Mars', 'in', 'Bag', '88gr', 'Maltesers']
['', '3', 'Blend', 'Nguyen', 'Pack', 'Premium', 'Oz', 'of', '15', 'Trung']
['', '3', 'Pepper', 'Habanero', 'Jerky', 'Ounce', 'with', '4', 'Package', 'Kickin', 'Pack', 'Beef', 'of', 'Ass']
['', '32', 'of', 'Red', 'Ounce', 'Pack', '12', 'Fine', 'Burghul', 'Ziyad']
['', '365', 'Mayonnaise', 'Pack', 'Everyday', '12', 'Organic', 'of', 'Value']
['', '5', '3', 'St', 'Organics', 's', '1', 'Bundle', 'Tin', 'of', 'Spearmints', 'Claire', 'oz']
['', '5', 'Chocolate', '3', 'Black', 'Mix', 'Ounce', 'Hot', 'Pack', '4', 'Organic', 'Canisters', 'of', 'Green']
['', '5', 'Chocolate', 'Imported', '750', 'Duca', 'Riserva', 'Dark', 'Panettone', 'Chip', '26', 'Ounces', 'grams', 'Italy', 'from']
['', '5', 'Fruit', 'Marzipan', 'Assortment', 'Oz', '8']


In [54]:
# we consider that products whose name is only one word are not precise enough (eg 'Water')
products_words = products_lit_spark.withColumn('name_words', mots_udf('product_name'))
print(products_words.count())
products_words = products_words.withColumn('str_wds', F.col('name_words').cast('string'))
products_words = products_words.dropDuplicates(subset=['str_wds']).drop('str_wds')
print(products_words.count())

products_words.show(300)

48422
47125
+----------+--------------------+--------------------+---------------+----------------+--------------------+
|product_id|        product_name|               aisle|     department|number_of_orders|          name_words|
+----------+--------------------+--------------------+---------------+----------------+--------------------+
|     35417| Reduced Fat Milk 2%|                milk|     dairy eggs|            2268|[, Fat, 2, Milk, ...|
|     14214|Gentle Iron 28mg ...|vitamins supplements|  personal care|               7|[28mg, Gentle, Ir...|
|     29847|Wrigley's 5 React...|            mint gum|         snacks|              18|[5, Mint, Unique,...|
|     42502|ProActive Health ...|       dog food care|           pets|             161|[Adult, Food, Pro...|
|     19942|Aged Balsamic Vin...|       oils vinegars|         pantry|              43|[Aged, Modena, Of...|
|      9960|Moving And Storag...|      more household|      household|              34|[And, Moving, Tap...|
|      

In [55]:
len_filter = udf(lambda x: len(x) == 1, sql.types.BooleanType())

In [56]:
real_products = products_words.filter(~len_filter('name_words'))
real_products = reload2('real_products.parquet', real_products)
real_products.show()

+----------+--------------------+--------------------+-------------+----------------+--------------------+
|product_id|        product_name|               aisle|   department|number_of_orders|          name_words|
+----------+--------------------+--------------------+-------------+----------------+--------------------+
|     37281|White & Dark in R...| frozen meat seafood|       frozen|               4|[, 2115, White, T...|
|       716|Mouthwash Clean M...|        oral hygiene|personal care|              48|[, Mint, Mouthwas...|
|     48444|Tofu, Medium Firm...|tofu meat alterna...|         deli|             173|[, Tofu, Medium, ...|
|     41554|Ken Davis 2 Carb ...|             spreads|       pantry|              14|[2, Carb, Ken, Cl...|
|      4903|2 In 1 Ocean Lift...|           hair care|personal care|              20|[2, Lift, 1, Ocea...|
|      6421|4.5 Oz. Special K...|            crackers|       snacks|               7|[5, Chips, Popcor...|
|     22895|         Quantum 9 V|    

In [57]:
match_udf = udf(lambda title, name, titles, names: name in title and all(w in x for w in y), sql.types.BooleanType())

In [58]:
spark.conf.set("spark.sql.crossJoin.enabled", True)
#df_withwords = Amazon_title_as_words.join(real_products, match_udf('title', 'product_name', 'title_words', 'name_words'))
#df_withwords = reload('df_withwords.parquet', df_withwords, True)
#df_withwords.show()

In [59]:
real_word_pattern = r'^(.* )?{}( .*)?$'
print(real_word_pattern)
print(real_word_pattern.format('Re Al'))
def myudffun(title, name):
    pattern = real_word_pattern.format(name)
    try:
        res = re.match(real_word_pattern.format(name), title)
    except:
        return name, title
    return res is not None
    
#match_udf2 = udf(lambda title, name: re.match(real_word_pattern.format(name), title)!=None, sql.types.BooleanType())
match_udf2 = udf(myudffun, sql.types.BooleanType())


^(.* )?{}( .*)?$
^(.* )?Re Al( .*)?$


In [5]:
#Amazon_expl = Amazon_title_as_words.withColumn('title_word', F.explode('title_words').alias('title_word'))
Amazon_expl = reload2('Amazon_expl.parquet', True)

In [6]:
for row in Amazon_title_as_words.filter(F.col('title')=='Traverse Bay Fruit Chocolate Covered Dried Cherries, 2 Pound').take(50):
    print(row.title, row.title_words)

NameError: name 'Amazon_title_as_words' is not defined

In [6]:
#products_expl = real_products.withColumn('name_word', F.explode('name_words').alias('name_word'))
products_expl = reload2('products_expl.parquet', True)

In [7]:
df_withwords3 = Amazon_expl.join(products_expl, Amazon_expl.title_word==products_expl.name_word)
#df_withwords3 = reload2('df_withwords3.parquet', df_withwords3)
df_withwords3.show()

+--------+----------+---------------+--------------------+----+--------+-----+--------------------+--------------------+--------------------+----------+----------+--------------------+--------------------+-------------+----------------+--------------------+---------+
|also_buy|      asin|          brand|            category|date|main_cat|price|                rank|               title|         title_words|title_word|product_id|        product_name|               aisle|   department|number_of_orders|          name_words|name_word|
+--------+----------+---------------+--------------------+----+--------+-----+--------------------+--------------------+--------------------+----------+----------+--------------------+--------------------+-------------+----------------+--------------------+---------+
|    null|B004U7UG30|123 Gluten Free|[Grocery & Gourme...|null| Grocery| null|911,709inGroceryG...|123 Gluten Free D...|[, 123, of, 72, B...|          |     19209|Onion Bagels, 6 P...|    breakfas

In [8]:
regrouped = df_withwords3.groupBy(['product_name', 'title']).agg(F.count('title_word').alias('nb_common_words'), F.collect_set('name_words').getItem(0).alias('name_words'))
#regrouped = reload2('regrouped.parquet', regrouped)
#regrouped.show()

In [9]:
full_match_udf = udf(lambda nb, name_words: len(name_words)==nb, sql.types.BooleanType())
full_match = regrouped.filter(full_match_udf('nb_common_words', 'name_words'))
#full_match = reload2('full_match.parquet', full_match)
full_match.show()
for row in full_match.take(100):
    print(row.product_name, ': ', row.title, '\n', row.name_words, ' |-| ', row.nb_common_words, '\n'+'_'*100+'\n')

Py4JJavaError: An error occurred while calling o60.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 5.0 failed 1 times, most recent failure: Lost task 1.0 in stage 5.0 (TID 12, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
	at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
	at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
	at org.apache.spark.io.ReadAheadInputStream.<init>(ReadAheadInputStream.java:105)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.<init>(UnsafeSorterSpillReader.java:81)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.getReader(UnsafeSorterSpillWriter.java:156)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.getSortedIterator(UnsafeExternalSorter.java:477)
	at org.apache.spark.sql.execution.UnsafeKVExternalSorter.sortedIterator(UnsafeKVExternalSorter.java:204)
	at org.apache.spark.sql.execution.aggregate.SortBasedAggregator$$anon$1.<init>(ObjectAggregationIterator.scala:240)
	at org.apache.spark.sql.execution.aggregate.SortBasedAggregator.destructiveIterator(ObjectAggregationIterator.scala:239)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:198)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:78)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:114)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:105)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$12.apply(RDD.scala:823)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$12.apply(RDD.scala:823)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Java heap space
	at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
	at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
	at org.apache.spark.io.ReadAheadInputStream.<init>(ReadAheadInputStream.java:105)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.<init>(UnsafeSorterSpillReader.java:81)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.getReader(UnsafeSorterSpillWriter.java:156)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.getSortedIterator(UnsafeExternalSorter.java:477)
	at org.apache.spark.sql.execution.UnsafeKVExternalSorter.sortedIterator(UnsafeKVExternalSorter.java:204)
	at org.apache.spark.sql.execution.aggregate.SortBasedAggregator$$anon$1.<init>(ObjectAggregationIterator.scala:240)
	at org.apache.spark.sql.execution.aggregate.SortBasedAggregator.destructiveIterator(ObjectAggregationIterator.scala:239)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:198)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:78)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:114)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:105)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$12.apply(RDD.scala:823)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$12.apply(RDD.scala:823)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [11]:
# pb: Traverse Bay Fruit Chocolate Covered Dried Cherries, 2 Pound

In [None]:
name_in_title3 = full_match.groupBy('product_name').agg(F.collect_list('title').alias('titles'), F.count('title').alias('nb_titles'))
#name_in_title3 = reload2('name_in_title3.parquet', name_in_title3)
name_in_title3.show()
for row in name_in_title3.take(100):
    print(row.product_name, ': ', row.titles, '\n')

In [None]:
unique_match3 = name_in_title3.filter(len_filter('titles'))
unique_match3 = reload2('unique_match3.parquet', unique_match3)
for row in unique_match3.take(100):
    print(row.product_name, ': ', row.titles, '\n')

In [None]:
#for row in df_withwords2.take(50):
#    print(row.product_name, 'matched to', row.title)

In [None]:
real_products.count()

In [None]:
unique_match3.count()