In [1]:
import pdb
import scipy
import scipy.sparse
import numpy as np
import time
import sys
import itertools

In [2]:
filename='review/sample.txt'

In [3]:
def get_ngrams(tokens,n_grams):
    return zip(*[tokens[i:] for i in range(n_grams)])

In [4]:
def get_hash_func():
    a = np.random.randint(-100, 100)
    b = np.random.randint(-100, 100)
    c = 4294967311
    def hash_func(x):
        return (a*x+b) % c
    return hash_func

In [5]:
def shingling(filename, n_grams, hash_size):
    lines=len([1 for line in open(filename,'r',encoding='utf-8')])
    matrix=scipy.sparse.lil_matrix((hash_size,lines),dtype=bool)
    
    for i,line in enumerate(open(filename,'r',encoding='utf-8')):
        tokens=[t for t in line[:-1].split(' ') if t!='']
        tokens=get_ngrams(tokens,n_grams)
        for token in tokens:
            bucket=hash(token)%hash_size
            matrix[bucket,i]=1
            
    matrix=scipy.sparse.csc_matrix(matrix)
    return matrix

In [6]:
def minhashing(matrix, hash_funcs):
    k=len(hash_funcs)
    n_cols=matrix.shape[1]
    M=np.zeros((k,n_cols))
    for i in range(n_cols):
        rows=matrix[:,i].indices
        for j,hash_func in enumerate(hash_funcs):
            vhash_list=[hash_func(v) for v in rows]
            vhash_min=min(vhash_list)
            M[j,i]=vhash_min
    return M

In [7]:
def LSH(M,b=20,r=5):
    n_cols=M.shape[1]
    k=n_cols
    bucket_list=[[[] for i in range(k)] for j in range(b)]
    for c in range(n_cols):
        for band in range(b):
            row_start=band*r
            bucket=tuple(M[row_start:(row_start+r),c])
            vbucket=hash(bucket)%k
            bucket_list[band][vbucket].append(c)
    return bucket_list

In [8]:
def get_candidates(bucket_list, query=None):
    candidates=set()
    count=0
    for band in bucket_list:
        for vbucket in band:
            if query!=None:
                if query in vbucket:
                    candidates=candidates.union(set(vbucket))
            else:
                candidates=candidates.union(list(itertools.combinations(vbucket,2)))
        count+=1
#         print(f"Band {count} completed, {len(candidates)} candidate pairs found")
    
    return candidates

In [9]:
def find_similar_item(bucket_list, matrix, query, SIM=0.8):
    candidates=get_candidates(bucket_list,query)
    
    sims=[]
    c1=set(matrix[:,query].indices)
    for candidate in candidates:
        c2=set(matrix[:,candidate].indices)
        sim=len(c1 & c2)/len(c1 | c2)
        if sim>=SIM:
            sims.append((candidate,sim))
    sims=sorted(sims,key=lambda x:x[1],reverse=True)           
    
    return sims

In [10]:
start_time=time.time()

k=100
# find and hash n-grams, create sparse matrix
n_grams=5
hash_size=2**16
matrix=shingling(filename, n_grams, hash_size)

# create hash functions
hash_funcs=[get_hash_func() for i in range(k)]

# minhashing and create M
M=minhashing(matrix, hash_funcs)

# LSH and create bucket_list
bucket_list=LSH(M,b=20,r=5)

# find similar pairs
sims=find_similar_item(bucket_list, matrix, 467, 0.5)

print(sims)

end_time=time.time()
elapsed=end_time-start_time
print(f"No Spark solution time elapsed: {int(elapsed//60)} minutes {int(elapsed%60)} seconds")

[(467, 1.0), (574, 0.5673076923076923), (466, 0.5673076923076923), (603, 0.5673076923076923)]
No Spark solution time elapsed: 3 minutes 9 seconds


### Partial Spark Solution

In [11]:
from pyspark import SparkContext

try:
    sc
except NameError:
    sc=SparkContext("local[*]","Similar Items")

In [12]:
def get_sim(pair,matrix):
    i1,i2=pair
    c1=set(matrix[:,i1].indices)
    c2=set(matrix[:,i2].indices)
    sim=len(c1 & c2)/len(c1 | c2)
    return (pair, sim)

def get_sim_other(item,query,matrix):
    c1=set(matrix[:,query].indices)
    c2=set(matrix[:,item].indices)
    sim=len(c1 & c2)/len(c1 | c2)
    return (item, sim)

In [13]:
start_time=time.time()

k=100
n_grams=5
hash_size=2**16
hash_funcs=[get_hash_func() for i in range(k)]

# find and hash n-grams, create sparse matrix
matrix=shingling(filename, n_grams, hash_size)

# minhashing and create M
M=minhashing(matrix, hash_funcs)

# LSH and create bucket_list
bucket_list=LSH(M,b=20,r=5)
    
# Given a query, get similar item
query=467
SIM=0.5
sims=sc.parallelize(bucket_list).flatMap(lambda x:[i for i in x if query in i]).\
flatMap(lambda x:[i for i in x]).distinct().\
map(lambda x: get_sim_other(x,query,matrix)).filter(lambda x:x[1]>SIM).sortBy(lambda x:x[1],False).collect()

print(sims)

