# Import preprocessed data

In [24]:
from tqdm import tqdm
import numpy as np
import mmh3
from src.structures import User, Movie
from src.data_methods import read_movies,read_viewers
import kagglehub

In [28]:
# Download latest version
datapath = kagglehub.dataset_download("netflix-inc/netflix-prize-data")

In [29]:
n_lines = 10000 #number of reviews to read
datafiles = ["combined_data_1.txt"]#, "combined_data_2.txt", "combined_data_3.txt", "combined_data_4.txt"]
with_tqdm = False #set to True to see progress bar (reduce speed)

movies = read_movies(datapath)
users = read_viewers(datapath, movies, datafiles = datafiles, with_tqdm= with_tqdm, n_lines=n_lines) #read only 100000 

# Compute Signature matrix

In [35]:

def compute_minhashes(
                    object: Movie | User, #movie or user to compute minhashes for
                    n_hash = 100 #number of hashes
                    ):
    
    bag = object.bag_ratings() #bag of ratings
    hashes = np.array([[mmh3.hash(id, seed) for id in bag] for seed in range(n_hash)]) #hashes for each seed
    minhashes = np.min(hashes, axis = 1) #minhashes for each seed
    return minhashes

def compute_signatures(
                    objets: dict[str, Movie | User], #objects to compute signatures for
                    n_hash = 100, #number of hashes
                    with_tqdm = True #whether to show progress bar
                    ):
    iterator = tqdm(objets.items()) if with_tqdm else objets.items() #iterator
    signatures = {id: compute_minhashes(obj, n_hash) for id, obj in iterator}
    return signatures

def bucket_hash(signatures, n_buckets = 100):
    bucket = 0
    for signature in signatures:
        bucket = bucket ^ hash(signature) #xor of all signatures
    return bucket % n_buckets #modulo n_buckets

cartesian_product_exclude_same = lambda A,B : set((a,b) for a in A for b in B if (a != b and b > a))
res = cartesian_product_exclude_same([1,2,3], [1,2,3])

In [36]:
n_hash = 100    
user = users["1488844"]
minhashes = compute_minhashes(user, n_hash)
minhashes

array([-1810453357, -1570063170,  -184002522,  -126235597,  -172315920,
        -346150140, -2131240906,  -793765815, -1571785578,  -977649233,
        -535678046, -1457417976, -1562019656,   101162235, -1560298715,
        -956861631,   168904549,   790285535, -2070529337,   625771924,
        -249096934,   166839407, -1447133418, -1508133614,  -721831573,
       -1056278211, -2114645946,  -381543989,     2653644,  -243094001,
       -1200734897, -1428275679,  1687665648,  -677418915, -1473849242,
        -248996816,  -893062816,  -576892624,  1653519337, -1799033939,
       -1549378093, -1625405734, -1868994069, -1367157416,  1017076005,
       -1384113491,  -310537956, -1527923987, -1384278664,   427621935,
         369715913, -1778312296,   436922359,  -868451072,  -383727986,
        -532758649, -2127377514,  -701845473,   559389694, -1995129985,
         517183813,  1120606309,  -552741755,  -943591880, -1871239471,
       -1987538483,  -818748851, -1542113122, -1668779179,   290

# Exercise 3.4.4 : 
Suppose we wish to implement LSH by MapReduce. Specifically, assume chunks of the signature matrix consist of columns, and elements
are key-value pairs where the key is the column number and the value is the
signature itself (i.e., a vector of values).

    a) Show how to produce the buckets for all the bands as output of a single
    MapReduce process. Hint: Remember that a Map function can produce
    several key-value pairs from a single element.

    b) Show how another MapReduce process can convert the output of (a) to
    a list of pairs that need to be compared. Specifically, for each column i,
    there should be a list of those columns j > i with which i needs to be
    compared.

In [117]:

def create_bucket_matrix(signatures: dict[str, np.ndarray], n_buckets = 100, bands = 10):
    """Creates a matrix of size (n_objects, bands) where each entry corresponds to a bucket number for a band of the minhash signatures.

    Args:
        signatures (dict[str, np.ndarray]): dictionary of on the form {object_id: [hash1, hash2, ...]} where the object is a user or movie.
        n_buckets (int, optional): number of buckets. Defaults to 100.
        bands (int, optional): number of bands. Defaults to 10.

    Returns:
        buckets: matrix of size (n_objects, bands) where each entry corresponds to a bucket number for a band
    """
    
    r = len(signatures) // bands
    buckets = np.zeros((len(signatures), bands), dtype = int)
    i = 0
    for _, signature in signatures.items(): #for each object
        for j in range(bands): #for each band
            buckets[i,j] = bucket_hash(signature[j*r:(j+1)*r], n_buckets=n_buckets)
        i += 1
    return buckets

In [118]:
n_hashes = 1000
b = 20
r = n_hashes // b
n_buckets = 10000

SIG = compute_signatures(users, n_hashes, with_tqdm = True)
buckets = create_bucket_matrix(SIG, n_buckets, bands = b)

100%|██████████| 9619/9619 [00:17<00:00, 558.69it/s]


In [121]:
def find_candidates(buckets, users):
    #buckets is a matrix of size (n_users, bands)
    ids = np.array(list(users.keys()))
    candidates = set()
    for band in buckets.T:
        #find collisions
        unique = np.unique(band)
        for bucket_value in tqdm(unique):
            idx = np.where(bucket_value == band)[0]
            new_candidates = cartesian_product_exclude_same(ids[idx], ids[idx])
            candidates = candidates.union(new_candidates)
    return candidates


In [122]:
candidates = find_candidates(buckets, users)

 85%|████████▌ | 47/55 [00:45<00:07,  1.02it/s]
  0%|          | 0/20 [00:45<?, ?it/s]


KeyboardInterrupt: 

{('1664010', '2118461'),
 ('1664010', '305344'),
 ('1664010', '387418'),
 ('2118461', '305344'),
 ('2118461', '387418'),
 ('305344', '387418')}

In [116]:
ids[idx]

array(['1664010', '305344', '2118461', '387418'], dtype='<U7')

{(1, 2), (1, 3), (2, 3)}

In [77]:
"A" < "B"

True