# Locality Sensitive Hashing

In this assignment you will implement localitiy sensitive hashing step-by-step and apply that on detecting almost duplicate names. You are required to implement the functions as instructed. Do not change the signatures of the functions.

## Submission instruction

Submit a text file with extension .py containing the functions you have implemented. You can simply use the "download as" option from Jupyter notebook to export the notebook as a .py file. The filename should be LSH_{YOUR_ROLL_NO}.py where {YOUR_ROLL_NO} is your 6 letter roll number. For example, a sample file name would be LSH_CS1702.py. Please take this seriously and name the file correctly. 

## Important: do not change the function signatures

You need to implement the functions given in this notebook. You may define more functions at your will if you need to, but make sure you implement the functions given and do not change their signatures. Before submitting, remove unnecessary print / collect statements in order to avoid too much console output. 

## Setup environment

We need to setup pyspark and pydrive. 

In [2]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

openjdk-8-jdk-headless is already the newest version (8u272-b10-0ubuntu1~18.04).
0 upgraded, 0 newly installed, 0 to remove and 11 not upgraded.


Follow the interactive instructions for accessing the file in Google drive. 

In [3]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

### Load the appropriate file

The files are shared on Google drive, with the following ids given. Choose to keep the appropriate line, use the corresponding filename and comment the others. After executing, you should be able to see the file in the files section on the left panel. 

<b>You may face troubles trying to run your code on the full data, so it is totally okay to run it on one of the smaller files.</b>

In [4]:
#id='1aoZykfz5GLGGw3lRA86ogd7yCsgkgHoT' # for titles-small.txt
id='1IPssUa3m-zfWmvVbLtI-lKhJbeZjIgBg' # for titles-10k.txt (with 10k titles, mid sized file) #taking this dataset for this assignment
# id='1RzTA4iOfH3bOiyG2kDaxweUuj4AEcMda' # 
downloaded = drive.CreateFile({'id': id})
downloaded.GetContentFile('titles-10k.txt')

## Parameters

Let us set our parameters. You are free to change these around and experiment. However, do not hardcode them anywhere else in the code. 

In [5]:
#The following tunable parameters are considered for the assignment 
# Number of hash functions
n = 20
# size of each shingle
k = 3
# Number of bands 
b = 4

$\textbf{Note:}$ Please open the notebook in Google colab otherwise it might show indentation error in local sytem because here 2 whitespace works for indentation but in local system, jupyter notebook takes 4 whitespaces or tab for indentation

### 1. Converting a string to shingles (of characters) [Marks: 5] 

In our setup, the items we would compare are movie titles (strings). You would have to convert names to sets of k-shingles (of characters), <i>after removing the whitespaces</i> and <i>converting to lowercase</i>. For example, the 3-shingling of the name <tt>'Die hard'</tt> would be <tt>['die', 'ieh', 'eha', 'har', 'ard']</tt>.

Implement the following function which takes a string and k as input and outputs the list of unique shingles generated from the string. 

In [6]:
def string_to_shingles(name, k):
  shingles=[]
  name=name.lower() # it converts given string to all lower characters strings 
  string="".join(name.split()) # it removes the whitepaces from the string after converting all characters to lower case characters
  #print(str)
  for s in range(len(string)-k+1):
    shingles.append(string[s:s+k]) # taking k-length shingles
  return list(set(shingles)) # to get unique shingles, first convert list to set to remove duplicates and at the end, 
  # returns list of unique shingles generated from the string

Also test your code. 

In [7]:
#print(string_to_shingles('Die hard',3))

### 2. Generating the shingle - item matrix from the whole data [Marks: 20]

For each title, first give it an ID. You can do it by the <tt>zipWithIndex()</tt> operation on Spark RDDs. Try testing it first. 

Then, for each title, get the list of shingles in map and for each unique shingle, get the list of IDs using reduce. 

Tips: when your input RDD is a list of tuples of the following form:

<tt>
    [('Die hard', 0),
 ('Die another day', 1),
 ('Tomorrow never dies', 2),
 ('Chup ke chup ke', 3), .... ]
</tt>

for each tuple <tt>t</tt>, you may use <tt>t[0], t[1]</tt> etc to access the first, second (and so on) elements of the tuples. 

The "titles-10k.txt" file has total 10k movie titles which are loaded in the titles RDD.   

In [8]:
from pyspark import SparkContext, SparkConf
sc = SparkContext.getOrCreate()

# Use appropriate file path here
titles = sc.textFile("titles-10k.txt")  

# Get the number of titles
N = titles.count()  # N=10000 = total number of movie titles
itemsByShingles = titles.zipWithIndex() # associate a unique id for each title.

