In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=5044a87da9967eabeb5b235b12cb28b11aadf62fc9cf1881751edfae17b4bad5
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

conf = SparkConf().setAppName("Project").setMaster("local")

sc = SparkContext.getOrCreate(conf = conf)
spark = SparkSession(sc)

# Read data from CSV file into Spark DataFrame
data = spark.read.csv("recipes_combined.csv", header=True,inferSchema=True)
data.show()

# Get column names of the DataFrame
Col = data.columns
print(Col)

# Drop specified columns from the DataFrame
columns_to_drop = ['_c2', '_c3', '_c4', '_c5', '_c6', '_c7', '_c8', '_c9', '_c10', '_c11', '_c12', '_c13', '_c14', '_c15','all_ingredients']
Filter_data = data.drop(*columns_to_drop)
Filter_data.show(truncate = False)

# Replace certain characters in the 'ingredients' column of the DataFrame
from pyspark.sql.functions import regexp_replace

filtered_data = Filter_data.withColumn('ingredients', regexp_replace('ingredients', '//', ' ')) \
    .withColumn('ingredients', regexp_replace('ingredients', '[(|)]', ''))
filtered_data.show(truncate = False)

from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

# Tokenize the 'ingredients' column
tokenizer = Tokenizer(inputCol="ingredients", outputCol="words")

# Apply HashingTF to convert tokenized words into feature vectors
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures")

# Apply IDF to the feature vectors
idf = IDF(inputCol="rawFeatures", outputCol="features")

# Create a pipeline to execute the above operations sequentially
pipeline = Pipeline(stages=[tokenizer,hashingTF, idf])
pipeline_model = pipeline.fit(filtered_data)
transformed_data = pipeline_model.transform(filtered_data)

transformed_data.show()

# Drop unnecessary columns from the DataFrame
columns_to_drop = ['ingredients','_c2', '_c3', '_c4', '_c5', '_c6', '_c7', '_c8', '_c9', '_c10', '_c11', '_c12', '_c13', '_c14', '_c15','all_ingredients','words','rawFeatures']
Filter_data = transformed_data.drop(*columns_to_drop)
Filter_data.show(truncate = False)

from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

# Define a function to calculate cosine similarity between two vectors
def cosine_similarity(vec1, vec2):
    dot_product = float(vec1.dot(vec2))
    norm_vec1 = float(vec1.norm(2))
    norm_vec2 = float(vec2.norm(2))
    return dot_product / (norm_vec1 * norm_vec2)

# Register the cosine similarity function as a User Defined Function (UDF)
cosine_similarity_udf = udf(cosine_similarity, DoubleType())

# Perform cross join between two copies of the DataFrame
joined_data = Filter_data.alias("df1").crossJoin(Filter_data.alias("df2"))

# Calculate cosine similarity between each pair of recipes
cosine_similarity_result = joined_data.select(
    "df1.recipeNames",
    "df2.recipeNames",
    cosine_similarity_udf("df1.features", "df2.features").alias("cosine_similarity")
)

# Filter out pairs with the same recipe names to avoid self-similarity
cosine_similarity_result = cosine_similarity_result.filter("df1.recipeNames != df2.recipeNames")

# Show the final DataFrame containing cosine similarity scores
cosine_similarity_result.show(truncate = False)

+--------------------+--------------------+--------------------+--------------------+--------------------+----+----+----+----+----+----+----+----+----+----+----+--------------------+
|         recipeNames|         ingredients|                 _c2|                 _c3|                 _c4| _c5| _c6| _c7| _c8| _c9|_c10|_c11|_c12|_c13|_c14|_c15|     all_ingredients|
+--------------------+--------------------+--------------------+--------------------+--------------------+----+----+----+----+----+----+----+----+----+----+----+--------------------+
|Poppy Seed Bread ...|3 cups all-purpos...|                NULL|                NULL|                NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|Poppy Seed Bread ...|
|Czech Christmas H...|1 (0.6 ounce) cak...|beaten // 1 teasp...|                NULL|                NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|Czech Christmas H...|
|  Applesauce Bread I|3 cups all-purpos...|                NULL|                NULL|

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from pyspark.ml.linalg import DenseVector

# Function to compute centered cosine similarity
def centered_cosine_similarity(vec1, vec2):
    # Compute means of each vector's components
    mean_vec1 = sum(vec1) / len(vec1)
    mean_vec2 = sum(vec2) / len(vec2)

    # Center the vectors by subtracting the means
    centered_vec1 = DenseVector(vec1 - mean_vec1)
    centered_vec2 = DenseVector(vec2 - mean_vec2)

    # Compute cosine similarity between centered vectors
    dot_product = float(centered_vec1.dot(centered_vec2))
    norm_centered_vec1 = float(centered_vec1.norm(2))
    norm_centered_vec2 = float(centered_vec2.norm(2))

    return dot_product / (norm_centered_vec1 * norm_centered_vec2)

# User-defined function for centered cosine similarity
centered_cosine_similarity_udf = udf(centered_cosine_similarity, DoubleType())

# Compute centered cosine similarity between recipes
centered_cosine_similarity_result = joined_data.select(
    "df1.recipeNames",
    "df2.recipeNames",
    centered_cosine_similarity_udf("df1.features", "df2.features").alias("centered_cosine_similarity")
)

# Filter out similarity between the same recipes
centered_cosine_similarity_result = centered_cosine_similarity_result.filter("df1.recipeNames != df2.recipeNames")
centered_cosine_similarity_result.show(truncate=False)


+---------------------------+-----------------------------------+--------------------------+
|recipeNames                |recipeNames                        |centered_cosine_similarity|
+---------------------------+-----------------------------------+--------------------------+
|Poppy Seed Bread with Glaze|Czech Christmas Hoska              |0.0733783858200811        |
|Poppy Seed Bread with Glaze|Applesauce Bread I                 |0.3216976132627152        |
|Poppy Seed Bread with Glaze|Raisin Brown Bread                 |0.1785376466617272        |
|Poppy Seed Bread with Glaze|Applesauce Raisin Bread            |0.3591791595834009        |
|Poppy Seed Bread with Glaze|Apple Raisin Bread                 |0.3639428395025738        |
|Poppy Seed Bread with Glaze|Buttermilk Oatmeal Bread           |0.2897694619299587        |
|Poppy Seed Bread with Glaze|Kolaches II                        |0.22847932165579546       |
|Poppy Seed Bread with Glaze|Whole Wheat Bread II               |0.156