In [1]:
# Importing the necessary libraries

import random

import time
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import matplotlib.ticker as ticker

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, DateType, FloatType, TimestampType, ArrayType
from pyspark.sql.functions import col, sum, isnan, isnull, isnotnull, when, countDistinct, count, regexp_replace, split, month, year, size, element_at, struct, trim, avg, expr, lit
from pyspark.sql.functions import concat, concat_ws, rand, collect_list, struct
from pyspark.sql import functions as F
from pyspark.sql.window import Window


import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

#### File sources:

In [2]:
# Appliances
url_meta = 'https://mcauleylab.ucsd.edu/public_datasets/data/amazon_2023/raw/meta_categories/meta_Appliances.jsonl.gz'
url_ratings = 'https://mcauleylab.ucsd.edu/public_datasets/data/amazon_2023/raw/review_categories/Appliances.jsonl.gz'


# # Electronics
# url_meta = 'https://mcauleylab.ucsd.edu/public_datasets/data/amazon_2023/raw/meta_categories/meta_Electronics.jsonl.gz'
# url_ratings = 'https://mcauleylab.ucsd.edu/public_datasets/data/amazon_2023/raw/review_categories/Electronics.jsonl.gz'

#### Generating reduced size dummy test files

In [3]:
def read_json_sampling(url, chunk_size, sample_size, random_state=None):
    """
    Reads a JSON file in chunks, samples from each chunk, and returns a combined sample.

    Args:
        url (str): JSON file url
        chunk_size (int): Number of rows to read in each chunk.
        sample_size (int): Total number of rows to sample.
        random_state (int, optional): Seed for random sampling. Defaults to None.

    Returns:
        pd.DataFrame: A DataFrame containing the sampled rows.
    """
    if random_state is not None:
        random.seed(random_state)

    sampled_chunks = []
    total_rows = 0

    for chunk in pd.read_json(url, lines=True, chunksize=chunk_size):
       
        chunk_sample_size = min(sample_size - total_rows, len(chunk))
        if chunk_sample_size > 0:
            sampled_chunk = chunk.sample(n=chunk_sample_size, random_state=random_state)
            sampled_chunks.append(sampled_chunk)
            total_rows += chunk_sample_size
        if total_rows >= sample_size:
            break

    return pd.concat(sampled_chunks, ignore_index=True)

#### Reduce samples

In [4]:
# Execute UDF and save dummy files locally

df_reviews = read_json_sampling(url_ratings, 100, 500, 33)
df_meta = read_json_sampling(url_meta, 100, 500, 33)

df_reviews.to_json('./data/reduced_app_reviews.jsonl.gz', orient='records', lines='true', compression='gzip')
df_meta.to_json('./data/reduced_app_meta.jsonl.gz', orient='records', lines='true', compression='gzip')

#### Spark sessions

In [5]:
# Create SparkSession

spark = SparkSession.builder \
    .config("spark.driver.memory", "30g") \
    .appName("RecSys2") \
    .getOrCreate()

your 131072x1 screen size is bogus. expect trouble
25/05/22 10:21:59 WARN Utils: Your hostname, DPC resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/05/22 10:21:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/22 10:22:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
spark.conf.set('spark.sql.caseSensitive', True)

In [7]:
spark

#### Loading files

###### Via URL

In [8]:
# # df_meta = spark.read.json(url_meta)
# # df_meta = spark.read.json(url_ratings)

###### Locally stored files

In [9]:
# Reduced files

df_meta = spark.read.json('./data/reduced_app_meta.jsonl.gz'  )
df_ratings = spark.read.json('./data/reduced_app_reviews.jsonl.gz' )

#Full files

# df_meta = spark.read.json('data/meta_Appliances.jsonl.gz')
# df_ratings = spark.read.json('data/Appliances.jsonl.gz')


25/05/22 10:22:05 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


### Cleaning Metadata

In [10]:
print(f'Metadata shape: ({df_meta.count()}, {len(df_meta.columns)})')

Metadata shape: (500, 14)


In [12]:
df_meta.printSchema()

