# **AMD Project: Finding Similar Items**

*   **Author:** Matteo Onger
*   **Date:** July 2023

The following code implements the MinHash-LSH technique to find similar items.\
The code is organized in three main classes: one to compute characteristic vectors, one to compute signatures and one to apply the LSH technique. Six global variables allow the customisation of the algorithm and are passed as parameters to the constructors of these classes. Four functions (*words*, *nonStopWords*, *joinStopWords* and *kgrams*) have already been defined to compute the shingles in four different ways and another function is used to preprocess the raw data.\
The dataset is organized as a Spark RDD, so its standard methods are used to apply the previously defined functions/methods to the entire dataset. Of course, this approach guarantees a better scalability.

## Main Code
---
**Note:** It is possible to edit the parameters to customize the execution of the algorithm (7th cell).\
**Note:** KAGGLE_USERNAME and KAGGLE_KEY must be entered to download the dataset (8th cell).



In [1]:
# ---- INSTALL LIBRARIES ----
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
# ---- LIBRARIES ----
import json
import nltk
import numpy as np
import os
import pyspark
import pyspark.sql.types as tp
import random
import re
import sympy
import time

from collections.abc import Callable
from operator import add
from os.path import isfile
from pyspark.sql import SparkSession

In [3]:
# ---- PREPROCESSING FUNCS ----
def preprocessing(text :str) -> str:
  """
  Return the string obtained by replacing all the non-alphanumeric characters
  of ``text`` with a whitespace. All the letters are made lowercase and multiple
  consecutive whitespaces are collapsed into a single whitespace.
  """
  text = re.sub('[^A-Za-z0-9 ]+', ' ', text.lower())
  text = re.sub('\s{2,}', ' ', text)
  return text


# ---- SHINGLES FUNCS ----
def words(text :str) -> set[str]:
  """
  Return a set of shingles, which, in this case, are words.
  So ``text`` is split according to whitespaces.
  """
  res = set(text.split(' '))
  res.discard('')
  return res


def nonStopWords(text :str) -> set[str]:
  """
  Return a set of shingles, which, in this case, are words, but excluding stopwords.
  ``stopWordsEn`` is a set and must be a global variable containing all the desired stopwords.
  """
  res = set(text.split(' '))
  res.discard('')
  res.difference_update(stopWordsEn)
  return res


def joinStopWords(text :str) -> set[str]:
  """
  Return a set of shingles, which, in this case, are words with the stopwords joined to the next two words.
  ``stopWordsEn`` is a set and must be a global variable containing all desired stopwords.
  """
  res = set()
  text = text.split(' ')
  for w in range(0, len(text)):
    if w == '':
      continue
    elif text[w] in stopWordsEn:
      res.add(text[w] + (text[w+1] if w+1<len(text) else '') + (text[w+2] if w+2<len(text) else ''))
      w += 3
    else:
      res.add(text[w])
  return res


def kgrams(text :str) -> set[str]:
  """
  Return a set of shingles, which, in this case, are K-grams.
   ``K`` is a global variable.
  """
  if len(text) < K:
    text += (K-len(text)) * ' '
  grams = set()
  for i in range(0, len(text)-K+1):
    grams.add(text[i:i+K])
  return grams

