## 9. Alternating Least Square (ALS) with Spark ML for recommendation system

#### Download the data

In [3]:
# create spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() 
sc = spark.sparkContext

# read json files with Spark
#reference: https://spark.apache.org/docs/latest/sql-data-sources-json.html
businesspath = "yelp_academic_dataset_business.json"
businessdf = spark.read.json(businesspath)
userpath="yelp_academic_dataset_user.json"
userdf = spark.read.json(userpath)
reviewpath="yelp_academic_dataset_review.json"
reviewdf=spark.read.json(reviewpath)

### Step1: Combine 3 data files & Reduce the dimension of business-user-stars matrix
##### The goal of improving the ALS performance is to make the matrix less sparse 
      The steps of downsample:
      (1)Choose the business which have reviews > 30
      (2)choose the user whose giving reviews > 10
      (3)choose the city which have the most reviews (Las Vegas)




In [4]:
from pyspark.sql import functions as fn

businessdf = businessdf.select('business_id','review_count','state','city').\
                    where(fn.col('review_count')>=30)
userdf = userdf.select('user_id','review_count').where(fn.col('review_count')>=10)

Join_df = businessdf.select('business_id','city','state').join(reviewdf, on='business_id')

Join_df = userdf.select('user_id').join(Join_df, on='user_id')\
                        .select('business_id','user_id','stars','city','state')

In [5]:
Join_df.groupBy(fn.col("city")).agg(fn.count("city").alias('city_count')).orderBy("City_Count",ascending=False).show(1)

+---------+----------+
|     city|city_count|
+---------+----------+
|Las Vegas|   1651915|
+---------+----------+
only showing top 1 row



In [6]:
Join_df = Join_df.select('business_id','user_id','stars','city').\
                  where(fn.col('city')=='Las Vegas').\
                  select('business_id','user_id','stars','city')
Join_df.show(5)

+--------------------+--------------------+-----+---------+
|         business_id|             user_id|stars|     city|
+--------------------+--------------------+-----+---------+
|RhEvP5flF6KoPriMH...|-3bsS2i9xqjNnIA1f...|  4.0|Las Vegas|
|q3dJQtwZQrrurNT-1...|-3bsS2i9xqjNnIA1f...|  1.0|Las Vegas|
|DYuOxkW4DtlJsTHdx...|-3i9bhfvrM3F1wsC9...|  3.0|Las Vegas|
|939j88ceB05Te3D7k...|-3i9bhfvrM3F1wsC9...|  4.0|Las Vegas|
|BLIJ-p5wYuAhw6Pp6...|-3i9bhfvrM3F1wsC9...|  3.0|Las Vegas|
+--------------------+--------------------+-----+---------+
only showing top 5 rows



##### Check the matrix‘s sparsity after adding three conditions: 99.96%

In [7]:
%%time
def get_mat_sparsity(df):
    # count the total number of ratings in the dataset
    count_nonzero = df.select("stars").count()
    
    # count the number of distinct user_id and distinct business_id
    total_elements = df.select("user_id").distinct().count()*df.select("business_id").distinct().count()
    
    # Divide the numerator by the denominator
    sparsity = (1.0-(count_nonzero*1.0)/total_elements)*100
    print("The ratings matrix is ", "%.2f" % sparsity + "% sparse.")
    
get_mat_sparsity(Join_df)

The ratings matrix is  99.96% sparse.
CPU times: user 16.1 ms, sys: 10.8 ms, total: 27 ms
Wall time: 1min 38s


### Step2 Build ALS model
#### 1. Split train and test
     The ratio of training and test data is 8:2
#### 2. ALS model require the id to be integer.
     Use window to give each business and uers id one unique integer
#### 3.  Model paramter Explaination： 
     (1)implicitPrefs=False: Because this is an explicit problem without any other potential information when building model. 
     (2)maxIter: Max iterations which tell spark how many times to alternate between U and P when minimizing the error
     (3)regParam:specifies the regularization parameter in ALS, to prevent ALS from overfitting to the data
     (4)coldStartStrategy='drop',nonnegative = True: avoid return NaN and negative predictions.
#### 4. Add param_grid and cross validation
     Fine-tuning for the best performance

In [8]:
%%time
ALS_df=Join_df

CPU times: user 4 µs, sys: 3 µs, total: 7 µs
Wall time: 13.4 µs


#### 1. Split train and test

In [9]:
%%time
from pyspark.sql import functions as fn
from pyspark.sql import window