root
 |-- average_rating: double (nullable = true)
 |-- bought_together: string (nullable = true)
 |-- categories: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- description: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- details: struct (nullable = true)
 |    |-- Access Location: string (nullable = true)
 |    |-- Age Range (Description): string (nullable = true)
 |    |-- Air Flow Capacity: string (nullable = true)
 |    |-- Airflow Displacement: string (nullable = true)
 |    |-- Alarm: string (nullable = true)
 |    |-- Amperage Capacity: string (nullable = true)
 |    |-- Annual Energy Consumption: string (nullable = true)
 |    |-- Are Batteries Included: string (nullable = true)
 |    |-- Assembly required: string (nullable = true)
 |    |-- Backlight: string (nullable = true)
 |    |-- Batteries: string (nullable = true)
 |    |-- Batteries Included?: string (nullable = true)
 |    |-- Batteries Required?: string (nul

In [13]:
df_meta.columns

['average_rating',
 'bought_together',
 'categories',
 'description',
 'details',
 'features',
 'images',
 'main_category',
 'parent_asin',
 'price',
 'rating_number',
 'store',
 'title',
 'videos']

In [14]:
# Drop columns in meta that are not needed in the analysis

cols_to_drop = ['images', 'videos', 'bought_together', 'price', 'rating_number', 'average_rating']

df_meta = df_meta.drop(*cols_to_drop)

In [15]:
# Count null or nan columns that don't contain arrays

cols_non_arr = [c for c, t in df_meta.dtypes if (t.startswith('array')==False) and (t.startswith('struct') == False) ] 

df_meta.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in cols_non_arr]).show()

+-------------+-----------+-----+-----+
|main_category|parent_asin|store|title|
+-------------+-----------+-----+-----+
|            1|          0|    6|    0|
+-------------+-----------+-----+-----+



In [16]:
# Calculate % of blanks in columns

# for c in cols_non_arr:
    
#     print(f'Percentage of blank rows in {c}: {round(100 * df_meta.where(col(c).isNull() | isnan(c)).count() / df_meta.count(), 3)}%')

### Cleaning Ratings

In [18]:
print(f'Ratings shape: ({df_ratings.count()}, {len(df_ratings.columns)})')

Ratings shape: (500, 10)


In [19]:
df_ratings.columns

['asin',
 'helpful_vote',
 'images',
 'parent_asin',
 'rating',
 'text',
 'timestamp',
 'title',
 'user_id',
 'verified_purchase']

In [20]:
df_ratings.printSchema()

root
 |-- asin: string (nullable = true)
 |-- helpful_vote: long (nullable = true)
 |-- images: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- attachment_type: string (nullable = true)
 |    |    |-- large_image_url: string (nullable = true)
 |    |    |-- medium_image_url: string (nullable = true)
 |    |    |-- small_image_url: string (nullable = true)
 |-- parent_asin: string (nullable = true)
 |-- rating: long (nullable = true)
 |-- text: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- title: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- verified_purchase: boolean (nullable = true)



In [21]:
# df_ratings.show(5)

In [22]:
cols_to_drop = ['title', 'text', 'images', 'helpful_vote', 'verified_purchase', 'timestamp' ]
df_ratings = df_ratings.drop(*cols_to_drop)

In [23]:
# cols_non_arr = [c for c, t in df_ratings.dtypes if (t.startswith('array')==False) and (t.startswith('struct') == False) ] 

# for c in cols_non_arr:
#     print(f'Percentage of blank rows in {c}: {round(100 * df_ratings.where(col(c).isNull() | isnan(c)).count() / df_ratings.count(), 3)}%')

In [24]:
# Utility function

def meta_lookup(parent_asin:str):

    return df_meta.filter(col('parent_asin') == parent_asin)

In [25]:
# Utility function

def ratings_lookup(parent_asin:str):

    return df_ratings.filter(col('parent_asin') == parent_asin)

In [26]:
# x= meta_lookup('B00Q4X2FSM')
# x.show()

In [27]:
# # Try
# df_meta.filter(col('parent_asin') == 'B07S9DJ2S2').show()

In [28]:
# # Try: 
# col_custom = list(F.col(f).alias(f) for f in  df_meta.columns) + list(map(lambda f: F.col("details").getItem(f).alias(str(f)), ["Brand", "Manufacturer"]))
# col_custom

In [29]:
# # Try

# df_meta.select(col_custom).show(5)

In [30]:
# Count blank [stores]

df_meta.filter(col('store').isNull()).count()

6

### FE: Create column -> Maker : Extract from Brand/Manufacturer

