---
# Algoritmos para Big Data

**Handout 7 - Recommendation system**

**2024/25**

This lab class is about creating and using a recommendation system for books, with particular
interest on the collaborative filtering strategy.

This notebook should contain only the implementation of the tasks B and C presented in the handout, that is, focussing on the recommendation model.

---
# Task B - Data supporting the model

**Datasets**

After task A, there should be parquet files for users, books and ratings, with normal and smaller sizes.

---
# 1.

In [None]:
# Imports
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
# Build SparkSession
spark = SparkSession.builder.appName("RecommendationSystem").getOrCreate()

**Reading and checking data**

In [None]:
# Data to read
data_dir = '../.'
file_users = data_dir + 
file_books = data_dir + 
file_ratings = data_dir + 

In [None]:
# Reading users data
df_users = spark.read.parquet(file_users)

# Checking data
print(f'df_users - number of rows: {df_users.count()}')
df_users.printSchema()
df_users.show(10) 

In [None]:
# Reading books data
df_books = 

In [None]:
# Reading ratings data
df_ratings = 

---
# 2.

**Check the data quality. Is it really clean?**

Leave aside outiliers!

In [None]:
# df_users
print(f'df_users - number of rows is {df_users.count() }; after dropDuplicates() applied would be {df_users.dropDuplicates().count()}.')
print(f'''df_users - number of rows after dropna(how='any') would be {df_users.dropna(how='any').count()}.''')
print('Checking nulls at each column of df_clean')
dict_nulls = {col: df_users.filter(df_users[col].isNull()).count() for col in df_users.columns}
dict_nulls

In [None]:
# df_books


In [None]:
# df_ratings


---
# Task C - ML recommendation model

---
# 1.

**Feature enginnering**
- Defining features to be used in the creation of the model
- Use of StringIndexer() to transform categorical features into numeric features

In [None]:
# StringerIndexer for ISBN, with output column as ISBNi and handle invalid as keep
# See https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.StringIndexer.html
indexer = StringIndexer(


In [None]:
# Columns from df_ratings to be considered in the model
user_feature = 'UserID'
item_feature = 'ISBNi' 
rating_feature = 'BookRating'

---
# 2. to 6.

**Select and train the model**
- Train/validation split: creation of two dataframes for training and validation respectively, with a split size of 70/30 (%)
- Free memory space of the no longer needed initial dataframe
- Set the ALS algorithm as estimator
    - See details in http://spark.apache.org/docs/latest/ml-collaborative-filtering.html , like the main assumptions the implemented algorithm relies upon. For example, notice that:
        - it underlies a collaborative filtering strategy;
        - it aims to fill in the missing entries of a user-item association matrix, in which users and items are described by a small set of latent factors that can be used to predict missing entries. The latent factors are learned by the ALS algorithm.
- Set up a ML pipeline configuration, holding the sequence of the two stages previously set:
    1. String indexer
    2. ML estimator (ALS)
- Create the model by fitting the pipeline to the training data

In [None]:
# train/validation ratings split, with a random split of 70%, 30%
df_train, df_validation = df_ratings.randomSplit

# caching data ... but just the training part and if we want to (check the implications)
# df_train.cache()

# print the number of rows in each part
print(f'There are {df_train.count()} rows in the training set and {df_validation.count()} in the validation set.')


In [None]:
# Save train/validation split as parquet files, say named as ratings-train and ratings-validation
df_train.write.
df_validation.write.

In [None]:
# Set the ALS estimator to learn from the training data and consequently to build the model
# note that we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, 
          userCol=user_feature, 
          itemCol=item_feature, 
          ratingCol=rating_feature,
          coldStartStrategy="drop",
          implicitPrefs=True
         )

# if the rating matrix is derived from another source of information
# (i.e. it is inferred from other signals), we may set implicitPrefs
# to True to get better results (see ALS documentation)

In [None]:
# The pipeline holds two stages set above
# As we will see below, we are going to use it just for evaluation purposes
pipeline = Pipeline(stages=[indexer, als])


In [None]:
# Save the pipeline with name pipeline-recommendation, for further use should it be required
pipeline.

In [None]:
# Get the model (as transformer) by fitting the pipeline to training data. It may take time!
model = 


In [None]:
# Save the model with name model-recommendation, for further use should it be required.
model.

---
# 7.

**Evaluate the model** 

- Make predictions by applying the validation data to the model transformer
- Print out the schema and content of the resulting dataframe
- Compute the evaluation metric *rmse* using *RegressionEvaluator*

In [None]:
# Make predictions on validation data and show values of columns of interest
df_prediction = model.transform(df_validation)


In [None]:
# Print out its schema and content
df_prediction.printSchema()
df_prediction.show(truncate=False)

