<a href="https://colab.research.google.com/github/KayvanShah1/Big-Data-Specialization-Coursera/blob/main/assignment-3/notebooks/HW3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setup & Installation

In [1]:
!pip install pyspark ipython-autotime

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting ipython-autotime
  Downloading ipython_autotime-0.3.2-py2.py3-none-any.whl (7.0 kB)
Collecting jedi>=0.16 (from ipython->ipython-autotime)
  Downloading jedi-0.19.1-py2.py3-none-any.whl (1.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m9.9 MB/s[0m eta [36m0:00:00[0m
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=a13accc1454e2a18b65a86ea6294565f63848fd4a700213b595eae0a87bc129a
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packag

In [2]:
%%bash
java --version

openjdk 11.0.22 2024-01-16
OpenJDK Runtime Environment (build 11.0.22+7-post-Ubuntu-0ubuntu222.04.1)
OpenJDK 64-Bit Server VM (build 11.0.22+7-post-Ubuntu-0ubuntu222.04.1, mixed mode, sharing)


# Imports

In [3]:
import os
import sys
import json
from pyspark import SparkContext

from pprint import pprint

%load_ext autotime

time: 538 µs (started: 2024-03-13 07:28:21 +00:00)


In [4]:
os.chdir("/content/drive/MyDrive/Colab Notebooks/DSCI553/hw3")
os.getcwd()

'/content/drive/MyDrive/Colab Notebooks/DSCI553/hw3'

time: 257 ms (started: 2024-03-13 07:28:21 +00:00)


In [5]:
!ls

HW3.ipynb  HW3StudentData  t1-ref.txt  task1_ref.py
time: 516 ms (started: 2024-03-13 07:28:22 +00:00)


# Tasks

## 1

In [35]:
import sys
import time
from pyspark import SparkContext, SparkConf
from itertools import combinations
import random
import csv


random.seed(37)

# Define constants
NUM_HASH_FUNCTIONS = 50
PRIME_NUMBER = 15485863
ROWS_PER_BAND = 2
BANDS = NUM_HASH_FUNCTIONS // ROWS_PER_BAND


def prepare_dataset(data):
    # Remove the header
    header = data.first()
    data = (
        data.filter(lambda row: row != header)
        .map(lambda row: row.split(","))
    )

    # Find unique users and map it to an index
    usr_to_idx = (
        data.map(lambda x: x[0])
        .distinct()
        .zipWithIndex()
        .collectAsMap()
    )

    # Group users that has reviewed a business
    business_user = (
        data.map(lambda row: (row[1], [row[0]]))
        .reduceByKey(lambda a, b: a + b)
    )
    return business_user, usr_to_idx


def generate_hash_function_params(max_range, count):
    """Generate random hash function parameters within a specified range."""
    hash_funcs = []
    for _ in range(count):
        a = random.randint(1, max_range)  # Random coefficient 'a'
        b = random.randint(0, max_range)  # Random intercept 'b'
        hash_funcs.append((a, b))
    return hash_funcs


def hash_item(item, params, num_bins):
    """Hash an item using given hash function parameters.
    Calculate hash value using the formula: ((a * item + b) % PRIME_NUMBER) % num_bins
    """
    hash_val = ((params[0] * item + params[1]) % PRIME_NUMBER) % num_bins
    return hash_val


def build_minhash_signature_matrix(hash_funcs, users, num_bins):
    """Build the minhash signature matrix for a set of users."""
    mhs = []
    for params in hash_funcs:
        minhash = float("inf")
        for user in users:
            # Hash each user and find the minimum hash value
            hash_val = hash_item(user, params, num_bins)
            minhash = min(minhash, hash_val)
        mhs.append(minhash)
    return mhs


def jaccard_similarity(pair, bus_user_dict):
    """
    Calculate Jaccard similarity for a candidate pair of businesses.

    Args:
        pair (tuple): A pair of business IDs.
        bus_user_dict (dict): Dictionary mapping business IDs to sets of user IDs.

    Returns:
        tuple: A tuple containing the business pair and their Jaccard similarity.
    """
    # Extract business IDs from the pair
    bus1, bus2 = pair

    # Get sets of users who reviewed each business
    user1 = set(bus_user_dict[bus1])
    user2 = set(bus_user_dict[bus2])

    # Calculate Jaccard similarity
    intersection = len(user1 & user2)
    union = len(user1 | user2)
    similarity = intersection / union if union != 0 else 0

    return (bus1, bus2), similarity


def jaccard_based_lsh(prepared_data):
    """Perform Jaccard-based Locality Sensitive Hashing (LSH) on prepared data.

    This function applies LSH to find candidate pairs of businesses with similar users,
    based on the Jaccard similarity metric.

    Algorithm Steps:
    1. Unpack the prepared data containing the business-to-user mapping and user index mapping.
    2. Generate a set of hash functions.
    3. Compute the Minhash Signature for each business.
    4. Divide the signature matrix into bands.
    5. Group businesses into bands based on their Minhash Signature.
    6. Find candidate pairs of businesses within each band.
    7. Calculate the Jaccard similarity for candidate pairs.
    8. Filter pairs with similarity above a threshold (e.g., 0.5).
    9. Sort the results by business ID pairs.
    10. Return the RDD containing the Jaccard similarity results for candidate business pairs.

    Args:
        prepared_data (tuple): A tuple containing the business-to-user mapping RDD and user index mapping dictionary.

    Returns:
        RDD: An RDD containing the Jaccard similarity results for candidate business pairs.
    """
    # Unpack prepared data
    business_to_user, usr_to_idx = prepared_data

    # Generate Hash functions
    NUM_BINS = len(usr_to_idx)
    hash_func_params = generate_hash_function_params(NUM_BINS, NUM_HASH_FUNCTIONS)

    # Compute Minhash Signature
    minhash_sign = (
        business_to_user.mapValues(lambda users: [usr_to_idx[user] for user in users])
        .mapValues(lambda users: build_minhash_signature_matrix(hash_func_params, users, NUM_BINS))
    )

    # Divide signature matrix into bands
    bands = (
        minhash_sign.flatMap(
            lambda x: [
                (
                    (i, tuple(x[1][i*ROWS_PER_BAND: (i+1)*ROWS_PER_BAND])), x[0]
                )
                for i in range(BANDS)
            ]
        )
        .groupByKey()
        .mapValues(list)
        .filter(lambda x: len(x[1]) > 1)
    )

    # Find the business candidate pairs
    candidates = (
        bands.map(lambda x: sorted(x[1]))
        .flatMap(lambda x: list(combinations(x, 2)))
        .distinct()
    )

    # Calculate Jaccard Similirality for pairs
    bus_to_user_dict = business_to_user.collectAsMap()

    jaccard_sim_results = (
        candidates.map(lambda x: jaccard_similarity(x, bus_to_user_dict))
        .filter(lambda x: x[1] >= 0.5)
        .sortByKey()
        .map(lambda x: [x[0][0], x[0][1] ,x[1]])
    )
    return jaccard_sim_results


def task1(input_file_name, output_file_name):
    # Initialize Spark
    conf = SparkConf().setAppName("Task 1")
    spark = SparkContext(conf=conf).getOrCreate()
    spark.setLogLevel("ERROR")

    try:
        start_time = time.time()

        # Read the input data
        data = spark.textFile(input_file_name)
        prepared_data = prepare_dataset(data)

        # Compute Jaccard similarity using LSH
        jaccard_sim_results = jaccard_based_lsh(prepared_data)

        # Write header and results to a CSV file
        header = ["business_id_1", "business_id_2", "similarity"]
        with open(output_file_name, "w", newline="") as f:
            writer = csv.writer(f)
            writer.writerow(header)
            writer.writerows(jaccard_sim_results.collect())

        execution_time = time.time() - start_time
        print(f"Duration: {execution_time}\n")

    finally:
        # Stop Spark
        spark.stop()


# if __name__ == "__main__":
#     if len(sys.argv) != 3:
#         print(
#             "Usage: spark-submit task1.py <input_file_name> <output_file_name>"
#         )
#         sys.exit(1)

#     # Read input parameters
#     input_file_path = sys.argv[1]
#     output_file_path = sys.argv[2]

#     task1(input_file_path, output_file_path)


task1("HW3StudentData/yelp_train.csv", "t1.csv")

Duration: 53.42814826965332

time: 54.7 s (started: 2024-03-13 09:12:29 +00:00)


### Ref

In [67]:
# @title Reference Code Task 1 { vertical-output: true, form-width: "30%" }
%%writefile task1_ref.py
from pyspark import SparkContext
import sys
import time
import random
from itertools import combinations
import operator

if __name__ == '__main__':
    input_path = sys.argv[1]
    output_path = sys.argv[2]

    s_t = time.time()

    spark = SparkContext(appName= "task1")
    lines = spark.textFile(input_path)
    first = lines.first()
    lines = lines.filter(lambda row: row != first).map(lambda row: row.split(","))
    #print(raw_rdd.take(10))

    bus_user = lines.map(lambda row: (row[1], row[0])).groupByKey().mapValues(set)
    #print(bus_user.take(10))
    bus_user_dict = {}
    for bus, users in bus_user.collect():
        bus_user_dict[bus] = users
    users = lines.map(lambda row: row[0]).distinct()
    users_dict = {}
    i = 0
    for user in users.collect():
        users_dict[user] = i
        i += 1

    n = 60
    m = i
    p = 1e9 + 7
    hash_funcs = [] #[a, b]
    a = random.sample(range(1, m), n)
    hash_funcs.append(a)
    b = random.sample(range(1, m), n)
    hash_funcs.append(b)
    #print(hash_funcs)

    sign_dict = {}
    for bus, user_list in bus_user.collect():
        minhash_sign_list = []
        for i in range(n):
            minhash = float("inf")
            for user in user_list:
                minhash = min(minhash, (((hash_funcs[0][i] * users_dict[user] + hash_funcs[1][i]) % p) % m))
            minhash_sign_list.append(int(minhash))
        sign_dict[bus] = minhash_sign_list
    #print(sign_dict)

    r = 2
    b = n // r
    bands_dict = {}
    for bus, minhash_sign in sign_dict.items():
        for i in range(0, b):
            #print(s[1][i*r: i*r+r])
            idx = (i, tuple(minhash_sign[i*r: i*r+r]))
            if idx not in bands_dict.keys():
                   bands_dict[idx] = []
                   bands_dict[idx].append(bus)
            else:
                   bands_dict[idx].append(bus)
    #print(bands_dict)
    bands_dict_fi = {}
    for key, values in bands_dict.items():
        if len(values) > 1:
            bands_dict_fi[key] = values
    #print(bands_dict_fi)
    #418426
    candidates = set()
    for values in bands_dict_fi.values():
        comb_list = combinations(sorted(values), 2)
        for item in comb_list:
            candidates.add(item)
    #print(candidates)

    result = {}
    for bus1, bus2 in candidates:
        user1 = bus_user_dict[bus1]
        user2 = bus_user_dict[bus2]
        js = len(user1 & user2) / len(user1 | user2)
        if js >= 0.5:
            result[str(bus1) + "," + str(bus2)] = js
    result = dict(sorted(result.items(), key=operator.itemgetter(0)))
    result_str = "business_id_1, business_id_2, similarity\n"
    for key, values in result.items():
        result_str += key + "," + str(values) + "\n"
    with open(output_path, "w") as f:
        f.writelines(result_str)

    e_t = time.time()
    print('Duration: ', e_t - s_t)

Overwriting task1_ref.py
time: 13.6 ms (started: 2024-03-13 03:07:25 +00:00)


In [68]:
!spark-submit task1_ref.py HW3StudentData/yelp_train.csv t1-ref.txt --executor-memory 4G --driver-memory 4G

24/03/13 03:07:40 INFO SparkContext: Running Spark version 3.5.1
24/03/13 03:07:40 INFO SparkContext: OS info Linux, 6.1.58+, amd64
24/03/13 03:07:40 INFO SparkContext: Java version 11.0.22
24/03/13 03:07:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/03/13 03:07:40 INFO ResourceUtils: No custom resources configured for spark.driver.
24/03/13 03:07:40 INFO SparkContext: Submitted application: task1
24/03/13 03:07:41 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
24/03/13 03:07:41 INFO ResourceProfile: Limiting resource is cpu
24/03/13 03:07:41 INFO ResourceProfileManager: Added ResourceProfile id: 0
24/03/13 03:07:41 INFO SecurityManager: Changing view a

### Test

In [7]:
((7 * 9098 + 147) % 15485863) % 50

33

time: 5.36 ms (started: 2024-03-13 00:29:40 +00:00)


In [54]:
list(
    combinations(
        sorted(
            (
                ("b1",[2,3,4]),
                ("b2",[3,4,77]),
                ("b3", [-1, 67, 0]),
                ("b4",[-2,3,4]),
                ("b5",[3,-42,77]),
                ("b6", [0, 7, -103])
            )
        ),
    2)
)

[(('b1', [2, 3, 4]), ('b2', [3, 4, 77])),
 (('b1', [2, 3, 4]), ('b3', [-1, 67, 0])),
 (('b1', [2, 3, 4]), ('b4', [-2, 3, 4])),
 (('b1', [2, 3, 4]), ('b5', [3, -42, 77])),
 (('b1', [2, 3, 4]), ('b6', [0, 7, -103])),
 (('b2', [3, 4, 77]), ('b3', [-1, 67, 0])),
 (('b2', [3, 4, 77]), ('b4', [-2, 3, 4])),
 (('b2', [3, 4, 77]), ('b5', [3, -42, 77])),
 (('b2', [3, 4, 77]), ('b6', [0, 7, -103])),
 (('b3', [-1, 67, 0]), ('b4', [-2, 3, 4])),
 (('b3', [-1, 67, 0]), ('b5', [3, -42, 77])),
 (('b3', [-1, 67, 0]), ('b6', [0, 7, -103])),
 (('b4', [-2, 3, 4]), ('b5', [3, -42, 77])),
 (('b4', [-2, 3, 4]), ('b6', [0, 7, -103])),
 (('b5', [3, -42, 77]), ('b6', [0, 7, -103]))]

time: 16.5 ms (started: 2024-03-13 02:38:18 +00:00)


In [31]:
sample_data = [
    '-8O4kt8AIRhM3OUxt-pWLg,_p64KqqRmPwGKhZ-xZwhtg,0.5\n',
    '-A5jntJgFglQ6zwAmOiOMw,cTqIuG-fvlQQL0OWzsFdig,0.5\n',
    '-Jhlh8Scjy669NdtCfKSSg,o5Mofj5KJkYAMs_fhxftpg,0.5\n',
    '-LfTBo0oa_uD454ScEW2XA,J5U-nbhKSnnX7DJGT6QELg,1.0',
    '-LfTBo0oa_uD454ScEW2XA,Lwb6bG1Qu3BbW7FJj5suLw,0.5',
    '-LfTBo0oa_uD454ScEW2XA,aokqWHt8vMf5iwj8KZZ5eg,0.5',
    '-Mm02AeY1PMGg-l-ShMxUg,RWwvuTbnf3DleUSiFbUscg,0.5',
    '-Mm02AeY1PMGg-l-ShMxUg,cKgkSMcPXwWTzPrJRpa2qw,0.5',
    '-N_YCDH4HijYnJ-RisQfHA,W2Vis19kUa7kP6GkSvJ9zQ,0.5',
    '-N_YCDH4HijYnJ-RisQfHA,zm8u9z2KCwEkNiCD59UQ3Q,0.5',
    '-UV2tt3gy57-5a29paA_4w,HUMdEqtlPF70AqyZmiyAhA,0.5',
    '-ePLgQ_afOTW1STxD-2RIA,fBU5QssrXMXPbJWD08o9zg,0.5',
    '-fn65iT4z-nh1Ybfyajxng,9BQGpsIRhs9cRHuGh9xuJw,0.5',
    '-i3dOjumvOw-52aGXU1xDg,6nMYROXu0VX4Ytpdsfi3XA,0.5',
    '-ml57G52t4mQpYRHe54HWQ,Nm9xRoLxn4a_q4XlVAJfSg,0.5',
    '06X34OSshQJPDiyUDHtS0A,or4yipilbCkKIFnF1RP5Ow,0.5',
    '06X34OSshQJPDiyUDHtS0A,rsoxvO7ku3fZdhxAw739bQ,0.5',
    '0BOdyChBbhGjQXRWZtDscQ,uocjMsMLvB9o_0nIRKIR6w,1.0',
    '0FJ22K--OjiiQJZ8VJIpJw,ObJJ9S04UuIIjMHZTbK8Kw,1.0',
    '0JoteSDMdGJSgYzk60vSdQ,NjypbHnZG6HjNQVDVQ_iYA,0.5',
    '0Lq-MK2LwhntBCF7G2t71w,WvcUKsNMERSP_dPkgsq9ZA,0.5',
    '0OOmvIS1Tb5ccYOCvq4Yeg,NKpwKdjEG8nL1rI60aYHmg,0.5',
    '0OOmvIS1Tb5ccYOCvq4Yeg,lnU-G15oaYWwqqbUXj9JEA,0.5',
    '0PenhUhRbw2xd_suA68IjA,7jKDOB9b_nijq0-djwyUTw,0.5',
    '0PenhUhRbw2xd_suA68IjA,_11zD3Vo-oyd21A_t5B3BA,0.5'
]

# Write header and results to a CSV file
header = "business_id_1,business_id_2,similarity\n"
with open("t1.csv", "w") as f:
    f.write(header)
    f.writelines(sample_data)

time: 13.2 ms (started: 2024-03-13 08:44:53 +00:00)


In [33]:
!cat t1.csv

b,u,s,i,n,e,s,s,_,i,d,_,1,",",b,u,s,i,n,e,s,s,_,i,d,_,2,",",s,i,m,i,l,a,r,i,t,y,"
"
-8O4kt8AIRhM3OUxt-pWLg,_p64KqqRmPwGKhZ-xZwhtg,0.5
-A5jntJgFglQ6zwAmOiOMw,cTqIuG-fvlQQL0OWzsFdig,0.5
-Jhlh8Scjy669NdtCfKSSg,o5Mofj5KJkYAMs_fhxftpg,0.5
-LfTBo0oa_uD454ScEW2XA,J5U-nbhKSnnX7DJGT6QELg,1.0
-LfTBo0oa_uD454ScEW2XA,Lwb6bG1Qu3BbW7FJj5suLw,0.5
-LfTBo0oa_uD454ScEW2XA,aokqWHt8vMf5iwj8KZZ5eg,0.5
-Mm02AeY1PMGg-l-ShMxUg,RWwvuTbnf3DleUSiFbUscg,0.5
-Mm02AeY1PMGg-l-ShMxUg,cKgkSMcPXwWTzPrJRpa2qw,0.5
-N_YCDH4HijYnJ-RisQfHA,W2Vis19kUa7kP6GkSvJ9zQ,0.5
-N_YCDH4HijYnJ-RisQfHA,zm8u9z2KCwEkNiCD59UQ3Q,0.5
-UV2tt3gy57-5a29paA_4w,HUMdEqtlPF70AqyZmiyAhA,0.5
-ePLgQ_afOTW1STxD-2RIA,fBU5QssrXMXPbJWD08o9zg,0.5
-fn65iT4z-nh1Ybfyajxng,9BQGpsIRhs9cRHuGh9xuJw,0.5
-i3dOjumvOw-52aGXU1xDg,6nMYROXu0VX4Ytpdsfi3XA,0.5
-ml57G52t4mQpYRHe54HWQ,Nm9xRoLxn4a_q4XlVAJfSg,0.5
06X34OSshQJPDiyUDHtS0A,or4yipilbCkKIFnF1RP5Ow,0.5
06X34OSshQJPDiyUDHtS0A,rsoxvO7ku3fZdhxAw739bQ,0.5
0BOdyChBbhGjQXRWZtDscQ,uocjMsMLvB9o_0nIRKIR6w,1.

## 2.1

## 2.2

## 2.3

# THE END