#Importing libraries

In [33]:
# Package Imports
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession
import gradio as gr
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType
import matplotlib.pyplot as plt
import pandas as pd
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

#Creating SparkSession

In [34]:
# Create SparkSession - used when working with DataFrames in Spark
spark = SparkSession.builder \
    .appName("ALS Example") \
    .master("local[*]") \
    .getOrCreate()

# Get SparkContext from SparkSession
sc = spark.sparkContext  # Create SparkContext, which manages resources and executes operations with RDDs

# Now you can load the file with sc.textFile
ratingsRDD = sc.textFile("user-item.txt")  # Load the file as an RDD, each line is an element
ratingsRDD.collect()  # Return all elements of the RDD as a list (useful for inspection on small datasets)

['1001,9001,10',
 '1001,9002,1',
 '1001,9003,9',
 '1002,9001,3',
 '1002,9002,5',
 '1002,9003,1',
 '1002,9004,10',
 '1003,9001,2',
 '1003,9002,6',
 '1003,9003,2',
 '1003,9004,9',
 '1003,9005,10',
 '1003,9006,8',
 '1003,9007,9',
 '1004,9001,9',
 '1004,9002,2',
 '1004,9003,8',
 '1004,9004,3',
 '1004,9010,10',
 '1004,9011,9',
 '1004,9012,8',
 '1005,9001,8',
 '1005,9002,3',
 '1005,9003,7',
 '1005,9004,1',
 '1005,9010,9',
 '1005,9011,10',
 '1005,9012,9',
 '1005,9013,8',
 '1005,9014,1',
 '1005,9015,1',
 '1006,9001,7',
 '1006,9002,4',
 '1006,9003,8',
 '1006,9004,1',
 '1006,9010,7',
 '1006,9011,6',
 '1006,9012,9',
 '1007,11853,5',
 '1007,10097,6',
 '1007,9194,5',
 '1007,13021,7',
 '1007,12392,3',
 '1007,10560,4',
 '1007,10124,9',
 '1007,12717,4',
 '1007,11861,7',
 '1008,9768,9',
 '1008,11908,5',
 '1008,10949,8',
 '1008,11572,3',
 '1008,11913,10',
 '1008,10124,3',
 '1008,11105,1',
 '1008,12800,2',
 '1009,10075,4',
 '1009,9108,5',
 '1009,11168,9',
 '1009,9643,2',
 '1009,11800,5',
 '1009,13774,9',

#Data preprocessing

In [35]:
# Converting strings
ratingsRDD2 = ratingsRDD.map(lambda l: l.split(',')).map(lambda l:(int(l[0]), int(l[1]), float(l[2])))
# Splits the line by comma and transforms it into (user, item, rating) as integers and float

In [36]:
# # Creates a DataFrame from the RDD, naming the columns for easier analysis and modeling
ratingsDF = spark.createDataFrame(ratingsRDD2, ["user", "item", "rating"])

In [37]:
ratingsDF # DataFrame Visualization

DataFrame[user: bigint, item: bigint, rating: double]

In [38]:
# Train/Test Split

trainDF, testDF = ratingsDF.randomSplit([0.8, 0.2], seed=42)

print("Training Data Count:", trainDF.count())
print("Testing Data Count:", testDF.count())

Training Data Count: 80137
Testing Data Count: 19957


#ALS Algorithm - Recommender System

In [39]:
# ALS with hyperparameter optimization.

# Initializing the ALS model
als = ALS(userCol="user", itemCol="item", ratingCol="rating",
          coldStartStrategy="drop")

# Defining hyperparameters
paramGrid = ParamGridBuilder() \
    .addGrid(als.rank, [40,60]) \
    .addGrid(als.maxIter, [10, 15]) \
    .addGrid(als.regParam, [0.05, 0.1]) \
    .build()

# Defining the evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

# Creating a CrossValidator
crossval = CrossValidator(estimator=als,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)  # Using 3+ folds

# Running cross-validation, and choosing the best set of parameters.
cvModel = crossval.fit(trainDF)

# Getting the best model generated by CrossValidator
best_model = cvModel.bestModel

# Printing the best parameters found
print("Best Rank:", best_model.rank)
print("Best MaxIter:", best_model._java_obj.parent().getMaxIter())
print("Best RegParam:", best_model._java_obj.parent().getRegParam())

# Making predictions on the test data using the best model
predictions = best_model.transform(testDF)

# Evaluating the best model on the test data
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Best Rank: 60
Best MaxIter: 15
Best RegParam: 0.1
Root-mean-square error = 3.448610985916547


In [40]:
# Visualizing Affinity Score by ID
best_model.userFactors.orderBy("id").collect()

[Row(id=1001, features=[-0.1357826441526413, -0.542222797870636, 0.8148403167724609, 0.13714030385017395, -0.3515481948852539, -0.46131381392478943, 0.46605393290519714, -0.37687480449676514, -0.3216167092323303, -0.0813029333949089, 0.43728411197662354, 0.009981782175600529, 0.28246086835861206, -0.06841247528791428, 0.4970722794532776, -0.34093430638313293, -0.35162413120269775, 0.08409609645605087, 0.1269378513097763, -0.02156974747776985, -0.1103585883975029, -0.20599035918712616, -0.31184619665145874, -0.31422218680381775, -0.760270893573761, 0.43717169761657715, -0.15647350251674652, 0.12732547521591187, 0.5374181270599365, -0.14465785026550293, -0.03878997638821602, -0.3644081652164459, 0.1545611321926117, 0.3592487871646881, -0.17198365926742554, -0.33107128739356995, -0.25109776854515076, 0.7289532423019409, 0.45873093605041504, -0.4620024263858795, 0.7602145671844482, -0.8280617594718933, 0.14392967522144318, 0.030242405831813812, 0.47876760363578796, 0.233847975730896, 0.032

In [41]:
# Creating a test dataset with users and items for rating
testeDF = spark.createDataFrame([(1001,9004),(1001,9005),(1001,9010)], ["user", "item"])

In [42]:
# Predictions
# The higher the Affinity Score, the higher the probability of the user accepting a recommendation
previsoes = (best_model.transform(testeDF).collect())
previsoes

[Row(user=1001, item=9004, prediction=0.920345664024353),
 Row(user=1001, item=9005, prediction=3.0596089363098145),
 Row(user=1001, item=9010, prediction=6.730755805969238)]

#Web Application with Gradio

In [44]:
# Function to make recommendation
def fazer_recomendacao(user_id):
    """
    Makes item recommendations for a given user_id.
    """
    # Create a test DataFrame for the specified user
    user_test_df = spark.createDataFrame([(int(user_id), item) for item in ratingsDF.select('item').distinct().rdd.flatMap(lambda x: x).collect()], ["user", "item"])

    # Make predictions for the user
    previsoes = best_model.transform(user_test_df)

    # Order predictions by predicted rating and select top N
    top_n_recomendacoes = previsoes.orderBy("prediction", ascending=False)

    # Return recommendations as a list of tuples (item, predicted rating)
    return [(row.item, row.prediction) for row in top_n_recomendacoes.collect()]

# Create the Gradio interface
iface = gr.Interface(
    fn=fazer_recomendacao,
    inputs=gr.Textbox(label="Enter User ID:"),
    outputs=gr.Dataframe(headers=["Item ID", "Predicted Rating"], label="Recommendations:"),
    title="Recommendation System with PySpark",
    description="Enter a user ID to get item recommendations."
)

# Launch the interface
iface.launch(share=True)

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://19c9e15fca10c09cb1.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