In [None]:
# Compute the RMSE on the validation data, providing information regarding the label column and the prediction column
# See https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.evaluation.RegressionEvaluator.html
evaluator = RegressionEvaluator(

rmse = evaluator.evaluate(df_prediction)
print(f'Root-mean-square error is {rmse}')

---
# Task D - Model deployment

**Recommendations**

The ALS model allow us to get recommendations directly. So we will follow this approach instead of relying on the pipeline.

See https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.recommendation.ALSModel.html 

for details, namely concerning the methods:
- recommendForUserSubset(dataset, numItems)
    - Returns top numItems items recommended for each user id in the input data set.
- recommendForItemSubset(dataset, numUsers)
    - Returns top numUsers users recommended for each item id in the input data set.



Recall that we could achieve those results if working with predictions after the pipeline set.


In [None]:
# Redo the model without a pipeline
df_train_indexed = indexer.fit(df_train).transform(df_train)
direct_model = als.fit(df_train_indexed)

In [None]:
# Get all distinct users and books in the training data, orderd by ID
users = df_train_indexed.select(als.getUserCol()).distinct().orderBy('UserID', ascending=True)
books = df_train_indexed.select(als.getItemCol()).distinct().orderBy('ISBNi', ascending=True)

In [None]:

users.show()

In [None]:
books.show()

In [None]:
df_train_indexed.printSchema()

In [None]:
df_train_indexed.orderBy('UserID', ascending=True).show()
df_train_indexed.orderBy('ISBNi', ascending=True).show()

In [None]:
# Recall counting
print(f'df_users - number of rows is {df_users.count()}.')
print(f'users (distinct in model training) - number of rows is {users.count()}.')
print(f'df_books - number of rows is {df_books.count()}.')
print(f'books (distinct in model training) - number of rows is {books.count()}.')
print(f'df_ratings - number of rows is {df_ratings.count()}.')
print(f'train indexed - number of rows is {df_train_indexed.count()}.')

In [None]:
df = df_train_indexed.select('ISBN', 'ISBNi').distinct()

In [None]:
# Join additional information to books
books_ext = books.join(df, on='ISBNi', how='inner')
print(f'books_ext - number of rows is {books_ext.count()}.')
books_ext.show()

In [None]:
# Join additional information to books
books_ext = books_ext.join(df_books, on='ISBN', how='inner')
print(f'books_ext - number of rows is {books_ext.count()}.')
books_ext.show()

In [None]:
# Join additional information to users
users_ext = users.join(df_users, on='UserID', how='inner')
print(f'users_ext - number of rows is {users_ext.count()}.')
users_ext.show()

In [None]:
# The top 2 book recommendations for a specified subset of users, for example, 3 users who have smaller IDs 
top_n_books = 2
num_users = 3
subset_users = users.limit(3)
books_recs = 
books_recs.show(truncate=False)

In [None]:
books_recommendations = books_recs.select('UserID', F.explode('recommendations').alias('BookRecommendation'))
books_recommendations.show()

In [None]:
#list_recs = [ [row.UserID, row.BookRecommendation] for row in books_recommendations.collect()]
list_recs = [[row.UserID, row.BookRecommendation['ISBNi'], row.BookRecommendation['rating']] for row in books_recommendations.collect()]
list_recs

In [None]:
# Show the recommendations with additional information
for rec in list_recs:
    info_user = users_ext.filter(users_ext.UserID == rec[0]).limit(1)
    print('')
    print('*** For user ***')
    info_user.show()
    print(f'Rating to be considered is {rec[2]}')
    if rec[2] > 0:
        print('Book recommendation is')
        info_book = books_ext.filter(books_ext.ISBNi == rec[1]).limit(1)
        info_book.show()
    else:
        print('No recommendation is advisable!')
    

In [None]:
# Curiosity: check ratings about user 8
df_ratings.filter(df_ratings.UserID == 8).show()

In [None]:
# The top 2 user recommendations for a specified subset of books, for example, 3 books that have smaller indexed IDs
# Such users might be interested on the specified books 
top_n_users = 2
num_books = 3
subset_books = books.limit(3)
users_recs = 
users_recs.show(truncate=False)


In [None]:
users_recommendations = users_recs.select('ISBNi', F.explode('recommendations').alias('UserRecommendation'))
users_recommendations.show()

In [None]:
#list_recs = [ [row.ISBNi, row.UserRecommendation] for row in users_recommendations.collect()]
list_recs = [[row.ISBNi, row.UserRecommendation['UserID'], row.UserRecommendation['rating']] for row in users_recommendations.collect()]
list_recs

In [None]:
# Show the recommendations with additional information
for rec in list_recs:
    info_book = books_ext.filter(books_ext.ISBNi == rec[0]).limit(1)
    print('')
    print('*** For book ***')
    info_book.show()
    print(f'Rating to be considered is {rec[2]}')
    if rec[2] > 0:
        print('User recommendation is (user might be interested on the book)')
        info_user = users_ext.filter(users_ext.UserID == rec[1]).limit(1)
        info_user.show()
    else:
        print('No recommendation is advisable!')

In [None]:
# Curiosity: check ratings about book 0971880107
df_ratings.filter(df_ratings.ISBN == '0971880107').show()