<a href="https://colab.research.google.com/github/KayvanShah1/usc-dsci553-data-mining-sp24/blob/main/assignment-3/notebooks/HW3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setup & Installation

In [1]:
!pip install pyspark ipython-autotime



In [2]:
%%bash
java --version

openjdk 11.0.22 2024-01-16
OpenJDK Runtime Environment (build 11.0.22+7-post-Ubuntu-0ubuntu222.04.1)
OpenJDK 64-Bit Server VM (build 11.0.22+7-post-Ubuntu-0ubuntu222.04.1, mixed mode, sharing)


# Imports

In [3]:
import os
import sys
import json
import pandas as pd
from itertools import combinations
import math
import statistics
from pyspark import SparkContext
import numpy as np

from pprint import pprint

%load_ext autotime

time: 327 µs (started: 2024-03-18 07:33:35 +00:00)


In [4]:
os.chdir("/content/drive/MyDrive/Colab Notebooks/DSCI553/hw3")
os.getcwd()

'/content/drive/MyDrive/Colab Notebooks/DSCI553/hw3'

time: 3.93 ms (started: 2024-03-18 07:33:36 +00:00)


In [5]:
!ls

HW3.ipynb	__pycache__  t1-ref.txt  t2_2.csv	  t2-ref.txt	task2_1.py	task2_2.py
HW3StudentData	t1.csv	     t2_1.csv	 t2_2-dryrun.csv  task1_ref.py	task2_1_ref.py
time: 106 ms (started: 2024-03-18 07:33:37 +00:00)


# Tasks

## 1

In [None]:
import sys
import time
from pyspark import SparkContext, SparkConf
from itertools import combinations
import random
import csv


random.seed(37)

# Define constants
NUM_HASH_FUNCTIONS = 50
PRIME_NUMBER = 15485863
ROWS_PER_BAND = 2
BANDS = NUM_HASH_FUNCTIONS // ROWS_PER_BAND


def prepare_dataset(data):
    # Remove the header
    header = data.first()
    data = (
        data.filter(lambda row: row != header)
        .map(lambda row: row.split(","))
    )

    # Find unique users and map it to an index
    usr_to_idx = (
        data.map(lambda x: x[0])
        .distinct()
        .zipWithIndex()
        .collectAsMap()
    )

    # Group users that has reviewed a business
    business_user = (
        data.map(lambda row: (row[1], [row[0]]))
        .reduceByKey(lambda a, b: a + b)
    )
    return business_user, usr_to_idx


def generate_hash_function_params(max_range, count):
    """Generate random hash function parameters within a specified range."""
    hash_funcs = []
    for _ in range(count):
        a = random.randint(1, max_range)  # Random coefficient 'a'
        b = random.randint(0, max_range)  # Random intercept 'b'
        hash_funcs.append((a, b))
    return hash_funcs


def hash_item(item, params, num_bins):
    """Hash an item using given hash function parameters.
    Calculate hash value using the formula: ((a * item + b) % PRIME_NUMBER) % num_bins
    """
    hash_val = ((params[0] * item + params[1]) % PRIME_NUMBER) % num_bins
    return hash_val


def build_minhash_signature_matrix(hash_funcs, users, num_bins):
    """Build the minhash signature matrix for a set of users."""
    mhs = []
    for params in hash_funcs:
        minhash = float("inf")
        for user in users:
            # Hash each user and find the minimum hash value
            hash_val = hash_item(user, params, num_bins)
            minhash = min(minhash, hash_val)
        mhs.append(minhash)
    return mhs


def jaccard_similarity(pair, bus_user_dict):
    """
    Calculate Jaccard similarity for a candidate pair of businesses.

    Args:
        pair (tuple): A pair of business IDs.
        bus_user_dict (dict): Dictionary mapping business IDs to sets of user IDs.

    Returns:
        tuple: A tuple containing the business pair and their Jaccard similarity.
    """
    # Extract business IDs from the pair
    bus1, bus2 = pair

    # Get sets of users who reviewed each business
    user1 = set(bus_user_dict[bus1])
    user2 = set(bus_user_dict[bus2])

    # Calculate Jaccard similarity
    intersection = len(user1 & user2)
    union = len(user1 | user2)
    similarity = intersection / union if union != 0 else 0

    return (bus1, bus2), similarity