In [4]:
# ---- CHARACTERISTIC MATRIX CLASS ----
class CharacteristicMatrix:
  """
  The following class implements methods to compute characteristic vectors
  and the jaccard index.
  """

  def __init__(self, maxNumShingles :int, computeShingles :Callable[[str], set[str]], verbose :bool=False) -> None:
    """
    Create a new object to compute characteristic vectors.
    Shingles will be computed using the function ``computeShingles`` and
    they will be stored as an integer between [0,``maxNumShingles``), so
    if the number of distinct shingles is greater or equal than ``maxNumShingles``,
    some will be associated with the same index (collisions).
    """
    self.maxNumShingles = maxNumShingles
    self._computeShingles = computeShingles
    self._verb = verbose

    if self._verb:
      print('New characteristic matrix created:')
      print(' - Max num shingles -> ' + str(self.maxNumShingles) + "\n")
    return


  def computeCharVect(self, doc :str, key :str='') -> dict[str, str|set[int]]:
    """
    Compute the characteristic vector of the document ``doc``.

    Parameters
    ----------
    ``key``: string, optional
      ID of the passed document ``doc``. Empty string as default value.
    ``doc``: string
      Text to analyze.

    Return
    ----------
    Return a dictionary ``y`` that contains the key (unchanged) of the given document and
    a set ``s`` of integer: they represent the shingles of ``doc``.
    ``y``: dict
      {'key':``key``, 'shin':``s``}
    """
    shingles = set()
    for s in self._computeShingles(doc):
      idx = hash(s) % self.maxNumShingles
      shingles.add(idx)
    return {'key':key, 'shin':shingles}


  def js(self, a :dict[str, str|set[int]], b :dict[str, str|set[int]]) -> dict[str, str|float]:
    """
    Compute the jaccard similarity coefficient of the docs ``a`` and ``b``, given their characteristic vectors.

    Parameters
    ----------
    ``a``, ``b``: dict
      The field 'shin' must contain the characteristic vector,
      while the field 'key' (optional) can contain the doc's id.
      If this field doesn't exist, an empty string is used as key.

    Return
    ----------
    Return a dictionary ``y`` that contains the keys (unchanged) of the given documents and
    a float ``f`` that is the jaccard index.
    ``y``: dict
      {'keyA':``a['key']``, 'keyB':``b['key']``, 'JS':``f``}
    """
    keyA = a['key'] if 'key' in a else ''
    keyB = b['key'] if 'key' in b else ''
    a = a['shin']
    b = b['shin']

    return {'keyA':keyA, 'keyB':keyB, 'JS':len(a.intersection(b)) / len(a.union(b))}

In [5]:
# ---- SIGNATURE MATRIX CLASS ----
class SignatureMatrix:
  """
  The following class implements methods to compute signatures
  and the estimated jaccard index.
  """

  def __init__(self, numMinHashes :int, charMat :CharacteristicMatrix, verbose :bool=False) -> None:
    """
    Create a new object that computes ``numMinHashes`` MinHashes per docs
    using ``charMat`` to get the characteristic vector of all the docs treated.
    """
    self.nMH = numMinHashes
    self._charMat = charMat
    self._seedsA = np.array([random.randint(0, self._charMat.maxNumShingles) for i in range(0, self.nMH)])
    self._seedsB = np.array([random.randint(0, self._charMat.maxNumShingles) for i in range(0, self.nMH)])
    self._seedsC = np.array([sympy.randprime(self._charMat.maxNumShingles, 2*self._charMat.maxNumShingles) for i in range(0, self.nMH)])
    self._verb = verbose

    if self._verb:
      print('New signature matrix created:')
      print(' - Num MinHashes -> ' + str(self.nMH))
      print(' - SeedsA -> ' + str(self._seedsA))
      print(' - SeedsB -> ' + str(self._seedsB))
      print(' - SeedsC -> ' + str(self._seedsC) + "\n")
    return


  def computeSignature(self, doc :str, key :str='') -> dict[str, str|tuple[int]]:
    """
    Compute the signature of the document ``doc``.

    Parameters
    ----------
    ``key``: string, optional
      ID of the passed document ``doc``. Empty string as default value.
    ``doc``: string
      Text to analyze.

    Return
    ----------
    Return a dictionary ``y`` that contains the key (unchanged) of the given document and
    a tuple ``t`` of integer: it is the signature of ``doc``.
    ``y``: dict
      {'key':``key``, 'sign':``t``}
    """
    shingles = self._charMat.computeCharVect(doc)['shin']
    signature = np.full(self.nMH, self._charMat.maxNumShingles)
    # Row index in the original char mat
    for idx in shingles:
      # Row indexes in the simulated permutations
      hashI = np.mod(np.mod(np.add(idx * self._seedsA, self._seedsB), self._seedsC), self._charMat.maxNumShingles)
      np.minimum(signature, hashI, out=signature)
    return {'key':key, 'sign':tuple(signature)}


  def ejs(self, a :dict[str, str|set[int]], b :dict[str, str|set[int]]) -> dict[str, str|float]:
    """
    Compute the estimated jaccard similarity coefficient of the docs ``a`` and ``b``, given their signatures.

    Parameters
    ----------
    ``a``, ``b``: dict
      The field 'sign' must contain the signature,
      while the field 'key' (optional) can contain the doc's id.
      If this field doesn't exist, an empty string is used as key.

    Return
    ----------
    Return a dictionary ``y`` that contains the keys (unchanged) of the given documents and
    a float ``f`` that is the estimated jaccard index.
    ``y``: dict
      {'keyA':``a['key']``, 'keyB':``b['key']``, 'EJS':``f``}
    """
    keyA = a['key'] if 'key' in a else ''
    keyB = b['key'] if 'key' in b else ''
    a = a['sign']
    b = b['sign']
    return {'keyA':keyA,  'keyB':keyB, 'EJS':sum([a[i] == b[i] for i in range(0, len(a))]) / len(a)}