In [31]:
# UDF for getting brand/manufacturer/distributor from details 

def get_maker(dict_col, key1, key2):
    if dict_col is None:
        return None
    if key1 in dict_col and dict_col[key1]:
        return dict_col[key1]
    elif key2 in dict_col:
        return dict_col[key2]
    else:
        return None
    
get_maker_udf = F.udf(get_maker, StringType())

In [32]:
# Call udf get_maker to fill [details] with brand or manufacturer info

df_meta = df_meta.withColumn(
    "maker",
    get_maker_udf(df_meta["details"], \
                F.lit("Brand"), \
                F.lit("Manufacturer"))
)

In [33]:
# Show null values in maker column

df_meta.where(col('maker').isNull()).count()

                                                                                

60

In [34]:
# Since [store] values will be used for maker, ensure there'e no blank values 
df_meta = df_meta.na.fill({'store':'Unknown'})

# Checkpoint: Assert no remaining nulls in [store]
assert df_meta.filter(col('store').isNull()).count() == 0, 'Blank values in store. Check imputation'

In [35]:
# Fill null columns in [maker] with values from [store]
from pyspark.sql.functions import coalesce
df_meta = df_meta.withColumn('maker', coalesce('maker', 'store') )

# Checkpoint: Assert no remaining nulls in [maker]
assert df_meta.filter(col('maker').isNull()).count() == 0, 'Blank values in maker. Check imputation'

### FE: Create column -> Feature_Group : Concatenation of different columns

In [36]:
# Create a new column with the concatenated values of the columns of interest
cols_of_interest = ['parent_asin', 'main_category', 'maker', 'title' ] # Add desc and details later
df_meta = df_meta.withColumn('feature_group', concat_ws(' ', *cols_of_interest))

### Similarity Search

In [37]:
import re
import nltk
from nltk.tokenize import sent_tokenize
from nltk.stem import WordNetLemmatizer

from nltk.corpus import stopwords
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')

lemmatizer = WordNetLemmatizer()
stop_words = stopwords.words('english')

from pyspark.ml.feature import Tokenizer, HashingTF, IDF, MinHashLSH

[nltk_data] Downloading package punkt to /home/edu/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /home/edu/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to /home/edu/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


In [38]:
pd.set_option('display.max_columns', None) 

In [39]:
# UDF for preprocessing text

def preprocess_text(text):
    
    text = text.lower()
    text = re.sub(r'[^a-zA-Z0-9\s]', ' ', text)
    text = re.sub(r'\s+', ' ', text)
    
    return text

# Register the UDF
preprocess_text_udf = F.udf(preprocess_text, StringType())

In [40]:
# Apply the UDF to the 'text' column
df_meta = df_meta.withColumn('feat_preproc', preprocess_text_udf(df_meta['feature_group']))

#### Tokenizing the text features

In [41]:
# Tokenize the preprocessed text 
tokenizer = Tokenizer(inputCol='feat_preproc', outputCol='feat_tokens')
df_tokenized = tokenizer.transform(df_meta)

#### Add Extra Lemmatization step

In [42]:
# UDF: Lemmatize tokens
import numpy as np
from pyspark.sql.types import ArrayType, StringType


def lemmatize_tokens(tokens):
    """
    Lemmatize the input tokens.
    Args:
        tokens (list): List of tokens to lemmatize.
    Returns:
        list: List of lemmatized tokens.
    """
    if tokens is None:
        return None

    lemmatized_tokens = [lemmatizer.lemmatize(token.lower()) for token in tokens if token not in stop_words]
    return lemmatized_tokens

# Register the UDF
lemmatize_tokens_udf = F.udf(lemmatize_tokens, ArrayType(StringType()))


In [43]:
# Apply the UDF to the 'tokens' column

df_lemmatized = df_tokenized.withColumn('feat_lemma', lemmatize_tokens_udf(df_tokenized['feat_tokens']))


#### Creating Feature Vectors

In [44]:
ctvec_file = None
hashtf_file = None
tf_ctvector = None
tf_vector = None

##### A. Using Hashing Term Frequency