def jaccard_based_lsh(prepared_data):
    """Perform Jaccard-based Locality Sensitive Hashing (LSH) on prepared data.

    This function applies LSH to find candidate pairs of businesses with similar users,
    based on the Jaccard similarity metric.

    Algorithm Steps:
    1. Unpack the prepared data containing the business-to-user mapping and user index mapping.
    2. Generate a set of hash functions.
    3. Compute the Minhash Signature for each business.
    4. Divide the signature matrix into bands.
    5. Group businesses into bands based on their Minhash Signature.
    6. Find candidate pairs of businesses within each band.
    7. Calculate the Jaccard similarity for candidate pairs.
    8. Filter pairs with similarity above a threshold (e.g., 0.5).
    9. Sort the results by business ID pairs.
    10. Return the RDD containing the Jaccard similarity results for candidate business pairs.

    Args:
        prepared_data (tuple): A tuple containing the business-to-user mapping RDD and user index mapping dictionary.

    Returns:
        RDD: An RDD containing the Jaccard similarity results for candidate business pairs.
    """
    # Unpack prepared data
    business_to_user, usr_to_idx = prepared_data

    # Generate Hash functions
    NUM_BINS = len(usr_to_idx)
    hash_func_params = generate_hash_function_params(NUM_BINS, NUM_HASH_FUNCTIONS)

    # Compute Minhash Signature
    minhash_sign = (
        business_to_user.mapValues(lambda users: [usr_to_idx[user] for user in users])
        .mapValues(lambda users: build_minhash_signature_matrix(hash_func_params, users, NUM_BINS))
    )

    # Divide signature matrix into bands
    bands = (
        minhash_sign.flatMap(
            lambda x: [
                (
                    (i, tuple(x[1][i*ROWS_PER_BAND: (i+1)*ROWS_PER_BAND])), x[0]
                )
                for i in range(BANDS)
            ]
        )
        .groupByKey()
        .mapValues(list)
        .filter(lambda x: len(x[1]) > 1)
    )

    # Find the business candidate pairs
    candidates = (
        bands.map(lambda x: sorted(x[1]))
        .flatMap(lambda x: list(combinations(x, 2)))
        .distinct()
    )

    # Calculate Jaccard Similirality for pairs
    bus_to_user_dict = business_to_user.collectAsMap()

    jaccard_sim_results = (
        candidates.map(lambda x: jaccard_similarity(x, bus_to_user_dict))
        .filter(lambda x: x[1] >= 0.5)
        .sortByKey()
        .map(lambda x: [x[0][0], x[0][1] ,x[1]])
    )
    return jaccard_sim_results


def task1(input_file_name, output_file_name):
    # Initialize Spark
    conf = SparkConf().setAppName("Task 1")
    spark = SparkContext(conf=conf).getOrCreate()
    spark.setLogLevel("ERROR")

    try:
        start_time = time.time()

        # Read the input data
        data = spark.textFile(input_file_name)
        prepared_data = prepare_dataset(data)

        # Compute Jaccard similarity using LSH
        jaccard_sim_results = jaccard_based_lsh(prepared_data)

        # Write header and results to a CSV file
        header = ["business_id_1", "business_id_2", "similarity"]
        with open(output_file_name, "w", newline="") as f:
            writer = csv.writer(f)
            writer.writerow(header)
            writer.writerows(jaccard_sim_results.collect())

        execution_time = time.time() - start_time
        print(f"Duration: {execution_time}\n")

    finally:
        # Stop Spark
        spark.stop()


# if __name__ == "__main__":
#     if len(sys.argv) != 3:
#         print(
#             "Usage: spark-submit task1.py <input_file_name> <output_file_name>"
#         )
#         sys.exit(1)

#     # Read input parameters
#     input_file_path = sys.argv[1]
#     output_file_path = sys.argv[2]

#     task1(input_file_path, output_file_path)


task1("HW3StudentData/yelp_train.csv", "t1.csv")

Duration: 53.42814826965332

time: 54.7 s (started: 2024-03-13 09:12:29 +00:00)


### Ref

In [None]:
# @title Reference Code Task 1 { vertical-output: true, form-width: "30%" }
%%writefile task1_ref.py
from pyspark import SparkContext
import sys
import time
import random
from itertools import combinations
import operator

if __name__ == '__main__':
    input_path = sys.argv[1]
    output_path = sys.argv[2]

    s_t = time.time()

    spark = SparkContext(appName= "task1")
    lines = spark.textFile(input_path)
    first = lines.first()
    lines = lines.filter(lambda row: row != first).map(lambda row: row.split(","))
    #print(raw_rdd.take(10))

    bus_user = lines.map(lambda row: (row[1], row[0])).groupByKey().mapValues(set)
    #print(bus_user.take(10))
    bus_user_dict = {}
    for bus, users in bus_user.collect():
        bus_user_dict[bus] = users
    users = lines.map(lambda row: row[0]).distinct()
    users_dict = {}
    i = 0
    for user in users.collect():
        users_dict[user] = i
        i += 1

    n = 60
    m = i
    p = 1e9 + 7
    hash_funcs = [] #[a, b]
    a = random.sample(range(1, m), n)
    hash_funcs.append(a)
    b = random.sample(range(1, m), n)
    hash_funcs.append(b)
    #print(hash_funcs)

    sign_dict = {}
    for bus, user_list in bus_user.collect():
        minhash_sign_list = []
        for i in range(n):
            minhash = float("inf")
            for user in user_list:
                minhash = min(minhash, (((hash_funcs[0][i] * users_dict[user] + hash_funcs[1][i]) % p) % m))
            minhash_sign_list.append(int(minhash))
        sign_dict[bus] = minhash_sign_list
    #print(sign_dict)

    r = 2
    b = n // r
    bands_dict = {}
    for bus, minhash_sign in sign_dict.items():
        for i in range(0, b):
            #print(s[1][i*r: i*r+r])
            idx = (i, tuple(minhash_sign[i*r: i*r+r]))
            if idx not in bands_dict.keys():
                   bands_dict[idx] = []
                   bands_dict[idx].append(bus)
            else:
                   bands_dict[idx].append(bus)
    #print(bands_dict)
    bands_dict_fi = {}
    for key, values in bands_dict.items():
        if len(values) > 1:
            bands_dict_fi[key] = values
    #print(bands_dict_fi)
    #418426
    candidates = set()
    for values in bands_dict_fi.values():
        comb_list = combinations(sorted(values), 2)
        for item in comb_list:
            candidates.add(item)
    #print(candidates)

    result = {}
    for bus1, bus2 in candidates:
        user1 = bus_user_dict[bus1]
        user2 = bus_user_dict[bus2]
        js = len(user1 & user2) / len(user1 | user2)
        if js >= 0.5:
            result[str(bus1) + "," + str(bus2)] = js
    result = dict(sorted(result.items(), key=operator.itemgetter(0)))
    result_str = "business_id_1, business_id_2, similarity\n"
    for key, values in result.items():
        result_str += key + "," + str(values) + "\n"
    with open(output_path, "w") as f:
        f.writelines(result_str)

    e_t = time.time()
    print('Duration: ', e_t - s_t)

