# **System 2: Recommendations Based on Selected Product's Ingredients**

In [1]:
from google.colab import drive

drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488490 sha256=43346e5e36cdba2d60cf03bd24804fe166070e8f079a0b7bce616599fa90ce3c
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [5]:
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, StringIndexer, VectorAssembler
from pyspark.sql.functions import col, udf
from pyspark.sql.types import FloatType
from pyspark.sql import SparkSession

In [6]:
# Initialize Spark Session
spark = SparkSession.builder.appName("HairProductRecommenderSystem2").getOrCreate()

# Verify Spark Session
spark

# **Data Preparation**

**Load Datasets**

In [7]:
# Ulta
ulta_products = spark.read.csv('/content/drive/MyDrive/Capstone Project - Summer 2024/Final Datasets/ulta_products_final.csv', header=True, inferSchema=True)
ulta_product_description = spark.read.csv('/content/drive/MyDrive/Capstone Project - Summer 2024/Final Datasets/ulta_product_description_final.csv', header=True, inferSchema=True)
ulta_ingredients = spark.read.csv('/content/drive/MyDrive/Capstone Project - Summer 2024/Final Datasets/ulta_ingredients_final.csv', header=True, inferSchema=True)

# Target
target_products = spark.read.csv('/content/drive/MyDrive/Capstone Project - Summer 2024/Cleaned Datasets From Web Scraping - Part 1/Target_products.csv', header=True, inferSchema=True)
target_product_description = spark.read.csv('/content/drive/MyDrive/Capstone Project - Summer 2024/Final Datasets/Target_product_description_final.csv', header=True, inferSchema=True)
target_ingredients = spark.read.csv('/content/drive/MyDrive/Capstone Project - Summer 2024/Final Datasets/Target_ingredients_final.csv', header=True, inferSchema=True)

**Check if the "Brand" and "Product Name" values are correct.**

In [52]:
ulta_products.show(5)

+----------------+--------------------+----------------+------+-----------------+--------------------+------------+----------+
|           Brand|        Product Name|           Price|Rating|Number of Ratings|                Link|Product Type|Product ID|
+----------------+--------------------+----------------+------+-----------------+--------------------+------------+----------+
|NatureLab. Tokyo|Perfect Clean 2-I...|         $19.00 |   4.7|             1027|https://www.ulta....|     Shampoo|        U1|
|       Color Wow|Dream Filter Pre-...|         $24.00 |   4.6|             2172|https://www.ulta....|     Shampoo|        U2|
|        Viviscal|Volumizing Dry Sh...|         $19.99 |   4.5|              225|https://www.ulta....|     Shampoo|        U3|
|         Garnier|Fructis Hair Fill...|          $9.99 |   4.6|              224|https://www.ulta....|     Shampoo|        U4|
|         Biolage|  Color Last Shampoo|$24.00 - $42.00 |   4.5|             3378|https://www.ulta....|     Sham

In [53]:
target_products.show(5)

+--------------------+------------+---------------+------+-----------------+--------------------+------------+----------+
|        Product Name|       Brand|          Price|Rating|Number of Ratings|                Link|Product Type|Product ID|
+--------------------+------------+---------------+------+-----------------+--------------------+------------+----------+
|Eva NYC Freshen U...|     Eva NYC|         $13.99|   4.5|             3177|https://www.targe...|     Shampoo|        T1|
|Batiste Tropical ...|     Batiste|  $4.39 - $8.99|   4.5|             1303|https://www.targe...|     Shampoo|        T2|
|Batiste Tropical ...|     Batiste|         $12.99|   4.7|              321|https://www.targe...|     Shampoo|        T3|
|Batiste Fresh Bre...|     Batiste|          $8.99|   4.6|              321|https://www.targe...|     Shampoo|        T4|
|Living Proof Perf...|Living Proof|$16.00 - $43.00|   4.5|             2858|https://www.targe...|     Shampoo|        T5|
+--------------------+--

# **Feature Extraction and Recommender System Implementation**

**Combine product data with descriptions and ingredients for each store.**

In [8]:
ulta_combined = ulta_products.join(ulta_product_description, "Product ID", "inner").join(ulta_ingredients, "Product ID", "inner")
target_combined = target_products.join(target_product_description, "Product ID", "inner").join(target_ingredients, "Product ID", "inner")

**Check if the "Brand" and "Product Name" values are correct.**

In [62]:
ulta_combined.show(10)

+----------+----------------+--------------------+----------------+------+-----------------+--------------------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|Product ID|           Brand|        Product Name|           Price|Rating|Number of Ratings|                Link|Product Type|         Picture URL|         Description|        Health Facts|          Highlights| Product Ingredients|
+----------+----------------+--------------------+----------------+------+-----------------+--------------------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|        U1|NatureLab. Tokyo|Perfect Clean 2-I...|         $19.00 |   4.7|             1027|https://www.ulta....|     Shampoo|https://media.ult...|The NatureLab. To...|Cruelty Free, Sus...| A refreshingly l...|Sucrose, Glycerin...|
|        U2|       Color Wow|Dream Filter Pre-...|         $24.00 |   4.

