# Import Libraries

In [19]:

from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.types import StructType,StructField,IntegerType
from pyspark.sql.functions import stddev,mean,avg,lit

# Sample data file paths

In [20]:
# file paths
business_file="Sample_Datasets/montreal_business/part-00000-b5f251d0-79e6-47a8-a405-042eb7b7894e-c000.snappy.parquet"
reviews_file="Sample_Datasets/montreal_reviews/part-00000-f0e4463e-0ac9-402e-b995-734cbefc958e-c000.snappy.parquet"
users_file="Sample_Datasets/montreal_users/part-00000-a7d49d78-89a7-478f-a577-0efe02dca047-c000.snappy.parquet"


# Initialize spark session

In [21]:
app_name="Collaborative filtering for restaurant recommendation"

def init_spark():
    spark = SparkSession \
        .builder \
        .appName(app_name) \
        .getOrCreate()
    return spark


# Load Dataset in Apache Spark

In [22]:
spark=init_spark()
business_df = spark.read.parquet(business_file)
reviews_df=spark.read.parquet(reviews_file)
users_df=spark.read.parquet(users_file)

## Selecting required features

In our project, we are only concerned with a subset of columns from the dataset, specifically those that are relevant to our goal of recommending restaurants in Montreal. Therefore, we extract the necessary features from the business_df table, including the id, name, stars, category. 
Similarly, we filter the reviews_df table to include only reviews for the selected restaurants by performing an inner join with business_df.

In [23]:
business_df = business_df.select("business_id","name", "stars", 
                                 "review_count", "address", "city", "state", "postal_code", "longitude", 
                                 "categories", "latitude").withColumnRenamed("stars", "stars_restaurant")
reviews_df = reviews_df.join(business_df, on='business_id', how='inner')

## Preparing Data for ALS: Convert String to index
Prior to initiating the modeling process, it is essential to transform all the relevant columns to integer type for compatibility with the ALS model from pyspark. The columns requiring conversion are the business_id and user_id. We accomplish this by leveraging the StringIndexer function, which we imported from pyspark.ml.feature.

In [25]:
indexer = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in ['business_id', 'user_id']]
pipeline = Pipeline(stages=indexer)
transformed = pipeline.fit(reviews_df).transform(reviews_df)
transformed.select(['business_id', 'user_id','business_id_index', 'user_id_index','stars','categories'])

Py4JJavaError: An error occurred while calling o736.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 111.0 failed 1 times, most recent failure: Lost task 0.0 in stage 111.0 (TID 454) (192.168.0.134 executor driver): TaskResultLost (result lost from block manager)
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2293)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:424)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:345)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:373)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:345)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3868)
	at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3120)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
	at org.apache.spark.sql.Dataset.collect(Dataset.scala:3120)
	at org.apache.spark.ml.feature.StringIndexer.countByValue(StringIndexer.scala:204)
	at org.apache.spark.ml.feature.StringIndexer.sortByFreq(StringIndexer.scala:212)
	at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:242)
	at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:145)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)


## Spliting the dataset into training and testing subsets 

Setting a seed value as 3 to make randomsplit output deterministic

In [7]:
(training, test) = transformed.randomSplit([0.8, 0.2],3)

In [8]:
# Compute the global mean
average = training.select('user_id','stars').withColumn("user_id",lit(1)).groupBy('user_id').mean()
global_average = average.select('avg(stars)').collect()[0][0]

print("Global Average:",str(global_average))
global_mean = training.agg(avg('stars')).collect()[0][0]
print("Global Average:",str(global_mean))


Global Average: 3.8922836512555232


# Prediction using global average for comparison

In [9]:
#Evaluate the root mean squared error if we use the global average as a prediction for all rating, Our model should perform better than this


test_avg = test.withColumn('prediction',lit(global_mean))
    
evaluator = RegressionEvaluator(metricName="rmse", labelCol="stars",predictionCol="prediction")
rmse = evaluator.evaluate(test_avg)

print("Root-mean-square error = " + str(rmse))


Root-mean-square error = 1.2260849010325352


## Basic ALS model
The Apache Spark library provides various parameters for the ALS (Alternating Least Squares) algorithm, including:

- rank: the number of latent factors used in the model (default value: 10).
- maxIter: the maximum number of iterations to run (default value: 10).
- regParam: the regularization parameter used in ALS (default value: 1.0).
- implicitPrefs: a boolean value that indicates whether to use the explicit feedback ALS variant or the one adapted for implicit feedback data (default value: false, which means using explicit feedback).
- alpha: a parameter that applies to the implicit feedback variant of ALS, determining the baseline confidence in preference observations (default value: 1.0).
- nonnegative: a boolean value that specifies whether to use nonnegative constraints for least squares (default value: false).