In [6]:
# ---- LSH CLASS ----
class LSH:
  """
  The following class implements methods to apply the LSH technique.
  """

  def __init__(self, nBands :int, nRows :int, signMat :SignatureMatrix, verbose :bool=False) -> None:
    """
    Create a new object to apply the LSH technique using ``nBands`` bands,
    everyone composed by ``nRows`` rows.
    If necessary, signatures are computed using ``signMat``.
    """
    self.B = nBands
    self.R = nRows
    self._signMat = signMat
    self._verb = verbose

    if self._verb:
      print('New LSH:')
      print(' - Num bands -> ' + str(self.B))
      print(' - Num rows x band -> ' + str(self.R))
      print(" - Threshold set: %.2f \n" % ((1/self.B)**(1/self.R)*100))
    return


  def computeBuckets(self, x :tuple[int]|str, key :str='') -> list[tuple[tuple[int,int], dict[str, str|tuple[int]]]]:
    """
    Assign the document ``x`` to a bucket for each band.

    Parameters
    ----------
    ``key``: string, optional
      ID of the passed document ``doc``. Empty string as default value.
    ``x``: tuple|str
      The document, or its signature, to be assigned to a bucuket for each band.

    Return
    ----------
    Return a list of tuples ``y``, one for each band.
    Each inner tuple contains the considered band ``i`` and the bucket ``b`` to which the document was assigned,
    in addition to the key and the signature of the processed document.
    ``y``: list
      [((``i``,``b``),{'key':``key``,'sign':sign}), ...]
    """
    bucks = list()
    if type(x) == str:
      x = self._signMat.computeSignature(x)
      sign = x['sign']
    else:
      sign = x
    for i in range(0, self.B):
      bucks.append(((i, hash(sign[self.R*i:(self.R*i)+self.R])), {'key':key, 'sign':sign}))
    return bucks


  def ejsAllpairs(self, x :list[dict[str, str|int]]) -> list:
    """
    Compute the ejs index for all the pairs of documents assigned to the same bucket of a band.
    The index is computed using the method ``.ejs(...)`` of the signature matrix passed to the constructor.

    Parameters
    ----------
    ``x``: list
      It is a list of dictionaries. Each of them must contain the key 'sign' linked to the signature
      and optionally the key 'key' that contains the id of the considered document.
      The empty string is used as default id.

    Return
    ----------
    ``ris``: list
      List of dict returned by ``SignatureMatrix.ejs(...)``.
    """
    ris = list()
    for i in range(0, len(x)-1):
      for j in range(i+1, len(x)):
        ris.append(sm.ejs(x[i],x[j]))
    return ris

In [7]:
# ---- PARAMETERS ----
# Num of reviews to read
J = 100000
# Max num of distinct shingles
N = 250000
# Num of bands
B = 5
# Num rows per bands
R = 10
# Func used to compute shingles
F = nonStopWords
# Len of K-grams (necessary only if F == kgrams)
K = 4