In [57]:
target_combined.show(5)

+----------+--------------------+--------+-------------+------+-----------------+--------------------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|Product ID|        Product Name|   Brand|        Price|Rating|Number of Ratings|                Link|Product Type|         Picture URL|         Description|        Health Facts|          Highlights| Product Ingredients|
+----------+--------------------+--------+-------------+------+-----------------+--------------------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|        T1|Eva NYC Freshen U...| Eva NYC|       $13.99|   4.5|             3177|https://www.targe...|     Shampoo|https://target.sc...|Eva NYC Freshen U...|                NULL|                NULL|Hydrofluorocarbon...|
|        T2|Batiste Tropical ...| Batiste|$4.39 - $8.99|   4.5|             1303|https://www.targe...|     Shampoo|h

**Combining, Tokenizing, and Feature Engineering Product Data**

In [9]:
# Ensure both DataFrames have the same column order before union
ordered_columns = ["Product ID", "Brand", "Product Name", "Price", "Rating", "Number of Ratings", "Link", "Product Type", "Picture URL", "Description", "Health Facts", "Highlights", "Product Ingredients"]

# Select columns in the same order
ulta_combined_ordered = ulta_combined.select(*ordered_columns)
target_combined_ordered = target_combined.select(*ordered_columns)

# Union the data from both stores
combined_data = ulta_combined_ordered.union(target_combined_ordered)

# Directly using DataFrame API to get distinct brands
distinct_brands = combined_data.select("Brand").distinct().collect()
distinct_product_types = combined_data.select("Product Type").distinct().collect()

# Converting rows to list of strings for easier use in application
list_of_brands = [row['Brand'] for row in distinct_brands if row['Brand'] is not None]
list_of_product_types = [row['Product Type'] for row in distinct_product_types if row['Product Type'] is not None]

# Tokenize ingredients
tokenizer = Tokenizer(inputCol="Product Ingredients", outputCol="words")
words_data = tokenizer.transform(combined_data)

# Compute TF-IDF for ingredient lists
hashing_tf = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurized_data = hashing_tf.transform(words_data)
idf = IDF(inputCol="rawFeatures", outputCol="ingredient_features")
idf_model = idf.fit(featurized_data)
rescaled_data = idf_model.transform(featurized_data)

# Index product type
indexer = StringIndexer(inputCol="Product Type", outputCol="product_type_index")
indexed_data = indexer.fit(rescaled_data).transform(rescaled_data)

# Combine features
assembler = VectorAssembler(inputCols=["ingredient_features", "product_type_index"], outputCol="features")
final_data = assembler.transform(indexed_data)

**Check if the "Brand" and "Product Name" values are correct.**