Overwriting task1_ref.py
time: 13.6 ms (started: 2024-03-13 03:07:25 +00:00)


In [None]:
!spark-submit task1_ref.py HW3StudentData/yelp_train.csv t1-ref.txt --executor-memory 4G --driver-memory 4G

24/03/13 03:07:40 INFO SparkContext: Running Spark version 3.5.1
24/03/13 03:07:40 INFO SparkContext: OS info Linux, 6.1.58+, amd64
24/03/13 03:07:40 INFO SparkContext: Java version 11.0.22
24/03/13 03:07:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/03/13 03:07:40 INFO ResourceUtils: No custom resources configured for spark.driver.
24/03/13 03:07:40 INFO SparkContext: Submitted application: task1
24/03/13 03:07:41 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
24/03/13 03:07:41 INFO ResourceProfile: Limiting resource is cpu
24/03/13 03:07:41 INFO ResourceProfileManager: Added ResourceProfile id: 0
24/03/13 03:07:41 INFO SecurityManager: Changing view a

### Test

In [None]:
((7 * 9098 + 147) % 15485863) % 50

33

time: 5.36 ms (started: 2024-03-13 00:29:40 +00:00)


In [None]:
list(
    combinations(
        sorted(
            (
                ("b1",[2,3,4]),
                ("b2",[3,4,77]),
                ("b3", [-1, 67, 0]),
                ("b4",[-2,3,4]),
                ("b5",[3,-42,77]),
                ("b6", [0, 7, -103])
            )
        ),
    2)
)

[(('b1', [2, 3, 4]), ('b2', [3, 4, 77])),
 (('b1', [2, 3, 4]), ('b3', [-1, 67, 0])),
 (('b1', [2, 3, 4]), ('b4', [-2, 3, 4])),
 (('b1', [2, 3, 4]), ('b5', [3, -42, 77])),
 (('b1', [2, 3, 4]), ('b6', [0, 7, -103])),
 (('b2', [3, 4, 77]), ('b3', [-1, 67, 0])),
 (('b2', [3, 4, 77]), ('b4', [-2, 3, 4])),
 (('b2', [3, 4, 77]), ('b5', [3, -42, 77])),
 (('b2', [3, 4, 77]), ('b6', [0, 7, -103])),
 (('b3', [-1, 67, 0]), ('b4', [-2, 3, 4])),
 (('b3', [-1, 67, 0]), ('b5', [3, -42, 77])),
 (('b3', [-1, 67, 0]), ('b6', [0, 7, -103])),
 (('b4', [-2, 3, 4]), ('b5', [3, -42, 77])),
 (('b4', [-2, 3, 4]), ('b6', [0, 7, -103])),
 (('b5', [3, -42, 77]), ('b6', [0, 7, -103]))]

time: 16.5 ms (started: 2024-03-13 02:38:18 +00:00)


## 2.1

In [22]:
# %%writefile task2_1.py
import csv
import sys
import time

from pyspark import SparkConf, SparkContext


def prepare_dataset(data, split="train"):
    # Remove the header
    header = data.first()
    data = (
        data.filter(lambda row: row != header)
        .map(lambda row: row.split(","))
        .map(lambda row: (row[0], row[1], row[2]) if split == "train" else (row[0], row[1]))
    )
    return data