w = window.Window().orderBy(fn.lit('A'))
user_id_idx = ALS_df.select('user_id').distinct().withColumn('user_idx', fn.row_number().over(w))
business_id_idx = ALS_df.select('business_id').distinct().withColumn('business_idx', fn.row_number().over(w))

ALS_df = ALS_df.join(user_id_idx, on='user_id') \
    .join(business_id_idx, on='business_id') 

CPU times: user 7.2 ms, sys: 0 ns, total: 7.2 ms
Wall time: 207 ms


#### 2. ALS model require the id to be integer.
     Give each user_id and business_id one unique integer to replace object type of data

In [10]:
user_id_idx.show(5)

+--------------------+--------+
|             user_id|user_idx|
+--------------------+--------+
|-3bsS2i9xqjNnIA1f...|       1|
|-3i9bhfvrM3F1wsC9...|       2|
|-47g7LR58tpHlm7Bm...|       3|
|-4xyc3OgPwrLshmqH...|       4|
|-8_yETBp70WiqqN-A...|       5|
+--------------------+--------+
only showing top 5 rows



In [11]:
business_id_idx.show(5)

+--------------------+------------+
|         business_id|business_idx|
+--------------------+------------+
|--9e1ONYQuAa-CB_R...|           1|
|35X1ZV9tSEqB__yJE...|           2|
|_ixV2SWDy7w8jzEAH...|           3|
|oeRLD870Z76FD1OYW...|           4|
|OGQ_6nIn4QQL2U6t0...|           5|
+--------------------+------------+
only showing top 5 rows



In [12]:
ALS_df.show()

+--------------------+--------------------+-----+---------+--------+------------+
|         business_id|             user_id|stars|     city|user_idx|business_idx|
+--------------------+--------------------+-----+---------+--------+------------+
|--9e1ONYQuAa-CB_R...|0y8ORuC2X1i1UF6SG...|  5.0|Las Vegas|      56|           1|
|--9e1ONYQuAa-CB_R...|3qz_dfwbFwTQeDRzy...|  5.0|Las Vegas|     156|           1|
|--9e1ONYQuAa-CB_R...|9spixZHaqC1JeN1ld...|  2.0|Las Vegas|     320|           1|
|--9e1ONYQuAa-CB_R...|A4GnBOU7ZCTcoQK4e...|  5.0|Las Vegas|     330|           1|
|--9e1ONYQuAa-CB_R...|FtUDjNLhVjlIoeFKm...|  4.0|Las Vegas|     487|           1|
|--9e1ONYQuAa-CB_R...|H0tfWQsGjEBuhXD4W...|  5.0|Las Vegas|     519|           1|
|--9e1ONYQuAa-CB_R...|R0KVWeN9xR-F6j4z5...|  4.0|Las Vegas|     819|           1|
|--9e1ONYQuAa-CB_R...|XZaCs-Gs0SXdZgfG3...|  4.0|Las Vegas|     968|           1|
|--9e1ONYQuAa-CB_R...|n9DJHwgYflQ_ms8gB...|  3.0|Las Vegas|    1415|           1|
|--9e1ONYQuAa-CB

##### ALS model only need three inputs: user_id/business_id/ratings, so we select the columns needed as ALS_df1

In [13]:
%%time
ALS_df1 = ALS_df.select(fn.col('user_idx'),fn.col('business_idx'),'stars')
ALS_df1.show(5)

+--------+------------+-----+
|user_idx|business_idx|stars|
+--------+------------+-----+
|      56|           1|  5.0|
|     156|           1|  5.0|
|     320|           1|  2.0|
|     330|           1|  5.0|
|     487|           1|  4.0|
+--------+------------+-----+
only showing top 5 rows

CPU times: user 17.5 ms, sys: 5.07 ms, total: 22.6 ms
Wall time: 1min 25s


In [14]:
ALS_df1.printSchema()

root
 |-- user_idx: integer (nullable = true)
 |-- business_idx: integer (nullable = true)
 |-- stars: double (nullable = true)



#### 3.  Build Model

In [15]:
%%time
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

# set the value of seed
(training, test) = ALS_df1.randomSplit([0.8,0.2],seed=2021)

# Build the recommendation model using ALS on the training data
als = ALS(maxIter = 10, regParam=0.01,implicitPrefs=False,
          userCol='user_idx',itemCol='business_idx',ratingCol='stars',
          coldStartStrategy='drop',nonnegative = True)