# Get all similar pairs
SIM=0.8
pairs=sc.parallelize(bucket_list).flatMap(lambda x:[i for i in x if len(i)>1]).\
flatMap(lambda x:list(itertools.combinations(x,2))).distinct().\
map(lambda x: get_sim(x,matrix)).filter(lambda x:x[1]>SIM).sortBy(lambda x:x[1],False).collect()

print(pairs[-10:])

end_time=time.time()
elapsed=end_time-start_time
print(f"Partial Spark solution time elapsed: {int(elapsed//60)} minutes {int(elapsed%60)} seconds")

[(467, 1.0), (466, 0.5673076923076923), (603, 0.5673076923076923), (574, 0.5673076923076923)]
[((2621, 2656), 1.0), ((8523, 8702), 1.0), ((2943, 2946), 1.0), ((5934, 5959), 0.9318181818181818), ((5958, 5959), 0.9318181818181818), ((2902, 2922), 0.9170731707317074), ((5023, 9620), 0.8461538461538461), ((4928, 4929), 0.8451327433628318), ((2636, 2646), 0.810126582278481), ((2646, 2647), 0.810126582278481)]
Partial Spark solution time elapsed: 3 minutes 26 seconds


### Full Spark Solution

In [14]:
def shingling_spark(line, n_grams, hash_size=2**16):
    mtx=scipy.sparse.lil_matrix((hash_size,1))
    tokens=[t for t in line[:-1].split(' ') if t!='']
    tokens=get_ngrams(tokens,n_grams)
    for token in tokens:
        h=hash(token)%hash_size
        mtx[h,0]=1
    mtx=scipy.sparse.csc_matrix(mtx)
    return mtx

def minhashing_spark(matrix, hash_funcs):
    k=len(hash_funcs)
    M=[0 for i in range(k)]
    rows=matrix.indices
    for j,hash_func in enumerate(hash_funcs):
        vhash_list=[hash_func(v) for v in rows]
        vhash_min=min(vhash_list)
        M[j]=vhash_min
    return M

def LSH_spark(col,sig,b,r):
    bucket_list=[]
    k=2**16
    for band in range(b):
        start=band*r
        bucket=hash(tuple(sig[start:(start+r)]))%k
        bucket_list.append([(band,bucket),col])
    return bucket_list

def get_sim_other_spark(item,query,matrix):
    c1=set(matrix[query].indices)
    c2=set(matrix[item].indices)
    sim=len(c1 & c2)/len(c1 | c2)
    return (item, sim)

def get_sim_spark(pair,matrix):
    i1,i2=pair
    c1=set(matrix[i1].indices)
    c2=set(matrix[i2].indices)
    sim=len(c1 & c2)/len(c1 | c2)
    return (pair, sim)

In [15]:
start_time=time.time()
  
n_grams=5
hash_size=2**16
hash_funcs=[get_hash_func() for i in range(k)]

# Get n_grams and hash
matrix=sc.textFile(filename).map(lambda x: shingling_spark(x, n_grams, hash_size)).collect()

# Minhash
M=sc.parallelize(matrix).map(lambda x: minhashing_spark(x, hash_funcs)).collect()
MM=[(col,sig) for col,sig in enumerate(M)]

# LSH
b=20
r=5
bucket_list=sc.parallelize(MM).flatMap(lambda x: LSH_spark(x[0],x[1],b,r)).groupByKey().\
filter(lambda x: len(x[1])>1).map(lambda x: list(x[1])).collect()

# Given a query, get all similar items
query=467
SIM=0.5
sims=sc.parallelize(bucket_list).filter(lambda x: query in x).\
flatMap(lambda x:[i for i in x]).distinct().\
map(lambda x: get_sim_other_spark(x,query,matrix)).\
filter(lambda x:x[1]>SIM).sortBy(lambda x:x[1],False).collect()

print(sims)

# Get all similar pairs
SIM=0.8
pairs=sc.parallelize(bucket_list).\
flatMap(lambda x:list(itertools.combinations(x,2))).distinct().\
map(lambda x: get_sim_spark(x,matrix)).filter(lambda x:x[1]>SIM).sortBy(lambda x:x[1],False).collect()

print(pairs[-10:])

end_time=time.time()
elapsed=end_time-start_time
print(f"Full Spark solution time elapsed: {int(elapsed//60)} minutes {int(elapsed%60)} seconds")

[(467, 1.0), (466, 0.5623003194888179), (603, 0.5623003194888179), (574, 0.5623003194888179)]
[((2941, 2947), 1.0), ((5927, 5929), 1.0), ((5934, 5958), 1.0), ((2943, 2946), 1.0), ((2621, 2656), 1.0), ((8523, 8702), 1.0), ((5023, 9620), 0.8461538461538461), ((4928, 4929), 0.8451327433628318), ((2636, 2646), 0.8269230769230769), ((2646, 2647), 0.8269230769230769)]
Full Spark solution time elapsed: 4 minutes 57 seconds


In [16]:
sc.stop()

In [17]:
# #takes forever to run
# def find_similar_pairs(bucket_list, matrix, SIM=0.8):
#     candidates=get_candidates(bucket_list)
#     print("Pairs of candidates:", len(candidates))
    
#     similar_list=[]
#     for candidate in candidates:
#         i1,i2=candidate
#         c1=set(matrix[:,i1].indices)
#         c2=set(matrix[:,i2].indices)
#         sim=len(c1 & c2)/len(c1 | c2)
#         if sim>=SIM:
#             similar_list.append((candidate,sim))
                    
#     return similar_list