In [0]:
%pip install mlflow

In [0]:
from pyspark.rdd import RDD
from pyspark.sql import Row
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.functions import desc
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
import mlflow

In [0]:
def als_with_bias_recommender(dataframe, seed):
    '''
    This function must return the RMSE of recommendations obtained 
    using ALS + biases. Your ALS model should make predictions for *i*, 
    the user-item interaction, then you should recompute the predicted 
    rating with the formula *i+user_mean+item_mean-m* (*m* is the 
    global rating). The RMSE should compare the original rating column 
    and the predicted rating column.  Training and test sets should be 
    determined as before. Your ALS model should use the same parameters 
    as before and be initialized with the random seed passed as 
    parameter. Test file: tests/test_als_with_bias_recommender.py
    '''

    spark = init_spark()
    #sc = spark.sparkContext
    lines = spark.read.text(filename).rdd
    ratingsRDD = lines.map(lambda x: x.value.split('::')).map(lambda x: Row(userId=int(x[0]), itemId=int(x[1]), rating=float(x[2])))
    ratings = spark.createDataFrame(ratingsRDD)
    (training, test) = ratings.randomSplit([0.8, 0.2], seed=seed)
    user_means = training.groupBy("userId").avg('rating').collect()
    item_means = training.groupBy('itemId').avg('rating').collect()
    ga = training.agg({"rating": "avg"}).collect()[0][0]
    print(user_means[0].userId)
    final = training.rdd.map(lambda x: Row(userId = x.userId, movieId = x.itemId, rating = x.rating, 
    user_mean = [a.asDict()['avg(rating)'] for a in user_means if a.userId == x.userId][0], 
    item_mean = [a.asDict()['avg(rating)'] for a in item_means if a.itemId == x.itemId][0],
    user_item_interaction = (x.rating - ([a.asDict()['avg(rating)'] for a in user_means if int(a.userId) == int(x.userId)][0]
    + [a.asDict()['avg(rating)'] for a in item_means if int(a.itemId) == int(x.itemId)][0] - ga))))
    als = ALS(maxIter=5, rank=70, regParam=0.01, coldStartStrategy='drop', userCol='userId', itemCol='movieId', 
    ratingCol='user_item_interaction')
    model = als.fit(spark.createDataFrame(final))
    predictions = model.transform(test.withColumnRenamed('itemId','movieId'))
    predictions_n = predictions.rdd.map(lambda x: Row(userId = x.userId, movieId = x.movieId, rating = x.rating, 
    user_mean = [a.asDict()['avg(rating)'] for a in user_means if a.userId == x.userId][0], 
    item_mean = [a.asDict()['avg(rating)'] for a in item_means if a.itemId == x.movieId][0],
    user_item_interaction = (x.rating - ([a.asDict()['avg(rating)'] for a in user_means if int(a.userId) == int(x.userId)][0]
    + [a.asDict()['avg(rating)'] for a in item_means if int(a.itemId) == int(x.movieId)][0] - ga)), prediction = x.prediction))
    predictions_n = spark.createDataFrame(predictions_n)
    new_predictions = predictions_n.withColumn("predictionss", predictions_n.prediction + predictions_n.user_mean + 
    predictions_n.item_mean - lit(ga))
    evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='predictionss')
    rmse = evaluator.evaluate(new_predictions)

    return rmse


In [0]:
mlflow.pyspark.ml.autolog()

In [0]:
def init_spark():
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    return spark
spark = init_spark()

In [0]:
ratings_df = spark.read.format("csv").option("header", "true").load("/FileStore/tables")
## Drop timestamp column, because it is not relevant to my calculations
ratings_df = ratings_df.drop('timestamp')
ratings_df = ratings_df.withColumn("rating",ratings_df.rating.cast('float'))
ratings_df = ratings_df.withColumn("userId",ratings_df.userId.cast('int'))
ratings_df = ratings_df.withColumn("movieId",ratings_df.movieId.cast('int'))
ratings_df.count()

In [0]:
(training, test) = ratings_df.randomSplit([0.8, 0.2], seed=42)


In [0]:
user_means = training.groupBy("userId").avg('rating').collect()
movie_means = training.groupBy('movieId').avg('rating').collect()

In [0]:
global_mean = training.agg({"rating": "avg"}).collect()[0][0]

In [0]:
comprehensive_df = training.rdd.map(lambda x: Row(userId = x.userId, movieId = x.movieId, rating = x.rating, 
   ## find user_mean
    user_mean = [a.asDict()['avg(rating)'] for a in user_means if a.userId == x.userId][0], 
   ## find movie_mean
    movie_mean = [a.asDict()['avg(rating)'] for a in movie_means if a.movieId == x.movieId][0],
   ## This is the definition of user-item interaction I will be using for this demo
   ## it is the rating - (user_mean + movie_mean - global_mean)
    user_item_interaction = (x.rating - ([a.asDict()['avg(rating)'] for a in user_means if int(a.userId) == int(x.userId)][0]
    + [a.asDict()['avg(rating)'] for a in movie_means if int(a.movieId) == int(x.movieId)][0] - global_mean)))).collect()
display(comprehensive_df)

userId,movieId,rating,user_mean,movie_mean,user_item_interaction
1,31,2.5,2.3666666666666667,3.128571428571429,0.546880670796571
1,1029,3.0,2.3666666666666667,3.683333333333333,0.4921187660346664
1,1129,2.0,2.3666666666666667,3.276315789473684,-0.100863690105684
1,1172,4.0,2.3666666666666667,4.220588235294118,0.9548638640738814
1,1263,2.0,2.3666666666666667,3.7906976744186047,-0.6152455750506052
1,1293,2.0,2.3666666666666667,3.8684210526315783,-0.6929689532635797
1,1343,2.0,2.3666666666666667,3.8620689655172415,-0.686616866149242
1,1371,2.5,2.3666666666666667,3.027027027027027,0.6484250723409728
1,1405,1.0,2.3666666666666667,2.986842105263158,-0.8113900058951584
1,1953,4.0,2.3666666666666667,4.0,1.175452099367999


In [0]:
als = ALS(maxIter=5, rank=70, regParam=0.01, coldStartStrategy='drop', userCol='userId', itemCol='movieId', 
    ratingCol='user_item_interaction')

In [0]:
als_model = als.fit(spark.createDataFrame(comprehensive_df))

In [0]:
predictions = als_model.transform(test)