# Yelp Recommender

## Intro

The purpose of this exercise is to use Spark in a real dataset, instead of just a toy example.

You will use the data from the [Yelp Dataset Challenge](https://www.yelp.de/dataset_challenge), which contains information about businesses, users, reviews and more.

For this exercise, you will need to focus only on the following files:
- yelp_academic_dataset_business.json
- yelp_academic_dataset_review.json

The goal is to build a recommender using [Spark's ALS (Alternating Least Squares)](https://spark.apache.org/docs/2.3.0/ml-collaborative-filtering.html) and then generate recommendations for a given user.

Since the dataset is quite big, you should pick a business category (e.g. Restaurants) and a city (e.g. Edinburgh) and work on the recommender using only this subset of the data.

Please take some time to:
- find out what information you will need to feed as input to Spark's ALS
- check how this information is available in the dataset
- plan how you will tackle this problem

In [4]:
# Download a small version of the Yelp dataset
#!wget https://s3.us-west-2.amazonaws.com/dsr-spark-appliedml/yelp_dataset_small.tar.gz
#!tar -xvzf yelp_dataset_small.tar.gz

In [5]:
from pyspark import SparkContext, SQLContext
#sc = SparkContext('local[*]')
sqlc = SQLContext(sc)

## Business Data

- Load the file ***yelp_academic_dataset_business.json*** and select the following columns:
    - business_id
    - name
    - city
    - stars
    - categories
    - address

In [6]:
df_business = sqlc.read.json('yelp_academic_dataset_business.json')
df_business = df_business.select('business_id',
                                 'name',
                                 'city',
                                 'stars',
                                 'categories',
                                 'address')

In [7]:
df_business.take(1)

[Row(business_id='0DI8Dt2PJp07XkVvIElIcQ', name='Innovative Vapors', city='Tempe', stars=4.5, categories=['Tobacco Shops', 'Nightlife', 'Vape Shops', 'Shopping'], address='227 E Baseline Rd, Ste J2')]

### Choosing a business category

- Define a regular Python function that takes a list of categories and returns 1 if a category of your choice (for instance, 'Restaurants') is contained in the list of categories or 0 otherwise
- Using the Python function, define a Spark's User Defined Function (UDF) with an IntegerType return
- Using the UDF, filter the businesses that belong to the category you chose

In [8]:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import IntegerType

def is_category_listed(name, categories):
    listed = 0
    if categories is not None:
        if name in categories:
            listed = 1
    return listed

def is_restaurant(categories):
    return is_category_listed('Restaurants', categories)

In [9]:
udf_is_restaurant = UserDefinedFunction(is_restaurant, 
                                        IntegerType())

df_restaurants = df_business.withColumn('is_restaurant', udf_is_restaurant('categories')) \
                            .filter('is_restaurant == 1') \
                            .drop('is_restaurant')

In [10]:
df_restaurants.take(1)

[Row(business_id='EDqCEAGXVGCH4FJXgqtjqg', name='Pizza Pizza', city='Toronto', stars=2.5, categories=['Restaurants', 'Pizza', 'Chicken Wings', 'Italian'], address='979 Bloor Street W')]

- The UDF approach works just fine, but there is a more straightforward way to perform the same operation
    - hint: look at ***array_contains*** SQL function

In [11]:
import pyspark.sql.functions as F

# you can overwrite the former df_restaurants
df_restaurants = (df_business
                  .filter(F.array_contains(F.col('categories'),
                                           'Restaurants')))

### Choosing a city
- Having filtered by the business category, now it is time to filter by the city (for instance, Edinburgh)

In [12]:
df_city_restaurants = df_restaurants.filter('city = "Edinburgh"')

### Generating numeric IDs
- If you haven't done it yet, take one sample from your already filtered DataFrame and notice that the ***business_id*** contains an alphanumeric value - this is not good for Spark's ALS implementation, which requires IDs for items (in our case, businesses) and users to be numeric
- Use a ***StringIndexer*** to create a new column ***business_idn*** from the conversion of business_id into a numeric value

In [13]:
from pyspark.ml.feature import StringIndexer
business_indexer = StringIndexer().setInputCol('business_id') \
                                  .setOutputCol('business_idn')
    
business_idx_model = business_indexer.fit(df_city_restaurants)
df_city_restaurants = business_idx_model.transform(df_city_restaurants)

In [14]:
df_city_restaurants.take(1)

[Row(business_id='NsarUMMMPOlMBb6K04x6hw', name='Juice Almighty', city='Edinburgh', stars=4.5, categories=['Food', 'Fast Food', 'Restaurants', 'Juice Bars & Smoothies'], address='7A Castle Street, Corstorphine', business_idn=24.0)]

In [15]:
df_city_restaurants.cache()

DataFrame[business_id: string, name: string, city: string, stars: double, categories: array<string>, address: string, business_idn: double]

## Review Data

- Load the file ***yelp_academic_dataset_review.json*** and select the following columns:
    - user_id
    - business-id
    - stars
    - date

In [16]:
df_reviews = sqlc.read.json('yelp_academic_dataset_review.json')
df_reviews = df_reviews.select('user_id',
                               'business_id',
                               'stars',
                               'date')

### Keeping reviews for the chosen city only

- You are only interested in reviews of businesses you kept after filtering for category and city - how to filter out everything else? (hint: take a look at the ***join*** operation of DataFrames)

In [17]:
df_city_reviews = (df_reviews
                   .join(df_city_restaurants
                         .select('business_id', 'business_idn'),
                         on='business_id'))

### Generating numeric IDs

- As it happened with the ***business_id***, you also need to convert ***user_id*** into a numeric value - once again, use a ***StringIndexer*** to create a new column named ***user_idn*** containing the result of the conversion

In [18]:
user_indexer = StringIndexer().setInputCol('user_id') \
                              .setOutputCol('user_idn').setHandleInvalid('keep')

user_idx_model = user_indexer.fit(df_city_reviews)
df_city_reviews = user_idx_model.transform(df_city_reviews)

In [19]:
df_city_reviews.cache()

DataFrame[business_id: string, user_id: string, stars: bigint, date: string, business_idn: double, user_idn: double]

### Adding a sequential number to the user's reviews

- Now add a ***sequential number*** to the user's reviews, that is, for each user, order his/her reviews by date (multiple reviews on the same date can be randomly ordered) and number them (hint: check ***window functions***)
- This sequential number will be useful later to perform a time-wise split of the dataset

In [20]:
from pyspark.sql import functions as F
from pyspark.sql import Window

w = Window().partitionBy('user_idn').orderBy('date')
df_city_reviews = (df_city_reviews
                   .withColumn('review_seq', F.row_number().over(w)))

### Subsetting reviews to keep only users with more than 4 reviews

- Some users had rated only 1 or a few businesses - this would pose as a problem to make recommendations - so you would want to keep only users who had rated more than 4 reviews, for instance
- Find the ***total number of reviews*** for each user and then filter them using this information (hint: again, you can use a ***window function***)

In [21]:
w = Window().partitionBy('user_idn')
df_city_reviews = df_city_reviews.withColumn('n_reviews', F.max('review_seq').over(w))
df_selected = df_city_reviews.filter('n_reviews > 4')

In [22]:
df_selected.cache()

DataFrame[business_id: string, user_id: string, stars: bigint, date: string, business_idn: double, user_idn: double, review_seq: int, n_reviews: int]

### Calculating mean rating by user

- Now you can calculate the mean rating by user and make it into a dictionary where the key is the ***user_id*** (hint: look at ***rdd*** method of DataFrames and ***collectAsMap*** method of RDDs)

In [23]:
dict_user_means = df_selected.select('user_id', 'stars') \
                             .groupby('user_id') \
                             .mean() \
                             .rdd \
                             .collectAsMap()        

### Centering rating by user

- The dictionary containing mean ratings by user can be seen as a ***lookup table*** - what is the appropriate way of dealing with those in Spark?
- Once you have figured this out, define a regular Python function that takes two arguments - ***user_id*** (String) and ***rating*** (String, which you will need to convert to float inside the function) - and returns the result of subtracting the mean rating of the user from the rating parameter
- Using the Python function, define a Spark's User Defined Function (UDF) with a DoubleType return
- Using the UDF, create a column in your DataFrame with the centered ratings

In [24]:
from pyspark.sql.types import DoubleType
lookup_user_means = sc.broadcast(dict_user_means)

def zero_mean(user_id, rating):
    return (float(rating) - lookup_user_means.value.get(user_id))

udf_zero_mean = UserDefinedFunction(zero_mean , DoubleType())

df_centered = df_selected.withColumn('centered', udf_zero_mean('user_id','stars')) \
                .drop('stars') \
                .withColumnRenamed('centered', 'stars')

- Once again, the UDF approach is not the most "Sparkonic" way of handling this - can you perform the same operation using only functions from ***pyspark.sql.functions*** (which was imported earlier as F)?
    - hint: you'll need ***Window functions***

In [25]:
# you can overwrite df_centered
w = Window.partitionBy('user_id')
df_centered = (df_selected
               .withColumn('avg_stars', F.avg('stars').over(w))
               .withColumn('stars', F.expr('stars - avg_stars'))
               .drop('avg_stars'))

In [26]:
from pyspark.sql.functions import pandas_udf, PandasUDFType

df_selected = df_selected.withColumn('stars', F.col('stars').astype('float'))

@pandas_udf(df_selected.schema, PandasUDFType.GROUPED_MAP)
# Input/output are both a pandas.DataFrame
def pandas_subtract_mean(pdf):
    return pdf.assign(stars=pdf.stars - pdf.stars.mean())

df_centered = df_selected.groupby('user_id').apply(pandas_subtract_mean)

## Dataset

### Splitting into training and test sets by time

- In recommender systems, it is common practice to do the training/test split timewise, that is, the test set is composed of the latest reviews
- First, filter only those reviews which have a sequential number smaller than the ***total number of reviews***, by user: this is your training set
- Then, filter only those reviews which have a sequential number identical to the ***total number of reviews***, by user: this is your test set
- Now you can see why you had to add a sequential number to the user's reiews - since some users had done all his/her reviews on the same day, you need to disambiguate them to split the dataset. By doing this, you guarantee your test set will have only 1 review for each user.

In [27]:
df_training = df_centered.filter('review_seq < n_reviews')
df_test = df_centered.filter('review_seq = n_reviews')

### If using Spark 2.1 (as in the Docker image), you need to filter out "new" businesses in the test set

In [28]:
#businesses = df_training.select('business_id').distinct()
#df_test = df_test.join(businesses, on='business_id')

## Alternate Least Squares (ALS) Model

- This is the recommender itself - the ALS uses a iterative approach to find the underlying factors that yield the user/item rating matrix
- It takes as input a DataFrame with three columns, representing:
    - userCol: user IDs (numeric - remember the conversion you did)
    - itemCol: item IDs (numeric - remember the conversion you did)
    - ratingCol: rating (numeric, obviously)
    - coldStartStrategy: "drop" (if there is unseen data on the test set, meaning a new user/business, drop it) - ***only available from Spark 2.2 on***
- Its parameters are:
    - rank: the number of factors to consider
    - maxIter: the maximum number of iterations to perform
    - regParam: the regularization parameter
- Use Spark's ALS to fit a model based on your DataFrame

In [29]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

params = {'rank': 10, 
          'maxIter': 10, 
          'regParam': 0.3}

als = ALS(userCol="user_idn",
          itemCol="business_idn", 
          ratingCol="stars", 
          coldStartStrategy="drop",
          seed=42).setParams(**params)

model = als.fit(df_training)

### Predictions for the training set

- Once the model is trained, make predictions for the training set and use a ***RegressionEvaluator*** to find out the RMSE of the predictions

In [30]:
predictions = model.transform(df_training)

evaluator = RegressionEvaluator(metricName="rmse",
                                labelCol="stars",
                                predictionCol="prediction")

train_rmse = evaluator.evaluate(predictions)

print(train_rmse)

0.7114699825165288


### Predictions for the test set

- Now, make predictions for the test set and use a ***RegressionEvaluator*** to find out the RMSE of the predictions

In [31]:
predictions = model.transform(df_test)

evaluator = RegressionEvaluator(metricName="rmse",
                                labelCol="stars",
                                predictionCol="prediction")

test_rmse = evaluator.evaluate(predictions)

print(test_rmse)

0.9260549386555614


## Recommendations

Now, your model is trained, but how can you use it to make recommendations for a given user?

### Organizing business data

- It would not make sense to recommend a place the user has already rated, right? So, generate a dictionary where ***user_idn*** is the key and a list of the already rated ***business_idn*** is the value (hint: when aggregating DataFrames, ***collect_list*** is a VERY useful function to turn multiple records into a list)

In [32]:
from pyspark.sql.functions import collect_list

dict_visited_by_user = df_city_reviews.select('user_idn', 'business_idn') \
                                      .groupby('user_idn') \
                                      .agg(collect_list('business_idn')) \
                                      .rdd \
                                      .collectAsMap()

- Besides, recommending a given business_id also does not help much, right? So you need to organize the business data in a way it can be shown to the user.
    - Define a regular Python function that takes one argument ***row*** (Row type) and returns a dictionary where ***business_idn*** is the key and the value is yet another dictionary with relevant fields (for instance: name, address, stars, categories)
    - Transform your business DataFrame into an RDD and apply the function you defined - upon collecting, you will end up with a list of dictionaries
    - Transform this list of dictionaries into a single dictionary

In [33]:
def rest_to_json(row):
    return {row.business_idn: {'name': row.name, 
                               'address': row.address,
                               'stars': row.stars, 
                               'categories': row.categories}}

rest = df_city_restaurants.select('business_idn',
                                  'name',
                                  'address',
                                  'stars',
                                  'categories') \
                          .rdd \
                          .map(rest_to_json) \
                          .collect()

dict_rest = {k: v for d in rest for k, v in d.items()}

### Making recommendations for a user

- To actually make the recommendations, we need to build an input DataFrame to feed the model
    - A DataFrame can be created using the SQL Context and a list of Rows, each containg two columns: user_idn and business_idn - the rating will be computed by the model
    - But you only need to have rows for the businesses which were not yet rated by the user - from all businesses, exclude the ones already rated by him/her

In [34]:
from pyspark.sql import Row
from pyspark.sql.functions import desc

user_idn = 317
n_business = len(dict_rest)

visited = dict_visited_by_user[user_idn]
not_visited = list(set(range(n_business)).difference(set(visited)))

rows_user = [Row(user_idn=user_idn, 
                 business_idn=float(i)) for i in not_visited]

df_test_user = sqlc.createDataFrame(rows_user)

- Now, you can use the generated DataFrame to make predictions
    - If there are any NA predictions, make sure to turn them into a really bad value (for instance, -5.0) (hint: remember ***na*** method of DataFrames)
- Order the predictions and take the ***business_idn*** of the top 5
- Finally, use this information to fetch the business data from the dictionary you assembled a couple of steps ago

In [35]:
predictions = model.transform(df_test_user).na.fill(-5.0)

top_predictions = predictions.orderBy(desc('prediction')) \
                             .select('business_idn') \
                             .rdd \
                             .map(lambda row: row.business_idn) \
                             .take(5)

response = list(map(lambda idn: dict_rest[idn], top_predictions))

In [36]:
response

[{'address': '112 St Stephen St',
  'categories': ['Restaurants', 'Chinese'],
  'name': "Karen's Unicorn",
  'stars': 5.0},
 {'address': 'Ogilivie Terrace, Harrison Park',
  'categories': ['Food', 'Cafes', 'Restaurants'],
  'name': 'Zazou Cruises',
  'stars': 4.5},
 {'address': '36 Broughton Street',
  'categories': ['Nightlife', 'Restaurants', 'British', 'Bars'],
  'name': 'Seasons Restaurant & Bar',
  'stars': 5.0},
 {'address': '54 Shore, Leith',
  'categories': ['French', 'Restaurants', 'British'],
  'name': 'Martin Wishart',
  'stars': 5.0},
 {'address': '125 Comiston Road, Morningside',
  'categories': ['Coffee & Tea',
   'Restaurants',
   'Patisserie/Cake Shop',
   'Cafes',
   'Food'],
  'name': 'Marie Délices',
  'stars': 4.5}]

In [37]:
user_idn = 317
rows_user = [Row(user_idn=user_idn)]
df_test_user2 = sqlc.createDataFrame(rows_user)
recommendations = model.recommendForUserSubset(df_test_user2, 5).toPandas()

In [38]:
recommendations.recommendations[0]

[Row(business_idn=301, rating=1.018609881401062),
 Row(business_idn=1239, rating=0.9607487320899963),
 Row(business_idn=1395, rating=0.9601149559020996),
 Row(business_idn=573, rating=0.9247788786888123),
 Row(business_idn=4, rating=0.91269850730896)]

In [39]:
factors = model.userFactors.filter('id == 317').toPandas()
factors.features[0]

[0.06317136436700821,
 0.0064953905530273914,
 0.32015344500541687,
 0.2631630599498749,
 0.48920193314552307,
 0.3880455195903778,
 0.07208795845508575,
 0.5377174019813538,
 0.48443400859832764,
 0.41213491559028625]

## Congratulations, you finished the exercise!

In [40]:
#sc.stop()