In [1]:
import os

# For installing requirements (uncomment if there was an error)

# !apt-get install openjdk-11-jdk-headless -qq > /dev/null
# !wget -q https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
# !tar -xzf spark-3.3.2-bin-hadoop3.tgz
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"

In [2]:
import pkg_resources

# Check if the necessary packages are installed
installed_packages = {pkg.key for pkg in pkg_resources.working_set}

# Install pyspark
if 'pyspark' not in installed_packages:
    !pip install -q pyspark

# Install findspark
if 'findspark' not in installed_packages:
    !pip install -q findspark

# Install kaggle CLI
if 'kaggle' not in installed_packages:
    !pip install -q kaggle

In [3]:
from google.colab import userdata

# Set KAGGLE_USERNAME and KAGGLE_KEY in the Secrets section of Colab
os.environ['KAGGLE_USERNAME'] = userdata.get('KAGGLE_USERNAME')
os.environ['KAGGLE_KEY'] = userdata.get('KAGGLE_KEY')

In [4]:
# Download and unzip the dataset
if not os.path.exists('Books_rating.csv'):
    !kaggle datasets download -d mohamedbakhet/amazon-books-reviews
    !unzip -o amazon-books-reviews.zip
else:
    print("The file 'Books_rating.csv' already exists.")

Dataset URL: https://www.kaggle.com/datasets/mohamedbakhet/amazon-books-reviews
License(s): CC0-1.0
Downloading amazon-books-reviews.zip to /content
 98% 1.04G/1.06G [00:05<00:00, 200MB/s]
100% 1.06G/1.06G [00:05<00:00, 202MB/s]
Archive:  amazon-books-reviews.zip
  inflating: Books_rating.csv        
  inflating: books_data.csv          


In [5]:
!nproc

2


In [6]:
!cat /proc/cpuinfo | grep "model name"

model name	: Intel(R) Xeon(R) CPU @ 2.20GHz
model name	: Intel(R) Xeon(R) CPU @ 2.20GHz


In [7]:
!cat /proc/meminfo | grep MemTotal

MemTotal:       13289424 kB


In [8]:
import findspark

# Initialize Spark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover
from pyspark.sql.functions import size
from pyspark.ml.feature import HashingTF, MinHashLSH

