<h1>Mount and downloading necessary libraries</h1>

In [1]:
try:
  from google.colab import drive
  drive.mount('/content/drive')

  !apt-get install openjdk-8-jdk-headless -qq > /dev/null
  !wget -q https://dlcdn.apache.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
  !tar xf spark-3.2.0-bin-hadoop3.2.tgz

  import os
  os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
  os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop3.2"

  !pip install pyspark
except:
  print("We are not using colab right now")

Mounted at /content/drive
Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 38 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 61.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=c1961322666a675cb5583984413b3dce4137f273b4fba3e3ded844595d6ae59a
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


<h1>Loading text and preprocessing</h1>

In [2]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
#Spark context
sc = SparkContext('local[*]')
spark = SparkSession(sc)

In [3]:
from pyspark.sql.types import StructField, StructType, StringType, ArrayType
#schema of the tsv file
schema = StructType([
    StructField("title", StringType()),
    StructField("description", StringType()),
    StructField("location", StringType()),
    StructField("timestamp", StringType()),
    StructField("link", StringType())
])

In [4]:
#multiline is needed because of the newlines that we can find in the descriptions, drop(any) to remove documents if they have null values
path = '/content/drive/MyDrive/Data Mining/HW2_old/jobs.tsv' #Colab directory
try:
  file = spark.read.option("multiline",True).options(delimiter=r'\t').csv(
      path, 
      header=True,
      schema=schema).na.drop(how='any')
  file.show(25)
except: #Not using colab right now, type the path manually
  print("Type the right path to the jobs.tsv file")
  path = input()
  file = spark.read.option("multiline",True).options(delimiter=r'\t').csv(
      path, 
      header=True,
      schema=schema).na.drop(how='any')
  file.show(25)

+--------------------+--------------------+--------------------+-----------------+--------------------+
|               title|         description|            location|        timestamp|                link|
+--------------------+--------------------+--------------------+-----------------+--------------------+
|Addetto sito inte...|Cercasi addetto a...|              Capena|21 ottobre, 17:41|https://www.kijij...|
|Developer esperie...|Advancia Technolo...|           Viareggio|      Oggi, 16:24|https://www.kijij...|
|Lavoro smart fles...|Pascarella Group ...|              Milano|      Oggi, 16:20|https://www.kijij...|
|DotNet Developer ...|I candidati che s...|              Milano|      Oggi, 15:21|https://www.kijij...|
|Web e Software De...|Ciao,\nMi chiamo ...|              Verona|      Oggi, 10:38|https://www.kijij...|
|Commerciale esper...|Commerciale in am...|             Sassari|      Oggi, 10:12|https://www.kijij...|
|Cercarsi tecnico ...|Ricerchiamo n. 1 ...|Città Studi / Lam...|

In [5]:
#How many documents?
number_of_documents = file.count()
number_of_documents

3047

In [6]:
from pyspark.sql.functions import udf, col, lower, regexp_replace
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from nltk.stem.snowball import SnowballStemmer
#Actual preprocessing
# Clean text
df_clean = file.select('description', (lower(regexp_replace('description', "[^a-zA-Z\\s]", "")).alias('cleaned_text')))

# Tokenizazion
tokenizer = Tokenizer(inputCol='cleaned_text', outputCol='tokenized')
df_tokenized = tokenizer.transform(df_clean).select('description', 'tokenized')

# Remove stop words
remover = StopWordsRemover(inputCol='tokenized', outputCol='no_stopwords')
df_stopwords_removed = remover.transform(df_tokenized).select('description', 'no_stopwords')

# Stemming
stemmer = SnowballStemmer(language='italian')
stemmer_udf = udf(lambda tokens: [stemmer.stem(token) for token in tokens], ArrayType(StringType()))
df_stemmed = df_stopwords_removed.withColumn("words_stemmed", stemmer_udf("no_stopwords")).select('description', 'words_stemmed')


In [7]:
from pyspark.sql.functions import row_number,lit
from pyspark.sql.window import Window
#Add docId (=row number)
w = Window().orderBy(lit('A'))
df = df_stemmed.withColumn("docId", row_number().over(w)).select("words_stemmed", "docId")

df.show(5)