In [60]:
# Check the data before and after joins
ulta_combined.explain()
ulta_combined.show(5)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [Product ID#3751, Brand#3744, Product Name#3745, Price#3746, Rating#3747, Number of Ratings#3748, Link#3749, Product Type#3750, Picture URL#3778, Description#3779, Health Facts#3780, Highlights#3781, Product Ingredients#3805]
   +- BroadcastHashJoin [Product ID#3751], [Product ID#3804], Inner, BuildRight, false
      :- Project [Product ID#3751, Brand#3744, Product Name#3745, Price#3746, Rating#3747, Number of Ratings#3748, Link#3749, Product Type#3750, Picture URL#3778, Description#3779, Health Facts#3780, Highlights#3781]
      :  +- BroadcastHashJoin [Product ID#3751], [Product ID#3777], Inner, BuildLeft, false
      :     :- BroadcastExchange HashedRelationBroadcastMode(List(input[7, string, false]),false), [plan_id=17590]
      :     :  +- Filter isnotnull(Product ID#3751)
      :     :     +- FileScan csv [Brand#3744,Product Name#3745,Price#3746,Rating#3747,Number of Ratings#3748,Link#3749,Product Type#3750,Produc

In [61]:
target_combined.explain()
target_combined.show(5)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [Product ID#3832, Product Name#3825, Brand#3826, Price#3827, Rating#3828, Number of Ratings#3829, Link#3830, Product Type#3831, Picture URL#3859, Description#3860, Health Facts#3861, Highlights#3862, Product Ingredients#3886]
   +- BroadcastHashJoin [Product ID#3832], [Product ID#3885], Inner, BuildRight, false
      :- Project [Product ID#3832, Product Name#3825, Brand#3826, Price#3827, Rating#3828, Number of Ratings#3829, Link#3830, Product Type#3831, Picture URL#3859, Description#3860, Health Facts#3861, Highlights#3862]
      :  +- BroadcastHashJoin [Product ID#3832], [Product ID#3858], Inner, BuildLeft, false
      :     :- BroadcastExchange HashedRelationBroadcastMode(List(input[7, string, false]),false), [plan_id=17794]
      :     :  +- Filter isnotnull(Product ID#3832)
      :     :     +- FileScan csv [Product Name#3825,Brand#3826,Price#3827,Rating#3828,Number of Ratings#3829,Link#3830,Product Type#3831,Produc

In [10]:
# After assembling the final data
final_data = assembler.transform(indexed_data)

# Print the schema of the final_data DataFrame to verify columns
final_data.printSchema()

root
 |-- Product ID: string (nullable = true)
 |-- Brand: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- Rating: double (nullable = true)
 |-- Number of Ratings: integer (nullable = true)
 |-- Link: string (nullable = true)
 |-- Product Type: string (nullable = true)
 |-- Picture URL: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Health Facts: string (nullable = true)
 |-- Highlights: string (nullable = true)
 |-- Product Ingredients: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- rawFeatures: vector (nullable = true)
 |-- ingredient_features: vector (nullable = true)
 |-- product_type_index: double (nullable = false)
 |-- features: vector (nullable = true)



In [67]:
list_of_product_types

['Custard',
 'Mousse',
 'Rinse-Out Treatment',
 'Leave-In Treatment',
 'Serum',
 'Styling Leave-In Spray',
 'Leave-In Conditioner',
 'Rinse-Out Conditioner',
 'Co-Wash',
 'Cream',
 'Oil',
 'Hair Mask',
 'Gel',
 'Shampoo',
 'Pomade']

In [26]:
list_of_brands

['Coco & Eve',
 'KRISTIN ESS HAIR',
 'Innersense Organic Beauty',
 'SheaMoisture',
 'Chi',
 'Philosophy',
 'CURLS',
 'IGK',
 'Bed Head',
 'Bondi Boost',
 'Melanin Haircare',
 'OLAPLEX',
 'BREAD BEAUTY SUPPLY',
 'FEKKAI',
 'Keratin Complex',
 "L'anza",
 'Maui Moisture',
 'Sexy Hair',
 'LolaVie',
 'Curlsmith',
 'Batiste',
 "DONNA'S RECIPE",
 'Biosilk',
 'Pureology',
 'Ouidad',
 'Kitsch',
 'Virtue',
 'AG Care',
 'MONDAY Haircare',
 'DERMA E',
 'Garnier',
 'Verb',
 'Better Not Younger',
 'American Crew',
 'Beachwaver Co.',
 'DevaCurl',
 'Briogeo',
 'Mielle',
 'Viviscal',
 'Sebastian',
 'Andrew Fitzsimons',
 "L'Oréal",
 'Keranique',
 'Matrix',
 'Pacifica',
 'Hot Tools',
 'Joico',
 'Drybar',
 'tgin',
 'Klorane',
 'Kenra Professional',
 'OUAI',
 'Divi',
 "L'ange",
 'Odele',
 'Living Proof',
 "Not Your Mother's",
 'Eva Nyc',
 'Redken',
 'Paul Mitchell',
 'The Ordinary',
 'Blind Barber',
 'Sun Bum',
 'florence by mills',
 'Nexxus',
 'Bumble and bumble',
 'NatureLab. Tokyo',
 'SEEN',
 'UNITE Hai

**Retrieve Recommendations for each Product**

In [17]:
from pyspark.sql.types import StructType, StructField, StringType, ArrayType

# Broadcast the entire DataFrame's features for comparison
features_df = final_data.select("Product ID", "features")

# Define the schema
schema = StructType([
    StructField("Product ID", StringType(), True),
    StructField("Recommendations", ArrayType(StringType(), True), True)
])

# Create an empty DataFrame with the defined schema
all_recommendations = spark.createDataFrame([], schema)

# Iterate through each product to find its recommendations
for row in final_data.collect():
    product_id = row["Product ID"]
    features = row["features"]
    broadcasted_features = spark.sparkContext.broadcast(features)

    # Define cosine similarity UDF
    @udf(FloatType())
    def cosine_similarity(features):
        dot_product = features.dot(broadcasted_features.value)
        norm_product = features.norm(2) * broadcasted_features.value.norm(2)
        return float(dot_product / norm_product) if norm_product != 0 else 0

    # Apply the UDF to compute similarity with all other products
    similarities = features_df.withColumn("Similarity", cosine_similarity(col("features")))
    top_recommendations = similarities.filter(col("Product ID") != product_id) \
                                      .orderBy(col("Similarity").desc()) \
                                      .limit(10) \
                                      .select("Product ID") \
                                      .rdd.flatMap(lambda x: x) \
                                      .collect()

    # Append to the DataFrame
    new_row = spark.createDataFrame([(product_id, top_recommendations)], schema)
    all_recommendations = all_recommendations.union(new_row)

**Download `final_data`, which contains all the product information, and `all_recommendations`, which contains a list of recommendations for each product.**

In [24]:
# all_recommendations.coalesce(5).write.json("System2Recommendations.json")
# final_data.write.json("final_data.json")