Here, we convert each title to a 3-shingles and keyval is a list of key-value pair in which key has title and value is a list of 3-shingles for corresponding title. 

In [9]:
keyval=titles.map(lambda x: (x,string_to_shingles(x,k)))  
#keyval.takeSample(False,5) # taking a sample of size 5 without replacement   

Here, keyval1 returns the key-value pair where key is a list of 3-shingles for a particular title and corresponding movie id. 

In [10]:
keyval1=itemsByShingles.map(lambda x: (string_to_shingles(x[0],k),x[1])) 
#keyval1.takeSample(False,5)

From the list of " ( [list of 3-shingles] , movie-id ) ", I make a pair of a particular shingle and movie id. 

In [11]:
key_val=keyval1.flatMap(lambda n: [(x, n[1]) for x in n[0]]) 
#key_val.takeSample(False,5)

In [12]:
#print(key_val.count())

Here, result returns a key-value pair where key is a shingle and and value is the list of movie id associated with a particular shingle which is the objective of this part.

## Answer 1 : Each unique 3-shingle with list of movie IDs<br>
After comment out the "result.takeSample(False,5)", it will give the desired output for sample size of 5 without replacement 

In [13]:
result=key_val.combineByKey(lambda v:[v],lambda x,y:x+[y],lambda x,y:x+y) 
#result.takeSample(False,5)

In [14]:
 #result.count() # returns total number of unique shingles

At the end of this step, the <tt>itemsByShingles</tt> RDD should have the list of (shingle, [list of movie ids]).

If you display your output, it should look like:

[(shingle, [list of movie ids]), (shingle, [list of movie ids]), ... ]

<b>However, do not keep collect or large print statements in your code at the time of submitting.</b>

### 3. Computation of min-hash signature matrix [Marks: 10 + 10 = 20]

Instead of using random permutations, you will implement min-hash function using Murmurhash (v3) functions, as discussed in the class. The input to your function should be a <i>title</i> (which corresponds to a row) and the output should be a number (hash value). 

Recall the outline of the min-hash signature matrix algorithm:<br><br>

<tt>
1. For each row $r$ BEGIN<br>
2. &emsp;Compute $h_1(r), h_2(r),…, h_n(r)$<br>
3. &emsp;  For each column $j$ BEGIN <br>
4. &emsp; &emsp;  If the $j$-th column has 1 in row $r$ <br>
5. &emsp;&emsp;&emsp; For each $i = 1, 2, … , n$ BEGIN <br>
6. &emsp;&emsp;&emsp;&emsp; set $\mathrm{SIG}_{i,j} = \min(\mathrm{SIG}_{i,j}, h_i(r))$<br>
7. &emsp;&emsp;&emsp;            END <br>
8. &emsp;&emsp;        END IF<br>
9. &emsp;  END<br>
10. END
</tt>

For each shingle, we have the list of movie IDs which contain the shingle. So, for each shingle (each row), we can perform the actions in lines 5-7 only for the movie IDs that are present in the list. For any other $j$, the $j$-th column does not have a 1 in row $r$. 

Do not create a SIG matrix and try to update it. Instead, note that for every 1-entry in the shingle-id matrix (that is, every id in the list corresponding to a shingle), the corresponding (i, id)-th entry of the signature matrix may get potentially updated. Simply output the tuple ((i, id), h_i(text)) in another map. The final value of the (i,id)-th entry of the signature matrix is the minimum of all such h_i(text) values obtained in this process. That minimum can be computed by another reduce process. 

### (a) The map [Marks: 10 out of 20]
To make it easier, you may implement an <tt>update_signature</tt> function which takes the shingle (text), the list of ids associated with the shingle and the total number of shingles (for computing the hash values). It should simply return the tules <tt>((i, id), h_i(text))</tt> for all i = 1, ..., n. 

<b>Note:</b> There has been a correction here. Instead of N (the number of movie ids), we need to pass the number of Shingles here. 

In [16]:
!pip3 install mmh3 



Here, mmh3 returns the key-value pair where key is a tuple of hash-function index and movie id and value is hash-value. Actually, here, key is $(i,j)^{th}$ entry in signature matrix and value is value in that $(i,j)^{th}$ entry for the signature matrix. 

## Answer 3(a):

In [17]:
import mmh3
def update_signature(text, ids, numOfShingles):
    
    # h = [mmh3.hash(text,i) % (total number of shingles) for i in range(n)]
  
  min_sig = []
  for i in range(n):
    for j in range(len(ids)):
      temp=[]
      temp2=[]
      temp.append(i)
      temp.append(ids[j])
      temp2.append(tuple(temp))
      temp2.append(mmh3.hash(text,i) % (numOfShingles))
      min_sig.append(tuple(temp2))
     
    
  return min_sig 

