In [2]:
import os
from collections import OrderedDict
import numpy as np
import pandas as pd
from sklearn import preprocessing

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window

In [3]:
# Since the dataset is stored in json format and the size is pretty huge, using pandas (or dask) for data transformation will cause long waiting time for tasks to complete or even problems in memory allocation. 
# To avoid such inconvenience, we will use PySpark to deal with this dataset.

spark = SparkSession.builder.appName('yelp_dataset').getOrCreate()

reviews_sk = spark.read.json('data/yelp_academic_dataset_review.json')
business_sk = spark.read.json('data/yelp_academic_dataset_business.json')
users_sk = spark.read.json('data/yelp_academic_dataset_user.json')

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/20 21:45:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/04/20 21:45:55 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

## Data Analysis

In [5]:
print("Total number of reviews: {}".format(reviews_sk.count()))
print("Total number of business: {}".format(business_sk.count()))
print("Total number of users: {}".format(users_sk.count()))

                                                                                

Total number of reviews: 6990280
Total number of business: 150346




Total number of users: 1987897


                                                                                

In [10]:
business_sk.groupBy("city") \
    .count() \
    .orderBy(F.desc("count")) \
    .show(10, truncate=False)

+-------------+-----+
|city         |count|
+-------------+-----+
|Philadelphia |14569|
|Tucson       |9250 |
|Tampa        |9050 |
|Indianapolis |7540 |
|Nashville    |6971 |
|New Orleans  |6209 |
|Reno         |5935 |
|Edmonton     |5054 |
|Saint Louis  |4827 |
|Santa Barbara|3829 |
+-------------+-----+
only showing top 10 rows



In [11]:
business_sk.select(F.explode(F.split("categories", ", ")).alias("category")) \
    .groupBy("category") \
    .count() \
    .orderBy(F.desc("count")) \
    .show(10, truncate=False)

+----------------+-----+
|category        |count|
+----------------+-----+
|Restaurants     |52268|
|Food            |27781|
|Shopping        |24395|
|Home Services   |14356|
|Beauty & Spas   |14292|
|Nightlife       |12281|
|Health & Medical|11890|
|Local Services  |11198|
|Bars            |11065|
|Automotive      |10773|
+----------------+-----+
only showing top 10 rows



In [13]:
reviews_sk.groupBy("user_id") \
    .count() \
    .orderBy(F.desc("count")) \
    .show(10, truncate=False)



+----------------------+-----+
|user_id               |count|
+----------------------+-----+
|_BcWyKQL16ndpBdggh2kNA|3048 |
|Xw7ZjaGfr0WNVt6s_5KZfA|1840 |
|0Igx-a1wAstiBDerGxXk2A|1747 |
|-G7Zkl1wIWBBmD0KRy_sCw|1682 |
|ET8n-r7glWYqZhuR6GcdNw|1653 |
|bYENop4BuQepBjM1-BI3fA|1578 |
|1HM81n6n4iPIFU5d2Lokhw|1554 |
|fr1Hz2acAb3OaL3l6DyKNg|1447 |
|wXdbkFZsfDR7utJvbWElyA|1396 |
|Um5bfs5DH6eizgjH3xZsvg|1391 |
+----------------------+-----+
only showing top 10 rows



                                                                                

In [None]:
# Most Reviewed Businesses

top_businesses = reviews_sk.groupBy("business_id") \
    .count() \
    .orderBy(F.desc("count")) \
    .limit(10)

top_businesses.join(business_sk, on="business_id") \
    .select("name", "city", "count", "stars") \
    .show(truncate=False)

                                                                                