In [9]:
class TextSimilarityDetector:
    def __init__(self, csv_file_path, text_input_column_name, columns_to_select=[], column_names_mapping={}, sample_fraction=0.01):
        self.spark = SparkSession.builder.master("local[*]").appName("SimilarityDetector").getOrCreate()
        print("Spark session created successfully.")
        print("Spark Default Parallelism Configuration: ", self.spark.sparkContext.defaultParallelism)

        if columns_to_select and (text_input_column_name not in columns_to_select):
            raise ValueError(f"The specified text input column '{text_input_column_name}' is not in the columns to select: {columns_to_select}")

        if column_names_mapping and (text_input_column_name not in column_names_mapping.keys()):
            raise ValueError(f"The specified text input column '{text_input_column_name}' is not in the column names mapping: {column_names_mapping}")

        if columns_to_select and column_names_mapping:
            for col in columns_to_select:
                if col not in column_names_mapping.keys():
                    raise ValueError(f"The column '{col}' in columns to select is not present in the column names mapping: {column_names_mapping}")

        self.csv_file_path = csv_file_path
        self.column_names_mapping = column_names_mapping
        self.text_input_column_name = self.column_names_mapping.get(text_input_column_name, text_input_column_name)
        self.columns_to_select = columns_to_select
        self.sample_fraction = sample_fraction

        self._load_data()

        if self.sample_fraction:
            self._sample_data()

    def _load_data(self):
        self.df = self.spark.read.csv(self.csv_file_path, header=True)
        if self.df is not None:
            if self.columns_to_select:
                self.df = self.df.select(*self.columns_to_select)
            else:
                self.df = self.df.select("*")

            if self.column_names_mapping:
                for old_name, new_name in self.column_names_mapping.items():
                    self.df = self.df.withColumnRenamed(old_name, new_name)

            self.df = self.df.dropna().dropDuplicates()
            self.df = self.df.cache()
            print(f"{self.df.count()} number of records loaded")
        else:
            raise ValueError("DataFrame is empty or not loaded properly.")

    def _sample_data(self):
        self.df_sampled = self.df.sample(withReplacement=False, fraction=self.sample_fraction, seed=42)
        self.df_sampled.cache()
        print(f"Sampled {self.df_sampled.count()} records from data.\nUsing sampled data for further processing.")

    def _tokenize(self, pattern, remove_stopwords):
        print("Initiating tokenization process...")
        tokenizer = RegexTokenizer(inputCol=self.text_input_column_name, outputCol="text_words", pattern=pattern)
        if self.sample_fraction:
            print("Using sampled data for tokenization.")
            self.df_with_words = tokenizer.transform(self.df_sampled)
        else:
            print("Using full data for tokenization.")
            self.df_with_words = tokenizer.transform(self.df)
        self.df_with_words.cache()

        if remove_stopwords:
            self.df_tokenized = self._remove_stopwords()
        else:
            self.df_tokenized = self.df_with_words

        self.df_tokenized = self.df_tokenized.filter(size("tokens") > 0)
        self.df_tokenized.cache()
        print(f"Tokenization complete. {self.df_tokenized.count()} records with non-empty tokens.")
        print("\n--------------------------------------------------------------------------------\n")
        print("Sample of tokenized data:")
        self.df_tokenized.show(5, truncate=True)

    def _remove_stopwords(self):
        remover = StopWordsRemover(inputCol="text_words", outputCol="tokens")
        df_tokenized = remover.transform(self.df_with_words)
        df_tokenized.cache()
        return df_tokenized

    def find_similar_pairs(self, record_id_column_name, pattern="\\W", remove_stopwords=True ,num_features=10000, num_hash_tables=5, Jaccard_threshold=0.4):
        self._tokenize(pattern=pattern, remove_stopwords=remove_stopwords)

        if record_id_column_name not in self.df_tokenized.columns:
            raise ValueError(f"The specified record ID column '{record_id_column_name}' is not in the DataFrame columns: {self.df_tokenized.columns}")

        print("Initiating MinHash LSH model fitting...")
        hashingTF = HashingTF(inputCol="tokens", outputCol="features", numFeatures=num_features)
        df_hashed = hashingTF.transform(self.df_tokenized)
        df_hashed.cache()

        mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=num_hash_tables)
        model = mh.fit(df_hashed)

        print("MinHash LSH model fitted successfully.")


        print(f"Finding similar pairs using Jaccard distance (Jaccard distance < {Jaccard_threshold})...")

        similar_pairs = model.approxSimilarityJoin(
            df_hashed, df_hashed, threshold=Jaccard_threshold, distCol="JaccardDistance"
        ).filter(f"datasetA.{record_id_column_name} <> datasetB.{record_id_column_name}")  # filter out self-joins

        similar_pairs.cache()
        print(f"Number of similar pairs found: {similar_pairs.count()}")

        return similar_pairs

In [10]:
# Usage

sample_fraction = 0.01

review_similarity_detector = TextSimilarityDetector(
    csv_file_path="Books_rating.csv",
    text_input_column_name="review/text",
    columns_to_select=["Id", "Title", "User_id", "review/score", "review/summary", "review/text"],
    column_names_mapping={
        "Id": "review_id",
        "Title": "book_title",
        "User_id": "user_id",
        "review/score": "review_score",
        "review/summary": "review_summary",
        "review/text": "review_text"
    },
    sample_fraction=sample_fraction
)

Spark session created successfully.
Spark Default Parallelism Configuration:  2
2418791 number of records loaded
Sampled 24359 records from data.
Using sampled data for further processing.


In [11]:
similar_pairs = review_similarity_detector.find_similar_pairs(
    record_id_column_name="review_id", # Make sure this column name is present in the column names of the dataset (or in column_names_mapping values if this name mapping is used)
    Jaccard_threshold=0.4, # Jaccard distance < 0.6
    # pattern="\\W",  # default
    # remove_stopwords=True,  # default
    # num_features=10000,  # default
    # num_hash_tables=5  # default
)

