In [26]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import concat_ws, col
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.sql.functions import udf, lit
from pyspark.ml.linalg import Vectors, VectorUDT

import numpy as np
import os

In [2]:
spark = SparkSession.builder \
    .appName("TF-IDF Example") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .config("spark.executor.cores", "2") \
    .config("spark.cores.max", "2") \
    .getOrCreate()

spark

In [19]:
path = os.path.join(os.getcwd(), "..", "Data", "output_search_DB_embedded.csv")
spark_df = spark.read.csv(path, header=True, inferSchema=True)
spark_df = spark_df.drop("embedding", "text")
spark_df.show(5, False)

+--------+-------------------+---------------------------------+------------------------------------------------------+
|order_id|order_customer_name|product_name                     |part_type_name                                        |
+--------+-------------------+---------------------------------+------------------------------------------------------+
|2200006 |得意先_1           |A　2023年3月号 定期演奏会        |['本文1']                                             |
|2107551 |得意先_7           |アーティストリスト2022年         |['本文1', '本文1', '本文2', '本文2', '表紙1', '表紙1']|
|2200898 |得意先_8           |ミュージアムリーフレット         |['本文']                                              |
|2202767 |得意先_148         |A小学校　2024学校案内パンフレット|['本文1', '本文2', '表紙1', '表紙2']                  |
|2203087 |得意先_14          |A社統合報告書2022（英文）        |['本文1', '表紙1']                                    |
+--------+-------------------+---------------------------------+------------------------------------------------------+
only showing top 5

In [20]:
columns = ['order_id', 'order_customer_name', 'product_name', 'part_type_name']
sample_row = [(2204908, '得意先_8', '「ABS」展B2ポスター', '[本文1, 本文2, 表紙1, 表紙2]')]
sample_df = spark.createDataFrame(sample_row, columns)

spark_df = spark_df.union(sample_df)

spark_df = spark_df.withColumn("text", concat_ws(" ", "product_name", "part_type_name"))

In [21]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
words_data = tokenizer.transform(spark_df)

In [22]:
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=100)
featurized_data = hashingTF.transform(words_data)

idf = IDF(inputCol="rawFeatures", outputCol="tfidf_features")
idf_model = idf.fit(featurized_data)
rescaled_data = idf_model.transform(featurized_data)

In [24]:
rescaled_data.show(truncate=False)

+--------+-------------------+-------------------------------------------------------------+------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|order_id|order_customer_name|product_name                                                 |part_type_name                                                                            |text                                                                                         

In [27]:
sample_input_vector = rescaled_data.filter(col("order_id") == 2204908).select("tfidf_features").collect()[0]["tfidf_features"]

sample_input_vector_bc = spark.sparkContext.broadcast(sample_input_vector)

In [28]:
def cosine_similarity(v1, v2):
    arr1 = np.array(v1.toArray())
    arr2 = np.array(v2.toArray())
    dot_product = np.dot(arr1, arr2)
    norm1 = np.linalg.norm(arr1)
    norm2 = np.linalg.norm(arr2)
    if norm1 == 0 or norm2 == 0:
        return float(0.0)
    return float(dot_product / (norm1 * norm2))

In [29]:
cosine_udf = udf(lambda v: cosine_similarity(sample_input_vector_bc.value, v), "double")

similarity_df = rescaled_data.withColumn("cosine_similarity", cosine_udf(col("tfidf_features")))

In [30]:
similarity_df.filter(col("order_id") != 2204908)\
    .select("order_id", "order_customer_name", "product_name", "part_type_name", "cosine_similarity")\
    .orderBy(col("cosine_similarity").desc())\
    .show(truncate=False)

+--------+-------------------+-----------------------------------------------------------------------------------+--------------------------------------------------------+-------------------+
|order_id|order_customer_name|product_name                                                                       |part_type_name                                          |cosine_similarity  |
+--------+-------------------+-----------------------------------------------------------------------------------+--------------------------------------------------------+-------------------+
|2204728 |得意先_169         |A社家具メーカー様　取扱説明書「グランサリー2モーター」【修正増刷】管理No,07-●●●●●●●|['表紙本文']                                            |0.293743129381092  |
|2200006 |得意先_1           |A　2023年3月号 定期演奏会                                                          |['本文1']                                               |0.288431675149842  |
|2204779 |得意先_179         |プロ列伝Ⅳ 巻き三つ折り                                                        