+----------------------------------+------------+-----+-----+
|name                              |city        |count|stars|
+----------------------------------+------------+-----+-----+
|Luke                              |New Orleans |4661 |4.0  |
|Royal House                       |New Orleans |5146 |4.0  |
|Commander's Palace                |New Orleans |4969 |4.5  |
|Hattie B’s Hot Chicken - Nashville|Nashville   |6160 |4.5  |
|Cochon                            |New Orleans |4480 |4.0  |
|Mother's Restaurant               |New Orleans |5254 |3.5  |
|Oceana Grill                      |New Orleans |7516 |4.0  |
|Acme Oyster House                 |New Orleans |7673 |4.0  |
|Reading Terminal Market           |Philadelphia|5778 |4.5  |
|Ruby Slipper - New Orleans        |New Orleans |5264 |4.5  |
+----------------------------------+------------+-----+-----+



## Subset Data: Philadelphia, Restaurant

In [10]:
philly_restaurants = business_sk.filter(
    (F.col("city") == "Philadelphia") & 
    (F.col("categories").contains("Restaurants"))
)

philly_business_ids = philly_restaurants.select("business_id")

w = Window().partitionBy().orderBy("business_id")
philly_restaurants = philly_restaurants.withColumn('business_cid', F.row_number().over(w) - 1)

philly_restaurants.select('business_cid', 'business_id', 'name', 'stars').show(5)

25/04/20 21:56:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 21:56:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 21:56:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+------------+--------------------+--------------------+-----+
|business_cid|         business_id|                name|stars|
+------------+--------------------+--------------------+-----+
|           0|-0M0b-XhtFagyLmsB...|      Paris Wine Bar|  3.5|
|           1|-0PN_KFPtbnLQZEeb...|Mr Wong's Chinese...|  3.5|
|           2|-0TffRSXXIlBYVbb5...|IndeBlue Modern I...|  4.5|
|           3|-0eUa8TsXFFy0FCxH...|Waterfront Gourme...|  4.0|
|           4|-1B9pP_CrRBJYPICE...|            Spice 28|  4.0|
+------------+--------------------+--------------------+-----+
only showing top 5 rows



In [11]:
philly_reviews = reviews_sk.join(philly_restaurants.select('business_id', 'business_cid'), 'business_id', 'inner')

w = Window().partitionBy().orderBy("review_id")
philly_reviews = philly_reviews.withColumn('review_cid', F.row_number().over(w) - 1)

philly_reviews.select('review_cid', 'review_id', 'business_cid', 'business_id', 'user_id', 'stars').show(5)

25/04/20 21:56:29 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 21:56:29 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 21:56:29 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 21:56:29 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 21:56:29 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 21:56:29 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 2

+----------+--------------------+------------+--------------------+--------------------+-----+
|review_cid|           review_id|business_cid|         business_id|             user_id|stars|
+----------+--------------------+------------+--------------------+--------------------+-----+
|         0|---aZVSWnQtOP_12Q...|        2655|SCjUeZfjafJ88fWWQ...|Zr2x9lE0thdxVXvqB...|  5.0|
|         1|---xLOb2_Yx8jwiRx...|        5454|vXb4OWsjPoiBtmarf...|QI2E64QGfIDStq-15...|  5.0|
|         2|--0hiTWOfs9K0FWHT...|         715|6ajnOk0GcY9xbb5Oc...|SpESh996ufyMVjsMd...|  5.0|
|         3|--12ixFsvpQPZl3FR...|        1798|IkY2ticzHEn4QFn8h...|bUPqFvBLkO7wMYfxW...|  1.0|
|         4|--14yHFQP5zSaQM6W...|        3301|ZNWox30LIv7hV5k5Y...|PZwKuOn8oCwQZjmbG...|  5.0|
+----------+--------------------+------------+--------------------+--------------------+-----+
only showing top 5 rows



                                                                                

In [6]:
# Include any user who has at least one review of a Philadelphia restaurant
philly_user_ids = philly_reviews.select("user_id").distinct()
philly_users = users_sk.join(philly_user_ids, on="user_id", how="inner")

w = Window().partitionBy().orderBy("user_id")
philly_users = philly_users.withColumn('user_cid', F.row_number().over(w) - 1)

philly_users.select(['user_cid', 'user_id']).show(5)

25/04/20 21:52:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 21:52:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 21:52:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 21:52:40 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 21:52:40 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 21:52:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 2