# Required JS of docs to have at least a 50% chance of being a candidate pair (threshold)
T = ((1/B)**(1/R))
# MinHashes computed per doc
M = B * R

print('LSH threshold set: %.2f' % T)
print('MinHashes  per doc: ' + str(M))

# ---- FILES PATH ----
pathZipDs = '/content/yelp-dataset.zip'
pathDs = '/content/yelp_academic_dataset_review.json'

LSH threshold set: 0.85
MinHashes  per doc: 50


In [8]:
# ---- SETUP ----
# Set os environment variables
os.environ['KAGGLE_USERNAME'] = '#########'
os.environ['KAGGLE_KEY'] = '#########'

# Yelp dataset
if not(isfile(pathZipDs)):
  !kaggle datasets download -d yelp-dataset/yelp-dataset
if not(isfile(pathDs)):
  !unzip /content/yelp-dataset.zip -d /content/

# Stopwords dataset
nltk.download('stopwords')
stopWordsEn = set(nltk.corpus.stopwords.words('english'))

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


**Note:** In the following cells the dataset is loaded and processed in the form of RDD, using the previously implemented methods and functions. The whole process is broken down into different cells to show the intermediate results, but it could be summerized in a few lines of code without implementing any new function.

In [9]:
# Instantiate the necessary objects
cm = CharacteristicMatrix(N, F)
sm = SignatureMatrix(M, cm)
lsh = LSH(B,R, sm)

In [10]:
# Create a new spark session
spark = SparkSession.builder.appName('LSH').master('local[*]').getOrCreate()
# Get spark context
sc = spark.sparkContext

# Dataframe schema
schema = tp.StructType().add('review_id', tp.StringType(), False).add('text', tp.StringType(), False)

In [11]:
# Read json file (J lines and only the fields in schema)
rawDataRDD = spark.read.schema(schema).json(pathDs).limit(J).rdd
rawDataRDD.first()

Row(review_id='KU_O5udG6zpxOg-VcAEodg', text="If you decide to eat here, just be aware it is going to take about 2 hours from beginning to end. We have tried it multiple times, because I want to like it! I have been to it's other locations in NJ and never had a bad experience. \n\nThe food is good, but it takes a very long time to come out. The waitstaff is very young, but usually pleasant. We have just had too many experiences where we spent way too long waiting. We usually opt for another diner or restaurant on the weekends, in order to be done quicker.")

In [12]:
startTime = time.time()

In [13]:
# Data preprocessing
dataRDD = rawDataRDD.map(lambda x: (x[0],preprocessing(x[1]))).cache()
dataRDD.first()

('KU_O5udG6zpxOg-VcAEodg',
 'if you decide to eat here just be aware it is going to take about 2 hours from beginning to end we have tried it multiple times because i want to like it i have been to it s other locations in nj and never had a bad experience the food is good but it takes a very long time to come out the waitstaff is very young but usually pleasant we have just had too many experiences where we spent way too long waiting we usually opt for another diner or restaurant on the weekends in order to be done quicker ')

In [14]:
# Compute signature
signRDD = dataRDD.map(lambda x: sm.computeSignature(x[1],x[0])).cache()
signRDD.first()

{'key': 'KU_O5udG6zpxOg-VcAEodg',
 'sign': (1003,
  6427,
  1030,
  1802,
  11336,
  1081,
  821,
  25,
  7865,
  13924,
  159,
  880,
  7935,
  4480,
  6471,
  27568,
  3025,
  21298,
  896,
  19652,
  191,
  7708,
  2070,
  4914,
  8099,
  2006,
  4415,
  10203,
  5992,
  2807,
  11109,
  5911,
  1419,
  4247,
  1943,
  2521,
  3350,
  393,
  678,
  9107,
  2383,
  1210,
  1013,
  4293,
  750,
  3186,
  9186,
  72,
  1719,
  717)}

