# Yelp Business Recommendation by PySpark and Cosine Similiarity 🤤😋

![Yelp Logo from Wikipedia](./images/Yelp_Logo.svg)

This project is to create a recommendation engine by leveraging PySpark and Yelp's Open Dataset.

Here is the link to the dataset

[Yelp data](https://www.yelp.com/dataset)

[Yelp documentation for the data](https://www.yelp.com/dataset/documentation/main)

## Step 1 Data Preprocessing

The data is in JSON format.
we should first load the data into python dictionary, and get only the fields of interests

In [1]:
# Import the libraries needed
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
from pyspark.mllib.linalg import Vectors
from pyspark.sql.functions import col, when
import json

### 1-1.) Loading the Data.

In Apache Spark, an **RDD (Resilient Distributed Dataset)** is a fundamental data structure that represents an immutable, distributed collection of objects that can be processed in parallel. RDDs are designed to handle large-scale data processing tasks, offering fault tolerance and data parallelism. They are a key component of Spark and underlie its ability to perform fast, distributed computations.

According to the official [Spark Documentation](https://spark.apache.org/docs/latest/rdd-programming-guide.html), a rule of thumb of the number of partition is 2 to 4 per CPUs.

Here we are going to not specify the number of partitions since Spark automatically decide on the number of partitions.

In [2]:
# Create a SparkSession
# Here, "local[*]" means to utilize all the cpu available.
spark = SparkSession.builder.appName("YelpRecommendation").master("local[*]").getOrCreate()

sc = spark.sparkContext

sc.setLogLevel("Error")

review_rdd = sc.textFile('data/review.json')

24/04/23 18:09:39 WARN Utils: Your hostname, Haeins-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.0.0.2 instead (on interface en0)
24/04/23 18:09:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/23 18:09:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### 1-2.) Select the data we are only interested in!

First, Let's convert the json into the python dictionaries.

And then, we are going to used `user_id`, `business_id`, `stars`, and `date`.

Of course we can utilize more columns from the dataset, but let's use the ratings(stars) for this project.

In the future, I could expand this project by utilizing the natural language reviews for the user-business pairs!

In [3]:
# JSON String into Dictionaries!

def json_to_dict(iter):
    for x in iter:
        yield json.loads(x)

review_rdd = review_rdd.mapPartitions(json_to_dict)

In [4]:
# Select only the desired key values
def target_columns(iter):
    for x in iter:
        yield {
            'user_id':x['user_id'],
            'business_id':x['business_id'],
            'stars':x['stars'],
            'date':int(x['date'].replace('-','').replace(' ', '').replace(':',''))  # Convert the dates into numerical format for the future deduplication
        }

review_rdd = review_rdd.mapPartitions(target_columns)

### 1-3.) Deduplicate the the records by user, business pair by the date

We want to deduplicate the records of the users' review for businesses to the lateset records possible.

We do not want a customer's old review on a specific business to affect the recommendation!

In [5]:
def user_business_count(iter):
    for x in iter:
        yield ((x['user_id'], x['business_id']), 1)

review_rdd_count = review_rdd.mapPartitions(user_business_count)

count_by_key = review_rdd_count.reduceByKey(lambda count1, count2: count1 + count2)  # We use reduce because it is more efficient than groupby

top_5 = count_by_key.takeOrdered(5, key=lambda x: -x[1])

for key, count in top_5:
    print(f"{key}: {count}")



('JlQ-9fc61X9lbzZN6ZjORQ', 'J3H6VSIgUTlACkb_HPFA8w'): 62
('Dl05etq6qzMf5NOskjb7Ag', 'f8IMQgRwo-8GP372MElDGQ'): 38
('4d13xAX2jp2EbGF8I9eZZw', 'K_-l_AhoGQvYDeGNgm52qQ'): 36
('U6MkHGNWKMVWR8TO31QzeA', 'qNYLt8zizn--wEpT1KILfw'): 32
('xq1zRJz5VHgEKiCaI51_Gw', 'z8fuQ6XYwfptZy-Tm1l9aQ'): 31


                                                                                

From above code, we see that there are some extreme cases where the same user leaves multiple comments to the same business over a course of time.

Therefore, we are going to deduplicate them!

In [6]:
# First, Let's make the date to be the key of the values
def user_business_to_key(iter):
    for x in iter:
        yield ((x['user_id'], x['business_id']), x['date'])  # Here, note that the user, business pair is the key

def all_to_key(iter):
    for x in iter:
        yield ((x[0][0], x[0][1], x[1]), -1)  # ((user_id, business_id, date), -1)

review_rdd_date = review_rdd.mapPartitions(user_business_to_key)
review_rdd_date = review_rdd_date.reduceByKey(max)  # Deduplicate by the most recent date
review_rdd_date = review_rdd_date.mapPartitions(all_to_key)

In [7]:
def user_business_date_to_key(iter):
    for x in iter:
        yield ((x['user_id'], x['business_id'], x['date']), x['stars'])

review_rdd = review_rdd.mapPartitions(user_business_date_to_key)

In [8]:
recent_review_rdd = review_rdd_date.leftOuterJoin(review_rdd)

Deduplicated Data:

```
((user_id, bussiness_id, date), (-1, rating))
```

In [9]:
# Format back to the dictionary of user_if, business_id, and stars
def back_to_dict(iter):
    for x in iter:
        yield dict(user_id=x[0][0], business_id=x[0][1], stars=x[1][1])

recent_review_rdd = recent_review_rdd.mapPartitions(back_to_dict)

In [10]:
# Here, We filter out the business with less than 30 user reviews
def user_id_to_set(iter):
    for x in iter:
        yield (x['business_id'], set((x['user_id'],)))
        
# Here, when reduceByKey(), because the user_id is set, we will end up having unique user_ids
item_filter_rdd = recent_review_rdd.mapPartitions(user_id_to_set).reduceByKey(lambda x, y: x.union(y)).filter(lambda x: len(x[1]) >= 30)

In [11]:
def user_id_and_stars_to_list(iter):
    for x in iter:
        yield (x['business_id'], [x['user_id'], x['stars']])

recent_review_rdd = recent_review_rdd.mapPartitions(user_id_and_stars_to_list)

Now, a record from each RDDs look as the following:

```
item_filter_rdd = (business_id, (user_id1, user_id2, ..., user_idn))

recent_review_rdd = (business_id, [user_id, star])
```

In [12]:
# Filtering all the items with less than 30 user reviews
thirty_plus_review_rdd = item_filter_rdd.leftOuterJoin(recent_review_rdd)

After the `leftOuterJoin`, a record from the result RDD looks as below

```
(business_id, ((user_id1, user_id2, ...), [user_id, stars]))
```

In [13]:
def back_to_dict(iter):
    for x in iter:
        yield {'user_id': x[1][1][0], 'business_id':x[0], 'stars':x[1][1][1]}

thirty_plus_review_rdd = thirty_plus_review_rdd.mapPartitions(back_to_dict)

In [14]:
# Filter Users with at least 5 distinct items

def business_id_to_set(iter):
    for x in iter:
        yield (x['user_id'], set((x['business_id'],)))
        
def user_id_and_1_set(iter):
    for x in iter:
        yield (x[0], 1)

user_filter_rdd = thirty_plus_review_rdd.mapPartitions(business_id_to_set).reduceByKey(lambda x, y: x.union(y)).filter(lambda x: len(x[1]) >= 5).mapPartitions(user_id_and_1_set)

In [15]:
def business_id_and_stars_to_list(iter):
    for x in iter:
        yield (x['user_id'], [x['business_id'], x['stars']])

thirty_plus_review_rdd = thirty_plus_review_rdd.mapPartitions(business_id_and_stars_to_list)

In [16]:
user_with_5plus_reviews_rdd = user_filter_rdd.leftOuterJoin(thirty_plus_review_rdd)

In [17]:
def back_to_dict(iter):
    for x in iter:
        yield {'user_id':x[0], 'business_id':x[1][1][0], 'stars':x[1][1][1]}

user_with_5plus_reviews_rdd = user_with_5plus_reviews_rdd.mapPartitions(back_to_dict)

## Step 2) User-Item Matrix

This is a matrix of columns being all the users existing in the dataset, rows being businesses, and each entry being the ratings.

For example, let's say there business `A` and `B`, and the data looks like below:

|Business|U1|U2|U3|
|-----|--|--|--|
|A|3|0|1|
|B|0|0|5|

The above user-item matrix tells us that the business `A` is rated by `U1` and `U3`, but not by `U2`.

For business `B`, only `U3` left a review.

User `U2` did not leave any reviews on businesses

First, we are going to create the user-item matrix!


In [18]:
# tranform into a shape that we can work on the user-item matrix
def user_id_and_bussiness_id_to_key(iter):
    for x in iter:
        yield ((x['user_id'], x['business_id']), x['stars'])

user_business_rating_rdd = user_with_5plus_reviews_rdd.mapPartitions(user_id_and_bussiness_id_to_key)

In [19]:
user_business_rating_rdd.take(3)

                                                                                

[(('MI3wG3R12sQuTGdNvBBX3A', 'LwQB9H3jZ9wTk24Lr-AnZQ'), 5.0),
 (('MI3wG3R12sQuTGdNvBBX3A', 'FP-YvQwpYXNPDy-X4AZK9A'), 5.0),
 (('MI3wG3R12sQuTGdNvBBX3A', 'H16oQAa3G8o3mcqzmGT9Gg'), 4.0)]

In [20]:
def collect_business_ids(iter):
    for x in iter:
        yield (x[0][1], 1)

def only_business_id(iter):
    for x in iter:
        yield x[0]

def collect_user_ids(iter):
    for x in iter:
        yield (x[0][0], 1)

def only_user_id(iter):
    for x in iter:
        yield x[0]

# Distinct business ids
business_id_rdd = user_business_rating_rdd.mapPartitions(collect_business_ids).reduceByKey(lambda x, y: x).mapPartitions(only_business_id)

# Distinct user ids
user_id_rdd = user_business_rating_rdd.mapPartitions(collect_user_ids).reduceByKey(lambda x, y: x).mapPartitions(only_user_id)


business_ids = business_id_rdd.collect()
user_ids = user_id_rdd.collect()

business_index = sc.broadcast({v: k for k, v in enumerate(business_ids)})
user_index = sc.broadcast({v: k for k, v in enumerate(user_ids)})

# top_5 = business_id.takeOrdered(5, key=lambda x: -x[1])

# for k, v in top_5:
#     print(f"{k}: {v}")

                                                                                

In [21]:
# Here, we are creating records that has indices rather than the actual user and business names
# We are doing this to utilize PySpark's Indexed rows and Indexed row matrix
def create_user_business_star_rdd(iter):
    for x in iter:
        yield (user_index.value[x[0][0]], business_index.value[x[0][1]], x[1])

user_business_star_rdd = user_business_rating_rdd.mapPartitions(create_user_business_star_rdd)

In [22]:
all_users_business_rdd = user_business_star_rdd.map(lambda x: (x[0], [(x[1], x[2])])).reduceByKey(lambda x, y: x + y).map(lambda x: {x[0]: x[1]})

In [23]:
all_users_business = sc.broadcast(all_users_business_rdd.collect())

                                                                                

## Step 3) Calculate the Cosine Similarity!

Now, we are going to calculate cosine similarity for each pairs of user rows of the user-item matrix.

Here we are going to utilize `IndexedRow`, and `IndexedRowMatrix` from PySpark!

In [24]:
from collections import defaultdict

def vectorize_ratings(iter):
    
    # Prepare per-partition dictionaries to collect ratings organized by numeric user index
    business_ratings = defaultdict(lambda: defaultdict(float))
    for x in iter:
        business_ratings[x[1]][x[0]] = x[2]

    # Yield IndexedRows using numeric indices and sparse vectors for ratings
    for business_idx, ratings in business_ratings.items():

        # Sort the dictionary items by keys (business indices) before creating the sparse vector
        sorted_items = sorted(ratings.items())
        indices = [item[0] for item in sorted_items]
        values = [item[1] for item in sorted_items]
        
        yield IndexedRow(business_idx, Vectors.sparse(len(user_index.value), indices, values))

indexed_row_rdd = user_business_star_rdd.mapPartitions(vectorize_ratings)

indexed_row_matrix = IndexedRowMatrix(indexed_row_rdd)

In [25]:
# Calculate the similarity of users!
similarity_matrix = indexed_row_matrix.columnSimilarities()

                                                                                

## Step 4) Recommendation for the Target Users!

Finally! now we give recommendation to the target users.



In [26]:
# Randomly selected target users

target_users = ['PomQayG1WhMxeSl1zohAUA', 'uEvusDwoSymbJJ0auR3muQ', 'q6XnQNNOEgvZaeizUgHTSw', 'n00anwqzOaR52zgMRaZLVQ', 'qOdmye8UQdqloVNE059PkQ']

Let's convert the user_id into user_idx

In [27]:
target_users_id = list()

for user_id in target_users:
    target_users_id.append(user_index.value[user_id])

target_users_id

[195902, 37701, 33240, 15189, 2773]

In [28]:
# Let's convert the similarity matrix into a dataframe
similarity_entries_df = spark.createDataFrame(similarity_matrix.entries)

# Get the businesses of the similar users
def get_businesses_of_similar_users(iter):
    for x in iter:
        yield x[1]

# get the top five similar users!
def get_top_similar_users(target_user_id, num_similar_users=3):
    # Filter rows where the current user_id appears in either the 'i' or 'j' columns
    return similarity_entries_df.filter(
        (col("i") == target_user_id) | (col("j") == target_user_id)
    ).withColumn(
        "other_user_id",
        when(col("i") == target_user_id, col("j")).otherwise(col("i"))
    ).withColumn(
        "similarity_score",
        col("value")  # Directly use the 'value' column which contains similarity scores
    ).orderBy(
        col("value").desc()  # Order by the similarity score in descending order
    ).limit(
        num_similar_users  # Limit to the top 'num_similar_users' similar users
    ).select(
        "other_user_id", "similarity_score"  # Select both the user ID and similarity score
    )


# RECOMMENDATION!!
def recommend_business(target_user_id):

    # Get the target user's dictionary
    for target in all_users_business.value:
        if target_user_id in target:
            target_user_dict = target
            break

    # Fetch the user's existing ratings from the broadcast variable
    user_data = target_user_dict.get(target_user_id, [])
    
    # Extract already rated business IDs
    already_rated_business_ids = set([business for business, _ in user_data])

    # Fetch top similar users
    top_similar_users = get_top_similar_users(target_user_id).collect()
    similar_user_ids = [row['other_user_id'] for row in top_similar_users]

    # Fetch and aggregate businesses rated above 4 by these similar users
    similar_users_businesses = user_business_star_rdd.filter(
        lambda x: x[0] in similar_user_ids and x[2] >= 4 and x[1] not in already_rated_business_ids
    ).map(lambda x: x[1]).distinct().collect()

    # Filter and recommend businesses not yet rated by the target user
    recommendations = [bus_id for bus_id in similar_users_businesses if bus_id not in already_rated_business_ids]

    return recommendations, similar_users_businesses

                                                                                

In [29]:
target_user_id = 37701  # This user ID should be from your broadcast variable
recommendations, _ = recommend_business(target_user_id)
print(f"Recommendations for user {target_user_id}: {recommendations}")



Recommendations for user 37701: [46901, 22775, 48618, 6604, 13510]


                                                                                