In [4]:
from pyspark.sql import Row
from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark import sql
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
sqlContext = sql.SQLContext(sc)

file_location = "./0"
df = sqlContext.read.csv(file_location, header=True, mode="DROPMALFORMED")
df = df.dropDuplicates(["path"])
table = df.filter(~ df.path.contains(".html")).filter(df.path.isNotNull()).filter(df.Abstract.isNotNull()).cache()

n_el = 20
subset = table.limit(n_el)
testDf = sc.parallelize([Row("test.txt", "", "The principal investigator will develop and improve methods  for understanding the stability of nonlinear waves as solutions  of partial differential equations which arise in a number of  scientific fields. In one particular case, he will use ideas he  has recently developed to establish the exponential decay rate  for nonlinear perturbations in solitary wave problems for KdV-  type equations.  The qualitative analysis of nonlinear waves for such systems  is of significant scientific interest in view of the wide spread  occurrence of such phenomena in real world situations.", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "")]).toDF()
subset = subset.union(testDf)


In [13]:
import math
import random
import itertools

def shingle(text, shingle_length):
    shingles = set(text[i:i+shingle_length] for i in range(len(text)-shingle_length+1))
    return shingles

class Shingling:
    def __init__(self, rdd, k = 10):
        self.rdd = rdd
        self.shingle_length = k
    
    def is_prime(self, x):
        for i in range(3, int(math.sqrt(x)) + 1, 2):
            if x % i == 0:
                return False
        return True

    
    def generate_prime(self, treshold):
        if treshold == 2 or treshold == 1:
            return treshold+1
        
        if treshold%2 == 0:
            treshold+=1
        
        while not self.is_prime(treshold):
            treshold +=2
        
        return treshold
    
    def generate_random_vals(self, n_func, tot_shingles):#tot_shingles = maximum shingle value
        '''
        * The permutations are represented by randomized hash functions: ax + b % p.
        * p is a prime such that p >= n where n is the number of terms in the collection.
        * If this constant isn’t prime, the random hash produces a lot of collisions, and the algorithm doesn’t work well
        * a and b are chosen uniformly at random from {1,2,...,p-1} (indip.).
        '''
        self.a = random.sample(range(1, tot_shingles), n_func)
        self.b = random.sample(range(1, tot_shingles), n_func)
        self.c = self.generate_prime(tot_shingles)
    
    def multiple_shingle(self, field = "Abstract", merged = False):#shingle a text
        l = self.shingle_length # avoid references to "self" in the spark command, otherwise spark will serialize the class
    #(https://stackoverflow.com/questions/46178161/not-able-to-access-class-methods-from-pyspark-rdds-map-method) 
    
        shingles = self.rdd.select("path",field).rdd.flatMap(lambda r: ((shingle, r["path"]) for shingle in shingle(r[field], l))).cache()#(shingle, document id)
        shingles_set = shingles.map(lambda x: x[0]).distinct().zipWithIndex()#hashed set of shingles (text, hash)
        if merged:
            result = shingles_set.join(shingles).map(lambda x: (x[1][1], set(x[1][0])))#(text, (hash, document_id)) -> (document_id, {hash})
            result = result.reduceByKey(lambda x,y: x.union(y)) #document_id, {hashes of shingles}
        else:
            result = shingles_set.join(shingles).map(lambda x: (x[1][1], x[1][0]))#returns [(document_id, hash),...]
        self.shingle_rdd = result
        return result
    
    def compare_docs(self, rdd1, rdd2, merged = False):#if merged : (id_doc, [set of shingle ids]), otherwise [(document_id, hash),...]
        reverse_rdd = self.shingle_rdd.filter(lambda x:x[0] == rdd1).union(self.shingle_rdd.filter(lambda x:x[0] == rdd2))#rdd with 2 docs hashes
        if (merged):
            reverse_rdd = reverse_rdd.flatMap(lambda x: ((i, 1) for i in x[1])).cache() #(hash, 1)
        else:
            reverse_rdd = reverse_rdd.map(lambda x: (x[1], 1)).cache() #(hash, 1)
        n_hashes = reverse_rdd.map(lambda x: x[0]).distinct().count()
        common_items = reverse_rdd.reduceByKey(lambda x,y: x+y).map(lambda x: x[1]).filter(lambda x: x == 2).count()#if same hash twice = common hash
        return float(common_items)/float(n_hashes)
    
    def set_max_index(self, n_func, merged = False):#(key, set_of_values)
        if merged:
            val = self.shingle_rdd.map(lambda x: max(x[1])).max()+1
        else:
            val = self.shingle_rdd.count()#map(lambda x: x[1]).max
        self.generate_random_vals(n_func, val)
        
    def min_hashing(self, n_func, merged = False):
        self.signature_length = n_func
        self.set_max_index(n_func, merged)
        if merged:
            rdd = self.shingle_rdd.flatMap(lambda x: (x[0], v) for v in v[1])
        a=self.a
        b=self.b
        c=self.c
        self.minhash = self.shingle_rdd.flatMap(lambda x: [((x[0],i), ((a[i]*x[1])+b[i])%c) for i in range(0, len(a))]).reduceByKey(lambda x,y: min(x,y))#((doc id,sign id), val), ...
        return self.minhash
      
    def compare_signatures(self, rdd1, rdd2):
        rdd = self.minhash.filter(lambda x:x[0][0] == rdd1).union(self.minhash.filter(lambda x:x[0][0] == rdd2)).map(lambda x: (x[0][1], x[1]))#(func, value)
        rdd = rdd.reduceByKey(lambda x,y: 1 if x==y else 0).cache()#same sign id, same value
        common = rdd.filter(lambda x: x[1] == 1).count()
        total = rdd.map(lambda x:x[0]).distinct().count()
        return float(common)/float(total)
    
    def spark_lsh(self, band_size):
        threshold = (1/band_size)**(band_size/self.signature_length)
        print("threshold: ", threshold)
        global_rdd = self.minhash.map(lambda x: ((x[0][0], x[0][1]//band_size), {x[0][1]%band_size : x[1]})).reduceByKey(lambda x,y: {**x, **y})
        global_rdd = global_rdd.map(lambda x:(x[0], hash(tuple([x[1][i] for i in range(0, len(x[1]))])))).map(lambda x: ((x[0][1],x[1]), x[0][0]))
        global_rdd = global_rdd.groupByKey().flatMap(lambda x: ((frozenset(i), 1) for i in itertools.combinations(x[1], 2)))
        global_rdd = global_rdd.reduceByKey(lambda x,y: x+y).filter(lambda x: x[1] >= threshold).collect()
        return global_rdd
                                                     

In [14]:
shingling = Shingling(subset, 8)
s = shingling.multiple_shingle()
c = shingling.compare_docs("test.txt","dbfs:/data-mining/nsf-abstract/original/awd_1992_01/a9201869.txt")

l_sign = 5000
band_size = l_sign/5

d = shingling.min_hashing(l_sign)
e = shingling.compare_signatures("test.txt", "dbfs:/data-mining/nsf-abstract/original/awd_1992_01/a9201869.txt")

print("exac", c)
print("approx", e)

shingling.spark_lsh(band_size)


exac 1.0
approx 1.0
threshold:  0.251188643150958


[(frozenset({'dbfs:/data-mining/nsf-abstract/original/awd_1992_01/a9201869.txt',
             'test.txt'}),
  5)]

In [9]:
'''
import numpy as np 
import itertools

def lsh(M,b,r): #M = matrice of signatures, b = nb of bands, r = nb of rows
  
  

  
  nrows, ncols = np.shape(M)
  
  if b*r != nrows:
    print('Error branding')
    
  # computing the threshold
  threshold = (1/b)**(1/r)
  
  # the different bands
  bands = np.array([M[i:i+r, :] for i in range(b)])
  
  #print(bands)

  #initialisation
  all_candidate_pairs=[]
  buckets = []
  
  for i in range(len(bands)): 
    # initialisation 
    band =bands[i]
    candidate_pairs_band =[] # the list of candidate pairs according to this band
    
    # the different portions : r-vectors of the band
    portions = np.array([band[:,j] for j in range(ncols)])
    
    print(portions)
    # a bucket for this band that hashes every portion
    bucket_array = np.array([hashing(portion) for portion in portions]) 
    
    # go through the different unique values of hashes and find the similar vectors that hashed to this value
    for hash_value in np.unique(bucket_array):
      
      indexes_of_portions_with_same_hash = np.where(bucket_array == hash_value)[0]
      candidate_pairs_value = list(itertools.combinations(indexes_of_portions_with_same_hash,2))

      if (candidate_pairs_value != []):
        candidate_pairs_band +=(candidate_pairs_value)
        all_candidate_pairs += (candidate_pairs_value)
        
    # Different buckets to different bands
    buckets.append(bucket_array)
  
  all_candidate_pairs = np.unique(all_candidate_pairs,return_counts=True)
  return(all_candidate_pairs, threshold)


def hashing(array): # computes the hash of a portion to a fixed range of values, with k > nb of documents and large
  hash_out = hash(tuple(array))
  return hash_out

def check(M,all_candidate_pairs, t): # Check each candidate pair’s signatures if the fraction of  components in which they agree is at least t , i.e. if they  are similar at least  t
  
  nrows, ncols = np.shape(M)
  print ('the threshold is ', t)
  similar_pairs = []
  
  for pair in all_candidate_pairs[0]:
    
    similarity = compareSignatures(M[:,pair[0]],M[:,pair[1]])
    
    if similarity >= t :
      similar_pairs.append([tuple(pair), similarity])
      print('the pair ', pair, ' has similar signatures ', similarity)
      
    if similarity <t:
      print('the pair ', pair," is not similar enough")
      
      
      
  return similar_pairs
  '''