# Yelp Recommender

## Intro

The purpose of this exercise is to use Spark in a real dataset, instead of just a toy example.

You will use the data from the [Yelp Dataset Challenge](https://www.yelp.de/dataset_challenge), which contains information about businesses, users, reviews and more.

For this exercise, you will need to focus only on the following files:
- yelp_academic_dataset_business.json
- yelp_academic_dataset_review.json

The goal is to build a recommender using Spark's ALS (Alternating Least Squares) and then generate recommendations for a given user.

Since the dataset is quite big, you should pick a business category (e.g. Restaurants) and a city (e.g. Edinburgh) and work on the recommender using only this subset of the data.

Please take some time to:
- find out what information you will need to feed as input to Spark's ALS
- check how this information is available in the dataset
- plan how you will tackle this problem

In [3]:
from pyspark import SparkContext, SQLContext
sc = SparkContext('local[*]')
sqlc = SQLContext(sc)

## Business Data

- Load the file ***yelp_academic_dataset_business.json*** and select the following columns:
    - business_id
    - name
    - city
    - stars
    - categories
    - address

In [63]:
df_business = sqlc.read.json('./yelp_academic_dataset_business.json')
df_business = df_business.select(['business_id', 
                   'name',
                   'city',
                   'stars',
                   'categories',
                   'address'])

In [64]:
df_business.take(1)

[Row(business_id='0DI8Dt2PJp07XkVvIElIcQ', name='Innovative Vapors', city='Tempe', stars=4.5, categories=['Tobacco Shops', 'Nightlife', 'Vape Shops', 'Shopping'], address='227 E Baseline Rd, Ste J2')]

In [65]:
# categories = df_business.select('categories')
# categories = []
# for lst in df_business.select('categories'):
#     for cat in lst:
#         if cat not in categories:
#             categories.append(cat)
            
categories_distinct = df_business.select('categories').rdd.flatMap(lambda x: x.categories if x.categories else []).distinct().collect()
categories_distinct.sort()
categories_distinct

['& Probates',
 '3D Printing',
 'ATV Rentals/Tours',
 'Acai Bowls',
 'Accessories',
 'Accountants',
 'Acne Treatment',
 'Active Life',
 'Acupuncture',
 'Addiction Medicine',
 'Adoption Services',
 'Adult',
 'Adult Education',
 'Adult Entertainment',
 'Advertising',
 'Aerial Fitness',
 'Aerial Tours',
 'Afghan',
 'African',
 'Agriturismi',
 'Air Duct Cleaning',
 'Aircraft Dealers',
 'Aircraft Repairs',
 'Airlines',
 'Airport Lounges',
 'Airport Shuttles',
 'Airport Terminals',
 'Airports',
 'Airsoft',
 'Allergists',
 'Alsatian',
 'Amateur Sports Teams',
 'American (New)',
 'American (Traditional)',
 'Amusement Parks',
 'Anesthesiologists',
 'Animal Physical Therapy',
 'Animal Shelters',
 'Antiques',
 'Apartments',
 'Appliances',
 'Appliances & Repair',
 'Appraisal Services',
 'Aquarium Services',
 'Aquariums',
 'Arabian',
 'Arcades',
 'Archery',
 'Architects',
 'Architectural Tours',
 'Argentine',
 'Armenian',
 'Art Classes',
 'Art Galleries',
 'Art Museums',
 'Art Restoration',
 'Art S

In [66]:
cities_distinct = df_business.select('city').rdd.distinct().collect()
cities_distinct.sort()
cities_distinct

[Row(city=''),
 Row(city='AGINCOURT'),
 Row(city='Aberdour'),
 Row(city='Aberlady'),
 Row(city='Ahwahtukee'),
 Row(city='Ahwatukee'),
 Row(city='Ahwatukee Foothills Village'),
 Row(city='Aichwald'),
 Row(city='Ajax'),
 Row(city='Alburg'),
 Row(city='Allegheny'),
 Row(city='Allison Park'),
 Row(city='Ambridge'),
 Row(city='Amherst'),
 Row(city='Anjou'),
 Row(city='Ansnorveldt'),
 Row(city='Anthem'),
 Row(city='Arlington'),
 Row(city='Arnold'),
 Row(city='Aspinwall'),
 Row(city='Auburn'),
 Row(city='Auburn Township'),
 Row(city='Aurora'),
 Row(city='Avalon'),
 Row(city='Avon'),
 Row(city='Avon Lake'),
 Row(city='Avondale'),
 Row(city="Baie-D'urfe"),
 Row(city="Baie-d'Urfé"),
 Row(city='Bainbridge'),
 Row(city='Bainbridge Township'),
 Row(city='Baldwin'),
 Row(city='Balerno'),
 Row(city='Ballantyne'),
 Row(city='Banksville'),
 Row(city='Bath'),
 Row(city='Bathurst Quay'),
 Row(city='Bay Village'),
 Row(city='Beachwood'),
 Row(city='Beaconsfield'),
 Row(city='Bedford'),
 Row(city='Bedford 

### Choosing a business category

- Define a regular Python function that takes a list of categories and returns 1 if a category of your choice (for instance, 'Restaurants') is contained in the list of categories or 0 otherwise
- Using the Python function, define a Spark's User Defined Function (UDF) with an IntegerType return
- Using the UDF, filter the businesses that belong to the category you chose

In [67]:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import IntegerType
from pyspark.sql.types import BooleanType

def is_restaurant(categories):
    return True if 'Restaurants' in categories else 0

is_restaurant_UDF = UserDefinedFunction(lambda x: is_restaurant(x), BooleanType())

df_restaurants = df_business.filter(is_restaurant_UDF(df_business.categories))

In [68]:
df_restaurants.take(1)

[Row(business_id='EDqCEAGXVGCH4FJXgqtjqg', name='Pizza Pizza', city='Toronto', stars=2.5, categories=['Restaurants', 'Pizza', 'Chicken Wings', 'Italian'], address='979 Bloor Street W')]

- The UDF approach works just fine, but there is a more straightforward way to perform the same operation
    - hint: look at ***array_contains*** SQL function

In [69]:
import pyspark.sql.functions as F

# you can overwrite the former df_restaurants
df_restaurants = df_business.where(F.array_contains('categories','Restaurants'))

In [70]:
df_restaurants.take(1)

[Row(business_id='EDqCEAGXVGCH4FJXgqtjqg', name='Pizza Pizza', city='Toronto', stars=2.5, categories=['Restaurants', 'Pizza', 'Chicken Wings', 'Italian'], address='979 Bloor Street W')]

### Choosing a city
- Having filtered by the business category, now it is time to filter by the city (for instance, Edinburgh)

In [76]:
# df_city_restaurants = df_restaurants.where(df_restaurants.city == 'Edinburgh')
df_city_restaurants = df_restaurants.filter('city = "Edinburgh"')

In [77]:
df_city_restaurants.take(1)

[Row(business_id='NsarUMMMPOlMBb6K04x6hw', name='Juice Almighty', city='Edinburgh', stars=4.5, categories=['Food', 'Fast Food', 'Restaurants', 'Juice Bars & Smoothies'], address='7A Castle Street, Corstorphine')]

### Generating numeric IDs
- If you haven't done it yet, take one sample from your already filtered DataFrame and notice that the ***business_id*** contains an alphanumeric value - this is not good for Spark's ALS implementation, which requires IDs for items (in our case, businesses) and users to be numeric
- Use a ***StringIndexer*** to create a new column ***business_idn*** from the conversion of business_id into a numeric value

In [78]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol='business_id', outputCol='business_idn')
model_index = indexer.fit(df_city_restaurants)

df_city_restaurants = model_index.transform(df_city_restaurants)

In [79]:
df_city_restaurants.take(1)

[Row(business_id='NsarUMMMPOlMBb6K04x6hw', name='Juice Almighty', city='Edinburgh', stars=4.5, categories=['Food', 'Fast Food', 'Restaurants', 'Juice Bars & Smoothies'], address='7A Castle Street, Corstorphine', business_idn=24.0)]

In [80]:
df_city_restaurants.cache()

DataFrame[business_id: string, name: string, city: string, stars: double, categories: array<string>, address: string, business_idn: double]

## Review Data

- Load the file ***yelp_academic_dataset_review.json*** and select the following columns:
    - user_id
    - business_id
    - stars
    - date

In [82]:
df_reviews = sqlc.read.json('./yelp_academic_dataset_review.json')
df_reviews = df_reviews.select(['user_id',
                                 'business_id',
                                 'stars',
                                 'date'])

In [83]:
df_reviews.take(1)

[Row(user_id='KpkOkG6RIf4Ra25Lhhxf1A', business_id='2aFiy99vNLklCx3T_tGS9A', stars=5, date='2011-10-10')]

### Keeping reviews for the chosen city only

- You are only interested in reviews of businesses you kept after filtering for category and city - how to filter out everything else? (hint: take a look at the ***join*** operation of DataFrames)

In [98]:
df_city_reviews = df_city_restaurants.select('business_id').join(df_reviews, on='business_id')
df_city_reviews = model_index.transform(df_city_reviews)

In [99]:
df_city_reviews.show(1)

+--------------------+--------------------+-----+----------+------------+
|         business_id|             user_id|stars|      date|business_idn|
+--------------------+--------------------+-----+----------+------------+
|-3pfhzz9CB7F2DpbF...|VRVCKQhYDCkzaEDce...|    5|2008-07-06|      1208.0|
+--------------------+--------------------+-----+----------+------------+
only showing top 1 row



### Generating numeric IDs

- As it happened with the ***business_id***, you also need to convert ***user_id*** into a numeric value - once again, use a ***StringIndexer*** to create a new column named ***user_idn*** containing the result of the conversion

In [100]:
indexer_users = StringIndexer(inputCol='user_id', outputCol='user_idn')
model_index_users = indexer_users.fit(df_city_reviews)

df_city_reviews = model_index_users.transform(df_city_reviews)

In [101]:
df_city_reviews.show(1)

+--------------------+--------------------+-----+----------+------------+--------+
|         business_id|             user_id|stars|      date|business_idn|user_idn|
+--------------------+--------------------+-----+----------+------------+--------+
|-3pfhzz9CB7F2DpbF...|VRVCKQhYDCkzaEDce...|    5|2008-07-06|      1208.0|    63.0|
+--------------------+--------------------+-----+----------+------------+--------+
only showing top 1 row



In [102]:
df_city_reviews.cache()

DataFrame[business_id: string, user_id: string, stars: bigint, date: string, business_idn: double, user_idn: double]

### Adding a sequential number to the user's reviews

- Now add a ***sequential number*** to the user's reviews, that is, for each user, order his/her reviews by date (multiple reviews on the same date can be randomly ordered) and number them (hint: check ***window functions***)
- This sequential number will be useful later to perform a time-wise split of the dataset

In [None]:
# from pyspark.sql import Window
# from pyspark.sql import functions as F

# w = (Window()
#      .partitionBy('Pclass')
#      .orderBy('Fare')
#      .rowsBetween(-9, 0))

# (train
#  .withColumn('sumFare', F.sum('Fare').over(w))
#  .select('PClass','Fare','sumFare')
#  .show())

In [104]:
from pyspark.sql import Window

# df_city_reviews = ...

w = (Window()
    .partitionBy('user_idn')
    .orderBy('date')
    .rowsBetween(Window.unboundedPreceding, Window.currentRow))

df_city_reviews = df_city_reviews.withColumn('review_number', F.count('stars').over(w))

# OR

# w = (Window()
#     .partitionBy('user_idn')
#     .orderBy('date'))
# df_city_reviews = df_city_reviews.withColumn('review_number', F.row_number().over(w))

In [116]:
df_city_reviews.show()

+--------------------+--------------------+-----+----------+------------+--------+-------------+
|         business_id|             user_id|stars|      date|business_idn|user_idn|review_number|
+--------------------+--------------------+-----+----------+------------+--------+-------------+
|sTCI-NaZBkpujXnc-...|MXdFKeqnoMDj180hl...|    5|2011-04-20|       327.0|   299.0|            1|
|2vOHS-_p5ylEf0BGR...|MXdFKeqnoMDj180hl...|    5|2011-04-20|       144.0|   299.0|            2|
|5Iin0tl6QJT_TyL1B...|MXdFKeqnoMDj180hl...|    2|2011-05-02|      1047.0|   299.0|            3|
|xNQj7CamjY9vKp8AC...|MXdFKeqnoMDj180hl...|    4|2011-05-02|       228.0|   299.0|            4|
|E8PkjCmDnDE7_2rYf...|MXdFKeqnoMDj180hl...|    5|2011-05-19|       942.0|   299.0|            5|
|483pQIBo1rxjZqms1...|MXdFKeqnoMDj180hl...|    5|2011-05-25|        51.0|   299.0|            6|
|ckzOUjOGnPKD8NJCc...|MXdFKeqnoMDj180hl...|    3|2011-05-30|       850.0|   299.0|            7|
|B9gUwTS1_Yzj2vajj...|MXdFKeqn

### Subsetting reviews to keep only users with more than 4 reviews

- Some users had rated only 1 or a few businesses - this would pose as a problem to make recommendations - so you would want to keep only users who had rated more than 4 reviews, for instance
- Find the ***total number of reviews*** for each user and then filter them using this information (hint: again, you can use a ***window function***)

In [158]:
selected_users = df_city_reviews.select('user_idn', 'review_number').groupBy('user_idn') \
                    .max().filter('max(review_number) > 4').select('user_idn')

In [118]:
df_selected = df_city_reviews.join(selected_users, on='user_idn')

In [119]:
df_selected.cache()

DataFrame[user_idn: double, business_id: string, user_id: string, stars: bigint, date: string, business_idn: double, review_number: bigint]

### Calculating mean rating by user

- Now you can calculate the mean rating by user and make it into a dictionary where the key is the ***user_id*** (hint: look at ***rdd*** method of DataFrames and ***collectAsMap*** method of RDDs)

In [124]:
dict_user_means = df_selected.select('user_id', 'stars').groupBy('user_id').mean().rdd.collectAsMap()

In [125]:
dict_user_means

{'-0MQ4webH2uc1ZAsGsNENg': 4.2,
 '-0wDYXGaz2mrHd6fQUvPHQ': 3.8,
 '-2EcIDIDnA8H7N81jwYpcQ': 3.0,
 '-4dmE_9lhLi7MOWGI72YhA': 3.6,
 '-AMiTsraRXFdNX9yBuzWSQ': 4.222222222222222,
 '-BJTaybCScNgY73ph1TE8Q': 3.8,
 '-EVgopMWXfIOPHktwc0UBA': 3.0,
 '-GwqsPQ2WC-dNkjOyJ_FcA': 3.2,
 '-LP5O6JelPyVS6n2F9JQ7A': 4.0,
 '-TJhZseJMYZjOab0wMOnAQ': 4.0,
 '-YVcxGk8zvn3wlCYVKAgsg': 3.8,
 '-ZBReTMh4DEGjc9XtteCtg': 4.166666666666667,
 '-ZM78Lp6guNOiE6yJNAJHw': 4.5,
 '-ZqPTpx9TyyvdAWbajdilA': 3.6666666666666665,
 '-bT32tHNq7ngqZNA4Tr8jA': 4.0,
 '-dw8f7FLaUmWR7bfJ_Yf0w': 2.8,
 '-kvgG65SqvDO5hxCarrz8Q': 3.2,
 '-y_rXIfDXY4Egh157AJy7g': 4.4,
 '-z9w3extrIRiyyW2zHDpGQ': 4.285714285714286,
 '-zhW9UyMKOMuuydi17x7Ag': 3.5,
 '0-wo7wKN_Rp0Xs6cBMEDJQ': 3.7083333333333335,
 '02YOZ7OLgXj18Jp9Fu-Fkg': 3.090909090909091,
 '03p8zXkWRammSu76kGzb6g': 3.769230769230769,
 '09_FFRjVsCc7pbqG7MxXyA': 2.888888888888889,
 '0CptBiovyRGXJDQxKL-aww': 3.9375,
 '0MeivhX0kZCfV3zMtHtk9Q': 4.0,
 '0Puy6PcLNSBct5uZhCsjBQ': 3.5,
 '0S2KCUzFvnWvlRWaD

### Centering rating by user

- The dictionary containing mean ratings by user can be seen as a ***lookup table*** - what is the appropriate way of dealing with those in Spark?
- Once you have figured this out, define a regular Python function that takes two arguments - ***user_id*** (String) and ***rating*** (String, which you will need to convert to float inside the function) - and returns the result of subtracting the mean rating of the user from the rating parameter
- Using the Python function, define a Spark's User Defined Function (UDF) with a DoubleType return
- Using the UDF, create a column in your DataFrame with the centered ratings

In [196]:
from pyspark.sql.types import DoubleType
# import numpy as np

lookup_user_means = sc.broadcast(dict_user_means)
# lookup_user_means.value['-z9w3extrIRiyyW2zHDpGQ']

def zero_mean(user_id, rating):
    return rating - lookup_user_means.value[user_id]

# zero_mean_UDF = UserDefinedFunction(lambda x,y: zero_mean(x,y), DoubleType())
zero_mean_UDF = UserDefinedFunction(zero_mean, DoubleType())

# df_centered = df_selected.withColumn('centered_rating', zero_mean_UDF(df_selected.user_id,df_selected.stars))
df_centered = df_selected.withColumn('centered_rating', zero_mean_UDF('user_id','stars'))

In [197]:
df_centered.show(5)

+--------+--------------------+--------------------+-----+----------+------------+-------------+--------------------+
|user_idn|         business_id|             user_id|stars|      date|business_idn|review_number|     centered_rating|
+--------+--------------------+--------------------+-----+----------+------------+-------------+--------------------+
|   299.0|sTCI-NaZBkpujXnc-...|MXdFKeqnoMDj180hl...|    5|2011-04-20|       327.0|            1|   0.666666666666667|
|   299.0|2vOHS-_p5ylEf0BGR...|MXdFKeqnoMDj180hl...|    5|2011-04-20|       144.0|            2|   0.666666666666667|
|   299.0|5Iin0tl6QJT_TyL1B...|MXdFKeqnoMDj180hl...|    2|2011-05-02|      1047.0|            3|  -2.333333333333333|
|   299.0|xNQj7CamjY9vKp8AC...|MXdFKeqnoMDj180hl...|    4|2011-05-02|       228.0|            4|-0.33333333333333304|
|   299.0|E8PkjCmDnDE7_2rYf...|MXdFKeqnoMDj180hl...|    5|2011-05-19|       942.0|            5|   0.666666666666667|
+--------+--------------------+--------------------+----

- Once again, the UDF approach is not the most "Sparkonic" way of handling this - can you perform the same operation using only functions from ***pyspark.sql.functions*** (which was imported earlier as F)?
    - hint: you'll need ***Window functions***

In [207]:
w = Window().partitionBy('user_idn')
#     .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))

# df_centered = df_selected.withColumn('centered_rating', df_selected.stars - F.mean('stars').over(w))
df_centered = df_selected.withColumn('centered_rating', df_selected.stars - F.mean('stars').over(w))

In [208]:
df_centered.show()

+--------+--------------------+--------------------+-----+----------+------------+-------------+--------------------+
|user_idn|         business_id|             user_id|stars|      date|business_idn|review_number|     centered_rating|
+--------+--------------------+--------------------+-----+----------+------------+-------------+--------------------+
|   299.0|sTCI-NaZBkpujXnc-...|MXdFKeqnoMDj180hl...|    5|2011-04-20|       327.0|            1|   0.666666666666667|
|   299.0|2vOHS-_p5ylEf0BGR...|MXdFKeqnoMDj180hl...|    5|2011-04-20|       144.0|            2|   0.666666666666667|
|   299.0|5Iin0tl6QJT_TyL1B...|MXdFKeqnoMDj180hl...|    2|2011-05-02|      1047.0|            3|  -2.333333333333333|
|   299.0|xNQj7CamjY9vKp8AC...|MXdFKeqnoMDj180hl...|    4|2011-05-02|       228.0|            4|-0.33333333333333304|
|   299.0|E8PkjCmDnDE7_2rYf...|MXdFKeqnoMDj180hl...|    5|2011-05-19|       942.0|            5|   0.666666666666667|
|   299.0|483pQIBo1rxjZqms1...|MXdFKeqnoMDj180hl...|    

## Dataset

### Splitting into training and test sets by time

- In recommender systems, it is common practice to do the training/test split timewise, that is, the test set is composed of the latest reviews
- First, filter only those reviews which have a sequential number smaller than the ***total number of reviews***, by user: this is your training set
- Then, filter only those reviews which have a sequential number identical to the ***total number of reviews***, by user: this is your test set
- Now you can see why you had to add a sequential number to the user's reiews - since some users had done all his/her reviews on the same day, you need to disambiguate them to split the dataset. By doing this, you guarantee your test set will have only 1 review for each user.

In [190]:
training_users = df_centered.select('user_idn', 'review_number').groupBy('user_idn') \
                    .max().select('user_idn', F.col('max(review_number)').alias('max_reviews'))

df_training = df_centered.join(training_users, on='user_idn').filter('review_number < max_reviews')
df_training.show(5)

df_test = df_centered.join(training_users, on='user_idn').filter('review_number == max_reviews')
df_test.show(5)

+--------+--------------------+--------------------+-----+----------+------------+-------------+--------------------+-----------+
|user_idn|         business_id|             user_id|stars|      date|business_idn|review_number|     centered_rating|max_reviews|
+--------+--------------------+--------------------+-----+----------+------------+-------------+--------------------+-----------+
|   299.0|sTCI-NaZBkpujXnc-...|MXdFKeqnoMDj180hl...|    5|2011-04-20|       327.0|            1|   0.666666666666667|         12|
|   299.0|2vOHS-_p5ylEf0BGR...|MXdFKeqnoMDj180hl...|    5|2011-04-20|       144.0|            2|   0.666666666666667|         12|
|   299.0|5Iin0tl6QJT_TyL1B...|MXdFKeqnoMDj180hl...|    2|2011-05-02|      1047.0|            3|  -2.333333333333333|         12|
|   299.0|xNQj7CamjY9vKp8AC...|MXdFKeqnoMDj180hl...|    4|2011-05-02|       228.0|            4|-0.33333333333333304|         12|
|   299.0|E8PkjCmDnDE7_2rYf...|MXdFKeqnoMDj180hl...|    5|2011-05-19|       942.0|        

### If using Spark 2.1 (as in the Docker image), you need to filter out "new" businesses in the test set

In [191]:
businesses = df_training.select('business_id').distinct()
df_test = df_test.join(businesses, on='business_id')

## Alternate Least Squares (ALS) Model

- This is the recommender itself - the ALS uses a iterative approach to find the underlying factors that yield the user/item rating matrix
- It takes as input a DataFrame with three columns, representing:
    - userCol: user IDs (numeric - remember the conversion you did)
    - itemCol: item IDs (numeric - remember the conversion you did)
    - ratingCol: rating (numeric, obviously)
    - coldStartStrategy: "drop" (if there is unseen data on the test set, meaning a new user/business, drop it) - ***only available from Spark 2.2 on***
- Its parameters are:
    - rank: the number of factors to consider
    - maxIter: the maximum number of iterations to perform
    - regParam: the regularization parameter
- Use Spark's ALS to fit a model based on your DataFrame

In [211]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

model = ALS(userCol='user_idn', itemCol='business_idn', ratingCol='centered_rating') \
        .fit(df_training)

### Predictions for the training set

- Once the model is trained, make predictions for the training set and use a ***RegressionEvaluator*** to find out the RMSE of the predictions

In [219]:
predictions = model.transform(df_training)

evaluator = RegressionEvaluator(labelCol='centered_rating')

train_rmse = evaluator.evaluate(predictions)

print(train_rmse)

0.3830311103577157


### Predictions for the test set

- Now, make predictions for the test set and use a ***RegressionEvaluator*** to find out the RMSE of the predictions

In [220]:
predictions = model.transform(df_test)

evaluator = RegressionEvaluator(labelCol='centered_rating')

test_rmse = evaluator.evaluate(predictions)

print(test_rmse)

1.0112816082377283


## Recommendations

Now, your model is trained, but how can you use it to make recommendations for a given user?

### Organizing business data

- It would not make sense to recommend a place the user has already rated, right? So, generate a dictionary where ***user_idn*** is the key and a list of the already rated ***business_idn*** is the value (hint: when aggregating DataFrames, ***collect_list*** is a VERY useful function to turn multiple records into a list)

In [None]:
from pyspark.sql.functions import collect_list

dict_visited_by_user = ...

- Besides, recommending a given business_id also does not help much, right? So you need to organize the business data in a way it can be shown to the user.
    - Define a regular Python function that takes one argument ***row*** (Row type) and returns a dictionary where ***business_idn*** is the key and the value is yet another dictionary with relevant fields (for instance: name, address, stars, categories)
    - Transform your business DataFrame into an RDD and apply the function you defined - upon collecting, you will end up with a list of dictionaries
    - Transform this list of dictionaries into a single dictionary

In [None]:
def rest_to_json(row):
    pass

rest = ...

dict_rest = {k: v for d in rest for k, v in d.items()}

### Making recommendations for a user

- To actually make the recommendations, we need to build an input DataFrame to feed the model
    - A DataFrame can be created using the SQL Context and a list of Rows, each containg two columns: user_idn and business_idn - the rating will be computed by the model
    - But you only need to have rows for the businesses which were not yet rated by the user - from all businesses, exclude the ones already rated by him/her

In [None]:
from pyspark.sql import Row
from pyspark.sql.functions import desc

user_idn = 317
n_business = len(dict_rest)

visited = ...
not_visited = ...

df_test_user = ...

- Now, you can use the generated DataFrame to make predictions
    - If there are any NA predictions, make sure to turn them into a really bad value (for instance, -5.0) (hint: remember ***na*** method of DataFrames)
- Order the predictions and take the ***business_idn*** of the top 5
- Finally, use this information to fetch the business data from the dictionary you assembled a couple of steps ago

In [None]:
predictions = ...

top_predictions = ...

response = list(map(lambda idn: dict_rest[idn], top_predictions))

In [None]:
response

## Congratulations, you finished the exercise!

In [None]:
sc.stop()