# Big Data LSH & MapReduce

## Improrting Libraries

In [None]:
import pandas as pd 
import random
import hashlib
from collections import defaultdict
from itertools import combinations
import csv

import warnings
warnings.filterwarnings('ignore')

## LSH

**Load Data**

In [2]:
train_df = pd.read_csv("Data\yelp_train.csv")
val_df = pd.read_csv('Data\yelp_val.csv')
test_df = pd.read_csv('Data\yelp_test.csv')
df = pd.concat([train_df, val_df, test_df], ignore_index=True)
df.columns = df.columns.str.strip()

**Preprocessing**

In [3]:
user_business_map = defaultdict(set)
business_users_map = defaultdict(set)

In [4]:
for _, row in df.iterrows():
    user = row['user_id'].strip()
    business = row['business_id'].strip()
    user_business_map[user].add(business)
    business_users_map[business].add(user)

In [5]:
user_id_index = {user_id: idx for idx, user_id in enumerate(user_business_map)}

In [6]:
user_id_index

{'vxR_YV0atFxIxfOnF9uHjQ': 0,
 'o0p-iTC5yTBV5Yab_7es4g': 1,
 '-qj9ouN0bzMXz1vfEslG-A': 2,
 'E43QxgV87Ij6KxMCHcijKw': 3,
 'T13IBpJITI32a1k41rc-tg': 4,
 'Q1IENmNc6bdDruACmhy4mg': 5,
 '4bQqil4770ey8GfhBgEGuw': 6,
 '0BBUmH7Krcax1RZgbH4fSA': 7,
 'QZ_Arlwoj0ghfBvg69rjOw': 8,
 'lEw2VL9JCDFk3R5NzahqnA': 9,
 'zsZBYWYEmLLs81_f-HHM8w': 10,
 'uNxXIIFzjHm5r62UTyVjrQ': 11,
 'TGrZk61ja7X1ijAGW5lNmg': 12,
 'Fv0e9RIV9jw5TX3ctA1WbA': 13,
 'z4RytucxI_XfcMFaEI2DRg': 14,
 'y0ZU1w6YY4W-KtMeRXSYLg': 15,
 'BeoFmNoFuz-h8uso-J2_lg': 16,
 'pMefTWo6gMdx8WhYSA2u3w': 17,
 'tYk7mMGGFl3gLfmhST5L-A': 18,
 'Ez8ifxrtOUT5TM4817scOQ': 19,
 'KhpA23RweKsO9TDaGpE2xg': 20,
 'Bj_MarPEKBe2xN12YimekQ': 21,
 'mrru6hbeQLvagU4vkfuqmw': 22,
 'UYcmGbelzRa0Q6JqzLoguw': 23,
 '0KeT9NKimYkHN0wlrij-dg': 24,
 '0nqshyLgABOSyTfJUTthjQ': 25,
 '3CMiTAhq_o_GP1tLs-ihbg': 26,
 'vSzquh2Ym99tpksfpWH2kg': 27,
 'PKEzKWv_FktMm2mGPjwd0Q': 28,
 'sMm9HOXmBgsBmCKnOmLwyA': 29,
 'E0v3KkPKsizW_RW3CKQj-A': 30,
 'en3OMUKWjeON-1-6wo8VrQ': 31,
 '8AwcaBJjiMpQ__FP

**Hash Functions**

In [7]:
def generate_hash_functions(num_funcs, max_val, prime=2147483647):
    funcs = []
    for _ in range(num_funcs):
        a = random.randint(1, prime - 1)
        b = random.randint(0, prime - 1)
        funcs.append(lambda x, a=a, b=b: ((a * x + b) % prime) % max_val)
    return funcs

In [8]:
num_hashes = 100
max_val = len(user_id_index)
hash_funcs = generate_hash_functions(num_hashes, max_val)

**Signature Matrix**

In [9]:
signature_matrix = {}
for business, users in business_users_map.items():
    sig = []
    for func in hash_funcs:
        min_hash = min(func(user_id_index[user]) for user in users)
        sig.append(min_hash)
    signature_matrix[business] = sig

**LSH**

In [10]:
bands = 20
rows_per_band = num_hashes // bands
buckets = defaultdict(list)

In [11]:
for band in range(bands):
    for business, sig in signature_matrix.items():
        band_signature = tuple(sig[band * rows_per_band: (band + 1) * rows_per_band])
        band_hash = hashlib.md5(str(band_signature).encode()).hexdigest()
        buckets[(band, band_hash)].append(business)

In [12]:
candidate_pairs = set()
for bucket in buckets.values():
    if len(bucket) > 1:
        for pair in combinations(sorted(bucket), 2):
            candidate_pairs.add(pair)

**Jaccard Similarity**

In [13]:
def jaccard(set1, set2):
    return len(set1 & set2) / len(set1 | set2)

In [14]:
similar_pairs = []
for b1, b2 in candidate_pairs:
    sim = jaccard(business_users_map[b1], business_users_map[b2])
    if sim >= 0.5:
        similar_pairs.append((b1, b2, sim))

**Sorting the results**

In [15]:
similar_pairs_sorted = sorted(similar_pairs, key=lambda x: (x[0], x[1]))

**Save to CSV**

In [16]:
with open("similar_businesses.csv", "w", newline='') as f:
    writer = csv.writer(f)
    writer.writerow(["business_id_1", "business_id_2", "similarity"])
    writer.writerows(similar_pairs_sorted)

## MapReduce

In [1]:
from pyspark import SparkConf, SparkContext
import os

In [None]:
os.environ["JAVA_HOME"] = "C:\\Program Files\\Java\\jre1.8.0_451"
os.environ["SPARK_HOME"] = "C:\\Spark\\spark-3.5.5-bin-hadoop3" 
os.environ["HADOOP_HOME"] = "C:\\hadoop"
os.environ["PYSPARK_PYTHON"] = "D:\\Master_Semester2\\BigData\\Assignments\\Assignment1\\.venv\\Scripts\\python.exe"
os.environ["PYSPARK_DRIVER_PYTHON"] = "D:\\Master_Semester2\\BigData\\Assignments\\Assignment1\\.venv\\Scripts\\python.exe"
os.environ["PATH"] += ";" + os.path.join(os.environ["SPARK_HOME"], "bin")

In [3]:
def loadMovieNames() :
    movieNames = {}
    with open("Data/movies.txt", encoding = "ISO-8859-1") as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

In [4]:
conf = SparkConf().setAppName("WorstMovies")
sc = SparkContext(conf = conf)
movieNames = loadMovieNames()

In [5]:
lines = sc.textFile("Data/users.txt")

In [6]:
movieRatings = lines.map(lambda x: (int(x.split()[1]), (float(x.split()[2]), 1)))

In [7]:
ratingTotalsAndCount = movieRatings.reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1]))

In [8]:
averageRatings = ratingTotalsAndCount.mapValues(lambda x: x[0] / x[1])

In [9]:
bestMovie = averageRatings.max(key=lambda x: x[1])
worstMovie = averageRatings.min(key=lambda x: x[1])

In [10]:
print("Best Movie:")
print(f"{movieNames[bestMovie[0]]} with average rating of {bestMovie[1]:.2f}")

Best Movie:
Santa with Muscles (1996) with average rating of 5.00


In [None]:
print("\nWorst Movie:")
print(f"{movieNames[worstMovie[0]]} with average rating of {worstMovie[1]:.2f}")


Worst Movie:
Amityville: Dollhouse (1996) with average rating of 1.00