In [10]:

#ALS model with default values
als=ALS(userCol="user_id_index",itemCol="business_id_index",ratingCol="stars",coldStartStrategy="drop",nonnegative=True)
als.setSeed(2)

ALS_262e2c7ef4d1

In [11]:
#train the default model
model=als.fit(training)

In [12]:
# predict the stars for test set 
predictions=model.transform(test)

In [13]:
#evaludate default model
evaluator=RegressionEvaluator(metricName="rmse",labelCol="stars",predictionCol="prediction")
rmse=evaluator.evaluate(predictions)
print("RMSE="+str(rmse))

RMSE=1.3534108396501865


## Tuning Hyper Parameters

In [14]:
ranks=[5,30,50,100]
for rank in ranks:
    als = ALS(rank=rank,userCol="user_id_index",itemCol="business_id_index",ratingCol="stars",coldStartStrategy="drop",nonnegative=True)
    als.setSeed(2)
    model = als.fit(training)
    evaluator=RegressionEvaluator(metricName="rmse",labelCol="stars",predictionCol="prediction")
    predictions=model.transform(test)
    rmse=evaluator.evaluate(predictions)
    print("RMSE with latent factor "+str(rank) +" is="+str(rmse))
    

RMSE with latent factor 5 is=1.3524617852548415
RMSE with latent factor 30 is=1.3238220621391332
RMSE with latent factor 50 is=1.3182668564134439
RMSE with latent factor 100 is=1.3092697928682182


In [15]:
epcohs=[10,15,20]
for epcoh in epcohs:
    als = ALS(rank=100,maxIter=epcoh,userCol="user_id_index",itemCol="business_id_index",ratingCol="stars",coldStartStrategy="drop",nonnegative=True)
    als.setSeed(2)
    model = als.fit(training)
    evaluator=RegressionEvaluator(metricName="rmse",labelCol="stars",predictionCol="prediction")
    predictions=model.transform(test)
    rmse=evaluator.evaluate(predictions)
    print("RMSE with latent factor 100 and maxIter "+str(epcoh) +" is="+str(rmse))
    

RMSE with latent factor 100 and maxIter 10 is=1.309269792868218
RMSE with latent factor 100 and maxIter 15 is=1.2994357989444387
RMSE with latent factor 100 and maxIter 20 is=1.2990339815403404


ALS Model With Bias

In [16]:
# Compute user mean and item mean on the training set
user_mean = training.groupBy('user_id_index').agg(avg('stars').alias('user_mean'))
item_mean = training.groupBy('business_id_index').agg(avg('stars').alias('item_mean'))
#remove bias from training set
interactions = training.join(user_mean, 'user_id_index').join(item_mean, 'business_id_index')
interactions = interactions.withColumn('user_item_interaction', col('stars') - col('user_mean') - col('item_mean') + global_mean)
interactions.select('stars','user_mean','item_mean','user_item_interaction').show(10)

+-----+------------------+------------------+---------------------+
|stars|         user_mean|         item_mean|user_item_interaction|
+-----+------------------+------------------+---------------------+
|  4.0| 3.652173913043478|3.6923076923076925|   0.5478020459043527|
|  5.0|               4.0| 4.415384615384616|  0.47689903587090754|
|  4.0|3.6666666666666665| 4.068493150684931|  0.15712383390392537|
|  2.0|3.3684210526315788|               3.5|  -0.9761374013760555|
|  3.0|               3.0| 3.933333333333333|  -0.0410496820778099|
|  1.0|               1.0|               4.0| -0.10771634874447678|
|  5.0|               4.0|               4.7|  0.19228365125552305|
|  5.0| 3.652173913043478|3.5555555555555554|   1.6845541826564898|
|  1.0|               1.0| 3.604651162790698|  0.28763248846482536|
|  4.0|               4.0|   4.0817843866171| -0.18950073536157674|
+-----+------------------+------------------+---------------------+
only showing top 10 rows



In [17]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='stars', predictionCol='user_item_interaction_recomputed')

ranks=[5,10,30,50,100]
for rank in ranks:
    als = ALS(maxIter=20,rank=rank,userCol="user_id_index",itemCol="business_id_index",ratingCol="user_item_interaction",coldStartStrategy="drop",nonnegative=True)
    als.setSeed(2)
    model = als.fit(interactions)
    predictions=model.transform(test).withColumnRenamed('prediction','predicted_rating')
    predictions=  predictions.join(user_mean, 'user_id_index').join(item_mean, 'business_id_index')
    predictions = predictions.withColumn('user_item_interaction_recomputed', col('predicted_rating') + col('user_mean') + col('item_mean') - global_mean)
    rmse=evaluator.evaluate(predictions)
    print("RMSE with latent factor "+str(rank) +" is="+str(rmse))