In [15]:
# Assign every doc to a bucket for each band
# key: (band, bucket), value: {'key':id_doc, 'sign':sign_doc}
buckRDD = signRDD.flatMap(lambda x: lsh.computeBuckets(x['sign'], x['key']))
buckRDD.first()

((0, -8302861642663643509),
 {'key': 'KU_O5udG6zpxOg-VcAEodg',
  'sign': (1003,
   6427,
   1030,
   1802,
   11336,
   1081,
   821,
   25,
   7865,
   13924,
   159,
   880,
   7935,
   4480,
   6471,
   27568,
   3025,
   21298,
   896,
   19652,
   191,
   7708,
   2070,
   4914,
   8099,
   2006,
   4415,
   10203,
   5992,
   2807,
   11109,
   5911,
   1419,
   4247,
   1943,
   2521,
   3350,
   393,
   678,
   9107,
   2383,
   1210,
   1013,
   4293,
   750,
   3186,
   9186,
   72,
   1719,
   717)})

In [16]:
# Buckets that contains more than one doc
filteredBuck = buckRDD.map(lambda x: (x[0], 1)).reduceByKey(add).filter(lambda x: x[1]>1).keys().collect()

print('Buckets with more than one doc: ' + str(len(filteredBuck)))

Buckets with more than one doc: 353


In [17]:
# Compute candidate pairs (all pairs of docs in the same bucket in a band)
candRDD = buckRDD.filter(lambda x: x[0] in filteredBuck).groupByKey().map(lambda x: list(x[1]))
candRDD.first()

[{'key': 'Sx8TMOWLNuJBWer-0pcmoA',
  'sign': (352,
   1325,
   53,
   1292,
   5510,
   4518,
   1691,
   4311,
   9042,
   1715,
   1418,
   4392,
   4068,
   1703,
   919,
   3935,
   3919,
   3605,
   587,
   8097,
   3437,
   413,
   5804,
   7888,
   8754,
   2006,
   2969,
   2349,
   5671,
   427,
   5050,
   1092,
   184,
   52,
   2860,
   5530,
   1537,
   2437,
   8890,
   5340,
   2604,
   9230,
   1158,
   3310,
   7055,
   771,
   3016,
   682,
   11093,
   4589)},
 {'key': 'DK-hGw3XSTJHT26yjzeG3Q',
  'sign': (352,
   1325,
   53,
   1292,
   5510,
   4518,
   1691,
   2338,
   9042,
   1715,
   1418,
   4392,
   4068,
   1703,
   919,
   3935,
   3919,
   3605,
   587,
   8097,
   3437,
   413,
   5804,
   7888,
   8754,
   2006,
   2969,
   2349,
   5671,
   427,
   5050,
   1092,
   184,
   52,
   2860,
   2654,
   1537,
   2437,
   8890,
   5340,
   2604,
   9230,
   1158,
   3310,
   7055,
   771,
   3016,
   682,
   11093,
   4589)}]

In [18]:
# Compute EJS for all candidate pairs
res = candRDD.flatMap(lambda x: lsh.ejsAllpairs(x))

print('Number of pairs: ' + str(res.count()) + "\n")
res.take(5)

Number of pairs: 355



[{'keyA': 'Sx8TMOWLNuJBWer-0pcmoA',
  'keyB': 'DK-hGw3XSTJHT26yjzeG3Q',
  'EJS': 0.96},
 {'keyA': 'Sx8TMOWLNuJBWer-0pcmoA',
  'keyB': 'DK-hGw3XSTJHT26yjzeG3Q',
  'EJS': 0.96},
 {'keyA': 'Sx8TMOWLNuJBWer-0pcmoA',
  'keyB': 'DK-hGw3XSTJHT26yjzeG3Q',
  'EJS': 0.96},
 {'keyA': 'Zb_27vX8weaYyDn-_2ZhVA',
  'keyB': '-crxsnaKPE07GLVPvX0MMQ',
  'EJS': 1.0},
 {'keyA': 'Zb_27vX8weaYyDn-_2ZhVA',
  'keyB': '-crxsnaKPE07GLVPvX0MMQ',
  'EJS': 1.0}]

