In [None]:
import pandas as pd
from pyspark import SparkContext
from pyspark.ml.recommendation import ALS
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [None]:
# Create a SparkConf object
conf = SparkConf()
# Set the spark.driver.maxResultSize property
conf.set("spark.driver.maxResultSize", "2000m")  # Adjust the value as needed
# Create a SparkSession with the configured SparkConf
spark = SparkSession.builder.config(conf=conf).getOrCreate()

In [None]:
NY_reviews = pd.read_csv('./data/New_York_reviews_cleaned.csv', index_col=0)
NY_reviews.head()

In [None]:
NY_reviews_als = NY_reviews[['author_id', 'restaurant_name', 'rating_review', 'date']]
NY_reviews_als.shape

In [None]:
### Convert author_id to numeric
## Step 1. convert to numeric
pattern = r'^UID_\d+$'
matches = NY_reviews_als['author_id'].str.match(pattern)
matches.sum() # make sure all strings have same pattern for easy conversion

In [None]:
## Step 2. remove prefix
NY_reviews_als['author_id_num'] = NY_reviews_als['author_id'].str.replace('UID_', '').astype(int)

In [None]:
### Convert restaurant_name to numeric
## Step 1. Create 1-1 indexesmatching restaurant_name
NY_reviews_als['restaurant_id'] = pd.factorize(NY_reviews_als['restaurant_name'])[0]

In [None]:
NY_reviews_als.head()

In [None]:
### Check and aggregate multiple reviews by one author for a single restaurant
NY_reviews_als['average_rating'] = NY_reviews_als.groupby(['author_id', 'restaurant_id'])['rating_review'].transform('mean')
NY_reviews_als_unique = NY_reviews_als.drop_duplicates(subset=['author_id', 'restaurant_id'], keep='first')
NY_reviews_als_unique = NY_reviews_als_unique.drop(columns=['rating_review'])
NY_reviews_als_unique = NY_reviews_als_unique.rename(columns={'average_rating': 'rating_review'})
NY_reviews_als_unique.head()

In [None]:
NY_reviews_als_unique.shape

In [None]:
### Train / Test split:
k = 2 # number of latest reviews / user in Test
## Train: part 1 - users with less than k+1 reviews
train_1 = NY_reviews_als_unique.groupby("author_id").filter(lambda x: len(x) <= k)

## Train: part 2 - users with at least k reviews: all reviews except the latest k reviews
users_with_more_than_k_reviews = NY_reviews_als_unique.groupby("author_id").filter(lambda x: len(x) > k)
# Sort the DataFrame by 'author_id' and 'date' in descending order
users_with_more_than_k_reviews_sorted = users_with_more_than_k_reviews.sort_values(by=['author_id', 'date'], ascending=[True, False])
# Get indexes of latest k reviews
def get_latest_k_reviews(group):
    return group.index[:k]
latest_reviews_indexes = users_with_more_than_k_reviews_sorted.groupby('author_id', group_keys=False).apply(get_latest_k_reviews).values
latest_reviews_indexes_list = [item for sublist in latest_reviews_indexes for item in sublist]

train_2 = users_with_more_than_k_reviews_sorted[~users_with_more_than_k_reviews_sorted.index.isin(latest_reviews_indexes_list)]
train = pd.concat([train_1, train_2])          
test = users_with_more_than_k_reviews_sorted.loc[latest_reviews_indexes_list]

In [None]:
train.shape

In [None]:
test.shape

In [None]:
TrainData = spark.createDataFrame(train)
TestData = spark.createDataFrame(test)

In [None]:
### Model Training
# Create ALS model
als = ALS(userCol="author_id_num", itemCol="restaurant_id", ratingCol="rating_review", coldStartStrategy="drop")

# Define hyperparameter grid
param_grid = ParamGridBuilder() \
    .addGrid(als.rank, [10, 20, 30]) \
    .addGrid(als.maxIter, [10, 20, 30]) \
    .addGrid(als.regParam, [0.5, 0.7, 0.9]) \
    .build()

# Define evaluation metric
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating_review', predictionCol='prediction')

# Create CrossValidator
cross_val = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3, collectSubModels=True)

# Fit ALS model and tune hyperparameters
cv_model = cross_val.fit(TrainData)

In [None]:
# Get the best ALS model from the CrossValidator
best_model = cv_model.bestModel
print("The best model has rank of ", best_model.rank, ", maxIter of ", best_model._java_obj.parent().getMaxIter(), " and regParam of ", best_model._java_obj.parent().getRegParam())

# Evaluate the best model on the test set
train_predictions = best_model.transform(TrainData)
test_predictions = best_model.transform(TestData)
train_rmse = evaluator.evaluate(train_predictions)
test_rmse = evaluator.evaluate(test_predictions)
print("Root Mean Squared Error (RMSE) on test data: {:.2f}".format(test_rmse))

In [None]:
### Evaluation
# Concordant Rate
concordant = 0
discordant = 0
tie = 0
one = []
for author in test_predictions_pd["author_id"].unique():
    test_author = test_predictions_pd[test_predictions_pd["author_id"] == author]
    
    if test_author.shape[0] < 2:
        one.append(author)
        continue
    else:
        res_1 = test_author.iloc[0]
        act_rating_1 = res_1['rating_review']
        pred_rating_1 = res_1['prediction']

        res_2 = test_author.iloc[1]
        act_rating_2 = res_2['rating_review']
        pred_rating_2 = res_2['prediction']
        if act_rating_1 == act_rating_2:
            tie += 1
            continue

        if (act_rating_1 < act_rating_2) and (pred_rating_1 < pred_rating_2):
            concordant += 1
        elif (act_rating_1 > act_rating_2) and (pred_rating_1 > pred_rating_2):
            concordant += 1
        elif (act_rating_1 < act_rating_2) and (pred_rating_1 > pred_rating_2):
            discordant += 1
        elif (act_rating_1 > act_rating_2) and (pred_rating_1 < pred_rating_2):
            discordant += 1

print("Concordant: ", concordant)
print("Discordant: ", discordant)
print("Tie: ", tie)

In [None]:
### Evaluation
# Hit Rate
user_recs = model.recommendForAllUsers(6)  # Get top 6 recommendations for each user
user_recs_pd = user_recs.toPandas()

# Define a function to extract the first element from each tuple
def extract_first_element(lst_of_tuples):
    return [t[0] for t in lst_of_tuples]
# Apply the function to the 'old_column' and create the new column
user_recs_pd['rec_res'] = user_recs_pd['recommendations'].apply(extract_first_element)

# Hit Rate
hit = 0

for index, row in test.iterrows():
    recs_author = user_recs_pd[user_recs_pd['author_id_num'] == row['author_id_num']]

    if row['restaurant_id'] in recs_author['rec_res']:
        recs_author[]
        if (row['rating_review'] == 1 or row['rating_review'] == 2):
            hit = hit - 1
        elif (row['rating_review'] == 4 or row['rating_review'] == 5):
            hit = hit + 1

print("Hit: ", hit)

In [None]:
# Stop SparkSession
spark.stop()