In [1]:
import numpy as np
from time import time
from itertools import combinations
from pyspark.ml.feature import Tokenizer, StopWordsRemover, IDF, CountVectorizer, Normalizer
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, col, size, udf, monotonically_increasing_id
from pyspark.sql.types import *

# Document Similarity
The aim of this notebook is to calculate the similarity between documents using an approximation of the cosine similarity metric. 
In order to do this, we will use the simHash approach. 

Let's start by creating a Spark session.

In [2]:
spark = SparkSession.builder \
    .appName("Similar documents") \
    .config("spark.driver.memory", "20g") \
    .config("spark.executor.memory", "20g") \
    .config("spark.driver.maxResultSize", "2g") \
    .getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")

24/05/13 19:31:43 WARN Utils: Your hostname, rog-davide resolves to a loopback address: 127.0.1.1; using 192.168.1.24 instead (on interface wlan0)
24/05/13 19:31:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/13 19:31:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Load the data
We will use two datasets to test the similarity calculation. The first dataset is a list of mails and the second one is a list of news articles.

In [3]:
dataset_mail = spark.read.parquet("data/mail_1.parquet")

dataset_news = spark.read.csv("data/news.csv", header=True).select("summary")
dataset_news = dataset_news.withColumnRenamed("summary", "text")

                                                                                

Choose the dataset you want to use by setting the variable `dataset` to the desired dataset.

In [4]:
dataset = dataset_mail

n = dataset.count()
print("Number of documents: " + str(n))

Number of documents: 231533


## Preprocess the data
The first step is to preprocess the data. We will tokenize the text and remove the stop words.

Remove special characters and numbers from the text.

In [5]:
dataset = dataset.withColumn("text", regexp_replace(col("text"), "[^a-zA-Z\\s]", ""))

Tokenize the text. we will use the `Tokenizer` class from the `pyspark.ml.feature` module and store the result in a new column of the dataframe called `words`.

In [6]:
tokenizer = Tokenizer(inputCol='text', outputCol='words')
df = tokenizer.transform(dataset)

Remove the empty strings from the list of words.

In [7]:
filter_udf = udf(lambda words: [word for word in words if len(word) >= 1], ArrayType(StringType()))
df = df.withColumn('words', filter_udf(col('words')))

Remove the stop words from the list of words.

In [8]:
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
df = remover.transform(df)

Remove the empty lists from the list of words (aka documents with no words).

In [9]:
df = df.filter(size(col("filtered")) > 0)

## Calculate the TF-IDF
The next step is to calculate the TF-IDF of the documents. We will use the `CountVectorizer` and `IDF` classes from the `pyspark.ml.feature` module.

Count the number of times each word appears in each document and store the result in a new column called `rawFeatures`. Then print the number of words in the vocabulary.

In [10]:
cv = CountVectorizer(inputCol="filtered", outputCol="rawFeatures")  # , minDF=0.01)
cv_model = cv.fit(df)
df = cv_model.transform(df)

n_words = len(cv_model.vocabulary)
n_words

                                                                                

262144

Calculate the IDF of the words and store the result in a new column called `features`.

In [11]:
idf = IDF(inputCol="rawFeatures", outputCol="features")  # , minDocFreq=5)
idf_model = idf.fit(df)
df = idf_model.transform(df)

                                                                                

Add a unique id to each document.

In [12]:
df = df.withColumn('id', monotonically_increasing_id())

Show the dataframe.

