# Yelp Recommender

## Intro

The purpose of this exercise is to use Spark in a real dataset, instead of just a toy example.

You will use the data from the [Yelp Dataset Challenge](https://www.yelp.de/dataset_challenge), which contains information about businesses, users, reviews and more.

For this exercise, you will need to focus only on the following files:
- yelp_academic_dataset_business.json
- yelp_academic_dataset_review.json

The goal is to build a recommender using [Spark's ALS (Alternating Least Squares)](https://spark.apache.org/docs/2.3.0/ml-collaborative-filtering.html) and then generate recommendations for a given user.

Since the dataset is quite big, you should pick a business category (e.g. Restaurants) and a city (e.g. Edinburgh) and work on the recommender using only this subset of the data.

Please take some time to:
- find out what information you will need to feed as input to Spark's ALS
- check how this information is available in the dataset
- plan how you will tackle this problem

In [None]:
# Download a small version of the Yelp dataset
#!wget https://s3.us-west-2.amazonaws.com/dsr-spark-appliedml/yelp_dataset_small.tar.gz
#!tar -xvzf yelp_dataset_small.tar.gz

In [1]:
!rm -rf metastore_db/*.lck # deletes locks

import pyspark
sc = pyspark.SparkContext('local[*]')

from pyspark.sql import SQLContext
sqlc = SQLContext(sc)

## Business Data

- Load the file ***yelp_academic_dataset_business.json*** and select the following columns:
    - business_id
    - name
    - city
    - stars
    - categories
    - address

In [2]:
df_business = sqlc.read.format("json").load("yelp_dataset_small/yelp_academic_dataset_business.json")
df_business.show()

+--------------------+--------------------+--------------------+--------------------+----------+--------------------+-------+-------------+--------------+--------------------+-------------------+-----------+------------+-----+-----+--------+
|             address|          attributes|         business_id|          categories|      city|               hours|is_open|     latitude|     longitude|                name|       neighborhood|postal_code|review_count|stars|state|    type|
+--------------------+--------------------+--------------------+--------------------+----------+--------------------+-------+-------------+--------------+--------------------+-------------------+-----------+------------+-----+-----+--------+
|227 E Baseline Rd...|[BikeParking: Tru...|0DI8Dt2PJp07XkVvI...|[Tobacco Shops, N...|     Tempe|[Monday 11:0-21:0...|      0|   33.3782141|   -111.936102|   Innovative Vapors|                   |      85283|          17|  4.5|   AZ|business|
|495 S Grand Centr...|[BusinessA

In [3]:
df_business = df_business.select([c for c in df_business.columns if c in ["business_id", "name", "city", "stars", "categories", "address"]])
df_business.show()

+--------------------+--------------------+--------------------+----------+--------------------+-----+
|             address|         business_id|          categories|      city|                name|stars|
+--------------------+--------------------+--------------------+----------+--------------------+-----+
|227 E Baseline Rd...|0DI8Dt2PJp07XkVvI...|[Tobacco Shops, N...|     Tempe|   Innovative Vapors|  4.5|
|495 S Grand Centr...|LTlCaCGZE14GuaUXU...|[Caterers, Grocer...| Las Vegas|       Cut and Taste|  5.0|
|  979 Bloor Street W|EDqCEAGXVGCH4FJXg...|[Restaurants, Piz...|   Toronto|         Pizza Pizza|  2.5|
|7014 Steubenville...|cnGIivYRLxpF7tBVR...|[Hair Removal, Be...|   Oakdale| Plush Salon and Spa|  4.0|
|   321 Jarvis Street|cdk-qqJ71q6P7TJTw...|[Hotels & Travel,...|   Toronto|         Comfort Inn|  3.0|
|30 Gibson Drive, ...|Q9rsaUiQ-A3NdEAlo...|[Nail Salons, Bea...|   Markham|         A Plus Nail|  2.5|
|10875 N Frankloyd...|Cu4_Fheh7IrzGiK-P...|[Baby Gear & Furn...|Scottsdal

In [4]:
df_business.take(1)

[Row(address='227 E Baseline Rd, Ste J2', business_id='0DI8Dt2PJp07XkVvIElIcQ', categories=['Tobacco Shops', 'Nightlife', 'Vape Shops', 'Shopping'], city='Tempe', name='Innovative Vapors', stars=4.5)]

### Choosing a business category

- Define a regular Python function that takes a list of categories and returns 1 if a category of your choice (for instance, 'Restaurants') is contained in the list of categories or 0 otherwise
- Using the Python function, define a Spark's User Defined Function (UDF) with an IntegerType return
- Using the UDF, filter the businesses that belong to the category you chose

In [5]:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import IntegerType

def is_restaurant(categories):
    return (0 if categories is None or "Restaurants" not in categories else 1)

cat_rest = UserDefinedFunction(lambda x: is_restaurant(x), IntegerType())
df_restaurants = df_business.withColumn("is_restaurant", cat_rest("categories"))
df_restaurants = df_restaurants.filter(df_restaurants.is_restaurant > 0).drop("is_restaurant")
df_restaurants.show()

+--------------------+--------------------+--------------------+------------+--------------------+-----+
|             address|         business_id|          categories|        city|                name|stars|
+--------------------+--------------------+--------------------+------------+--------------------+-----+
|  979 Bloor Street W|EDqCEAGXVGCH4FJXg...|[Restaurants, Piz...|     Toronto|         Pizza Pizza|  2.5|
|11072 No Frank Ll...|GDnbt3isfhd57T1Qq...|[Tex-Mex, Mexican...|  Scottsdale|           Taco Bell|  2.5|
|1500 N Green Vall...|42romV8altAeuZuP2...|[Hawaiian, Restau...|   Henderson|  Ohana Hawaiian BBQ|  4.0|
|1052 Lionel-Dauna...|DNyYOxVAfu0oUcPNL...|[Restaurants, Cafes]|Boucherville|         Chez Lionel|  3.5|
|2000 Mansfield St...|a1Ba6XeIOP48e64YF...|[Sandwiches, Brea...|    Montréal|             La Prep|  4.0|
|123 Front St, Uni...|826djy6K_9Fp0ptqJ...|[Fast Food, Mexic...|     Toronto|Chipotle Mexican ...|  3.5|
|      5646 W Bell Rd|Mi5uhdFB9OJteXPd0...|[Restaurants

In [6]:
df_restaurants.take(1)

[Row(address='979 Bloor Street W', business_id='EDqCEAGXVGCH4FJXgqtjqg', categories=['Restaurants', 'Pizza', 'Chicken Wings', 'Italian'], city='Toronto', name='Pizza Pizza', stars=2.5)]

- The UDF approach works just fine, but there is a more straightforward way to perform the same operation
    - hint: look at ***array_contains*** SQL function

In [7]:
import pyspark.sql.functions as F

# you can overwrite the former df_restaurants
df_restaurants = df_business.filter(F.array_contains(df_business.categories, "Restaurants"))
df_restaurants.show()

+--------------------+--------------------+--------------------+------------+--------------------+-----+
|             address|         business_id|          categories|        city|                name|stars|
+--------------------+--------------------+--------------------+------------+--------------------+-----+
|  979 Bloor Street W|EDqCEAGXVGCH4FJXg...|[Restaurants, Piz...|     Toronto|         Pizza Pizza|  2.5|
|11072 No Frank Ll...|GDnbt3isfhd57T1Qq...|[Tex-Mex, Mexican...|  Scottsdale|           Taco Bell|  2.5|
|1500 N Green Vall...|42romV8altAeuZuP2...|[Hawaiian, Restau...|   Henderson|  Ohana Hawaiian BBQ|  4.0|
|1052 Lionel-Dauna...|DNyYOxVAfu0oUcPNL...|[Restaurants, Cafes]|Boucherville|         Chez Lionel|  3.5|
|2000 Mansfield St...|a1Ba6XeIOP48e64YF...|[Sandwiches, Brea...|    Montréal|             La Prep|  4.0|
|123 Front St, Uni...|826djy6K_9Fp0ptqJ...|[Fast Food, Mexic...|     Toronto|Chipotle Mexican ...|  3.5|
|      5646 W Bell Rd|Mi5uhdFB9OJteXPd0...|[Restaurants

### Choosing a city
- Having filtered by the business category, now it is time to filter by the city (for instance, Edinburgh)

In [8]:
df_city_restaurants = df_restaurants.filter(df_business.city == "Toronto")
df_city_restaurants.show()

+--------------------+--------------------+--------------------+-------+--------------------+-----+
|             address|         business_id|          categories|   city|                name|stars|
+--------------------+--------------------+--------------------+-------+--------------------+-----+
|  979 Bloor Street W|EDqCEAGXVGCH4FJXg...|[Restaurants, Piz...|Toronto|         Pizza Pizza|  2.5|
|123 Front St, Uni...|826djy6K_9Fp0ptqJ...|[Fast Food, Mexic...|Toronto|Chipotle Mexican ...|  3.5|
| 2014 Queen Street E|OLG7Gou8kmTLxogtJ...|[Tex-Mex, Restaur...|Toronto|              Z-Teca|  1.5|
|   2816 Markham Road|L_thK7r3K_h5M4tV7...|[Diners, Italian,...|Toronto|Honey B Hives Res...|  3.5|
|   250 Queens Quay W|6pSUvtk5-OOaJfX0h...|[Restaurants, Jap...|Toronto|       Ichiban Sushi|  3.5|
|       5515 Yonge St|mRv3Z25F56qduMKnv...|[Korean, Restaura...|Toronto|              Kokoya|  3.0|
|  561 Bloor Street W|ODwqVEORhb9YAgo3M...|[Restaurants, Kor...|Toronto|       Yummy Bar-B-Q|  3.5|


### Generating numeric IDs
- If you haven't done it yet, take one sample from your already filtered DataFrame and notice that the ***business_id*** contains an alphanumeric value - this is not good for Spark's ALS implementation, which requires IDs for items (in our case, businesses) and users to be numeric
- Use a ***StringIndexer*** to create a new column ***business_idn*** from the conversion of business_id into a numeric value

In [9]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer().setInputCol("business_id").setOutputCol("business_idn")
df_city_restaurants = indexer.fit(df_city_restaurants).transform(df_city_restaurants)

In [10]:
df_city_restaurants.take(1)

[Row(address='979 Bloor Street W', business_id='EDqCEAGXVGCH4FJXgqtjqg', categories=['Restaurants', 'Pizza', 'Chicken Wings', 'Italian'], city='Toronto', name='Pizza Pizza', stars=2.5, business_idn=5733.0)]

In [11]:
df_city_restaurants.cache()

DataFrame[address: string, business_id: string, categories: array<string>, city: string, name: string, stars: double, business_idn: double]

In [12]:
df_city_restaurants.show()

+--------------------+--------------------+--------------------+-------+--------------------+-----+------------+
|             address|         business_id|          categories|   city|                name|stars|business_idn|
+--------------------+--------------------+--------------------+-------+--------------------+-----+------------+
|  979 Bloor Street W|EDqCEAGXVGCH4FJXg...|[Restaurants, Piz...|Toronto|         Pizza Pizza|  2.5|      5733.0|
|123 Front St, Uni...|826djy6K_9Fp0ptqJ...|[Fast Food, Mexic...|Toronto|Chipotle Mexican ...|  3.5|       745.0|
| 2014 Queen Street E|OLG7Gou8kmTLxogtJ...|[Tex-Mex, Restaur...|Toronto|              Z-Teca|  1.5|      4897.0|
|   2816 Markham Road|L_thK7r3K_h5M4tV7...|[Diners, Italian,...|Toronto|Honey B Hives Res...|  3.5|      1465.0|
|   250 Queens Quay W|6pSUvtk5-OOaJfX0h...|[Restaurants, Jap...|Toronto|       Ichiban Sushi|  3.5|       406.0|
|       5515 Yonge St|mRv3Z25F56qduMKnv...|[Korean, Restaura...|Toronto|              Kokoya|  3

## Review Data

- Load the file ***yelp_academic_dataset_review.json*** and select the following columns:
    - user_id
    - business-id
    - stars
    - date

In [13]:
df_reviews = sqlc.read.format("json").load("yelp_dataset_small/yelp_academic_dataset_review.json")
df_reviews = df_reviews.select([c for c in df_reviews.columns if c in ["user_id", "business_id", "stars", "date"]])
df_reviews.show()

+--------------------+----------+-----+--------------------+
|         business_id|      date|stars|             user_id|
+--------------------+----------+-----+--------------------+
|2aFiy99vNLklCx3T_...|2011-10-10|    5|KpkOkG6RIf4Ra25Lh...|
|2aFiy99vNLklCx3T_...|2010-12-29|    5|bQ7fQq1otn9hKX-gX...|
|2aFiy99vNLklCx3T_...|2011-04-29|    5|r1NUhdNmL6yU9Bn-Y...|
|2LfIuF3_sX6uwe-IR...|2014-07-14|    5|aW3ix1KNZAvoM8q-W...|
|2LfIuF3_sX6uwe-IR...|2014-01-15|    4|YOo-Cip8HqvKp_p9n...|
|2LfIuF3_sX6uwe-IR...|2013-04-28|    5|bgl3j8yJcRO-00NkU...|
|2LfIuF3_sX6uwe-IR...|2014-10-12|    4|CWKF9de-nskLYEqDD...|
|2LfIuF3_sX6uwe-IR...|2012-09-18|    5|GJ7PTY7huYORFKKg3...|
|2LfIuF3_sX6uwe-IR...|2015-10-11|    5|rxqp9eXZj1jYTn0UI...|
|2LfIuF3_sX6uwe-IR...|2015-04-05|    5|UU0nHQtHPMAfLidk8...|
|2LfIuF3_sX6uwe-IR...|2014-07-08|    1|A_Hyfk3FcwFVIk1CQ...|
|2LfIuF3_sX6uwe-IR...|2014-08-23|    5|OvD92wp0-uuFoGLBy...|
|2LfIuF3_sX6uwe-IR...|2015-01-13|    4|5NDk-q5mv8PIDvz83...|
|2LfIuF3_sX6uwe-IR...|20

### Keeping reviews for the chosen city only

- You are only interested in reviews of businesses you kept after filtering for category and city - how to filter out everything else? (hint: take a look at the ***join*** operation of DataFrames)

In [14]:
df1 = df_reviews.alias('df1')
df2 = df_city_restaurants.alias('df2')

df_city_reviews = df1.join(df2, df2.business_id == df1.business_id).select('df1.*', 'df2.business_idn')
df_city_reviews.show()

+--------------------+----------+-----+--------------------+------------+
|         business_id|      date|stars|             user_id|business_idn|
+--------------------+----------+-----+--------------------+------------+
|0BJK4_RQnNiiXJcYP...|2011-01-15|    4|2QhYG5gwEReX7qC_E...|      5273.0|
|0BJK4_RQnNiiXJcYP...|2009-04-21|    5|puKCWAzXbY4QfpMBO...|      5273.0|
|0BJK4_RQnNiiXJcYP...|2011-08-23|    5|MdjsjgFRXf1g5nIxS...|      5273.0|
|0BJK4_RQnNiiXJcYP...|2013-07-05|    2|goyBp2yzRfBRzfRVK...|      5273.0|
|0BJK4_RQnNiiXJcYP...|2010-04-03|    3|X5whw8WcB8dVZGZ5v...|      5273.0|
|0BJK4_RQnNiiXJcYP...|2009-10-16|    5|yt5fggA4287LXCN84...|      5273.0|
|0BJK4_RQnNiiXJcYP...|2013-12-20|    5|ltg5Pgd5x7K7zpYtX...|      5273.0|
|0BJK4_RQnNiiXJcYP...|2011-01-26|    4|d5n7WXQkyESc_mOgy...|      5273.0|
|0BJK4_RQnNiiXJcYP...|2011-08-28|    5|xgHuIzDbuuSxsn9KB...|      5273.0|
|0BJK4_RQnNiiXJcYP...|2008-10-10|    5|rWGA6f5boGNr9ooE-...|      5273.0|
|0BJK4_RQnNiiXJcYP...|2010-09-09|    4

### Generating numeric IDs

- As it happened with the ***business_id***, you also need to convert ***user_id*** into a numeric value - once again, use a ***StringIndexer*** to create a new column named ***user_idn*** containing the result of the conversion

In [15]:
indexer = StringIndexer().setInputCol("user_id").setOutputCol("user_idn")
df_city_reviews = indexer.fit(df_city_reviews).transform(df_city_reviews)
df_city_reviews.show()

+--------------------+----------+-----+--------------------+------------+--------+
|         business_id|      date|stars|             user_id|business_idn|user_idn|
+--------------------+----------+-----+--------------------+------------+--------+
|0BJK4_RQnNiiXJcYP...|2011-01-15|    4|2QhYG5gwEReX7qC_E...|      5273.0|  6006.0|
|0BJK4_RQnNiiXJcYP...|2009-04-21|    5|puKCWAzXbY4QfpMBO...|      5273.0|   343.0|
|0BJK4_RQnNiiXJcYP...|2011-08-23|    5|MdjsjgFRXf1g5nIxS...|      5273.0|   284.0|
|0BJK4_RQnNiiXJcYP...|2013-07-05|    2|goyBp2yzRfBRzfRVK...|      5273.0|  8085.0|
|0BJK4_RQnNiiXJcYP...|2010-04-03|    3|X5whw8WcB8dVZGZ5v...|      5273.0|   147.0|
|0BJK4_RQnNiiXJcYP...|2009-10-16|    5|yt5fggA4287LXCN84...|      5273.0| 31452.0|
|0BJK4_RQnNiiXJcYP...|2013-12-20|    5|ltg5Pgd5x7K7zpYtX...|      5273.0|   517.0|
|0BJK4_RQnNiiXJcYP...|2011-01-26|    4|d5n7WXQkyESc_mOgy...|      5273.0|  2434.0|
|0BJK4_RQnNiiXJcYP...|2011-08-28|    5|xgHuIzDbuuSxsn9KB...|      5273.0|  7598.0|
|0BJ

In [16]:
df_city_reviews.cache()

DataFrame[business_id: string, date: string, stars: bigint, user_id: string, business_idn: double, user_idn: double]

### Adding a sequential number to the user's reviews

- Now add a ***sequential number*** to the user's reviews, that is, for each user, order his/her reviews by date (multiple reviews on the same date can be randomly ordered) and number them (hint: check ***window functions***)
- This sequential number will be useful later to perform a time-wise split of the dataset

In [17]:
df_city_reviews = df_city_reviews.orderBy('Date')
df_city_reviews.show()

+--------------------+----------+-----+--------------------+------------+--------+
|         business_id|      date|stars|             user_id|business_idn|user_idn|
+--------------------+----------+-----+--------------------+------------+--------+
|fJETfN5aByPwhhwEA...|2008-01-03|    4|jaE3xXZfU03fpLvRg...|      5543.0| 56376.0|
|2Wff1lKhXu56Rdsf0...|2008-04-13|    5|s7hAGtbGamNEKWzXV...|       611.0| 34183.0|
|JMiaNitMzMbJm6Kh0...|2008-06-09|    5|lhdSQruhLpcnuMzcV...|      5350.0| 55107.0|
|IxDYVpjEeCxZ8HsZn...|2008-06-24|    3|rtbozq32SMKlN2f_c...|      5925.0| 11976.0|
|fD33UMpiCJcWWXmNK...|2008-06-24|    2|rtbozq32SMKlN2f_c...|      3539.0| 11976.0|
|mrwpkN8k5Q-X7J5mG...|2008-07-04|    4|rtbozq32SMKlN2f_c...|      1242.0| 11976.0|
|b-wEkSnjS9VSSymJI...|2008-07-04|    3|rtbozq32SMKlN2f_c...|      3189.0| 11976.0|
|wMiGwn4qPvEN17CaQ...|2008-08-06|    5|MGqAAAAv1bCLybzsA...|      5327.0| 40095.0|
|SCklYa_jhihWAcfIn...|2008-08-06|    5|hGeXKYWkqJgFC1u_S...|      2373.0|  2650.0|
|SAE

In [18]:
from pyspark.sql import Window
from pyspark.sql.functions import row_number,lit,rank

w = Window().partitionBy('user_id').orderBy("date")
df_city_reviews = df_city_reviews.withColumn("num", row_number().over(w))
df_city_reviews.show()

+--------------------+----------+-----+--------------------+------------+--------+---+
|         business_id|      date|stars|             user_id|business_idn|user_idn|num|
+--------------------+----------+-----+--------------------+------------+--------+---+
|478TIlfHXfT3wvww5...|2016-08-17|    3|-4Anvj46CWf57KWI9...|      2844.0| 51622.0|  1|
|MlKNIbEM-JL9WesSd...|2016-03-18|    1|-BUamlG3H-7yqpAl1...|      4218.0| 48514.0|  1|
|exs56JDSWmPWQ3dQO...|2014-09-23|    5|-LR8Z9Cun0VG8Rmju...|      1219.0| 57530.0|  1|
|bo4loBeLwkQqZ7HGr...|2015-05-25|    4|-ZofWirnfYXaa3iCH...|      5740.0| 50584.0|  1|
|ryyptYnhb8n94NnEF...|2015-02-19|    5|-c12zkgP2UdpmO-cu...|      3841.0| 39380.0|  1|
|SNkkuchbVtUzCwyEN...|2016-02-04|    4|-eJct_tyBWSAt_hWE...|      4084.0| 48961.0|  1|
|SWpJG44iX0fw18zYZ...|2013-01-02|    1|-ud2GthMinJlYBn5E...|      4101.0| 47291.0|  1|
|171c2z28JaSYQtWim...|2015-02-21|    1|-xuYnB-HwZIQJVzRX...|      2814.0| 10724.0|  1|
|xfbjRpE3yhRQ9Zw-D...|2015-02-21|    1|-xuY

### Subsetting reviews to keep only users with more than 4 reviews

- Some users had rated only 1 or a few businesses - this would pose as a problem to make recommendations - so you would want to keep only users who had rated more than 4 reviews, for instance
- Find the ***total number of reviews*** for each user and then filter them using this information (hint: again, you can use a ***window function***)

In [24]:
w = Window.partitionBy("user_id")
df_selected = df_city_reviews.select("business_idn", "business_id","date", "stars", "user_id", "user_idn", "num", F.count("user_id").over(w).alias("num_reviews")).dropDuplicates()
df_selected = df_selected.filter(df_selected.num_reviews > 4)
df_selected.show()

+------------+--------------------+----------+-----+--------------------+--------+---+-----------+
|business_idn|         business_id|      date|stars|             user_id|user_idn|num|num_reviews|
+------------+--------------------+----------+-----+--------------------+--------+---+-----------+
|      3241.0|snw9iNNLpFYZeHotW...|2015-10-24|    5|0LUt7xSvYgLpL0zBe...|  6881.0|  1|          7|
|      2943.0|KrwGJBrSE3kN721yx...|2015-10-24|    5|0LUt7xSvYgLpL0zBe...|  6881.0|  2|          7|
|      2290.0|6aOoRA6P_lmUIzFO6...|2015-10-24|    5|0LUt7xSvYgLpL0zBe...|  6881.0|  3|          7|
|        62.0|tIXpNWE6R1yadAThb...|2015-10-24|    5|0LUt7xSvYgLpL0zBe...|  6881.0|  4|          7|
|      3038.0|9N7VAcNFqjHptX3vR...|2015-10-24|    5|0LUt7xSvYgLpL0zBe...|  6881.0|  5|          7|
|      5461.0|O1TvPrgkK2bUo5O5a...|2015-10-24|    5|0LUt7xSvYgLpL0zBe...|  6881.0|  6|          7|
|      5193.0|SjgeuBlgKER9yegpo...|2015-10-24|    5|0LUt7xSvYgLpL0zBe...|  6881.0|  7|          7|
|      533

In [25]:
df_selected.cache()

DataFrame[business_idn: double, business_id: string, date: string, stars: bigint, user_id: string, user_idn: double, num: int, num_reviews: bigint]

### Calculating mean rating by user

- Now you can calculate the mean rating by user and make it into a dictionary where the key is the ***user_id*** (hint: look at ***rdd*** method of DataFrames and ***collectAsMap*** method of RDDs)

In [26]:
df_user_means = df_selected.groupBy("user_id").agg(F.mean('stars'))
rdd_user_means = df_user_means.rdd.map(lambda x : (x[0], x[1]))
dict_user_means = rdd_user_means.collectAsMap()
dict_user_means

{'0LUt7xSvYgLpL0zBezI5QA': 5.0,
 '2Hw061_FYIUKYIILdB_EcQ': 3.4,
 '31nz8urmYS2uogLPWZnx7Q': 3.8,
 '418W7Ufoz3eXSt3A1wsxOg': 4.666666666666667,
 '5YXcnjNQ1PsJK0GznDBH9Q': 3.9523809523809526,
 '6J0mGxAaPuHAPj8ShjOCag': 2.875,
 '86ERj-2NhTAJ2Q8L0DOpyg': 2.8,
 '9mBFC5yp_D3wc5sIeG8smg': 4.083333333333333,
 '9nMkmXkkJvB2oHPs6mXbIA': 3.0,
 'B856VSGCvHAB-mil7JVprA': 3.6666666666666665,
 'BGzavA_ddMr-jGmhArv7fg': 3.595505617977528,
 'B_4OUzr0q1jpOsgi1Wh9vA': 3.9166666666666665,
 'BzT2waPoocFE9Ga9Pwu-sA': 2.8,
 'CY5yCvzJpkgsQUnO-S1Mtw': 4.066666666666666,
 'E65KNtDwqZAhvSmxNWPbMQ': 4.5,
 'FwpdYIL6Y-DjSnkL3cgt2A': 4.6,
 'J-fY-LbrJPzE4pZeF80u5g': 3.25,
 'KC6RQlajTSBY-VrZeme2hA': 3.130434782608696,
 'LR597MAyQFRsSY3C4Lau7g': 4.833333333333333,
 'M7vDDzoPNQDN2FdTcwCq4A': 4.6,
 'NFH6lgwwub14W-sR7m40hA': 3.625,
 'P8WNbVV_iDGkKfBKl0zHmQ': 3.3636363636363638,
 'PxTG5v-gjolwQKviX4gS2g': 4.571428571428571,
 'T4tYddq80sMKSHyYDl1CYw': 3.7142857142857144,
 'T5EbJUbUJfuqHOzeTtK91Q': 3.5454545454545454,
 'VHw5k

### Centering rating by user

- The dictionary containing mean ratings by user can be seen as a ***lookup table*** - what is the appropriate way of dealing with those in Spark?
- Once you have figured this out, define a regular Python function that takes two arguments - ***user_id*** (String) and ***rating*** (String, which you will need to convert to float inside the function) - and returns the result of subtracting the mean rating of the user from the rating parameter
- Using the Python function, define a Spark's User Defined Function (UDF) with a DoubleType return
- Using the UDF, create a column in your DataFrame with the centered ratings

In [27]:
from pyspark.sql.types import DoubleType

lookup_user_means = sc.broadcast(dict_user_means)

def zero_mean(user_id, rating):
    return rating - lookup_user_means.value[user_id]

star_cent = UserDefinedFunction(lambda x, y: zero_mean(x, y), DoubleType())

df_centered = df_selected.withColumn("stars_centered", star_cent("user_id", "stars"))
df_centered.show()

+------------+--------------------+----------+-----+--------------------+--------+---+-----------+-------------------+
|business_idn|         business_id|      date|stars|             user_id|user_idn|num|num_reviews|     stars_centered|
+------------+--------------------+----------+-----+--------------------+--------+---+-----------+-------------------+
|      3241.0|snw9iNNLpFYZeHotW...|2015-10-24|    5|0LUt7xSvYgLpL0zBe...|  6881.0|  1|          7|                0.0|
|      2943.0|KrwGJBrSE3kN721yx...|2015-10-24|    5|0LUt7xSvYgLpL0zBe...|  6881.0|  2|          7|                0.0|
|      2290.0|6aOoRA6P_lmUIzFO6...|2015-10-24|    5|0LUt7xSvYgLpL0zBe...|  6881.0|  3|          7|                0.0|
|        62.0|tIXpNWE6R1yadAThb...|2015-10-24|    5|0LUt7xSvYgLpL0zBe...|  6881.0|  4|          7|                0.0|
|      3038.0|9N7VAcNFqjHptX3vR...|2015-10-24|    5|0LUt7xSvYgLpL0zBe...|  6881.0|  5|          7|                0.0|
|      5461.0|O1TvPrgkK2bUo5O5a...|2015-10-24|  

- Once again, the UDF approach is not the most "Sparkonic" way of handling this - can you perform the same operation using only functions from ***pyspark.sql.functions*** (which was imported earlier as F)?
    - hint: you'll need ***Window functions***

In [28]:
# you can overwrite df_centered

w = Window.partitionBy("user_id")
df_centered = df_selected.withColumn("avg_stars", F.mean("stars").over(w))
df_centered = df_centered.withColumn("stars_centered", F.col("stars") - F.col("avg_stars")).drop("avg_stars")
df_centered.show()

+------------+--------------------+----------+-----+--------------------+--------+---+-----------+-------------------+
|business_idn|         business_id|      date|stars|             user_id|user_idn|num|num_reviews|     stars_centered|
+------------+--------------------+----------+-----+--------------------+--------+---+-----------+-------------------+
|      3241.0|snw9iNNLpFYZeHotW...|2015-10-24|    5|0LUt7xSvYgLpL0zBe...|  6881.0|  1|          7|                0.0|
|      2943.0|KrwGJBrSE3kN721yx...|2015-10-24|    5|0LUt7xSvYgLpL0zBe...|  6881.0|  2|          7|                0.0|
|      2290.0|6aOoRA6P_lmUIzFO6...|2015-10-24|    5|0LUt7xSvYgLpL0zBe...|  6881.0|  3|          7|                0.0|
|        62.0|tIXpNWE6R1yadAThb...|2015-10-24|    5|0LUt7xSvYgLpL0zBe...|  6881.0|  4|          7|                0.0|
|      3038.0|9N7VAcNFqjHptX3vR...|2015-10-24|    5|0LUt7xSvYgLpL0zBe...|  6881.0|  5|          7|                0.0|
|      5461.0|O1TvPrgkK2bUo5O5a...|2015-10-24|  

## Dataset

### Splitting into training and test sets by time

- In recommender systems, it is common practice to do the training/test split timewise, that is, the test set is composed of the latest reviews
- First, filter only those reviews which have a sequential number smaller than the ***total number of reviews***, by user: this is your training set
- Then, filter only those reviews which have a sequential number identical to the ***total number of reviews***, by user: this is your test set
- Now you can see why you had to add a sequential number to the user's reiews - since some users had done all his/her reviews on the same day, you need to disambiguate them to split the dataset. By doing this, you guarantee your test set will have only 1 review for each user.

In [29]:
df_training = df_centered.filter(df_centered.num < df_centered.num_reviews)
df_test = df_centered.filter(df_centered.num == df_centered.num_reviews)

df_training.show()
df_test.show()

+------------+--------------------+----------+-----+--------------------+--------+---+-----------+-------------------+
|business_idn|         business_id|      date|stars|             user_id|user_idn|num|num_reviews|     stars_centered|
+------------+--------------------+----------+-----+--------------------+--------+---+-----------+-------------------+
|      3241.0|snw9iNNLpFYZeHotW...|2015-10-24|    5|0LUt7xSvYgLpL0zBe...|  6881.0|  1|          7|                0.0|
|      2943.0|KrwGJBrSE3kN721yx...|2015-10-24|    5|0LUt7xSvYgLpL0zBe...|  6881.0|  2|          7|                0.0|
|      2290.0|6aOoRA6P_lmUIzFO6...|2015-10-24|    5|0LUt7xSvYgLpL0zBe...|  6881.0|  3|          7|                0.0|
|        62.0|tIXpNWE6R1yadAThb...|2015-10-24|    5|0LUt7xSvYgLpL0zBe...|  6881.0|  4|          7|                0.0|
|      3038.0|9N7VAcNFqjHptX3vR...|2015-10-24|    5|0LUt7xSvYgLpL0zBe...|  6881.0|  5|          7|                0.0|
|      5461.0|O1TvPrgkK2bUo5O5a...|2015-10-24|  

### If using Spark 2.1 (as in the Docker image), you need to filter out "new" businesses in the test set

In [30]:
businesses = df_training.select('business_id').distinct()
df_test = df_test.join(businesses, on='business_id')

## Alternate Least Squares (ALS) Model

- This is the recommender itself - the ALS uses a iterative approach to find the underlying factors that yield the user/item rating matrix
- It takes as input a DataFrame with three columns, representing:
    - userCol: user IDs (numeric - remember the conversion you did)
    - itemCol: item IDs (numeric - remember the conversion you did)
    - ratingCol: rating (numeric, obviously)
    - coldStartStrategy: "drop" (if there is unseen data on the test set, meaning a new user/business, drop it) - ***only available from Spark 2.2 on***
- Its parameters are:
    - rank: the number of factors to consider
    - maxIter: the maximum number of iterations to perform
    - regParam: the regularization parameter
- Use Spark's ALS to fit a model based on your DataFrame

In [42]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

df_training_ = df_training.select(F.col("user_idn").alias("userCol"), F.col("business_idn").alias("itemCol"), F.col("stars_centered").alias("ratingCol"))
df_training_.show()

df_test_ = df_test.select(F.col("user_idn").alias("userCol"), F.col("business_idn").alias("itemCol"), F.col("stars_centered").alias("ratingCol"))
df_test_.show()

rank = 25
alpha = 1.0
numIterations = 10
als = ALS(rank=rank, maxIter=numIterations, alpha=alpha, userCol="userCol", itemCol="itemCol", ratingCol="ratingCol")
als.setRegParam(0.01)
model = als.fit(df_training_)

+-------+-------+-------------------+
|userCol|itemCol|          ratingCol|
+-------+-------+-------------------+
| 6881.0| 3241.0|                0.0|
| 6881.0| 2943.0|                0.0|
| 6881.0| 2290.0|                0.0|
| 6881.0|   62.0|                0.0|
| 6881.0| 3038.0|                0.0|
| 6881.0| 5461.0|                0.0|
| 9872.0| 5338.0|-0.3999999999999999|
| 9872.0| 3295.0|                1.6|
| 9872.0| 1853.0|               -2.4|
| 9872.0| 3826.0| 0.6000000000000001|
|10043.0| 2392.0|-0.7999999999999998|
|10043.0| 5386.0|0.20000000000000018|
|10043.0| 1941.0|0.20000000000000018|
|10043.0| 5944.0|0.20000000000000018|
| 7186.0| 1046.0|0.33333333333333304|
| 7186.0| 4221.0|0.33333333333333304|
| 7186.0|  553.0| -0.666666666666667|
| 7186.0| 2186.0|0.33333333333333304|
| 7186.0| 4191.0|0.33333333333333304|
| 1762.0|  575.0| 1.0476190476190474|
+-------+-------+-------------------+
only showing top 20 rows

+-------+-------+--------------------+
|userCol|itemCol|      

### Predictions for the training set

- Once the model is trained, make predictions for the training set and use a ***RegressionEvaluator*** to find out the RMSE of the predictions

In [47]:
predictions = model.transform(df_training_)

evaluator = RegressionEvaluator(metricName= "rmse", labelCol = "ratingCol", predictionCol="prediction")

train_rmse = evaluator.evaluate(predictions)

print(train_rmse)
predictions.show()

0.14540615041580532
+-------+-------+--------------------+------------+
|userCol|itemCol|           ratingCol|  prediction|
+-------+-------+--------------------+------------+
| 4487.0|  148.0|                 1.4|   1.3849335|
| 1971.0|  148.0|               -2.85|   -2.786324|
|   72.0|  148.0| -0.4962406015037595|   -0.477958|
| 2366.0|  463.0| -2.6470588235294117|  -2.5478444|
|   53.0|  463.0|  0.2810457516339868| -0.10843566|
|   78.0|  463.0| 0.32061068702290063|   0.6005223|
|  101.0|  463.0|  0.2583333333333333|   0.3644285|
|  115.0|  463.0|  1.5535714285714284|   1.5400193|
|   76.0|  463.0| 0.33587786259541996|    0.577274|
|  908.0|  463.0|  0.6944444444444446|  0.74302477|
|   22.0|  463.0|  -0.291079812206573| -0.31641603|
| 1207.0|  463.0| 0.13793103448275845| 0.024906963|
|  259.0|  463.0|  0.8974358974358974|  0.82274526|
|    1.0|  463.0|-0.44110275689223055|-7.522106E-5|
|   16.0|  463.0|  1.4708333333333332|   1.0596706|
|    3.0|  463.0| 0.44413407821229045|   0.5

### Predictions for the test set

- Now, make predictions for the test set and use a ***RegressionEvaluator*** to find out the RMSE of the predictions

In [48]:
predictions = model.transform(df_test_)

evaluator = RegressionEvaluator(metricName= "rmse", labelCol = "ratingCol", predictionCol="prediction")

test_rmse = evaluator.evaluate(predictions)

print(test_rmse)
predictions.show()

1.2716057810487713
+-------+-------+-------------------+------------+
|userCol|itemCol|          ratingCol|  prediction|
+-------+-------+-------------------+------------+
| 3235.0|  471.0|-1.6153846153846154|  -0.6224477|
|  729.0|  471.0|-0.9523809523809526|  0.50990003|
| 1490.0|  471.0|  0.666666666666667|   0.9149472|
| 7296.0|  471.0|                1.5|  -0.4103583|
| 5443.0|  471.0|               0.25| -0.23649636|
| 2540.0|  471.0|                0.5| -0.68654466|
| 2731.0|  471.0|  0.666666666666667|   -0.707319|
|  670.0|  471.0| 0.8888888888888888|   1.1561122|
| 3990.0| 1342.0| 1.1818181818181817|  0.22711542|
|10458.0| 1342.0|               -2.8|  -0.4874729|
| 3938.0| 1342.0| 1.8181818181818183| -0.27951562|
| 7358.0| 1342.0|0.33333333333333304|-0.096849255|
|10126.0| 1342.0| 0.3999999999999999| -0.19696356|
| 9244.0| 1342.0| 1.2000000000000002|   0.3193489|
| 7790.0| 1342.0| 0.3333333333333335|  0.25785917|
|10437.0| 1342.0| 0.5999999999999996| -0.75702894|
| 5796.0| 15

## Recommendations

Now, your model is trained, but how can you use it to make recommendations for a given user?

### Organizing business data

- It would not make sense to recommend a place the user has already rated, right? So, generate a dictionary where ***user_idn*** is the key and a list of the already rated ***business_idn*** is the value (hint: when aggregating DataFrames, ***collect_list*** is a VERY useful function to turn multiple records into a list)

In [55]:
from pyspark.sql.functions import collect_list

df_visitied_by_user = df_centered.groupby("user_idn").agg(F.collect_list("business_idn"))
rdd_visitied_by_user = df_visitied_by_user.rdd.map(lambda x : (x[0], x[1]))
dict_visited_by_user = rdd_visitied_by_user.collectAsMap()
dict_visited_by_user

{7782.0: [5640.0, 4960.0, 115.0, 4525.0, 3800.0, 4326.0],
 1761.0: [4709.0,
  244.0,
  2949.0,
  3072.0,
  1839.0,
  5193.0,
  583.0,
  5412.0,
  4575.0,
  384.0,
  521.0,
  474.0,
  4022.0,
  1758.0,
  4670.0,
  851.0,
  4921.0,
  1262.0,
  5271.0,
  1595.0,
  2609.0],
 7115.0: [6103.0, 4602.0, 6228.0, 2482.0, 5664.0, 4739.0, 5660.0],
 1051.0: [2873.0,
  1975.0,
  3590.0,
  2949.0,
  3302.0,
  3784.0,
  4996.0,
  3365.0,
  6122.0,
  3313.0,
  1429.0,
  3166.0,
  5376.0,
  432.0,
  499.0,
  4261.0,
  1425.0,
  794.0,
  3585.0,
  5320.0,
  5005.0,
  4720.0,
  583.0,
  1462.0,
  2758.0,
  4791.0,
  1865.0,
  5252.0,
  5518.0,
  557.0,
  3557.0,
  4960.0],
 2734.0: [1460.0,
  17.0,
  4252.0,
  4390.0,
  783.0,
  321.0,
  1057.0,
  159.0,
  5039.0,
  5280.0,
  5844.0,
  249.0,
  122.0,
  2620.0,
  2785.0],
 8779.0: [168.0, 5054.0, 5198.0, 4095.0, 3955.0],
 3980.0: [6201.0,
  2025.0,
  4567.0,
  2507.0,
  1152.0,
  5965.0,
  5451.0,
  1398.0,
  6111.0,
  344.0,
  2600.0],
 305.0: [3061.0,
 

- Besides, recommending a given business_id also does not help much, right? So you need to organize the business data in a way it can be shown to the user.
    - Define a regular Python function that takes one argument ***row*** (Row type) and returns a dictionary where ***business_idn*** is the key and the value is yet another dictionary with relevant fields (for instance: name, address, stars, categories)
    - Transform your business DataFrame into an RDD and apply the function you defined - upon collecting, you will end up with a list of dictionaries
    - Transform this list of dictionaries into a single dictionary

In [66]:
def rest_to_json(row):
    return {row["business_idn"]: {"name": row["name"], "address": row["address"]}}

rest = df_city_restaurants.rdd.map(rest_to_json)
rest = rest.collect()

dict_rest = {k: v for d in rest for k, v in d.items()}

In [67]:
dict_rest

{5733.0: {'name': 'Pizza Pizza', 'address': '979 Bloor Street W'},
 745.0: {'name': 'Chipotle Mexican Grill',
  'address': '123 Front St, Unit 103 and 103-A'},
 4897.0: {'name': 'Z-Teca', 'address': '2014 Queen Street E'},
 1465.0: {'name': 'Honey B Hives Restaurant', 'address': '2816 Markham Road'},
 406.0: {'name': 'Ichiban Sushi', 'address': '250 Queens Quay W'},
 3724.0: {'name': 'Kokoya', 'address': '5515 Yonge St'},
 261.0: {'name': 'Yummy Bar-B-Q', 'address': '561 Bloor Street W'},
 3604.0: {'name': 'Kayagum', 'address': '5460 Yonge Street'},
 3485.0: {'name': 'New Spiceland Restaurant', 'address': '6065 Steeles Ave E'},
 5011.0: {'name': 'Brooklynn', 'address': '1186 Queen Street W'},
 1311.0: {'name': 'Pho 88 Viet Thai Cuisine',
  'address': 'Milliken Wellss Shopping Plaza, 250 Alton Tower Circle, Suite C6'},
 6018.0: {'name': 'Ka Chi',
  'address': 'Kensington Market, 8 Saint Andrews Street'},
 248.0: {'name': 'India 360', 'address': '2761 Markham Road'},
 1290.0: {'name': 'S

### Making recommendations for a user

- To actually make the recommendations, we need to build an input DataFrame to feed the model
    - A DataFrame can be created using the SQL Context and a list of Rows, each containg two columns: user_idn and business_idn - the rating will be computed by the model
    - But you only need to have rows for the businesses which were not yet rated by the user - from all businesses, exclude the ones already rated by him/her

In [None]:
from pyspark.sql import Row
from pyspark.sql.functions import desc

user_idn = 317
n_business = len(dict_rest)

visited = ...
not_visited = ...

df_test_user = ...

- Now, you can use the generated DataFrame to make predictions
    - If there are any NA predictions, make sure to turn them into a really bad value (for instance, -5.0) (hint: remember ***na*** method of DataFrames)
- Order the predictions and take the ***business_idn*** of the top 5
- Finally, use this information to fetch the business data from the dictionary you assembled a couple of steps ago

In [None]:
predictions = ...

top_predictions = ...

response = list(map(lambda idn: dict_rest[idn], top_predictions))

In [None]:
response

## Congratulations, you finished the exercise!

In [None]:
sc.stop()