In [45]:
'''
# Hashing Term Frequency to create feature vectors - Similar to TF-IDF but without the IDF component
# The HashingTF class is used to convert a sequence of terms into a feature vector using the hashing trick.
# HashingTF is a feature transformer that maps a sequence of terms to their term frequencies using the hashing trick.
# It is a fast and efficient way to convert text data into numerical feature vectors.
# The numFeatures parameter specifies the number of features to create.
# The output is a sparse vector of term frequencies, where each index corresponds to a hashed term.


hashingTF = HashingTF(inputCol="feat_lemma", outputCol="feat_vectors" ) #, numFeatures=2048)
tf_vector = hashingTF.transform(df_lemmatized)

tf_vector.cache()
tf_vector.select(col('feat_vectors')).show(5)

ctvec_file = None
hashtf_file = './data/similarity_hashtf.csv' 

'''

'\n# Hashing Term Frequency to create feature vectors - Similar to TF-IDF but without the IDF component\n# The HashingTF class is used to convert a sequence of terms into a feature vector using the hashing trick.\n# HashingTF is a feature transformer that maps a sequence of terms to their term frequencies using the hashing trick.\n# It is a fast and efficient way to convert text data into numerical feature vectors.\n# The numFeatures parameter specifies the number of features to create.\n# The output is a sparse vector of term frequencies, where each index corresponds to a hashed term.\n\n\nhashingTF = HashingTF(inputCol="feat_lemma", outputCol="feat_vectors" ) #, numFeatures=2048)\ntf_vector = hashingTF.transform(df_lemmatized)\n\ntf_vector.cache()\ntf_vector.select(col(\'feat_vectors\')).show(5)\n\nctvec_file = None\nhashtf_file = \'./data/similarity_hashtf.csv\' \n\n'

##### B. Using CountVectorizer

In [46]:
from pyspark.ml.feature import CountVectorizer

# Apply CountVectorizer to count the occurrences of each phrase
countvec = CountVectorizer(inputCol="feat_lemma", outputCol="feat_vectors")

countvec_model = countvec.fit(df_lemmatized)
tf_ctvector = countvec_model.transform(df_lemmatized)

hashtf_file = None
ctvec_file = './data/similarity_ctvec.csv'

tf_ctvector.cache()

                                                                                

