This notebook shows the implementation of different algorithms related to finding textually similar documents. Based on Jaccard similarity, we implement shingling, minhashing, and locality-sensitive hashing (LHS). 

Using a collection of NSF research award abstracts obtained from https://archive.ics.uci.edu/ml/datasets/NSF+Research+Award+Abstracts+1990-2003 we produce a collection of vectors containing hashed shingles based on k-grams of size 5 hashed using MurmurHash. We then use minhash with 100 different hashing algorithms by randomly generating coefficients. 

In [1]:
//input
val fileName1 = "dataset_abstracts/a9000038.txt"
val fileName2 = "dataset_abstracts/a9000040.txt"

val rows1 = sc.textFile(fileName1).map(line=>line.trim().replaceAll("(\\s)+", " ")).cache()
val text1 = rows1.reduce(_ + _)

val rows2 = sc.textFile(fileName2).map(line=>line.trim().replaceAll("(\\s)+", " ")).cache()
val text2 = rows2.reduce(_ + _)

fileName1 = dataset_abstracts/a9000038.txt
fileName2 = dataset_abstracts/a9000040.txt
rows1 = MapPartitionsRDD[2] at map at <console>:31
text1 = principal investigator and associates. Topics in the followingareas are to be considered: (1) controlled Markov diffusionsand nonlinear PDEs; (2) asymptotic properties of nearlydeterministic Markov processes; (3) financial economicsapplications; (4) singular stochastic control; (5) computationalmethods in stochastic control; (6) stochastic calculus ofvariations; (7) nonlinear estimation. Analytical methods basedon viscosity solution techniques for nonlinear differentialequations as well as probabilistic methods will be studied.These theoretical studies are the basis for applied problemsr...


principal investigator and associates. Topics in the followingareas are to be considered: (1) controlled Markov diffusionsand nonlinear PDEs; (2) asymptotic properties of nearlydeterministic Markov processes; (3) financial economicsapplications; (4) singular stochastic control; (5) computationalmethods in stochastic control; (6) stochastic calculus ofvariations; (7) nonlinear estimation. Analytical methods basedon viscosity solution techniques for nonlinear differentialequations as well as probabilistic methods will be studied.These theoretical studies are the basis for applied problemsr...

In [2]:
//k-gram
val k = 5
val k_gram1 = text1.split("").sliding(k).toList.map(x => (scala.util.hashing.MurmurHash3.arrayHash(x), ""))
val kgramRDD1 = sc.parallelize(k_gram1)
val filtered1 = kgramRDD1.distinct()

val k_gram2 = text2.split("").sliding(k).toList.map(x => (scala.util.hashing.MurmurHash3.arrayHash(x), ""))
val kgramRDD2 = sc.parallelize(k_gram2)
val filtered2 = kgramRDD2.distinct()