+--------------------+-----+
|       words_stemmed|docId|
+--------------------+-----+
|[cercas, addett, ...|    1|
|[advanc, technolo...|    2|
|[pascarell, group...|    3|
|[candid, che, sup...|    4|
|[cia, mi, chiam, ...|    5|
+--------------------+-----+
only showing top 5 rows



In [8]:
from pyspark.sql.functions import concat_ws
#To obtain shingles I need to pass from lists of stemmed words to strings of stemmed words
df = df.withColumn("description", concat_ws(" ", "words_stemmed")).select("docId", "description")
df.show(5)

+-----+--------------------+
|docId|         description|
+-----+--------------------+
|    1|cercas addett al ...|
|    2|advanc technology...|
|    3|pascarell group r...|
|    4|candid che super ...|
|    5|cia mi chiam giac...|
+-----+--------------------+
only showing top 5 rows



In [9]:
#Mapping of descriptions to relative docId 
#To map I need a rdd -> df.rdd
data = df.rdd.map(lambda row: (row.description,row.docId))
data.take(5)

[('cercas addett al sit internet con buon esperit nell gestion degl articol inser e indicizz sui principal  motor di ricerc dei nuov articol e tutt cio che conc la gestion dell shop onlin trasfer sol personal di zon limitrof monterotond capen fian rom alleg cvita',
  1),
 ('advanc technology ricerc num  svilupp back end per ampli dellorgan  la ricerc  rivolt agli iscritt al colloc mir per gli appartenent alle categor protett second la ex l   l avvi dell attivit avverr modalit remot o mist  requis  esperit di  anno circ nell programm ad oggett back end  attitudin al lavor team  conosct di uno dei seguent linguagg di programm   jav see   net   python  invi la propr candidatur attravers il sit includ sia una brev present che cop del cv complet di vot di diplom di laure e autorizz second il d lgs  e reg ue   loffert  da intend nel rispett dell norm sull parit di tratt mater di occup e di condizion di lavor l l dlgs   e reg ue  accord con lintrodu del gdpr dat forn sarann utilizz dal grupp 

In [10]:
number_of_documents = data.count()
number_of_documents

3047

<h1>Shingling</h1>

In [11]:
#Function proposed by the text of the exercise, I will use it for shingling, minHash and LSH

# Implement a family of hash functions. It hashes strings and takes an
# integer to define the member of the family.
# Return a hash function parametrized by i
import hashlib
def hashFamily(i):
  resultSize = 8
  # how many bytes we want back
  maxLen = 20
  # how long can our i be (in decimal)
  salt = str(i).zfill(maxLen)[-maxLen:] 
  def hashMember(x):
    sequence = x + salt
    return hashlib.sha1(sequence.encode("utf-8")).digest()[-resultSize:]
  return hashMember

In [12]:
#We want shingles of length 10
k=10
shingles = data.map(lambda shingle: (shingle[1], [shingle[0][i:i+k] for i in range(len(shingle[0])-k+1)]))

shingles.take(13)

[(1,
  ['cercas add',
   'ercas adde',
   'rcas addet',
   'cas addett',
   'as addett ',
   's addett a',
   ' addett al',
   'addett al ',
   'ddett al s',
   'dett al si',
   'ett al sit',
   'tt al sit ',
   't al sit i',
   ' al sit in',
   'al sit int',
   'l sit inte',
   ' sit inter',
   'sit intern',
   'it interne',
   't internet',
   ' internet ',
   'internet c',
   'nternet co',
   'ternet con',
   'ernet con ',
   'rnet con b',
   'net con bu',
   'et con buo',
   't con buon',
   ' con buon ',
   'con buon e',
   'on buon es',
   'n buon esp',
   ' buon espe',
   'buon esper',
   'uon esperi',
   'on esperit',
   'n esperit ',
   ' esperit n',
   'esperit ne',
   'sperit nel',
   'perit nell',
   'erit nell ',
   'rit nell g',
   'it nell ge',
   't nell ges',
   ' nell gest',
   'nell gesti',
   'ell gestio',
   'll gestion',
   'l gestion ',
   ' gestion d',
   'gestion de',
   'estion deg',
   'stion degl',
   'tion degl ',
   'ion degl a',
   'on degl ar',
   'n deg

In [13]:
#Hash the shingles
hash_fn = hashFamily(100)
hashed_shingles = shingles.map(lambda h: (h[0], [hash_fn(h[1][i]) for i in range(len(h[1]))]))

hashed_shingles.take(10)

[(1,
  [b'H\xe2\xa3\x84\xd9|B\xdc',
   b'.\xeb\xa4\x11\x94\xf1\x03\x15',
   b'I\xb3\xeb>X\x99\xa9\x8a',
   b'u\xe9oT\xf1"\xecH',
   b'\xee\xde\x1e\x11-\xc5\xac\x91',
   b';\x85\xb8<\x1dtiE',
   b'\xee\x83Kp\xd8l\x9c\x17',
   b'*O\x91\xb9\x0f 2e',
   b'\xb9f\x13\x14\xfe \x81\x87',
   b'\xae|\xe3]\xd3Y\xff\x97',
   b'E0\x0c\xe9\xf4\xb1Z\xd9',
   b'\\O\x84_Wd\\]',
   b'\xad\x0c\x9c\xf5K\x1b\x12F',
   b'\xc1\x1f\r\xfc{\xc42s',
   b'Z\x8cG\x1a\xa4\xad\xce\r',
   b' \xd1\xfb<jX1&',
   b'\xbb\xc8\x80\xfe\xddo\xad\x8a',
   b'%b\xa7\x1f\xe2!\xee\x0e',
   b'\xafU\xc5\xa1\xa9(b\x1c',
   b'\xf3\xcb\\6\xdap\x19\xb6',
   b'd\xa1\xe5\x17\xe7n\xb9\x84',
   b'\x1cx3\xfc\xd0NtD',
   b'ce\x1c\x126T\x99R',
   b'\x19\xba\x93\x8dXs\xf5\xa6',
   b'\xe2T\xb9\xf9\x9f\x10&\xb8',
   b'<\xe0KT\xcf\xceG9',
   b'\x9b\xa5\xf0\x1d\x9a\x875\x0c',
   b'\xfb\xbc70\x05\xe5\t\xe1',
   b'0\xb1\xe3\xc5\x0f\xa6)/',
   b'M-\xcbtr\xbb3\r',
   b'\xab4\xd6\xad\rrf\x0f',
   b'\xe9\x851\x02\xff3\x0b0',
   b'xevo~9\xdf\xfc',
   b'h

In [14]:
#Save the hashed shingles into a dataframe so that they can be safely read later
deptColumns = ["docId","hashed_shingles"]
df_shingles = hashed_shingles.toDF(deptColumns)
df_shingles.printSchema()
df_shingles.show(15)

root
 |-- docId: long (nullable = true)
 |-- hashed_shingles: array (nullable = true)
 |    |-- element: binary (containsNull = true)

+-----+---------------------+
|docId|      hashed_shingles|
+-----+---------------------+
|    1| [H⣄�|B�, .���,...|
|    2| [6����$, d�H�_...|
|    3| [��ѧ��, �,ɪɘ7,...|
|    4| [:�@��.s�, �M���...|
|    5| [�A��V, 5�_�:...|
|    6| ['�`�ʸ�0, h\n���...|
|    7| [�q&x��[V, �^�' ...|
|    8| [\f��=ӕ, ���^w�...|
|    9| [�}lsQ_!�, PO�G�...|
|   10| [�}lsQ_!�, PO�G�...|
|   11|[hvoIZ��2, $A�肑<�...|
|   12| [�m&-�٘, ic�&�*\t...|
|   13| [�}lsQ_!�, PO�G�...|
|   14| [����\a�b, ��qw��...|
|   15| [�}lsQ_!�, PO�G�...|
+-----+---------------------+
only showing top 15 rows



In [15]:
number_of_documents = hashed_shingles.count()
number_of_documents

3047

<h1>Minhashing<h1>

In [16]:
import time
start = time.time()

In [17]:
#Given a list of shingles, compute the hash for every shingle and keep the smallest value
import sys
def minHash(shingles:list, i:int) -> int:
  hash_fn = hashFamily(i)
  minSignature = float('inf') #big number
  for s in shingles:
    #some casts are needed
    hashSignature = int.from_bytes(hash_fn(str(s)), sys.byteorder)
    if hashSignature < minSignature:
        minSignature = hashSignature
  return minSignature

In [18]:
#For every document compute the minHash for k times to build the signature matrix
k=100
signature = hashed_shingles.map(lambda sign: (sign[0], [minHash(sign[1],i) for i in range(k)]))

signature.take(1)

[(1,
  [85657798787422750,
   141065245902151870,
   291870001698537642,
   135276584715898323,
   246477686563128297,
   16092962851347172,
   11405924817492680,
   11821240613099548,
   231586632579970977,
   128844553038212005,
   224607985852311032,
   455132730155119493,
   141687257862743708,
   10518711573983171,
   27704842023839055,
   243612564594863708,
   78566540917788022,
   83094867813813236,
   204186990656639378,
   45719010181933508,
   18495174060202070,
   3385964768032789,
   30132643392410775,
   13032932605564078,
   104404405948775307,
   187948055796039942,
   37793045977709989,
   109028578978594035,
   32870667568459941,
   179752613564932136,
   50649530091458850,
   174766584686612157,
   210462579366453486,
   166831431610958704,
   63077826666735886,
   15704192786178090,
   71909768563907846,
   539428945705893381,
   34581633287840775,
   79530660647877130,
   59155829179713951,
   75123012945754531,
   85833874634111244,
   138454108495571149,
   81390

In [19]:
end = time.time()
print("Time spent for MinHashing: {}".format(end-start))

Time spent for MinHashing: 0.29157471656799316


<H1>LSH</H1>

In [20]:
start = time.time()

In [21]:
#Creating bands into the signature matrix
b=20 #b->#bands
r=5 #r->#rows per band

band_lsh = signature.map(lambda band: (band[0],[[band[1][i] for i in range(j*r,j*r+r)]for j in range(b)]))

band_lsh.take(2)

[(1,
  [[85657798787422750,
    141065245902151870,
    291870001698537642,
    135276584715898323,
    246477686563128297],
   [16092962851347172,
    11405924817492680,
    11821240613099548,
    231586632579970977,
    128844553038212005],
   [224607985852311032,
    455132730155119493,
    141687257862743708,
    10518711573983171,
    27704842023839055],
   [243612564594863708,
    78566540917788022,
    83094867813813236,
    204186990656639378,
    45719010181933508],
   [18495174060202070,
    3385964768032789,
    30132643392410775,
    13032932605564078,
    104404405948775307],
   [187948055796039942,
    37793045977709989,
    109028578978594035,
    32870667568459941,
    179752613564932136],
   [50649530091458850,
    174766584686612157,
    210462579366453486,
    166831431610958704,
    63077826666735886],
   [15704192786178090,
    71909768563907846,
    539428945705893381,
    34581633287840775,
    79530660647877130],
   [59155829179713951,
    75123012945754531,
   

In [22]:
#Hashing the bands
hashed_bands = band_lsh.flatMap(lambda band: [(minHash(tuple(band[1][j]),j), band[0]) for j in range(b)])

hashed_bands.take(10)

[(10063554265426660082, 1),
 (184691700705687972, 1),
 (4078970926576244757, 1),
 (2061092132324116383, 1),
 (6834075960186870958, 1),
 (4077290146969431272, 1),
 (15805638678853887, 1),
 (356140045934811843, 1),
 (35567402953732130, 1),
 (6785735728799302458, 1)]

In [23]:
#Build the buckets and keep the ones with collisions
buckets = hashed_bands.map(lambda bucket: (bucket[0], [bucket[1]])).reduceByKey(lambda doc1, doc2 : doc1 + doc2).filter(lambda collision: len(collision[1])>1)

buckets.collect()

[(10063554265426660082,
  [1,
   22,
   43,
   62,
   82,
   103,
   124,
   145,
   166,
   187,
   208,
   229,
   250,
   253,
   271,
   292,
   313,
   334,
   355,
   376,
   397,
   418,
   438,
   458,
   479,
   500,
   520,
   541,
   561,
   582,
   601,
   622,
   643,
   664,
   678,
   691,
   712,
   733,
   754,
   774,
   795,
   816,
   837,
   853,
   867,
   888,
   909,
   930,
   951,
   972,
   992,
   1013,
   1034,
   1055,
   1076,
   1096,
   1113,
   1134,
   1155,
   1176,
   1197,
   1218,
   1239,
   1260,
   1281,
   1301,
   1322,
   1343,
   1364,
   1385,
   1406,
   1423,
   1444,
   1464,
   1485,
   1506,
   1527,
   1547,
   1568,
   1589,
   1610,
   1630,
   1651,
   1672,
   1693,
   1712,
   1733,
   1754,
   1775,
   1794,
   1815,
   1836,
   1857,
   1877,
   1898,
   1919,
   1940,
   1959,
   1980,
   2001,
   2022,
   2042,
   2062,
   2083,
   2104,
   2125,
   2146,
   2167,
   2188,
   2208,
   2228,
   2249,
   2270,
   2291,
   2312

In [24]:
#Get the pairs from a list
import itertools

def get_pairs(collisions_list:list) -> tuple:
  pair_list=[]
  for pair in itertools.combinations(collisions_list,2):
    pair_list.append(pair)
  return tuple(pair_list)

In [25]:
#Get pairs of (possible) near-duplicates
duplicates = buckets.flatMap(lambda pairs: get_pairs(pairs[1])).distinct()

duplicates.take(10)

[(1, 22),
 (1, 43),
 (1, 62),
 (1, 82),
 (1, 103),
 (1, 124),
 (1, 145),
 (1, 166),
 (1, 187),
 (1, 208)]

In [26]:
end = time.time()
print("Time spent for LSH: {}".format(end-start))

Time spent for LSH: 223.25113987922668


In [27]:
#How many near-duplicates do we have?
number_of_duplicates = duplicates.count()
number_of_duplicates 

1379681

<h1>Jaccard</h1>

In [28]:
#Load the hashed shingles from the dataframe to the rdd (and cast from bytearray to bytes)
hashed_shingles = df_shingles.rdd.map(lambda row: (row.docId, row.hashed_shingles)).map(lambda b: (b[0], [bytes(i) for i in b[1]]))
hashed_shingles.take(15)

[(1,
  [b'H\xe2\xa3\x84\xd9|B\xdc',
   b'.\xeb\xa4\x11\x94\xf1\x03\x15',
   b'I\xb3\xeb>X\x99\xa9\x8a',
   b'u\xe9oT\xf1"\xecH',
   b'\xee\xde\x1e\x11-\xc5\xac\x91',
   b';\x85\xb8<\x1dtiE',
   b'\xee\x83Kp\xd8l\x9c\x17',
   b'*O\x91\xb9\x0f 2e',
   b'\xb9f\x13\x14\xfe \x81\x87',
   b'\xae|\xe3]\xd3Y\xff\x97',
   b'E0\x0c\xe9\xf4\xb1Z\xd9',
   b'\\O\x84_Wd\\]',
   b'\xad\x0c\x9c\xf5K\x1b\x12F',
   b'\xc1\x1f\r\xfc{\xc42s',
   b'Z\x8cG\x1a\xa4\xad\xce\r',
   b' \xd1\xfb<jX1&',
   b'\xbb\xc8\x80\xfe\xddo\xad\x8a',
   b'%b\xa7\x1f\xe2!\xee\x0e',
   b'\xafU\xc5\xa1\xa9(b\x1c',
   b'\xf3\xcb\\6\xdap\x19\xb6',
   b'd\xa1\xe5\x17\xe7n\xb9\x84',
   b'\x1cx3\xfc\xd0NtD',
   b'ce\x1c\x126T\x99R',
   b'\x19\xba\x93\x8dXs\xf5\xa6',
   b'\xe2T\xb9\xf9\x9f\x10&\xb8',
   b'<\xe0KT\xcf\xceG9',
   b'\x9b\xa5\xf0\x1d\x9a\x875\x0c',
   b'\xfb\xbc70\x05\xe5\t\xe1',
   b'0\xb1\xe3\xc5\x0f\xa6)/',
   b'M-\xcbtr\xbb3\r',
   b'\xab4\xd6\xad\rrf\x0f',
   b'\xe9\x851\x02\xff3\x0b0',
   b'xevo~9\xdf\xfc',
   b'h

In [29]:
start = time.time()

In [30]:
#Cartesian product to have possible pairs
cartesian_product = hashed_shingles.cartesian(hashed_shingles).filter(lambda tup: tup[0][0]!=tup[1][0])

cartesian_product.take(10)

[((1,
   [b'H\xe2\xa3\x84\xd9|B\xdc',
    b'.\xeb\xa4\x11\x94\xf1\x03\x15',
    b'I\xb3\xeb>X\x99\xa9\x8a',
    b'u\xe9oT\xf1"\xecH',
    b'\xee\xde\x1e\x11-\xc5\xac\x91',
    b';\x85\xb8<\x1dtiE',
    b'\xee\x83Kp\xd8l\x9c\x17',
    b'*O\x91\xb9\x0f 2e',
    b'\xb9f\x13\x14\xfe \x81\x87',
    b'\xae|\xe3]\xd3Y\xff\x97',
    b'E0\x0c\xe9\xf4\xb1Z\xd9',
    b'\\O\x84_Wd\\]',
    b'\xad\x0c\x9c\xf5K\x1b\x12F',
    b'\xc1\x1f\r\xfc{\xc42s',
    b'Z\x8cG\x1a\xa4\xad\xce\r',
    b' \xd1\xfb<jX1&',
    b'\xbb\xc8\x80\xfe\xddo\xad\x8a',
    b'%b\xa7\x1f\xe2!\xee\x0e',
    b'\xafU\xc5\xa1\xa9(b\x1c',
    b'\xf3\xcb\\6\xdap\x19\xb6',
    b'd\xa1\xe5\x17\xe7n\xb9\x84',
    b'\x1cx3\xfc\xd0NtD',
    b'ce\x1c\x126T\x99R',
    b'\x19\xba\x93\x8dXs\xf5\xa6',
    b'\xe2T\xb9\xf9\x9f\x10&\xb8',
    b'<\xe0KT\xcf\xceG9',
    b'\x9b\xa5\xf0\x1d\x9a\x875\x0c',
    b'\xfb\xbc70\x05\xe5\t\xe1',
    b'0\xb1\xe3\xc5\x0f\xa6)/',
    b'M-\xcbtr\xbb3\r',
    b'\xab4\xd6\xad\rrf\x0f',
    b'\xe9\x851\x02\xff3\x0

In [31]:
#Compute Jaccard similarity for each pair
jaccard = cartesian_product.map(lambda x: (x[0][0], x[1][0], float(len(set(x[0][1]).intersection(x[1][1])))/float(len(set(x[0][1]).union(x[1][1])))))

jaccard.take(10)

[(1, 2, 0.002976190476190476),
 (1, 3, 0.001658374792703151),
 (1, 4, 0.002105263157894737),
 (1, 5, 0.0),
 (1, 6, 0.0),
 (1, 7, 0.0013679890560875513),
 (1, 8, 0.0),
 (1, 9, 0.0),
 (1, 10, 0.0),
 (1, 11, 0.008695652173913044)]

In [32]:
#Filter pairs with similarity < 0.80 and delete repeated tuples, i.e. I keep just one between (i,j) and (j,i)  
similar_pairs = jaccard.filter(lambda p: p[2]>0.8).map(lambda t: (t[0],t[1])).map(lambda s: tuple(sorted(s))).distinct()

similar_pairs.take(10)

[(1, 22),
 (1, 43),
 (1, 62),
 (1, 82),
 (1, 103),
 (1, 124),
 (1, 145),
 (1, 166),
 (1, 187),
 (1, 208)]

In [33]:
end = time.time()
print("Time spent for brute force comparisons: {}".format(end-start))

Time spent for brute force comparisons: 1002.0721197128296


In [34]:
#How many near-duplicates do we have?
dim_of_jaccard = similar_pairs.count()
dim_of_jaccard 

1333048

<h1>Intersection</h1>

In [35]:
#Compute the intersection between the results given by the two approaches
intersection = similar_pairs.intersection(duplicates).distinct()
intersection.take(10)

[(1, 2083),
 (10, 54),
 (13, 83),
 (13, 89),
 (15, 199),
 (9, 231),
 (10, 244),
 (9, 275),
 (10, 294),
 (13, 293)]

In [36]:
#How many pairs are in common?
dim_of_intersection = intersection.count()
dim_of_intersection 

1333048

In [37]:
#False positive (FP/(FP+TN)) and false negative (FN/(FN+TP)) rate
false_negative = dim_of_jaccard - dim_of_intersection
true_positive = dim_of_jaccard
false_positive = number_of_duplicates - dim_of_intersection
true_negative = number_of_documents*(number_of_documents-1)/2

print("False positive rate: {}\nFalse negative rate: {}".format(false_positive/(false_positive+true_negative), false_negative/(false_negative+true_positive)))

False positive rate: 0.009948980353787986
False negative rate: 0.0