DataFrame[categories: array<string>, description: array<string>, details: struct<Access Location:string,Age Range (Description):string,Air Flow Capacity:string,Airflow Displacement:string,Alarm:string,Amperage Capacity:string,Annual Energy Consumption:string,Are Batteries Included:string,Assembly required:string,Backlight:string,Batteries:string,Batteries Included?:string,Batteries Required?:string,Batteries required:string,Battery Cell Type:string,Best Sellers Rank:struct<Amazon Renewed:bigint,Appliances:bigint,Beverage Refrigerators:bigint,Chest Freezers:bigint,Clothes Dryer Replacement Lint Screens:bigint,Clothes Dryer Replacement Vents:bigint,Clothes Washer Replacement Doors:bigint,Clothes Washer Replacement Drain Pumps:bigint,Clothes Washing Machines:bigint,Commercial Bag Sealers:bigint,Commercial Shrink Wrappers:bigint,Compact Refrigerators:bigint,Cooktop Parts & Accessories:bigint,Cooktops:bigint,Dishwasher Parts & Accessories:bigint,Dishwasher Replacement Baskets:bigint,Dishwas

In [47]:
tf_ctvector.show()

[Stage 25:>                                                         (0 + 1) / 1]

+--------------------+--------------------+--------------------+----------------------+--------------------+-----------+------------------+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|          categories|         description|             details|              features|       main_category|parent_asin|             store|               title|             maker|       feature_group|        feat_preproc|         feat_tokens|          feat_lemma|        feat_vectors|
+--------------------+--------------------+--------------------+----------------------+--------------------+-----------+------------------+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|[Appliances, Part...|                  []|{NULL, NULL, NULL...|                    []|Tools & Home Impr...| B07CPY3X2X|            UMTELE|Compat

                                                                                

Create IDF for each product

In [48]:
# The IDF class is used to compute the inverse document frequency (IDF) of the terms in the feature vectors.
# The IDF component is used to down-weight the importance of common terms and up-weight the importance of rare terms.

idf = IDF(inputCol="feat_vectors", outputCol="feat_idf")

if tf_ctvector:
        idf_model = idf.fit(tf_ctvector)
        tfidf = idf_model.transform(tf_ctvector)
elif tf_vector:
        idf_model = idf.fit(tf_vector)
        tfidf = idf_model.transform(tf_vector)

                                                                                

In [49]:
# Drop columns in the tfidf vector 
cols_to_drop = ['feat_preproc', 'feat_tokens', 'feat_lemma', 'feat_vectors']
tfidf = tfidf.drop(*cols_to_drop)

Create a database view of the vectorized tfidf

In [50]:
tfidf.createOrReplaceTempView("v_meta_tfidf")

##### Build query in SQL

In [51]:
# UDF: Query builder using SQL 

def query_builder(search_asin = None, search_category = None, search_maker = None, search_title = None):
    """
    Build a SQL query to filter the DataFrame based on the provided search criteria.

    Args:
        search_category (str, optional): Category to search for. Defaults to None.
        search_maker (str, optional): Maker to search for. Defaults to None.
        search_title (str, optional): Title to search for. Defaults to None.

    Returns:
        str: SQL query string.
    """

    conditions = []
    if search_asin:
        conditions.append(f" parent_asin = '{search_asin}'")
    if search_category:
        conditions.append(f" main_category LIKE '%{search_category}%'")
    if search_maker:
        # conditions.append(f" maker LIKE '%{search_maker}%'")
        conditions.append(f" maker = '{search_maker}'")
    if search_title:
        conditions.append(f" title LIKE '%{search_title}%'")
    if not conditions:
        raise ValueError("At least one search criterion must be provided.")
    
    conditions_str = " AND ".join(conditions)

    query = f"SELECT * FROM v_meta_tfidf WHERE ({conditions_str})"
    
    return query

In [52]:
# # Try: Test Cases


# # Test Case #1: Search by category and maker/brand
# query_statement = query_builder(search_asin=None, search_category='Amazon Home', search_maker='Frigidaire' )

# # Test Case #2: Seacrh by category, maker, and title
# # query_statement = query_builder(search_asin= None, search_category='Electronics', search_maker='Samsung', search_title='DC66')

# # print(query_statement)

In [53]:
# Get results from sql query 
# query_results = spark.sql(query_statement)
# print(f'Query results count: {query_results.count()}')

#### Transform Query Vector

In [54]:
# input_query = [input('Enter Product To Search: ') ]

In [None]:
# Validate:
input_query = 'Frigidaire refrigirators'
print( input_query )

Frigidaire refrigirators


In [56]:
# convert string to spark dataframe
input_df = spark.createDataFrame([(input_query,)], ['feat_preproc'])

In [57]:
# Preprocess the input query
query_pp = preprocess_text_udf(input_query)

# Transform input query to a pyspark dataframe
# query_pp_df = spark.createDataFrame([query_pp], StringType()).toDF('feat_preproc')
query_pp_df = spark.createDataFrame([(input_query,)], ['feat_preproc'])

# Tokenize the preprocessed text
query_token = tokenizer.transform(query_pp_df)

# Lemmatize the tokens
query_lemma = query_token.withColumn('feat_lemma', lemmatize_tokens_udf(query_token['feat_tokens']))

# Get term frequency vector for the lemmatized tokens
query_tf = countvec_model.transform(query_lemma)

In [58]:
query_token.show(1)

                                                                                

+--------------------+--------------------+
|        feat_preproc|         feat_tokens|
+--------------------+--------------------+
|Frigidaire refrig...|[frigidaire, refr...|
+--------------------+--------------------+



In [None]:
query_lemma.show()



+--------------------+--------------------+--------------------+
|        feat_preproc|         feat_tokens|          feat_lemma|
+--------------------+--------------------+--------------------+
|Frigidaire refrig...|[frigidaire, refr...|[frigidaire, refr...|
+--------------------+--------------------+--------------------+



                                                                                

In [None]:
query_tf.show()

+--------------------+--------------------+--------------------+-----------------+
|        feat_preproc|         feat_tokens|          feat_lemma|     feat_vectors|
+--------------------+--------------------+--------------------+-----------------+
|Frigidaire refrig...|[frigidaire, refr...|[frigidaire, refr...|(5165,[17],[1.0])|
+--------------------+--------------------+--------------------+-----------------+



In [None]:
# Get the IDF vector for the term frequency vector
idf = IDF(inputCol="feat_vectors", outputCol="query_idf")
idf_model = idf.fit(query_tf)

query_tfidf = idf_model.transform(query_tf)
# query_tfidf.cache()

ERROR:root:Exception while sending command.===>                     (5 + 3) / 8]
Traceback (most recent call last):
  File "/home/edu/DataScience/CapstoneProjects/Spark_RecSys/venv_spark/lib/python3.12/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/edu/DataScience/CapstoneProjects/Spark_RecSys/venv_spark/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/edu/DataScience/CapstoneProjects/Spark_RecSys/venv_spark/lib/python3.12/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Excepti

Py4JError: An error occurred while calling o344.fit

### Calculate Cosine Similarity

In [None]:
# Crosstab to self?

# tfidf = tfidf.crossJoin(tfidf.withColumnRenamed("feat_idf", "feat_idf2"))
# tfidf.show(2)
# tfidf.select( 'feat_idf', 'feat_idf2').show(5, truncate=False)
# similarity = tfidf.withColumn("cos_sim", cos_sim(F.col('feat_idf'), F.col('feat_idf2')) )

In [None]:
# UDF: Cosine Similarity

# @F.udf(returnType=FloatType())
def cos_sim(u, v):

  return float(u.dot(v) / (u.norm(2) * v.norm(2)))

# Register the UDF
cos_sim_udf = F.udf(cos_sim, FloatType())

In [None]:
# query_results.select(col('feat_idf')).show(5)

In [None]:
tfidf.show(2)

In [None]:
# # Validate query results from previous call

print(query_tfidf.count())
query_tfidf.show(5, truncate=False)

In [None]:
# Compute cosine similarity between the tfidf vectors and the query results.

similarity = tfidf.withColumn("cos_sim", \
                              cos_sim_udf(tfidf['feat_idf'], query_tfidf['query_idf']))

In [None]:
# Preview the similarity results

similarity.select('parent_asin', 'cos_sim', 'title', 'main_category', 'maker').show(10, truncate=False)

In [None]:
# Validation: Save similarity results to csv. 
# Toggle below codes to save using hasftf or countvectorizer.

n_result = 20


df_cos_sim = similarity.select('parent_asin', 'cos_sim', 'title', 'main_category', 'maker').toPandas().sort_values('cos_sim', ascending=False)
df_cos_sim = df_cos_sim.head(n_result)


if hashtf_file:
    # similarity.select('parent_asin', 'cos_sim', 'title', 'main_category', 'maker').show(10, truncate=False).toPandas().sort_values('cos_sim', ascending=False).head(10)

    df_cos_sim.to_csv(hashtf_file, index=False)

elif ctvec_file:
    
    df_cos_sim.to_csv(ctvec_file, index=False)



In [None]:
# similarity.select('parent_asin', 'cos_sim', 'title', 'main_category', 'maker').show(10, truncate=False).toPandas().sort_values('cos_sim', ascending=False).head(10)
# similarity.select('parent_asin', 'cos_sim', 'title', 'main_category', 'maker').toPandas().sort_values('cos_sim', ascending=False).to_csv('./data/similarity_hashtf.csv')

In [None]:
spark.stop()

In [None]:
# from sklearn.feature_extraction.text import TfidfVectorizer
# from sklearn.metrics.pairwise import cosine_similarity


In [None]:
# similarity = cosine_similarity(idfscores.select('feat_idf').rdd.map(lambda x: x[0]).collect())
# similarity.shape

In [None]:
# # Create and fit the MinHashLSH model to the feature vectors
# # Notes:
# # The MinHashLSH -  creates a locality-sensitive hashing (LSH) model for approximate nearest neighbor search.
# # numHashTable -  number of hash tables to use for the LSH model.
# # Fitted hash results -  transforms the feature vectors into hash values.


# mh = MinHashLSH(inputCol="idf", outputCol="mh_hashes", numHashTables=5)
# mh_model = mh.fit(idfscores)

# # Transform the feature data to include hash values
# transformedData = mh_model.transform(idfscores)
# transformedData.show()

In [None]:
# df_meta.withColumn('c_brand', F.col('details').getItem('Brand')).show(5)

# df_meta.withColumn('c_manufac', F.col('details').getItem('Manufacturer')).show(5)

In [None]:
# df.withColumn('c_brand', F.col('details').getItem('Brand')).show()

In [None]:
# rdd = spark.sparkContext.parallelize([jsonData])