k = 5
k_gram1 = List((323183978,""), (879426833,""), (207652440,""), (1023262041,""), (1191319030,""), (-1029485329,""), (94755376,""), (798376504,""), (-1163905680,""), (982395043,""), (-487295416,""), (929914478,""), (1998737565,""), (2078865377,""), (-1593058553,""), (-81630221,""), (1675405219,""), (-1975782702,""), (-312516106,""), (-2146848343,""), (899011444,""), (-2008099792,""), (1597990745,""), (1159842749,""), (2046055236,""), (-1557845639,""), (-2100210802,""), (-206169076,""), (-19109906,""), (1127829784,""), (-121854170,""), (772595528,""), (734667765,""), (-1813676078,""), (-708552550,""), (-1949504179,""), (-707004830,""), (179624194,""), (-1471358333,""), (45318084,""), (2014423945,""), (-618699956,""), (1272547154,""), (240336297,""), (9266125...


List((323183978,""), (879426833,""), (207652440,""), (1023262041,""), (1191319030,""), (-1029485329,""), (94755376,""), (798376504,""), (-1163905680,""), (982395043,""), (-487295416,""), (929914478,""), (1998737565,""), (2078865377,""), (-1593058553,""), (-81630221,""), (1675405219,""), (-1975782702,""), (-312516106,""), (-2146848343,""), (899011444,""), (-2008099792,""), (1597990745,""), (1159842749,""), (2046055236,""), (-1557845639,""), (-2100210802,""), (-206169076,""), (-19109906,""), (1127829784,""), (-121854170,""), (772595528,""), (734667765,""), (-1813676078,""), (-708552550,""), (-1949504179,""), (-707004830,""), (179624194,""), (-1471358333,""), (45318084,""), (2014423945,""), (-618699956,""), (1272547154,""), (240336297,""), (9266125...

In [3]:

val join = filtered1.leftOuterJoin(filtered2)
val join_n = join.count().toFloat
val union = filtered1.union(filtered2).distinct()
val union_n = union.count().toFloat

val sim = join_n / union_n
val jacc_dist = 1-sim

println(sim)
println(jacc_dist)


0.6188993
0.3811007


join = MapPartitionsRDD[16] at leftOuterJoin at <console>:28
join_n = 1192.0
union = MapPartitionsRDD[20] at distinct at <console>:30
union_n = 1926.0
sim = 0.6188993
jacc_dist = 0.3811007


0.3811007

Now we work with Min-hashing

In [4]:
import java.io.File

val dir = new File("dataset_abstracts")
val files = dir.listFiles() //379
val maxSize = 10
val docSize = if(files.size < maxSize) files.size else maxSize

val docs = files.map(f => (f.getName(), sc.textFile(f.getPath() ).map(line=>line.replaceAll("(\\s)+", " ")).collect().reduce(_ + _)))
val docsRDD = sc.parallelize(docs).cache()
println(docsRDD.take(1)(0))

(a9000038.txt,Title : Mathematical Sciences: Research on Optimal Stochastic Control and Nonlinear EstimationType : AwardNSF Org : DMS LatestAmendmentDate : April 8, 1992 File : a9000038Award Number: 9000038Award Instr.: Continuing grant Prgm Manager:  DMS DIVISION OF MATHEMATICAL SCIENCES  MPS DIRECT FOR MATHEMATICAL & PHYSICAL SCIENStart Date : July 1, 1990 Expires : December 31, 1993 (Estimated)ExpectedTotal Amt. : $188574 (Estimated)Investigator: Wendell H. Fleming whf@cfm.brown.edu (Principal Investigator current)Sponsor : Brown University 164 Angell Street Providence, RI 02912 401/863-2777NSF Program : 1266 APPLIED MATHEMATICSFld Applictn: 0000099 Other Applications NEC  21 Mathematics Program Ref : Abstract : This research is part of an on-going program by the  principal investigator and associates. Topics in the following  areas are to be considered: (1) controlled Markov diffusions  and nonlinear PDEs; (2) asymptotic properties of nearly  deterministic Markov processes; (3) fin

dir = dataset_abstracts
files = Array(dataset_abstracts/a9000038.txt, dataset_abstracts/a9000040.txt, dataset_abstracts/a9000043.txt, dataset_abstracts/a9000045.txt, dataset_abstracts/a9000046.txt, dataset_abstracts/a9000048.txt, dataset_abstracts/a9000049.txt, dataset_abstracts/a9000050.txt, dataset_abstracts/a9000052.txt, dataset_abstracts/a9000053.txt, dataset_abstracts/a9000054.txt, dataset_abstracts/a9000057.txt, dataset_abstracts/a9000058.txt, dataset_abstracts/a9000060.txt, dataset_abstracts/a9000063.txt, dataset_abstracts/a9000075.txt, dataset_abstracts/a9000089.txt, dataset_abstracts/a9000091.txt, dataset_abstracts/a9000094.txt, dataset_abstracts/a9000099.txt, dataset_abstracts/a9000100.txt, dataset_abstracts/a9000102.txt, ...


Array(dataset_abstracts/a9000038.txt, dataset_abstracts/a9000040.txt, dataset_abstracts/a9000043.txt, dataset_abstracts/a9000045.txt, dataset_abstracts/a9000046.txt, dataset_abstracts/a9000048.txt, dataset_abstracts/a9000049.txt, dataset_abstracts/a9000050.txt, dataset_abstracts/a9000052.txt, dataset_abstracts/a9000053.txt, dataset_abstracts/a9000054.txt, dataset_abstracts/a9000057.txt, dataset_abstracts/a9000058.txt, dataset_abstracts/a9000060.txt, dataset_abstracts/a9000063.txt, dataset_abstracts/a9000075.txt, dataset_abstracts/a9000089.txt, dataset_abstracts/a9000091.txt, dataset_abstracts/a9000094.txt, dataset_abstracts/a9000099.txt, dataset_abstracts/a9000100.txt, dataset_abstracts/a9000102.txt, ...

In [5]:
val abstracts = docsRDD.map(doc => (doc._1,doc._2.split("Abstract :"))).filter(a => a._2.size > 1).map(a => (a._1, a._2(1)))

val k = 5
val k_gram = abstracts.map(a => (a._1, a._2.split("").sliding(k).toList.map(x => scala.util.hashing.MurmurHash3.arrayHash(x))))
println(k_gram)
val documents = k_gram.map(k => (k._1, k._2.distinct))


MapPartitionsRDD[1156] at map at <console>:34


abstracts = MapPartitionsRDD[1155] at map at <console>:30
k = 5
k_gram = MapPartitionsRDD[1156] at map at <console>:34
documents = MapPartitionsRDD[1157] at map at <console>:36


MapPartitionsRDD[1157] at map at <console>:36

In [103]:
//Min-hashing
import Array._
val p = 1073676287
val m = p + 1
def hashThis(a:Long, b:Long, x:Long): Long = {
    ((a*x + b) % p ) % m
}
def generateCo1(i:Int): Long = {
    val r = new scala.util.Random(i)
    val a2 = r.nextInt(p-1)
    val a = if(a2%2==0) a2+1 else a2
    a
}
def generateCo2(i:Int): Long = {
    val r = new scala.util.Random(i)
    val b = r.nextInt(p-1)
    b
}

def findMin(x:Long,y:Long, h:HashAlgo): Long = {
    return hashThis(h.co1, h.co2, x) min hashThis(h.co1, h.co2, y)
}
val n = 100
case class HashAlgo(n_th: Int, co1: Long, co2: Long)
case class DocHash(hash: HashAlgo, n_th: Int, docId: String, shingles: List[Int] )
case class DocMinHash(docId: String, minHash: Long, hashN: Int)


val hash_algos = range(1,n+1).map(i => HashAlgo(i, generateCo1(i), generateCo2(i) ))
//hash_algos.foreach(println)
val prep_docs = documents.map(d => hash_algos.map(h => DocHash(h, h.n_th, d._1, d._2))).flatMap(y => y)
//prep_docs.collect().foreach(println)
val doc_signatures = prep_docs.map(d => DocMinHash(d.docId, d.shingles.map(s => s.toLong).reduce((a,b)=> findMin(a,b,d.hash)), d.n_th ) )
//doc_signatures.collect().foreach(println)


p = 1073676287
m = 1073676288
n = 100
defined class HashAlgo
defined class DocHash
defined class DocMinHash
hash_algos = Array(HashAlgo(1,495872699,495872699), HashAlgo(2,496449823,496449822), HashAlgo(3,496257449,496257448), HashAlgo(4,495295577,495295576), HashAlgo(5,495103201,495103201), HashAlgo(6,495680325,495680325), HashAlgo(7,495487951,495487950), HashAlgo(8,494526079,494526078), HashAlgo(9,494333703,494333703), HashAlgo(10,494910827,494910827), HashAlgo(11,494718453,494718452), HashAlgo(12,493756581,493756580), HashAlgo(13,493564207,493564206), HashAlgo(14,494141329,494141329), HashAlgo(15,49394...


hashThis: (a: Long, b: Long, x: Long)Long
generateCo1: (i: Int)Long
generateCo2: (i: Int)Long
findMin: (x: Long, y: Long, h: HashAlgo)Long


Array(HashAlgo(1,495872699,495872699), HashAlgo(2,496449823,496449822), HashAlgo(3,496257449,496257448), HashAlgo(4,495295577,495295576), HashAlgo(5,495103201,495103201), HashAlgo(6,495680325,495680325), HashAlgo(7,495487951,495487950), HashAlgo(8,494526079,494526078), HashAlgo(9,494333703,494333703), HashAlgo(10,494910827,494910827), HashAlgo(11,494718453,494718452), HashAlgo(12,493756581,493756580), HashAlgo(13,493564207,493564206), HashAlgo(14,494141329,494141329), HashAlgo(15,49394...

now we have a document collection like (docID, minhash, hash_n):

DocMinHash(a9000038.txt,-970556178,1)
DocMinHash(a9000040.txt,-319735484,1)
DocMinHash(a9000043.txt,-921168316,1)
DocMinHash(a9000045.txt,-691100492,1)
DocMinHash(a9000046.txt,-295120213,1)
DocMinHash(a9000048.txt,-527021014,1)
DocMinHash(a9000049.txt,-295120213,1)
DocMinHash(a9000050.txt,-911179693,1)
DocMinHash(a9000052.txt,-474449840,1)
DocMinHash(a9000053.txt,-788660299,1)
.
.
.

So we can map the values by hash value, get the common documents and return them as candidate pairs. For this, we can use DataFrame which has better aggregation functionalities. 

In [101]:
import org.apache.spark.sql.functions.countDistinct
import org.apache.spark.sql.functions.collect_list
import org.apache.spark.sql.functions.count
import org.apache.spark.sql.functions._
import org.apache.spark.sql
import org.apache.spark.sql.types._

val docDF = spark.createDataFrame(doc_signatures)
//docDF.show(200)
//docDF.printSchema
//val group = docDF.groupBy("hashN").agg(collect_set("docId").as("newGroupedListColumn"), collect_set("minHash")).show()
//group.show()

docDF.createOrReplaceTempView("docs")

val sqlContext = new SQLContext(sc)
val dup = sqlContext.sql("SELECT d.docId as d1_id, d.minHash as d1_hash, d.hashN as d1_minhash, d2.docId as d2_id, d2.minHash as d2_hash, d2.hashN as d2_minhash FROM docs d, docs d2 WHERE d.minHash = d2.minHash AND d.hashN = d2.hashN AND d.docId != d2.docId AND d.docId < d2.docId")
dup.show()

+------------+-----------+----------+------------+-----------+----------+
|       d1_id|    d1_hash|d1_minhash|       d2_id|    d2_hash|d2_minhash|
+------------+-----------+----------+------------+-----------+----------+
|a9000138.txt|-1010748709|         7|a9000139.txt|-1010748709|         7|
|a9000138.txt| -937681875|        42|a9000139.txt| -937681875|        42|
|a9000502.txt| -873889065|        99|a9000697.txt| -873889065|        99|
|a9000650.txt| -857341462|        59|a9000826.txt| -857341462|        59|
|a9000287.txt| -800533594|        55|a9000713.txt| -800533594|        55|
|a9000287.txt| -800533594|        55|a9000816.txt| -800533594|        55|
|a9000287.txt| -800533594|        55|a9000825.txt| -800533594|        55|
|a9000713.txt| -800533594|        55|a9000816.txt| -800533594|        55|
|a9000713.txt| -800533594|        55|a9000825.txt| -800533594|        55|
|a9000816.txt| -800533594|        55|a9000825.txt| -800533594|        55|
|a9000187.txt| -763014720|        38|a

docDF = [docId: string, minHash: bigint ... 1 more field]
sqlContext = org.apache.spark.sql.SQLContext@39a54505
dup = [d1_id: string, d1_hash: bigint ... 4 more fields]




[d1_id: string, d1_hash: bigint ... 4 more fields]

In [102]:
val sorted = dup.rdd.map(d => (d(0) + d(3).toString, 1)).reduceByKey(_ + _).map(d => (d._2.toFloat/100, d._1)).sortByKey(false)
sorted.collect().foreach(println)

(1.0,a9000049.txta9000251.txt)
(1.0,a9000046.txta9000251.txt)
(1.0,a9000223.txta9000528.txt)
(1.0,a9000221.txta9000528.txt)
(1.0,a9000246.txta9000463.txt)
(1.0,a9000404.txta9000944.txt)
(1.0,a9000158.txta9000246.txt)
(1.0,a9000528.txta9000944.txt)
(1.0,a9000221.txta9000404.txt)
(1.0,a9000222.txta9000404.txt)
(1.0,a9000343.txta9000463.txt)
(1.0,a9000356.txta9000527.txt)
(1.0,a9000049.txta9000246.txt)
(1.0,a9000396.txta9000528.txt)
(1.0,a9000223.txta9000944.txt)
(1.0,a9000046.txta9000246.txt)
(1.0,a9000049.txta9000158.txt)
(1.0,a9000221.txta9000944.txt)
(1.0,a9000223.txta9000396.txt)
(1.0,a9000221.txta9000223.txt)
(1.0,a9000046.txta9000158.txt)
(1.0,a9000177.txta9000458.txt)
(1.0,a9000057.txta9000915.txt)
(1.0,a9000396.txta9000944.txt)
(1.0,a9000222.txta9000528.txt)
(1.0,a9000158.txta9000463.txt)
(1.0,a9000246.txta9000343.txt)
(1.0,a9000221.txta9000222.txt)
(1.0,a9000223.txta9000404.txt)
(1.0,a9000158.txta9000343.txt)
(1.0,a9000049.txta9000463.txt)
(1.0,a9000046.txta9000049.txt)
(1.0,a90

sorted = ShuffledRDD[1376] at sortByKey at <console>:161


ShuffledRDD[1376] at sortByKey at <console>:161

Finally, we can work with LHS