+--------+--------------------+
|user_cid|             user_id|
+--------+--------------------+
|       0|---r61b7EpVPkb4UV...|
|       1|--0kuuLmuYBe3Rmu0...|
|       2|--13zE3NaRvLSrmfT...|
|       3|--2tyArRmSoyKx5r-...|
|       4|--2vR0DIsmQ6WfcSz...|
+--------+--------------------+
only showing top 5 rows



                                                                                

In [7]:
# Count of Philadelphia restaurants
philly_restaurants_count = philly_restaurants.count()
print(f"Philadelphia Restaurants: {philly_restaurants_count}")

# Count of reviews for those restaurants
philly_reviews_count = philly_reviews.count()
print(f"Reviews for Philadelphia Restaurants: {philly_reviews_count}")

# Count of distinct users who reviewed those restaurants
philly_users_count = philly_users.count()
print(f"Users who reviewed Philadelphia Restaurants: {philly_users_count}")

Philadelphia Restaurants: 5852


                                                                                

Reviews for Philadelphia Restaurants: 687289


[Stage 29:>                                                         (0 + 7) / 8]

Users who reviewed Philadelphia Restaurants: 209513


                                                                                

In [12]:
# New column in the review dataset for user_cid
philly_reviews = philly_reviews.join(philly_users.select('user_id', 'user_cid'), 'user_id', 'inner')
philly_reviews.count()
philly_reviews.select('review_cid', 'review_id', 'business_cid', 'business_id', 'user_cid', 'user_id', 'stars').show(5)

25/04/20 21:56:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 21:56:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 21:56:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 21:56:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 21:56:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 21:56:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 2

+----------+--------------------+------------+--------------------+--------+--------------------+-----+
|review_cid|           review_id|business_cid|         business_id|user_cid|             user_id|stars|
+----------+--------------------+------------+--------------------+--------+--------------------+-----+
|         0|---aZVSWnQtOP_12Q...|        2655|SCjUeZfjafJ88fWWQ...|  120642|Zr2x9lE0thdxVXvqB...|  5.0|
|         1|---xLOb2_Yx8jwiRx...|        5454|vXb4OWsjPoiBtmarf...|   89486|QI2E64QGfIDStq-15...|  5.0|
|         2|--0hiTWOfs9K0FWHT...|         715|6ajnOk0GcY9xbb5Oc...|   97834|SpESh996ufyMVjsMd...|  5.0|
|         3|--12ixFsvpQPZl3FR...|        1798|IkY2ticzHEn4QFn8h...|  129341|bUPqFvBLkO7wMYfxW...|  1.0|
|         4|--14yHFQP5zSaQM6W...|        3301|ZNWox30LIv7hV5k5Y...|   87124|PZwKuOn8oCwQZjmbG...|  5.0|
+----------+--------------------+------------+--------------------+--------+--------------------+-----+
only showing top 5 rows



                                                                                

## Preparing Data for Training and Testing

Normally, for this task, we would use the function traint_test_split from scikit. However, due to the distinct need of recommender system problem, in which positive items are ranked in higher priority and returned accordingly from the predictor, it is desirable to have a testing set containing only highly/positively rated items.

* Recommenders are usually evaluated on how well they retrieve or rank items the user actually liked.
* It’s common to define a “positive” interaction (e.g., rating ≥ 4) and treat all others (or missing items) as implicit negatives.
* During evaluation, you want to check whether the model can rank those positive items higher than random/unseen items.

So in testing:
* You keep only positive items (e.g., 4- or 5-star reviews),
* Then check if your model ranks them higher than sampled negatives (e.g., items the user didn’t rate or rated poorly).

In [14]:
review_counts = philly_reviews.groupby('user_cid').count().toPandas()
review_counts = review_counts.rename(columns={'count': 'review_count'})