#(1) maxIter is the maximum number of iterations to run (defaults to 10): 
#.   alternating fix the matrix U(user_matrix) and P(business_matrix) to minimzie the the least-squares/minimize the error
#(2) regParam specifies the regularization parameter in ALS (defaults to 1.0)
#(3)[coldStartStrategy='drop' and nonnegative = True] to avoid return NaN and negative predictions.

# # Fit ALS model to training data
predicted_model = als.fit(training)
# Evaluate the model by computing the RMSE on the test data
predictions = predicted_model.transform(test)
evaluator = RegressionEvaluator(metricName='rmse',labelCol='stars',
                               predictionCol='prediction')
rmse = evaluator.evaluate(predictions)
print("Root_Mean_Square error = " + str(rmse))

Root_Mean_Square error = 1.732490325709468
CPU times: user 55.6 ms, sys: 41.1 ms, total: 96.6 ms
Wall time: 3min 35s


#### 4. Add param_grid and cross validation

In [12]:
%%time

param_grid = ParamGridBuilder().addGrid(als.rank,[30,35,40])\
             .addGrid(als.maxIter,[10,15,20])\
             .addGrid(als.regParam,[0.01,0.05,0.1])\
             .build()
# Tuning parameters:
#(1)rank: the rank of the U and P matrices
#(2)Max iterations which tell spark how many times to alternate between U and P when minimizing the error
#(3)regularization parameter: to prevent ALS from overfitting to the data
# Define evaluator as RMSE
evaluator = RegressionEvaluator(metricName='rmse',labelCol='stars',
                               predictionCol='prediction')

# Build cross validation using TrainValidationSplit
tvs = TrainValidationSplit(
    estimator = als,
    estimatorParamMaps=param_grid,
    evaluator=evaluator       
)

# Fit ALS model to training data
model = tvs.fit(training)

# Extract best model from the tuning exercise using ParamGridBuilder
best_model = model.bestModel

# Generate preditions and evaluate using RMSE
predictions2 = best_model.transform(test)
rmse = evaluator.evaluate(predictions2)

# Print evaluation metrics 
print("RMSE = " + str(rmse))
print("***Best Model***")
print("   Rank:",best_model.rank)
print("   maxIter:",best_model._java_obj.parent().getMaxIter())
print("   regParam:",best_model._java_obj.parent().getRegParam())

RMSE = 1.3926480352111519
***Best Model***
   Rank: 40
   maxIter: 20
   regParam: 0.1
CPU times: user 1.98 s, sys: 1.83 s, total: 3.81 s
Wall time: 52min 8s


### Step 3: Results
#### 1. Predictions: 
     we could clearly see the comparation between real star and predicted star, although the rmse is high, it still could reflect users' preference to some degree.
#### 2. Recommendations
     Give each user 10 recommendations based on ranked predicted ratings.
#### 3. Take usr_id= 0y8ORuC2X1i1UF6SG1hlkQ as an example
     Because the original format of business_id and user_id are object not integer, so the step that we transform id between object type and interger type is required.

#### 1. Predictions

In [13]:
predictions_all = best_model.transform(ALS_df1)

In [14]:
predictions.sort('user_idx','prediction').show(5)

+--------+------------+-----+----------+
|user_idx|business_idx|stars|prediction|
+--------+------------+-----+----------+
|       2|        4599|  4.0| 2.2479887|
|       2|        2454|  4.0| 2.8458648|
|       2|        4340|  4.0| 3.6926274|
|       2|        8490|  4.0|  4.271734|
|       6|        6320|  3.0| 2.6394508|
+--------+------------+-----+----------+
only showing top 5 rows



#### 2. Recommendations

In [15]:
%%time
# Generate top 10 item recommendations for each user
recommendations = best_model.recommendForAllUsers(10)
recommendations.show(5)