In [19]:
# Remove duplicate pairs
res = res.groupBy(lambda x: (x['keyA'],x['keyB'])).map(lambda x: list(x[1])[0])

print('Number of similar docs: ' + str(res.count()) + "\n")
res.take(5)

Number of similar docs: 98



[{'keyA': 'Sx8TMOWLNuJBWer-0pcmoA',
  'keyB': 'DK-hGw3XSTJHT26yjzeG3Q',
  'EJS': 0.96},
 {'keyA': 'Zb_27vX8weaYyDn-_2ZhVA',
  'keyB': '-crxsnaKPE07GLVPvX0MMQ',
  'EJS': 1.0},
 {'keyA': 'RZq-EioVPJpBgwbN2aFmVw',
  'keyB': 'RQY_P1raDRzMiYl1WtdK8w',
  'EJS': 0.84},
 {'keyA': 'RZq-EioVPJpBgwbN2aFmVw',
  'keyB': 'Jjby0Vu3NaCJEHRmWvOwQw',
  'EJS': 0.8},
 {'keyA': 'RQY_P1raDRzMiYl1WtdK8w',
  'keyB': 'Jjby0Vu3NaCJEHRmWvOwQw',
  'EJS': 0.96}]

In [20]:
endTime = time.time()
print("Execution time: " + str(endTime - startTime))

Execution time: 98.3564965724945


## Testing
---
**Note:** The following functions are for testing purposes only, given the high space and time complexity. So any failure is perfectly normal.

**JS vs EJS**
> This cell is used to compute the average difference between the Jaccard Similarity index and its estimate. Of course, the parameters set deeply affect the resulting mean.



In [21]:
lst = res.collect()
txt = dataRDD.collectAsMap()
delta = 0
for i in lst:
  idD1 = i['keyA']
  idD2 = i['keyB']

  shinD1 = cm.computeCharVect(txt[idD1])['shin']
  shinD2 = cm.computeCharVect(txt[idD2])['shin']

  ejs = i['EJS']
  try:
    js = cm.js({'key':idD1, 'shin':shinD1}, {'key':idD2, 'shin':shinD2})['JS']
  except ZeroDivisionError:
    js = 0

  delta += abs(ejs-js)

delta/len(lst)

0.027229965953625774

**A Simple Comparison**
> The following cell computes the estimated Jaccard distance using objects and methods from the *ml.feature* module of PySpark. The code is basically from the [official documentation](https://spark.apache.org/docs/2.2.3/ml-features.html#minhash-for-jaccard-distance).

In [22]:
'''
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

row = signRDD.map(lambda x: (x['key'],Vectors.sparse(250000, {v:1 for v in x['sign']})))
dfA = spark.createDataFrame(row, ["id", "features"])


mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
model = mh.fit(dfA)

print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()

print("Approximately joining dfA and dfB on distance smaller than 0.2:")
model.approxSimilarityJoin(dfA, dfA, 0.2, distCol="JaccardDistance")\
    .select(col("datasetA.id").alias("idA"),
            col("datasetB.id").alias("idB"),
            col("JaccardDistance")).show()
'''

'\nfrom pyspark.ml.feature import MinHashLSH\nfrom pyspark.ml.linalg import Vectors\nfrom pyspark.sql.functions import col\n\nrow = signRDD.map(lambda x: (x[\'key\'],Vectors.sparse(250000, {v:1 for v in x[\'sign\']})))\ndfA = spark.createDataFrame(row, ["id", "features"])\n\n\nmh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)\nmodel = mh.fit(dfA)\n\nprint("The hashed dataset where hashed values are stored in the column \'hashes\':")\nmodel.transform(dfA).show()\n\nprint("Approximately joining dfA and dfB on distance smaller than 0.2:")\nmodel.approxSimilarityJoin(dfA, dfA, 0.2, distCol="JaccardDistance")    .select(col("datasetA.id").alias("idA"),\n            col("datasetB.id").alias("idB"),\n            col("JaccardDistance")).show()\n'