In [13]:
df.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+---+
|                text|               words|            filtered|         rawFeatures|            features| id|
+--------------------+--------------------+--------------------+--------------------+--------------------+---+
|My pleasure\n\n\n...|[my, pleasure, ma...|[pleasure, mark, ...|(262144,[1,6,10,1...|(262144,[1,6,10,1...|  0|
|Dear Dana\n\nThan...|[dear, dana, than...|[dear, dana, than...|(262144,[11,15,16...|(262144,[11,15,16...|  1|
|Lew it looks like...|[lew, it, looks, ...|[lew, looks, like...|(262144,[15,20,28...|(262144,[15,20,28...|  2|
|Have you ever see...|[have, you, ever,...|[ever, seen, site...|(262144,[240,635,...|(262144,[240,635,...|  3|
|I am following up...|[i, am, following...|[following, someo...|(262144,[1,2,6,9,...|(262144,[1,2,6,9,...|  4|
| Forwarded by Jud...|[forwarded, by, j...|[forwarded, judy,...|(262144,[1,3,6,9,...|(262144,[1,3,6,9,...|  5|
|

## Calculate the simHashes of the documents
Let's calculate the simHash of the documents. 

Define the number of bits of the simHash and the length of the blocks.

In [14]:
m = 64
p = 32

Let's define a matrix of size `m x n_words` with random values of -1 and 1 and broadcast it to all the nodes.

In [15]:
random_matrix = sc.broadcast(np.random.choice([-1, 1], (m, n_words)))

Define the function that calculates the simHash of a document.

In [16]:
def simhash(features):
    global random_matrix, m

    sim_hash = np.sum(
        [(random_matrix.value[:, i] * features[int(i)]) for i in features.indices],
        axis=0
    )

    # Check if the simHash is not a list
    if not isinstance(sim_hash, np.ndarray):
        return [0] * m

    # Return the simHash as a list of 0s and 1s
    return (sim_hash >= 0).astype(int).tolist()

simhash_udf = udf(simhash, ArrayType(IntegerType()))

Calculate the simHash of the documents and store the result in a new column called `simHash`. Then persist the dataframe.

In [17]:
_time = time()
df = df.withColumn("simHash", simhash_udf('features'))
df.persist()
df.collect()

_time_simhash = time() - _time
print("Time to calculate the simHashes: " + str(_time_simhash))

                                                                                

Time to calculate the simHashes: 164.98927474021912


Show the dataframe.

In [18]:
df.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+---+--------------------+
|                text|               words|            filtered|         rawFeatures|            features| id|             simHash|
+--------------------+--------------------+--------------------+--------------------+--------------------+---+--------------------+
|My pleasure\n\n\n...|[my, pleasure, ma...|[pleasure, mark, ...|(262144,[1,6,10,1...|(262144,[1,6,10,1...|  0|[0, 1, 0, 1, 0, 0...|
|Dear Dana\n\nThan...|[dear, dana, than...|[dear, dana, than...|(262144,[11,15,16...|(262144,[11,15,16...|  1|[1, 1, 0, 1, 0, 1...|
|Lew it looks like...|[lew, it, looks, ...|[lew, looks, like...|(262144,[15,20,28...|(262144,[15,20,28...|  2|[1, 0, 0, 1, 1, 1...|
|Have you ever see...|[have, you, ever,...|[ever, seen, site...|(262144,[240,635,...|(262144,[240,635,...|  3|[0, 0, 1, 0, 0, 0...|
|I am following up...|[i, am, following...|[following, someo...|(262144,[1,2

## Calculate the similarity between the documents
The next step is to calculate the similarity between the documents. 
### Split the simHashes into blocks 
Let's start by dividing the simHashes into blocks of size `p` and store them as integers. This is done in order to reduce the number of comparisons to do later.

In [19]:
def split_blocks(simHash):
    global m, p
    return [int("".join(map(str, simHash[i:i + p])), 2) for i in range(0, m, p)]

split_blocks_udf = udf(split_blocks, ArrayType(IntegerType()))

Calculate the blocks of the simHashes and store the result in a new column called `simHashBlocks`.

In [20]:
df = df.withColumn("simHashBlocks", split_blocks_udf('simHash'))

Show the dataframe.

In [21]:
df.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+---+--------------------+--------------------+
|                text|               words|            filtered|         rawFeatures|            features| id|             simHash|       simHashBlocks|
+--------------------+--------------------+--------------------+--------------------+--------------------+---+--------------------+--------------------+
|My pleasure\n\n\n...|[my, pleasure, ma...|[pleasure, mark, ...|(262144,[1,6,10,1...|(262144,[1,6,10,1...|  0|[0, 1, 0, 1, 0, 0...|[1359601215, -866...|
|Dear Dana\n\nThan...|[dear, dana, than...|[dear, dana, than...|(262144,[11,15,16...|(262144,[11,15,16...|  1|[1, 1, 0, 1, 0, 1...|[-733771619, -352...|
|Lew it looks like...|[lew, it, looks, ...|[lew, looks, like...|(262144,[15,20,28...|(262144,[15,20,28...|  2|[1, 0, 0, 1, 1, 1...|[-1633707351, -26...|
|Have you ever see...|[have, you, ever,...|[ever, seen, site...|(262144,[240,635,.

### Group the simHashes by blocks
Now we will group togheter the blocks that shares at least one blovk in the same position and calculate the similarity between the documents in each group.

Let's create a dataframe with the (id, simHashBlock, position) for each row and group by simHashBlock and position.

In [22]:
_time = time()
block_dataframe = df.select("id", "simHash", "simHashBlocks", "features").rdd.flatMap(
        lambda x: [((x[0], x[1], x[3]), block, i) for i, block in enumerate(x[2])]).toDF(["id", "block", "position"])

grouped_blocks = block_dataframe.groupBy("block", "position").agg({"id": "collect_list"})
grouped_blocks.persist()
grouped_blocks.collect()

_time_blocks = time() - _time
print("Time to group the blocks: " + str(_time_blocks))

                                                                                

Time to group the blocks: 66.72602796554565


Add a unique id to each group.

In [23]:
grouped_blocks = grouped_blocks.withColumn('id', monotonically_increasing_id())

Print the largest group.

In [24]:
largest_group = grouped_blocks.orderBy(size(col("collect_list(id)")).desc()).take(1)
len(largest_group[0])

                                                                                

4

Show the dataframe.

In [25]:
grouped_blocks.show()

+-----------+--------+--------------------+---+
|      block|position|    collect_list(id)| id|
+-----------+--------+--------------------+---+
|-2145894837|       0|[{34359740938, [1...|  0|
|-2144869491|       0|[{111669150344, [...|  1|
|-2140903798|       0|[{74, [1, 0, 0, 0...|  2|
|-2139806157|       0|[{85899355644, [1...|  3|
|-2139551052|       0|[{111669154453, [...|  4|
|-2132269696|       1|[{94489287656, [0...|  5|
|-2131498322|       0|[{17179877741, [1...|  6|
|-2131217808|       1|[{111669160561, [...|  7|
|-2129690228|       1|[{8033, [1, 0, 0,...|  8|
|-2128779444|       1|[{51539616447, [0...|  9|
|-2128453714|       1|[{17179871495, [1...| 10|
|-2128437929|       0|[{128849022735, [...| 11|
|-2126092076|       0|[{17179881839, [1...| 12|
|-2118276431|       1|[{60129548435, [1...| 13|
|-2116252062|       1|[{85899350451, [1...| 14|
|-2115211214|       1|[{25769810950, [0...| 15|
|-2115209948|       1|[{85899358209, [1...| 16|
|-2113396042|       0|[{17179878451, [1.

Let's compute the number of comparisons we need to do before and after grouping the blocks.

In [26]:
tot_comparison = int((n ** 2 - n) / 2)
group_comparison = int(grouped_blocks.filter(size(col("collect_list(id)")) > 1).rdd.map(lambda x: (len(x[2]) ** 2 - len(x[2])) / 2).sum())

print("Percentage of comparisons to do: " + str(group_comparison * 100 / tot_comparison) + "%")



Percentage of comparisons to do: 0.011956528630713118%


                                                                                

# Find the similar documents
Now we will calculate the similarity between the documents in each group and find the similar documents (aka documents with a similarity greater than 0.9).

The similarity between two documents is calculated as 1 - the hamming distance between the simHashes of the documents divided by `m`.

In [27]:
_time = time()
docs_to_compare = grouped_blocks.select("collect_list(id)").rdd \
    .flatMap(lambda x: combinations(x[0], 2)).map(lambda x: (x[0][0], x[1][0], [x[0][1], x[1][1]], [x[0][2], x[1][2]])) \
    .toDF(["id1", "id2", "simHashes", "features"]).dropDuplicates(["id1", "id2"])

similar_docs_rdd = docs_to_compare.rdd \
    .map(lambda x: (x[0], x[1], float(1 - np.sum([1 for i, j in zip(x[2][0], x[2][1]) if i != j]) / m))) \
    .filter(lambda x: x[2] >= 0.9)

similar_docs = similar_docs_rdd.collect()

_time_similarity = time() - _time
print("Time to find similar documents:", _time_similarity)

                                                                                

Time to find similar documents: 139.55639696121216


Now we will print the number of similar documents.

In [28]:
len(similar_docs)

1740803

Print two random similar documents.

In [29]:
el = similar_docs[np.random.randint(0, len(similar_docs))]

print(el)

print("Document 1:")
print(df.filter(col("id") == el[0]).select("text").collect()[0][0])

print("\n\nDocument 2:")
print(df.filter(col("id") == el[1]).select("text").collect()[0][0])

(13008, 8589938829, 1.0)
Document 1:


                                                                                

Start Date  HourAhead hour   No ancillary schedules awarded  No variances detected

    LOG MESSAGES

PARSING FILE  OPortlandWestDeskCalifornia SchedulingISO Final Schedulestxt


Document 2:
Start Date  HourAhead hour   No ancillary schedules awarded  No 
variances detected

    LOG MESSAGES

PARSING FILE  OPortlandWestDeskCalifornia SchedulingISO Final 
Schedulestxt


Stop the Spark session.

In [30]:
spark.stop()