### (b) The reduce [Marks: 10 out of 20]

Now that you have all <tt>((i, id), h_i(shingle))</tt> tuples output by the map, use reduce to compute the minimum of <tt>h_i(shingle)</tt> for every <tt>(i,id)</tt> key. This would produce the signature matrix in a sparse matrix representation. 

You may actually implement the map function above and call map and reduce together later as well. 

Here, new_key_val returns the list of key-value pair where where key is a pair of hash-function id $i= 0$ to $(n-1)$ and a movie-id. Value in the key-value pair is value of the hash-function where input is a unique 3-shingle. Here, key-value pairs are corresponding to the unique 3-shingle and associated list of movie-ids.

In [18]:
numOfShingles = result.count() # Calculate, you need to pass this to the update_signature function

# You should use map with the update_signature function and a reduce 
#new_key_val=result.flatMap(lambda n: [(x, n[1]) for x in n[1]])

new_key_val=result.map(lambda x: update_signature(x[0],x[1],numOfShingles) )
#new_key_val.takeSample(False,2)
new_key_val = new_key_val.flatMap(lambda list: list)

In [19]:
#print(result.count(),new_key_val.count())

Now, signature returns the signature matrix which is our objective for this part.It is in the form of $((i,j),k)$ where, $(i,j)$ represents the $(i,j)^{th}$ entry of the signature matrix and $k$ is the value of the hash function.<br>
<br>
$\textbf{Note:}$ Here,  size of the signature matrix is $199960$ , not $20 \times 10000 = 200000$. The reason is, in the original dataset "titles-10k.txt", there are $2$ movie titles "99" and "S1" for which I can't generate 3-shingle. So, size of the signature matrix will be $20*(10000-2)= 199960.$ 


## Answer 3 (b): Signature Matrix <br>
After commnet out the "signature.takeSample(False,5)", it will give the desired output.


In [20]:
signature = new_key_val.reduceByKey(min)
#signature.takeSample(False,5)

In [24]:
#signature.count() 


#Here, size of the signature matrix is 199960 , not 20×10000=200000. The reason is, in the original dataset "titles-10k.txt", 
#there are 2 movie titles "99" and "S1" for which I can't generate 3-shingle. So, movie IDs are not considered for these titles
#So, size of the signature matrix will be 20∗(10000−2)=199960.

The RDD signature should be of the following form:

<tt>
    [((0, 1), 5), ((0, 3), 56), ... 
    ]
</tt>

where each tuple <tt>((i,j),v)</tt> represents the $(i,j)$-th entry of the signature matrix with value $v$.

### 4. Implement the banding [Marks: 15]

Now configure your number of bands $b$ (a divisor of number of hash functions $n$) and implement the candidate pair computation. Any pair of movies which agree completely in their signature on at least one band should be output as candidate pairs. 

Here, signature matrix is divided into 4 parts by rows since there are 4 bands and number of rows in each band is 5.

In [25]:
#r=n/b = 20/4 = 5

band1=signature.filter(lambda x : x[0][0] >=0 and x[0][0]<=4)
band2=signature.filter(lambda x : x[0][0] >=5 and x[0][0]<=9)
band3=signature.filter(lambda x : x[0][0] >=10 and x[0][0]<=14)
band4=signature.filter(lambda x : x[0][0] >=15 and x[0][0]<=19)

candidate_pairs = []

Since, I get the candidate pairs in band1 where pair of movies completely agree in their signature, so I will not check the other bands. The logic to find the candidate pair which I have used is: first for a band1, I listed the repeating hash-values in the signature matrix along with their entry means to which row and column they belong to. After that, pair-wise columns in which hash-values are same, I compare the other 3 hash-values for those pair-wise coulumns. If all values are matches, then I return the those column pairs.<br>
$\textbf{Note:}$ Here, I return only the candidate pairs of movie ids. Later, I will return candidate pairs by movie titles.    

## Answer (4): List of candidate pairs by movie IDs<br>
After commenting out the "res1.collect()", we get the desired output.
 

In [26]:
#band1

temp01= band1.map(lambda x :(x[1],x[0]))
temp101=temp01.groupByKey().map(lambda x : (x[0], list(x[1])))
#print(temp101.takeSample(False,2))
def ls(x):
  f2=[]
  for i in range(len(x[1])):
    for j in range(i+1,len(x[1])):
      f1=[]
      if x[1][i][0]== x[1][j][0]:
        l1=[]
        l1.append(x[1][i][1])
        l1.append(x[1][j][1])
        f1.append(tuple(l1))
        f1.append(x[1][j][0])
        #f1.append(x[0])
        f2.append(tuple(f1))
  return f2
