# Yelp Recommender

## Intro

The purpose of this exercise is to use Spark in a real dataset, instead of just a toy example.

You will use the data from the [Yelp Dataset Challenge](https://www.yelp.de/dataset_challenge), which contains information about businesses, users, reviews and more.

For this exercise, you will need to focus only on the following files:
- yelp_academic_dataset_business.json
- yelp_academic_dataset_review.json

The goal is to build a recommender using [Spark's ALS (Alternating Least Squares)](https://spark.apache.org/docs/2.3.0/ml-collaborative-filtering.html) and then generate recommendations for a given user.

Since the dataset is quite big, you should pick a business category (e.g. Restaurants) and a city (e.g. Edinburgh) and work on the recommender using only this subset of the data.

Please take some time to:
- find out what information you will need to feed as input to Spark's ALS
- check how this information is available in the dataset
- plan how you will tackle this problem

In [None]:
# Download a small version of the Yelp dataset
#!wget https://s3.us-west-2.amazonaws.com/dsr-spark-appliedml/yelp_dataset_small.tar.gz
#!gunzip yelp_dataset_small.tar.gz

In [None]:
from pyspark import SparkContext, SQLContext
sc = pyspark.SparkContext('local[*]')
sqlc = SQLContext(sc)

## Business Data

- Load the file ***yelp_academic_dataset_business.json*** and select the following columns:
    - business_id
    - name
    - city
    - stars
    - categories
    - address

In [None]:
df_business = ...

In [None]:
df_business.take(1)

### Choosing a business category

- Define a regular Python function that takes a list of categories and returns 1 if a category of your choice (for instance, 'Restaurants') is contained in the list of categories or 0 otherwise
- Using the Python function, define a Spark's User Defined Function (UDF) with an IntegerType return
- Using the UDF, filter the businesses that belong to the category you chose

In [None]:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import IntegerType

def is_restaurant(categories):
    pass

df_restaurants = ...

In [None]:
df_restaurants.take(1)

- The UDF approach works just fine, but there is a more straightforward way to perform the same operation
    - hint: look at ***array_contains*** SQL function

In [None]:
import pyspark.sql.functions as F

# you can overwrite the former df_restaurants
df_restaurants = ...

### Choosing a city
- Having filtered by the business category, now it is time to filter by the city (for instance, Edinburgh)

In [None]:
df_city_restaurants = ...

### Generating numeric IDs
- If you haven't done it yet, take one sample from your already filtered DataFrame and notice that the ***business_id*** contains an alphanumeric value - this is not good for Spark's ALS implementation, which requires IDs for items (in our case, businesses) and users to be numeric
- Use a ***StringIndexer*** to create a new column ***business_idn*** from the conversion of business_id into a numeric value

In [None]:
from pyspark.ml.feature import StringIndexer

df_city_restaurants = ...

In [None]:
df_city_restaurants.take(1)

In [None]:
df_city_restaurants.cache()

## Review Data

- Load the file ***yelp_academic_dataset_review.json*** and select the following columns:
    - user_id
    - business-id
    - stars
    - date

In [None]:
df_reviews = ...

### Keeping reviews for the chosen city only

- You are only interested in reviews of businesses you kept after filtering for category and city - how to filter out everything else? (hint: take a look at the ***join*** operation of DataFrames)

In [None]:
df_city_reviews = ...

### Generating numeric IDs

- As it happened with the ***business_id***, you also need to convert ***user_id*** into a numeric value - once again, use a ***StringIndexer*** to create a new column named ***user_idn*** containing the result of the conversion

In [None]:
df_city_reviews = ...

In [None]:
df_city_reviews.cache()

### Adding a sequential number to the user's reviews

- Now add a ***sequential number*** to the user's reviews, that is, for each user, order his/her reviews by date (multiple reviews on the same date can be randomly ordered) and number them (hint: check ***window functions***)
- This sequential number will be useful later to perform a time-wise split of the dataset

In [None]:
from pyspark.sql import Window

df_city_reviews = ...

### Subsetting reviews to keep only users with more than 4 reviews

- Some users had rated only 1 or a few businesses - this would pose as a problem to make recommendations - so you would want to keep only users who had rated more than 4 reviews, for instance
- Find the ***total number of reviews*** for each user and then filter them using this information (hint: again, you can use a ***window function***)

In [None]:
df_selected = ...

In [None]:
df_selected.cache()

### Calculating mean rating by user

- Now you can calculate the mean rating by user and make it into a dictionary where the key is the ***user_id*** (hint: look at ***rdd*** method of DataFrames and ***collectAsMap*** method of RDDs)

In [None]:
dict_user_means = ...

### Centering rating by user

- The dictionary containing mean ratings by user can be seen as a ***lookup table*** - what is the appropriate way of dealing with those in Spark?
- Once you have figured this out, define a regular Python function that takes two arguments - ***user_id*** (String) and ***rating*** (String, which you will need to convert to float inside the function) - and returns the result of subtracting the mean rating of the user from the rating parameter
- Using the Python function, define a Spark's User Defined Function (UDF) with a DoubleType return
- Using the UDF, create a column in your DataFrame with the centered ratings

In [None]:
from pyspark.sql.types import DoubleType

lookup_user_means = ...

def zero_mean(user_id, rating):
    pass

df_centered = ...

- Once again, the UDF approach is not the most "Sparkonic" way of handling this - can you perform the same operation using only functions from ***pyspark.sql.functions*** (which was imported earlier as F)?
    - hint: you'll need ***Window functions***

In [None]:
# you can overwrite df_centered
df_centered = ...

## Dataset

### Splitting into training and test sets by time

- In recommender systems, it is common practice to do the training/test split timewise, that is, the test set is composed of the latest reviews
- First, filter only those reviews which have a sequential number smaller than the ***total number of reviews***, by user: this is your training set
- Then, filter only those reviews which have a sequential number identical to the ***total number of reviews***, by user: this is your test set
- Now you can see why you had to add a sequential number to the user's reiews - since some users had done all his/her reviews on the same day, you need to disambiguate them to split the dataset. By doing this, you guarantee your test set will have only 1 review for each user.

In [None]:
df_training = ...
df_test = ...

### If using Spark 2.1 (as in the Docker image), you need to filter out "new" businesses in the test set

In [None]:
businesses = df_training.select('business_id').distinct()
df_test = df_test.join(businesses, on='business_id')

## Alternate Least Squares (ALS) Model

- This is the recommender itself - the ALS uses a iterative approach to find the underlying factors that yield the user/item rating matrix
- It takes as input a DataFrame with three columns, representing:
    - userCol: user IDs (numeric - remember the conversion you did)
    - itemCol: item IDs (numeric - remember the conversion you did)
    - ratingCol: rating (numeric, obviously)
    - coldStartStrategy: "drop" (if there is unseen data on the test set, meaning a new user/business, drop it) - ***only available from Spark 2.2 on***
- Its parameters are:
    - rank: the number of factors to consider
    - maxIter: the maximum number of iterations to perform
    - regParam: the regularization parameter
- Use Spark's ALS to fit a model based on your DataFrame

In [None]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

model = ...

### Predictions for the training set

- Once the model is trained, make predictions for the training set and use a ***RegressionEvaluator*** to find out the RMSE of the predictions

In [None]:
predictions = ...

evaluator = ...

train_rmse = ...

print(train_rmse)

### Predictions for the test set

- Now, make predictions for the test set and use a ***RegressionEvaluator*** to find out the RMSE of the predictions

In [None]:
...

print(test_rmse)

## Recommendations

Now, your model is trained, but how can you use it to make recommendations for a given user?

### Organizing business data

- It would not make sense to recommend a place the user has already rated, right? So, generate a dictionary where ***user_idn*** is the key and a list of the already rated ***business_idn*** is the value (hint: when aggregating DataFrames, ***collect_list*** is a VERY useful function to turn multiple records into a list)

In [None]:
from pyspark.sql.functions import collect_list

dict_visited_by_user = ...

- Besides, recommending a given business_id also does not help much, right? So you need to organize the business data in a way it can be shown to the user.
    - Define a regular Python function that takes one argument ***row*** (Row type) and returns a dictionary where ***business_idn*** is the key and the value is yet another dictionary with relevant fields (for instance: name, address, stars, categories)
    - Transform your business DataFrame into an RDD and apply the function you defined - upon collecting, you will end up with a list of dictionaries
    - Transform this list of dictionaries into a single dictionary

In [None]:
def rest_to_json(row):
    pass

rest = ...

dict_rest = {k: v for d in rest for k, v in d.items()}

### Making recommendations for a user

- To actually make the recommendations, we need to build an input DataFrame to feed the model
    - A DataFrame can be created using the SQL Context and a list of Rows, each containg two columns: user_idn and business_idn - the rating will be computed by the model
    - But you only need to have rows for the businesses which were not yet rated by the user - from all businesses, exclude the ones already rated by him/her

In [None]:
from pyspark.sql import Row
from pyspark.sql.functions import desc

user_idn = 317
n_business = len(dict_rest)

visited = ...
not_visited = ...

df_test_user = ...

- Now, you can use the generated DataFrame to make predictions
    - If there are any NA predictions, make sure to turn them into a really bad value (for instance, -5.0) (hint: remember ***na*** method of DataFrames)
- Order the predictions and take the ***business_idn*** of the top 5
- Finally, use this information to fetch the business data from the dictionary you assembled a couple of steps ago

In [None]:
predictions = ...

top_predictions = ...

response = list(map(lambda idn: dict_rest[idn], top_predictions))

In [None]:
response

## Congratulations, you finished the exercise!

In [None]:
sc.stop()