bins = [0, 1, 2, 3, 4, 5, 10, 20, 50, 100, 200, 500, 1000]
groups = pd.cut(review_counts['review_count'], bins)
review_counts = review_counts.groupby(groups)['review_count'].agg(['count'])
review_counts = review_counts.rename(columns={'count': 'user_count'})

review_counts

25/04/20 22:05:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 22:05:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 22:05:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 22:05:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 22:05:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 22:05:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 2

Unnamed: 0_level_0,user_count
review_count,Unnamed: 1_level_1
"(0, 1]",125446
"(1, 2]",33421
"(2, 3]",15147
"(3, 4]",8612
"(4, 5]",5509
"(5, 10]",11513
"(10, 20]",5562
"(20, 50]",3009
"(50, 100]",879
"(100, 200]",335


As we can see, the majority of users only contribute one or two reviews. Surprisingly, we have 4 hyper-active user who has 500-1000 reviews under his/her arm.

Based on this result, we will split the original dataset into training and testing sets as follows:

* All reviews from those users, who have lesser than ten reviews, will be added to the training set, but not to the testing set.
* For each active user, who have written more than ten reviews, 20% of positive reviews will be moved to the testing set, while the rest will be comprehended to the training set.

In [15]:
df = philly_reviews.select(['user_cid', 'business_cid', 'review_cid', 'stars']).toPandas()
df.shape

25/04/20 22:07:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 22:07:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 22:07:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 22:07:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 22:07:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 22:07:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 2

(687289, 4)

In [None]:
high_reviews = df[df['stars'] >= 4.].groupby('user_cid')['business_cid'] \
                                    .count() \
                                    .reset_index() \
                                    .rename(columns={'business_cid': 'review_count'})

conditions = (df['user_cid'].isin(high_reviews[high_reviews['review_count'] > 10]['user_cid'])) & (df['stars'] >= 4)

def pick_top_reviews(x):
    factor = 0.20
    nitems = int(np.floor(factor * x.shape[0]))
    return x.head(nitems)

df_test = df[conditions].groupby('user_cid').apply(lambda x: pick_top_reviews(x)).reset_index(drop=True)
df_test.shape

  df_test = df[conditions].groupby('user_cid').apply(lambda x: pick_top_reviews(x)).reset_index(drop=True)


(33915, 4)

In [17]:
df_train = df[~df['review_cid'].isin(df_test['review_cid'])]
df_train.shape

(653374, 4)

In [20]:
unique_stars = df_train['stars'].unique()
print(f"Unique stars in training set: {unique_stars}")

# Rescale the columns stars in both sets, making values between 0 and 1
df_train['stars_rescale'] = (df_train['stars'] - 1.) / 4.
df_test['stars_rescale'] = (df_test['stars'] - 1.) / 4.

df_train.head()

Unique stars in training set: [5. 1. 2. 3. 4.]


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_train['stars_rescale'] = (df_train['stars'] - 1.) / 4.


Unnamed: 0,user_cid,business_cid,review_cid,stars,stars_rescale
0,120642,2655,0,5.0,1.0
1,89486,5454,1,5.0,1.0
2,97834,715,2,5.0,1.0
3,129341,1798,3,1.0,0.0
4,87124,3301,4,5.0,1.0


In [None]:
# Create a list of unique users and restaurants for embedding

n_users = philly_users.count()
n_rests = philly_restaurants.count()
list_unique_users = philly_users.select(['user_cid']).rdd.flatMap(lambda x: x).collect()
list_unique_rests = philly_restaurants.select(['business_cid']).rdd.flatMap(lambda x: x).collect()

print(f"Number of unique users: {n_users}")
print(f"Number of unique restaurants: {n_rests}")
print(list_unique_users[:5])
print(list_unique_rests[:5])


25/04/20 22:29:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 22:29:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 22:29:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 22:29:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 22:29:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 22:29:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 2

Number of unique users: 209513
Number of unique restaurants: 5852
[0, 1, 2, 3, 4]
[0, 1, 2, 3, 4]


25/04/20 22:29:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 22:29:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 22:29:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 22:29:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/20 22:29:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