Initiating tokenization process...
Using sampled data for tokenization.
Tokenization complete. 24353 records with non-empty tokens.

--------------------------------------------------------------------------------

Sample of tokenized data:
+----------+--------------------+--------------+------------+--------------------+--------------------+--------------------+--------------------+
| review_id|          book_title|       user_id|review_score|      review_summary|         review_text|          text_words|              tokens|
+----------+--------------------+--------------+------------+--------------------+--------------------+--------------------+--------------------+
|0877794685|Webster's Third N...|A24VUPNUGKF2ER|         2.0|This Review Is Fo...|The CD-Rom is ver...|[the, cd, rom, is...|[cd, rom, disappo...|
|0684190192|         The Hustons| AJJOYSGAAL1JI|         4.0|Huston, not a cit...|Hollywood (and mu...|[hollywood, and, ...|[hollywood, much,...|
|B000LEJ3YY|King of the Wind:

In [47]:
# General view
similar_pairs.select(
    "JaccardDistance", "datasetA.*", "datasetB.*"
).orderBy("JaccardDistance").show(10)

+---------------+----------+--------------------+--------------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+--------------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|JaccardDistance| review_id|          book_title|       user_id|review_score|      review_summary|         review_text|          text_words|              tokens|            features|              hashes| review_id|          book_title|       user_id|review_score|      review_summary|         review_text|          text_words|              tokens|            features|              hashes|
+---------------+----------+--------------------+--------------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+------

In [48]:
# Side by side comparison
similar_pairs.select(
    "JaccardDistance", "datasetA.review_id", "datasetB.review_id", "datasetA.user_id", "datasetB.user_id",
    "datasetA.book_title", "datasetB.book_title", "datasetA.review_score",
    "datasetB.review_score", "datasetA.review_summary", "datasetB.review_summary",
    "datasetA.review_text", "datasetB.review_text"
).orderBy("JaccardDistance").show(10)

+---------------+----------+----------+--------------+--------------+--------------------+--------------------+------------+------------+--------------------+--------------------+--------------------+--------------------+
|JaccardDistance| review_id| review_id|       user_id|       user_id|          book_title|          book_title|review_score|review_score|      review_summary|      review_summary|         review_text|         review_text|
+---------------+----------+----------+--------------+--------------+--------------------+--------------------+------------+------------+--------------------+--------------------+--------------------+--------------------+
|            0.0|B000JGTKBK|B000PG2G9M|A1JTG5X4VHJV27|A1JTG5X4VHJV27|     Little Britches|     Little Britches|         4.0|         4.0|Building a Charaa...|Building a Charaa...|"When his New Eng...|"When his New Eng...|
|            0.0|B0006BV6RY|0395051029|A2SD4MYPYSA304|A2SD4MYPYSA304|Wuthering Heights...|Wuthering Heights...| 

In [49]:
# Possible identical reviews (JaccardDistance = 0)
similar_pairs.select(
    "JaccardDistance", "datasetA.review_id", "datasetB.review_id", "datasetA.user_id", "datasetB.user_id",
    "datasetA.book_title", "datasetB.book_title", "datasetA.review_score",
    "datasetB.review_score", "datasetA.review_summary", "datasetB.review_summary",
    "datasetA.review_text", "datasetB.review_text"
).filter("JaccardDistance = 0").show(10)

+---------------+----------+----------+--------------+--------------+--------------------+--------------------+------------+------------+--------------------+--------------------+--------------------+--------------------+
|JaccardDistance| review_id| review_id|       user_id|       user_id|          book_title|          book_title|review_score|review_score|      review_summary|      review_summary|         review_text|         review_text|
+---------------+----------+----------+--------------+--------------+--------------------+--------------------+------------+------------+--------------------+--------------------+--------------------+--------------------+
|            0.0|0451519582|0736605010|A384TL3175MAQC|A384TL3175MAQC|Wuthering Heights...|   Wuthering Heights|         5.0|         5.0|Is it a truly sor...|Is it a truly sor...|Instead of focus ...|Instead of focus ...|
|            0.0|0435126024|1847022251|A29LEKFCAHMI6Y|A29LEKFCAHMI6Y|Jane Eyre (New Wi...|Jane Eyre (Large ...| 

In [54]:
# Possible non-identical reviews (JaccardDistance > 0.2)
similar_pairs.select(
    "JaccardDistance", "datasetA.review_id", "datasetB.review_id", "datasetA.user_id", "datasetB.user_id",
    "datasetA.book_title", "datasetB.book_title", "datasetA.review_score",
    "datasetB.review_score", "datasetA.review_summary", "datasetB.review_summary",
    "datasetA.review_text", "datasetB.review_text"
).filter("JaccardDistance > 0.2").orderBy("JaccardDistance").show(10)

+-------------------+----------+----------+--------------+--------------+--------------------+--------------------+------------+------------+--------------------+--------------------+--------------------+--------------------+
|    JaccardDistance| review_id| review_id|       user_id|       user_id|          book_title|          book_title|review_score|review_score|      review_summary|      review_summary|         review_text|         review_text|
+-------------------+----------+----------+--------------+--------------+--------------------+--------------------+------------+------------+--------------------+--------------------+--------------------+--------------------+
| 0.2142857142857143|0764566334|1864500832|A3DWXVGOE2XZIQ|A3DWXVGOE2XZIQ|Frommer's Road At...|Lonely Planet Out...|         3.0|         5.0|You're going to L...|You're going to L...|I've made &#62;20...|I've spent a year...|
| 0.2142857142857143|1864500832|0764566334|A3DWXVGOE2XZIQ|A3DWXVGOE2XZIQ|Lonely Planet Out...|Fr

In [55]:
# By different users
similar_pairs.select(
    "JaccardDistance", "datasetA.review_id", "datasetB.review_id", "datasetA.user_id", "datasetB.user_id",
    "datasetA.book_title", "datasetB.book_title", "datasetA.review_score",
    "datasetB.review_score", "datasetA.review_summary", "datasetB.review_summary",
    "datasetA.review_text", "datasetB.review_text"
).filter("datasetA.user_id <> datasetB.user_id",).orderBy("JaccardDistance").show(10)

+---------------+----------+----------+--------------+--------------+--------------------+--------------------+------------+------------+--------------------+--------------------+--------------------+--------------------+
|JaccardDistance| review_id| review_id|       user_id|       user_id|          book_title|          book_title|review_score|review_score|      review_summary|      review_summary|         review_text|         review_text|
+---------------+----------+----------+--------------+--------------+--------------------+--------------------+------------+------------+--------------------+--------------------+--------------------+--------------------+
|            0.0|B000MS82OQ|B000HKR9FW|A331SWKETANFZS|A2XKXQ9NSS7B98|   Mere Christianity|   Mere Christianity|         5.0|         5.0|Titles can be mis...|Titles can be mis...|By &quot;mere&quo...|By &quot;mere&quo...|
|            0.0|1594390002|B0007J8XJ4| ADEVBXUS0RSIR|A22XJU523VPBK8|Analysis of Shaol...|The varieties of ...| 

In [57]:
# With different book titles
similar_pairs.select(
    "JaccardDistance", "datasetA.review_id", "datasetB.review_id", "datasetA.user_id", "datasetB.user_id",
    "datasetA.book_title", "datasetB.book_title", "datasetA.review_score",
    "datasetB.review_score"
).filter("datasetA.book_title <> datasetB.book_title",).show(10, truncate=False)

+---------------+----------+----------+--------------+--------------+---------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------+------------+------------+
|JaccardDistance|review_id |review_id |user_id       |user_id       |book_title                                                                                         |book_title                                                                                |review_score|review_score|
+---------------+----------+----------+--------------+--------------+---------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------+------------+------------+
|0.0            |0451519582|0736605010|A384TL3175MAQC|A384TL3175MAQC|Wuthering Heights (Signet classics)                                   