filt1=temp101.flatMap(lambda x : [ls(x)])
#filt1.take(2)
filt1=filt1.flatMap(lambda l: l)
#filt1.count()
fin1=filt1.groupByKey().map(lambda x : (x[0], list(x[1])))
#print(fin1.takeSample(False,2))
#fin1.count()
def check(x):
  if len(x[1])==4:
    return x[0]
res1=fin1.map(lambda x: check(x))
res1=res1.filter(lambda x: x is not None)
#res1.collect()

Now, instead of candidate pairs as movie ids, I will return the candidate pairs by movie titles. I used the above code (just copy-pasted) and instead of movie-id, I replaced the movie-titles. 

## Answer (4): List of candidate pairs by movie titles<br>
After commenting out the "res1_modified.collect()" we get the desired output

In [28]:
keyval_modified=keyval.map(lambda x: (string_to_shingles(x[0],k),x[0])) #it returns the key-value pair where key is 
#list of 3-shingles for a particular title and corresponding movie id 
#keyval_modified.takeSample(False,5)
key_val_modified=keyval_modified.flatMap(lambda n: [(x, n[1]) for x in n[0]]) # from list of 3-shingles:movie-id, it makes
# a pair of a particular shingle and movie id. 
#key_val_modified.takeSample(False,5)
result_modified=key_val_modified.combineByKey(lambda v:[v],lambda x,y:x+[y],lambda x,y:x+y) # it returns a key-value pair 
# where key is a shingle and and value is the list of movie id associated with a particular shingle.
#result_modified.takeSample(False,5)
#numOfShingles = result.count() # Calculate, you need to pass this to the update_signature function

#signature = # Implement the rest here

# You should use map with the update_signature function and a reduce 
#new_key_val=result.flatMap(lambda n: [(x, n[1]) for x in n[1]])

new_key_val_modified=result_modified.map(lambda x: update_signature(x[0],x[1],numOfShingles) )
#new_key_val_modified.takeSample(False,2)
#signature.take(5)
new_key_val_modified = new_key_val_modified.flatMap(lambda list: list)
signature_modified = new_key_val_modified.reduceByKey(min)
#r=n/b = 20/4 = 5
#x = sc.parallelize([1,2,4,5])
##max_val = x.reduce(lambda a, b: a if a > b else b)
#print(max_val)
band1_modified=signature_modified.filter(lambda x : x[0][0] >=0 and x[0][0]<=4)
band2_modified=signature_modified.filter(lambda x : x[0][0] >=5 and x[0][0]<=9)
band3_modified=signature_modified.filter(lambda x : x[0][0] >=10 and x[0][0]<=14)
band4_modified=signature_modified.filter(lambda x : x[0][0] >=15 and x[0][0]<=19)

candidate_pairs = []
#band1_modified

temp01_modified= band1_modified.map(lambda x :(x[1],x[0]))
temp101_modified=temp01_modified.groupByKey().map(lambda x : (x[0], list(x[1])))
#print(temp101.takeSample(False,2))
def ls(x):
  f2=[]
  for i in range(len(x[1])):
    for j in range(i+1,len(x[1])):
      f1=[]
      if x[1][i][0]== x[1][j][0]:
        l1=[]
        l1.append(x[1][i][1])
        l1.append(x[1][j][1])
        f1.append(tuple(l1))
        f1.append(x[1][j][0])
        #f1.append(x[0])
        f2.append(tuple(f1))
  return f2
filt1_modified=temp101_modified.flatMap(lambda x : [ls(x)])
#filt1.take(2)
filt1_modified=filt1_modified.flatMap(lambda l: l)
#filt1.count()
fin1_modified=filt1_modified.groupByKey().map(lambda x : (x[0], list(x[1])))
#print(fin1.takeSample(False,2))
#fin1.count()
def check(x):
  if len(x[1])==4:
    return x[0]
res1_modified=fin1_modified.map(lambda x: check(x))
res1_modified=res1_modified.filter(lambda x: x is not None)
#res1_modified.collect()

The candidate pairs should be of the form

<tt>
    [(movie1, movie2), (movie1, movie2), ... ]
</tt>

### 5. Optional: 

Optionally, you may also want to compute the pairwise actual Jaccard similarities for each candidate pair and test the number of false positives and negatives for a similarity threshold $s$. 