+--------+--------------------+
|user_idx|     recommendations|
+--------+--------------------+
|     463|[[8947, 5.754446]...|
|     471|[[1858, 4.326974]...|
|     496|[[901, 3.9129672]...|
|     833|[[8763, 5.55471],...|
|    1088|[[6931, 5.4667664...|
+--------+--------------------+
only showing top 5 rows

CPU times: user 22 ms, sys: 8.76 ms, total: 30.8 ms
Wall time: 1min 58s


In [16]:
# ’rating‘ is the inner name in ALS Model, representing predicted rates not named by us.
recommendations.select("recommendations.business_idx","recommendations.rating")

DataFrame[business_idx: array<int>, rating: array<float>]

In [17]:
%%time
from pyspark.sql.functions import explode,col

recs = recommendations.withColumn("rec_exp",explode("recommendations"))\
           .select("user_idx",col("rec_exp.business_idx"),col("rec_exp.rating"))

CPU times: user 3.51 ms, sys: 3.1 ms, total: 6.62 ms
Wall time: 37.4 ms


In [18]:
recs.limit(11).show()

+--------+------------+---------+
|user_idx|business_idx|   rating|
+--------+------------+---------+
|     463|        8947| 5.754446|
|     463|        8763|5.6977024|
|     463|        7109| 5.681104|
|     463|        8441| 5.653909|
|     463|        8517|5.6490293|
|     463|        1153|5.6353283|
|     463|        2465| 5.630769|
|     463|        9104| 5.629904|
|     463|        2311|5.6221247|
|     463|        1815|5.6167316|
|     471|        1858| 4.326974|
+--------+------------+---------+



#### 3. Take usr_id= 0y8ORuC2X1i1UF6SG1hlkQ as an example

In [27]:
idx = int(ALS_df.where(fn.col('user_id')== "0y8ORuC2X1i1UF6SG1hlkQ")\
            .toPandas()["user_idx"][:1])
idx

56

In [29]:
# get the list of 10 recommendations business and their ratings
business_idx = recs.where(fn.col('user_idx')==idx).select("business_idx","rating")    
business_idx.show()

+------------+---------+
|business_idx|   rating|
+------------+---------+
|        8763|4.9332237|
|        1748| 4.877681|
|        2885| 4.877142|
|        7494|4.8453774|
|        3890|4.8261304|
|           1| 4.785572|
|        6492| 4.616911|
|        4658|4.6011395|
|        2567| 4.600248|
|        7687|4.5992465|
+------------+---------+



In [36]:
business_resc = business_idx.join(business_id_idx,on="business_idx")
business_resc.show()

+------------+---------+--------------------+
|business_idx|   rating|         business_id|
+------------+---------+--------------------+
|           1| 4.785572|--9e1ONYQuAa-CB_R...|
|        3890|4.8261304|a9ncRGtg3iWBpQmhp...|
|        2885| 4.877142|80J9QfTbDYBk88rhd...|
|        1748| 4.877681|-CQokjildrY7UZezX...|
|        4658|4.6011395|oQvw4iamSyIm_Oi0o...|
|        2567| 4.600248|cV95g6PN0RcTy0qDv...|
|        7494|4.8453774|R6CvLpMoDEGQ7QApf...|
|        7687|4.5992465|PmTirgprG7Lp9SH79...|
|        6492| 4.616911|BBbYxS_CC0R8xyGi6...|
|        8763|4.9332237|TBSuWtC0EyH2iDbq5...|
+------------+---------+--------------------+



#### This result shows the most recommended from user_id = 0y8ORuC2X1i1UF6SG1hlkQ

In [38]:
%%time
# join the businessdf to return some extra information except business_id
## Attention: I didn't select name and category and the business_df just have review_count/state/city
## It will spend a long time to run them from the begining so I quit.


resc_list = business_resc.join(businessdf,on = "business_id")
resc_list.show(1)

# This result shows the most recommended from user_id = 0y8ORuC2X1i1UF6SG1hlkQ

+--------------------+------------+--------+------------+-----+---------+
|         business_id|business_idx|  rating|review_count|state|     city|
+--------------------+------------+--------+------------+-----+---------+
|--9e1ONYQuAa-CB_R...|           1|4.785572|        1759|   NV|Las Vegas|
+--------------------+------------+--------+------------+-----+---------+
only showing top 1 row

CPU times: user 18.6 ms, sys: 24.9 ms, total: 43.5 ms
Wall time: 2min 39s


### Conclusion for ALS :
#### 1. This is an explicit problem:
      Explicit problem:using explicit rating(such as rating a business from one to five stars), easier to predict but put the responsibility of data collection on the user;
      Implicit problem:suitable to collect in large quantities without any extra effort on the part of the user, more difficult to work with.
#### 2. Sparse Matrix
     Although ALS is great to deal with sparse matrix, the more data, the better performance. In our model, we select the same city, select business and user whose number ratings is bigger than a specific number.
#### 3. Limitation
    (1）Cannot recommend to new users before they don't give any ratings
    (2) Due to the Yelp data from official website, the categories are not clealy, this model cannot recommend slimilar business.