RMSE with latent factor 5 is=1.18701500128988
RMSE with latent factor 10 is=1.1856944430863605
RMSE with latent factor 30 is=1.1831676942255596
RMSE with latent factor 50 is=1.182823374677896
RMSE with latent factor 100 is=1.1813151492298182


# Final model 

In [18]:
als = ALS(maxIter=20,rank=100,userCol="user_id_index",itemCol="business_id_index",ratingCol="user_item_interaction",coldStartStrategy="drop",nonnegative=True)
als.setSeed(2)
model = als.fit(interactions)
predictions=model.transform(test).withColumnRenamed('prediction','predicted_rating')
predictions=  predictions.join(user_mean, 'user_id_index').join(item_mean, 'business_id_index')
predictions = predictions.withColumn('user_item_interaction_recomputed', col('predicted_rating') + col('user_mean') + col('item_mean') - global_mean)
  

Evaluation

In [19]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='stars', predictionCol='user_item_interaction_recomputed')
rmse=evaluator.evaluate(predictions)
print("RMSE is="+str(rmse))

RMSE is=1.1813151492298182


In [20]:
std_dev = predictions.select(stddev('user_item_interaction_recomputed')).collect()[0][0]
print('Standard deviation of predictions:', std_dev)

std_dev_stars = predictions.select(stddev('stars')).collect()[0][0]
print('Standard deviation of stars:', std_dev_stars)

mean_prediction = predictions.select(mean('user_item_interaction_recomputed')).collect()[0][0]
print('Mean of predictions:', mean_prediction)

mean_stars = predictions.select(mean('stars')).collect()[0][0]
print('Mean of stars:', mean_stars)

print('Standard Deviation ratio ', std_dev/std_dev_stars)

Standard deviation of predictions: 0.9202296973985253
Standard deviation of stars: 1.140661507264479
Mean of predictions: 3.9739221582961832
Mean of stars: 3.8865761405816848
Standard Deviation ratio  0.8067508998400492


# Find Best Recommendations

In [21]:
def get_recommendations():
    """
    Returns top recommendations for a user.

    Returns
    -------
    :py:class:`pyspark.sql.DataFrame`
    a DataFrame of (itemCol, recommendations), where recommendations are
    stored as an array of ('name','business_id', 'stars', 'categories') Rows.
    """
    test = model.recommendForAllUsers(10).filter(col('user_id_index')==30).select("recommendations").take(10)
    topRestaurants = []
    for item in test[0][0]:        
        topRestaurants.append(item.business_id_index)
    
    schema = StructType([StructField("business_id_index",IntegerType(),True)])
    restaurants = spark.createDataFrame(topRestaurants,IntegerType()).toDF("business_id_index")
    return restaurants

In [22]:
def display_transformed_list():
    transformed\
    .select(['name', 'user_id', 'stars', 'categories'])\
    .filter(col('user_id_index')==30)\
    .show()

In [23]:
def display_top10_recommendations(restaurants):
    """
    Displays the top 10 restaurant recommendations.
    """
    restaurants\
    .join(transformed, on = 'business_id_index', how = 'inner')\
    .select([ 'name','business_id', 'stars', 'categories'])\
    .drop_duplicates(subset=['name'])\
    .show(10)

## Display the Top Recommendations

In [24]:


top10_recommendations = get_recommendations()
display_top10_recommendations(top10_recommendations)


+--------------------+--------------------+-----+--------------------+
|                name|         business_id|stars|          categories|
+--------------------+--------------------+-----+--------------------+
|       Casa de Mateo|09Hl38pDGEe1uSBNi...|  4.0|Restaurants, Mexican|
|        Chez Di Vito|qwmlZkohv0MhBi9g9...|  4.0|Restaurants, Italian|
|Da Franco Ristorante|F190EgYriOkncHd0v...|  5.0|Italian, Restaurants|
|     L'Gros Luxe NDG|1aL-ic0cpqS3FkvSj...|  4.0|Comfort Food, Res...|
|          L'Oeufrier|BMa7bY_ZdgDlVaM2Z...|  4.0|Breakfast & Brunc...|
|             Mandy's|LtyoPfxpvcF_9e9wM...|  5.0|Restaurants, Vega...|
|Pains Farcis Tianjin|len7Tn8eoi1KhUXbv...|  1.0|Specialty Food, R...|
|Restaurant Xi'An ...|HVd718KlG6VdY5oh5...|  1.0|Restaurants, Chinese|
|       Resto Chillax|ubLn_FrFygzcbhXTD...|  5.0|Restaurants, Cafe...|
|               Tommy|JFnJyu4pdIGfXovYt...|  5.0|Breakfast & Brunc...|
+--------------------+--------------------+-----+--------------------+