def save_data(data, output_file_name):
    header = ["user_id", "business_id", "prediction"]
    with open(output_file_name, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(header)
        writer.writerows(data)


def get_bus_to_usr_map(train_data):
    # Group by business_id and collect the corresponding set of users
    bus2user = (
        train_data.map(lambda x: (x[1], (x[0], float(x[2]))))
        .groupByKey()
        .mapValues(lambda vals: {"users": dict(vals), "avg_rating": sum(val[1] for val in vals) / len(vals)})
    )
    return bus2user.collectAsMap()


def get_usr_to_bus_map(train_data):
    # Group by user_id and collect the corresponding set of businesses
    user2bus = (
        train_data.map(lambda x: (x[0], (x[1], float(x[2]))))
        .groupByKey()
        .mapValues(lambda vals: {"business": dict(vals)})
    )
    return user2bus.collectAsMap()


def compute_pearson_similarity(data, item2user_dict):
    """
    Formala: r = Σᵢ((xᵢ − mean(x))(yᵢ − mean(y))) (√Σᵢ(xᵢ − mean(x))² √Σᵢ(yᵢ − mean(y))²)⁻¹
    """
    # Unpack the data
    item1, item2 = data

    # Find common user to calculate co-rated averages
    users_item1 = set(item2user_dict[item1]["users"].keys())
    users_item2 = set(item2user_dict[item2]["users"].keys())
    common_users = users_item1.intersection(users_item2)

    if len(common_users) <= 1:
        similarity = (5 - abs(item2user_dict[item1]["avg_rating"] - item2user_dict[item2]["avg_rating"])) / 5
    else:
        r1 = []
        r2 = []
        # Get ratings of common users for both business
        for usr in common_users:
            r1.append(item2user_dict[item1]["users"][usr])
            r2.append(item2user_dict[item2]["users"][usr])

        # Center the ratings by subtracting the co-rated average rating
        r1_bar = sum(r1) / len(r1)
        r2_bar = sum(r2) / len(r2)
        r1 = [r - r1_bar for r in r1]
        r2 = [r - r2_bar for r in r2]

        # Compute weight for the item pair
        numer = sum([a * b for a, b in zip(r1, r2)])
        denom = ((sum([a**2 for a in r1])) ** 0.5) * (sum([b**2 for b in r2]) ** 0.5)

        similarity = 0 if denom == 0 else numer / denom

    return similarity


def predict_rating(data, bus2user_dict, user2bus_dict, neighbours=15):
    """Perform Item-based Collaborative filtering on prepared data."""
    # Unpack the data
    user, business = data

    # Return avg rating if user or business is not present in the dataset
    if user not in user2bus_dict or business not in bus2user_dict:
        return 3.0

    # Pearson similarities for rating prediction
    pc = []

    for item in user2bus_dict[user]["business"].keys():
        # Compute pearson similarity for each business pair
        similarity = compute_pearson_similarity((business, item), bus2user_dict)
        pc.append((similarity, bus2user_dict[item]["users"][user]))

    # Calculate the predicted rating
    top_pc = sorted(pc, key=lambda x: -x[0])[:neighbours]
    x, y = 0, 0
    for p, r in top_pc:
        x += p * r
        y += abs(p)
    predicted_rating = 3.5 if y == 0 else x / y

    return predicted_rating


def task2_1(train_file_name, test_file_name, output_file_name):
    # Initialize Spark
    conf = SparkConf().setAppName("Task 2.1: Item-Based Collaborative Filtering")
    spark = SparkContext(conf=conf).getOrCreate()
    spark.setLogLevel("ERROR")

    try:
        start_time = time.time()

        # Read and process the train data
        train_data = spark.textFile(train_file_name)
        train_data = prepare_dataset(train_data, split="train")

        # Preprocess train data to get mapping dictionaries
        bus2user_dict = get_bus_to_usr_map(train_data)
        user2bus_dict = get_usr_to_bus_map(train_data)

        # Read and prepare validation data
        val_data = spark.textFile(test_file_name)
        val_data = prepare_dataset(val_data, split="valid").cache()

        val_data = val_data.map(lambda x: [x[0], x[1], predict_rating(x, bus2user_dict, user2bus_dict)]).cache()

        save_data(val_data.collect(), output_file_name)

        execution_time = time.time() - start_time
        print(f"Duration: {execution_time}\n")

    finally:
        # Stop Spark
        spark.stop()


if __name__ == "__main__":
    if len(sys.argv) != 4:
        print("Usage: spark-submit task2_1.py <train_file_name> <test_file_name> <output_file_name>")
        sys.exit(1)

    # Read input parameters
    train_file_name = sys.argv[1]
    test_file_name = sys.argv[2]
    output_file_name = sys.argv[3]

    task2_1(train_file_name, test_file_name, output_file_name)

# task2_1("HW3StudentData/yelp_train.csv", "HW3StudentData/yelp_val.csv", "t2_1.csv")


Overwriting task2_1.py
time: 357 ms (started: 2024-03-18 04:52:03 +00:00)


### Test

In [7]:
df = pd.read_csv("HW3StudentData/yelp_val.csv")
# df.loc[df["user_id"]=="wf1GqnKQuvH-V3QN80UOOQ"]
df

Unnamed: 0,user_id,business_id,stars
0,wf1GqnKQuvH-V3QN80UOOQ,fThrN4tfupIGetkrz18JOg,5.0
1,39FT2Ui8KUXwmUt6hnwy-g,uW6UHfONAmm8QttPkbMewQ,5.0
2,7weuSPSSqYLUFga6IYP4pg,IhNASEZ3XnBHmuuVnWdIwA,4.0
3,CqaIzLiWaa-lMFYBAsYQxw,G859H6xfAmVLxbzQgipuoA,5.0
4,yy7shAsNWRbGg-8Y67Dzag,rS39YnrhoXmPqHLzCBjeqw,3.0
...,...,...,...
142039,pA9NXgASl86RImkdBtydrA,q6-SF8zHFU1AWO70k92o1Q,2.0
142040,_eUb7UGsUoSfi9n2ieF5ow,hgWMxKhrnOUd3m5nOUBIkA,4.0
142041,cEJGXB63KhROA-XmE_jgXw,0ldxjei8v4q95fApIei3Lg,5.0
142042,Z4-V0hc51oxUdULWJOufeg,j29tuUdrfaxmGjwxHdHZPA,3.0


time: 255 ms (started: 2024-03-17 22:42:38 +00:00)


In [26]:
df = pd.read_csv("t2_1.csv")
# df.loc[df["user_id"]=="wf1GqnKQuvH-V3QN80UOOQ"]
df

Unnamed: 0,user_id,business_id,prediction
0,wf1GqnKQuvH-V3QN80UOOQ,fThrN4tfupIGetkrz18JOg,4.467539
1,39FT2Ui8KUXwmUt6hnwy-g,uW6UHfONAmm8QttPkbMewQ,4.731460
2,7weuSPSSqYLUFga6IYP4pg,IhNASEZ3XnBHmuuVnWdIwA,4.344990
3,CqaIzLiWaa-lMFYBAsYQxw,G859H6xfAmVLxbzQgipuoA,4.746280
4,yy7shAsNWRbGg-8Y67Dzag,rS39YnrhoXmPqHLzCBjeqw,2.996730
...,...,...,...
142039,pA9NXgASl86RImkdBtydrA,q6-SF8zHFU1AWO70k92o1Q,3.336156
142040,_eUb7UGsUoSfi9n2ieF5ow,hgWMxKhrnOUd3m5nOUBIkA,2.887312
142041,cEJGXB63KhROA-XmE_jgXw,0ldxjei8v4q95fApIei3Lg,3.662905
142042,Z4-V0hc51oxUdULWJOufeg,j29tuUdrfaxmGjwxHdHZPA,3.800393


time: 208 ms (started: 2024-03-18 05:13:41 +00:00)


## 2.2

In [None]:
# %%writefile task2_2.py
import csv
import json
import sys
import time

import numpy as np
from pyspark import SparkConf, SparkContext
from xgboost import XGBRegressor


def save_data(data, output_file_name):
    header = ["user_id", "business_id", "prediction"]
    with open(output_file_name, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(header)
        writer.writerows(data)


def read_csv_spark(path, sc):
    rdd = sc.textFile(path)
    header = rdd.first()
    rdd = rdd.filter(lambda row: row != header).map(lambda row: row.split(","))
    return rdd


def read_json_spark(path, sc):
    return sc.textFile(path).map(lambda row: json.loads(row))


def process_reviews(review_rdd):
    review_rdd = (
        review_rdd.map(
            lambda row: (row["business_id"], (float(row["useful"]), float(row["funny"]), float(row["cool"])))
        )
        .groupByKey()
        .mapValues(lambda x: tuple(sum(col) / len(col) for col in zip(*x)))
        .cache()
    )
    return review_rdd.collectAsMap()


def process_user(usr_rdd):
    usr_rdd = usr_rdd.map(
        lambda row: (row["user_id"], (float(row["average_stars"]), float(row["review_count"]), float(row["fans"])))
    ).cache()
    return usr_rdd.collectAsMap()


def process_bus(bus_rdd):
    bus_rdd = bus_rdd.map(lambda row: (row["business_id"], (float(row["stars"]), float(row["review_count"])))).cache()
    return bus_rdd.collectAsMap()


def process_train_data(row, review_dict, usr_dict, bus_dict):
    if len(row)==3:
        usr, bus, rating = row
    else:
        usr, bus = row
        rating = 0

    useful, funny, cool = review_dict.get(bus, (None, None, None))
    usr_avg_star, usr_review_cnt, usr_fans = usr_dict.get(usr, (None, None, None))
    bus_avg_star, bus_review_cnt = bus_dict.get(bus, (None, None))

    return ([useful, funny, cool, usr_avg_star, usr_review_cnt, usr_fans, bus_avg_star, bus_review_cnt], rating)


def task2_2(folder_path, test_file_name, output_file_name):
    # Initialize Spark
    conf = SparkConf().setAppName("Task 2.2: : Model-based recommendation system")
    spark = SparkContext(conf=conf).getOrCreate()
    spark.setLogLevel("ERROR")

    try:
        start_time = time.time()

        # Read and process the train data
        train_rdd = read_csv_spark(folder_path + "/yelp_train.csv", spark)

        review_rdd = read_json_spark(folder_path + "/review_train.json", spark)
        review_rdd = process_reviews(review_rdd)

        usr_rdd = read_json_spark(folder_path + "/user.json", spark)
        usr_rdd = process_user(usr_rdd)

        bus_rdd = read_json_spark(folder_path + "/business.json", spark)
        bus_rdd = process_bus(bus_rdd)

        # Read and process validation dataset
        val_rdd = read_csv_spark(test_file_name, spark).cache()

        # Train X and Y
        train_rdd = train_rdd.map(lambda x: process_train_data(x, review_rdd, usr_rdd, bus_rdd))

        # Valid x and Y
        val_processed = val_rdd.map(lambda x: process_train_data(x, review_rdd, usr_rdd, bus_rdd))

        # Extract X_train and Y_train
        X_train = train_rdd.map(lambda x: x[0]).cache()
        X_train = np.array(X_train.collect(), dtype="float32")
        Y_train = train_rdd.map(lambda x: x[1]).cache()
        Y_train = np.array(Y_train.collect(), dtype="float32")

        # Extract X_train and Y_train
        X_val = val_processed.map(lambda x: x[0]).cache()
        X_val = np.array(X_val.collect(), dtype="float32")
        # Y_val = val_processed.map(lambda x: x[1]).cache()
        # Y_val = np.array(Y_val.collect(), dtype='float32')

        xgb = XGBRegressor(
            colsample_bytree=0.5,
            subsample=0.8,
            learning_rate=0.02,
            max_depth=17,
            random_state=47,
            min_child_weight=101,
            n_estimators=40,
        )
        xgb.fit(X_train, Y_train)
        Y_pred = xgb.predict(X_val)

        pred_data = []
        for i, row in enumerate(val_rdd.collect()):
            pred_data.append([row[0], row[1], Y_pred[i]])

        save_data(pred_data, output_file_name)

        execution_time = time.time() - start_time
        print(f"Duration: {execution_time}\n")

    finally:
        # Stop Spark
        spark.stop()


if __name__ == "__main__":
    if len(sys.argv) != 4:
        print("Usage: spark-submit task2_1.py <folder_path> <test_file_name> <output_file_name>")
        sys.exit(1)

    # Read input parameters
    folder_path = sys.argv[1]
    test_file_name = sys.argv[2]
    output_file_name = sys.argv[3]

    task2_2(folder_path, test_file_name, output_file_name)

# task2_2("HW3StudentData", "HW3StudentData/yelp_val.csv", "t2_2.csv")


Duration: 89.79863214492798

time: 1min 31s (started: 2024-03-17 02:54:04 +00:00)


In [None]:
!spark-submit task2_2.py HW3StudentData HW3StudentData/yelp_val.csv t2_2-dryrun.csv

## 2.3

In [11]:
import csv
import json
import sys
import time

import numpy as np
from pyspark import SparkConf, SparkContext
from xgboost import XGBRegressor


def save_data(data, output_file_name):
    header = ["user_id", "business_id", "prediction"]
    with open(output_file_name, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(header)
        writer.writerows(data)


class ItemBasedCF:
    def __init__(self):
        pass

    @staticmethod
    def prepare_dataset(data, split="train"):
        # Remove the header
        header = data.first()
        data = (
            data.filter(lambda row: row != header)
            .map(lambda row: row.split(","))
            .map(lambda row: (row[0], row[1], row[2]) if split == "train" else (row[0], row[1]))
        )
        return data

    @staticmethod
    def get_bus_to_usr_map(train_data):
        # Group by business_id and collect the corresponding set of users
        bus2user = (
            train_data.map(lambda x: (x[1], (x[0], float(x[2]))))
            .groupByKey()
            .mapValues(lambda vals: {"users": dict(vals), "avg_rating": sum(val[1] for val in vals) / len(vals)})
        )
        return bus2user.collectAsMap()

    @staticmethod
    def get_usr_to_bus_map(train_data):
        # Group by user_id and collect the corresponding set of businesses
        user2bus = (
            train_data.map(lambda x: (x[0], (x[1], float(x[2]))))
            .groupByKey()
            .mapValues(lambda vals: {"business": dict(vals)})
        )
        return user2bus.collectAsMap()

    @staticmethod
    def compute_pearson_similarity(data, item2user_dict):
        """
        Formala: r = Σᵢ((xᵢ − mean(x))(yᵢ − mean(y))) (√Σᵢ(xᵢ − mean(x))² √Σᵢ(yᵢ − mean(y))²)⁻¹
        """
        # Unpack the data
        item1, item2 = data

        # Find common user to calculate co-rated averages
        users_item1 = set(item2user_dict[item1]["users"].keys())
        users_item2 = set(item2user_dict[item2]["users"].keys())
        common_users = users_item1.intersection(users_item2)

        if len(common_users) <= 1:
            similarity = (5 - abs(item2user_dict[item1]["avg_rating"] - item2user_dict[item2]["avg_rating"])) / 5
        else:
            r1 = []
            r2 = []
            # Get ratings of common users for both business
            for usr in common_users:
                r1.append(item2user_dict[item1]["users"][usr])
                r2.append(item2user_dict[item2]["users"][usr])

            # Center the ratings by subtracting the co-rated average rating
            r1_bar = sum(r1) / len(r1)
            r2_bar = sum(r2) / len(r2)
            r1 = [r - r1_bar for r in r1]
            r2 = [r - r2_bar for r in r2]

            # Compute weight for the item pair
            numer = sum([a * b for a, b in zip(r1, r2)])
            denom = ((sum([a**2 for a in r1])) ** 0.5) * (sum([b**2 for b in r2]) ** 0.5)

            similarity = 0 if denom == 0 else numer / denom

        return similarity

    @staticmethod
    def predict_rating(data, bus2user_dict, user2bus_dict, neighbours=15):
        """Perform Item-based Collaborative filtering on prepared data."""
        # Unpack the data
        user, business = data

        # Return avg rating if user or business is not present in the dataset
        if user not in user2bus_dict or business not in bus2user_dict:
            return 3.0

        # Pearson similarities for rating prediction
        pc = []

        for item in user2bus_dict[user]["business"].keys():
            # Compute pearson similarity for each business pair
            similarity = ItemBasedCF.compute_pearson_similarity((business, item), bus2user_dict)
            pc.append((similarity, bus2user_dict[item]["users"][user]))

        # Calculate the predicted rating
        top_pc = sorted(pc, key=lambda x: -x[0])[:neighbours]
        x, y = 0, 0
        for p, r in top_pc:
            x += p * r
            y += abs(p)
        predicted_rating = 3.5 if y == 0 else x / y

        return predicted_rating

    def run(self, spark, train_file_name, test_file_name):
        # Read and process the train data
        train_data = spark.textFile(train_file_name)
        train_data = ItemBasedCF.prepare_dataset(train_data, split="train")

        # Preprocess train data to get mapping dictionaries
        bus2user_dict = ItemBasedCF.get_bus_to_usr_map(train_data)
        user2bus_dict = ItemBasedCF.get_usr_to_bus_map(train_data)

        # Read and prepare validation data
        val_data = spark.textFile(test_file_name)
        val_data = ItemBasedCF.prepare_dataset(val_data, split="valid").cache()

        val_data = val_data.map(
            lambda x: [x[0], x[1], ItemBasedCF.predict_rating(x, bus2user_dict, user2bus_dict)]
        ).cache()

        return val_data


class ModelBased:
    def __init__(self):
        pass

    @staticmethod
    def read_csv_spark(path, sc):
        rdd = sc.textFile(path)
        header = rdd.first()
        rdd = rdd.filter(lambda row: row != header).map(lambda row: row.split(","))
        return rdd

    @staticmethod
    def read_json_spark(path, sc):
        return sc.textFile(path).map(lambda row: json.loads(row))

    @staticmethod
    def process_reviews(review_rdd):
        review_rdd = (
            review_rdd.map(
                lambda row: (row["business_id"], (float(row["useful"]), float(row["funny"]), float(row["cool"])))
            )
            .groupByKey()
            .mapValues(lambda x: tuple(sum(col) / len(col) for col in zip(*x)))
            .cache()
        )
        return review_rdd.collectAsMap()

    @staticmethod
    def process_user(usr_rdd):
        usr_rdd = usr_rdd.map(
            lambda row: (row["user_id"], (float(row["average_stars"]), float(row["review_count"]), float(row["fans"])))
        ).cache()
        return usr_rdd.collectAsMap()

    @staticmethod
    def process_bus(bus_rdd):
        bus_rdd = bus_rdd.map(
            lambda row: (row["business_id"], (float(row["stars"]), float(row["review_count"])))
        ).cache()
        return bus_rdd.collectAsMap()

    @staticmethod
    def process_train_data(row, review_dict, usr_dict, bus_dict):
        if len(row) == 3:
            usr, bus, rating = row
        else:
            usr, bus = row
            rating = None

        useful, funny, cool = review_dict.get(bus, (None, None, None))
        usr_avg_star, usr_review_cnt, usr_fans = usr_dict.get(usr, (None, None, None))
        bus_avg_star, bus_review_cnt = bus_dict.get(bus, (None, None))

        return ([useful, funny, cool, usr_avg_star, usr_review_cnt, usr_fans, bus_avg_star, bus_review_cnt], rating)

    def run(self, spark, folder_path, test_file_name):
        # Read and process the train data
        train_rdd = ModelBased.read_csv_spark(folder_path + "/yelp_train.csv", spark)

        review_rdd = ModelBased.read_json_spark(folder_path + "/review_train.json", spark)
        review_rdd = ModelBased.process_reviews(review_rdd)

        usr_rdd = ModelBased.read_json_spark(folder_path + "/user.json", spark)
        usr_rdd = ModelBased.process_user(usr_rdd)

        bus_rdd = ModelBased.read_json_spark(folder_path + "/business.json", spark)
        bus_rdd = ModelBased.process_bus(bus_rdd)

        # Read and process validation dataset
        val_rdd = ModelBased.read_csv_spark(test_file_name, spark).cache()

        # Train X and Y
        train_rdd = train_rdd.map(lambda x: ModelBased.process_train_data(x, review_rdd, usr_rdd, bus_rdd))

        # Valid x and Y
        val_processed = val_rdd.map(lambda x: ModelBased.process_train_data(x, review_rdd, usr_rdd, bus_rdd))

        # Extract X_train and Y_train
        X_train = train_rdd.map(lambda x: x[0]).cache()
        X_train = np.array(X_train.collect(), dtype="float32")
        Y_train = train_rdd.map(lambda x: x[1]).cache()
        Y_train = np.array(Y_train.collect(), dtype="float32")

        # Extract X_train and Y_train
        X_val = val_processed.map(lambda x: x[0]).cache()
        X_val = np.array(X_val.collect(), dtype="float32")

        xgb = XGBRegressor()
        xgb.fit(X_train, Y_train)
        Y_pred = xgb.predict(X_val)

        pred_data = []
        for i, row in enumerate(val_rdd.collect()):
            pred_data.append([row[0], row[1], Y_pred[i]])

        return spark.parallelize(pred_data)


def hybrid_pred(preds, factor=0.5):
    wieghted_pred = factor * preds[0] + (1 - factor) * preds[1]
    return wieghted_pred


def task2_3(folder_path, test_file_name, output_file_name):
    # Initialize Spark
    conf = SparkConf().setAppName("Task 2.3: Hybrid recommendation system")
    spark = SparkContext(conf=conf).getOrCreate()
    spark.setLogLevel("ERROR")

    try:
        start_time = time.time()

        # Train the item-based collaborative recommendation system
        item_based = ItemBasedCF()
        item_based_pred = item_based.run(
            spark=spark, train_file_name=f"{folder_path}/yelp_train.csv", test_file_name=test_file_name
        )
        item_based_pred = item_based_pred.map(lambda x: ((x[0], x[1]), x[2])).persist()

        # Train the item-based collaborative recommendation system
        model_based = ModelBased()
        model_based_pred = model_based.run(spark=spark, folder_path=folder_path, test_file_name=test_file_name)
        model_based_pred = model_based_pred.map(lambda x: ((x[0], x[1]), x[2])).persist()

        FACTOR = 0.05222

        joined_preds = (
            item_based_pred.join(model_based_pred)
            .map(lambda x: [x[0][0], x[0][1], hybrid_pred(x[1], factor=FACTOR)])
            .cache()
        )

        save_data(joined_preds.collect(), output_file_name)

        execution_time = time.time() - start_time
        print(f"Duration: {execution_time}\n")

    finally:
        spark.stop()


# if __name__ == "__main__":
#     if len(sys.argv) != 4:
#         print("Usage: spark-submit task2_1.py <folder_path> <test_file_name> <output_file_name>")
#         sys.exit(1)

#     # Read input parameters
#     folder_path = sys.argv[1]
#     test_file_name = sys.argv[2]
#     output_file_name = sys.argv[3]

#     task2_3(folder_path, test_file_name, output_file_name)

task2_3("HW3StudentData", "HW3StudentData/yelp_val.csv", "t2_3.csv")


['x-8ZMKKNycT3782Kqf9loA', 'jgtWfJCJZty_Nctqpdtp3g', 4.339137756524084]
['0FVcoJko1kfZCrJRfssfIA', 'JVK8szNDoy9MNiYSz_MiAA', 2.6283899840360982]
['C__1BHWTGBNA5s2ZPH289g', 'h_UvnQfe1cuVICly_kIqHg', 4.217598720355673]
['zDBOdWtl2PsNY38IeoE5cQ', 'gy-HBIeJGlQHs4RRYDLuHw', 4.29041363721337]
['CMu9FmdK8xpiawJowJuGQg', '364hhL5st0LV16UcBHRJ3A', 4.311573210538574]
['aOseJnydZYD8Og00vWylqg', 'fNc1WuGwiT7RhqXUIe4S8A', 4.221355593590737]
['I8_iXLcpYHAb_xi2vShgOg', 'LR0qF0FEVsCOhYWUOiH26A', 2.781415633708503]
['renPzRDqMZpMaHiCD_e1_A', 'xTlmLL2xZZ0xhZ2J16zXQQ', 3.92118768756853]
['_VTEyUzzH92X3w-IpGaXVA', 'X-b4-QvZLENnf3yFwhpSXQ', 4.522463311630964]
['Nhk0jTP2gkU12G4LHJVR3A', 'FaHADZARwnY4yvlvpnsfGA', 4.109731298778561]
Duration: 317.4028687477112

time: 5min 19s (started: 2024-03-18 08:30:54 +00:00)


# Test

In [103]:
import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error

# Read the data
df = pd.read_csv("HW3StudentData/yelp_val.csv")
df["item_based"] = pd.read_csv("t2_1.csv")["prediction"]
df["model_based"] = pd.read_csv("t2_2.csv")["prediction"]
df = df.loc[:, ["item_based", "model_based", "stars"]]

# Initialize lists to store w and corresponding RMSE values
w_values = []
rmse_values = []

# Iterate over different values of w
for w in np.arange(0.049, 0.06, 0.00001):
    # Calculate hybrid predictions
    df["hybrid"] = (w) * df["item_based"] + (1 - w) * df["model_based"]

    # Calculate RMSE
    rmse = np.sqrt(mean_squared_error(df["stars"], df["hybrid"]))

    # Append w and RMSE values to the lists
    w_values.append(w)
    rmse_values.append(rmse)

# Create a DataFrame to store w and RMSE values
rmse_df = pd.DataFrame({"w": w_values, "RMSE": rmse_values}).sort_values(by="RMSE")

# Find the row with the minimum RMSE
best_w_row = rmse_df.loc[rmse_df['RMSE'].idxmin()]

# Extract the best w value
best_w = best_w_row['w']
best_rmse = best_w_row['RMSE']

print("Best w:", best_w)
print("Corresponding RMSE:", best_rmse)

# Print the DataFrame
rmse_df.head(10)

# w=0.05222

Best w: 0.05222000000000099
Corresponding RMSE: 0.9823398391952476


Unnamed: 0,w,RMSE
322,0.05222,0.98234
323,0.05223,0.98234
321,0.05221,0.98234
324,0.05224,0.98234
320,0.0522,0.98234
325,0.05225,0.98234
319,0.05219,0.98234
326,0.05226,0.98234
318,0.05218,0.98234
327,0.05227,0.98234


time: 4.65 s (started: 2024-03-18 07:30:47 +00:00)


In [102]:
import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error

# Read the data
df = pd.read_csv("HW3StudentData/yelp_val.csv")
df["item_based"] = pd.read_csv("t2_1.csv")["prediction"]
df["model_based"] = pd.read_csv("t2_2.csv")["prediction"]
df = df.loc[:, ["item_based", "model_based", "stars"]]

w = 0.05222
df["hybrid"] = (w) * df["item_based"] + (1 - w) * df["model_based"]

# Calculate RMSE
rmse = np.sqrt(mean_squared_error(df["stars"], df["hybrid"]))
print(rmse)

0.9823398391952476
time: 931 ms (started: 2024-03-18 07:30:17 +00:00)


In [10]:
from pyspark import SparkConf, SparkContext

# Initialize Spark
conf = SparkConf().setAppName("RDD Join Example")
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")

try:
    # Sample data
    item_based_data = [("A", "X", 1), ("B", "Y", 2), ("C", "Z", 3)]
    model_based_data = [("A", "X", 10), ("B", "Y", 20), ("C", "Z", 30)]

    # Create RDDs
    item_based_rdd = sc.parallelize(item_based_data)
    model_based_rdd = sc.parallelize(model_based_data)

    # Transform RDDs into key-value pairs with composite key
    item_based_keyed = item_based_rdd.map(lambda x: ((x[0], x[1]), x[2]))
    model_based_keyed = model_based_rdd.map(lambda x: ((x[0], x[1]), x[2]))

    # Join RDDs based on composite keys
    joined_rdd = item_based_keyed.join(model_based_keyed)

    # Output the joined data
    for i in joined_rdd.collect():
        print(i)

finally:
    # Stop Spark
    sc.stop()


(('C', 'Z'), (3, 30))
(('A', 'X'), (1, 10))
(('B', 'Y'), (2, 20))
time: 5.42 s (started: 2024-03-18 08:20:43 +00:00)


# THE END