In [147]:
import os
import sys
from pyspark.rdd import RDD
from pyspark.sql import Row
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col
from pyspark.sql.functions import desc
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
import numpy as np

In [38]:
#Initialize a spark session.
def init_spark():
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    return spark

In [39]:
#Useful functions to print RDDs and Dataframes.
def toCSVLineRDD(rdd):
    '''
    This function convert an RDD or a DataFrame into a CSV string
    '''
    a = rdd.map(lambda row: ",".join([str(elt) for elt in row]))\
           .reduce(lambda x,y: os.linesep.join([x,y]))
    return a + os.linesep

def toCSVLine(data):
    '''
    Convert an RDD or a DataFrame into a CSV string
    '''
    if isinstance(data, RDD):
        return toCSVLineRDD(data)
    elif isinstance(data, DataFrame):
        return toCSVLineRDD(data.rdd)
    return None

In [40]:
def basic_als_recommender(filename, seed):
    '''
    This function must print the RMSE of recommendations obtained
    through ALS collaborative filtering, similarly to the example at
    http://spark.apache.org/docs/latest/ml-collaborative-filtering.html
    The training ratio must be 80% and the test ratio must be 20%. The
    random seed used to sample the training and test sets (passed to
    ''DataFrame.randomSplit') is an argument of the function. The seed
    must also be used to initialize the ALS optimizer (use
    *ALS.setSeed()*). The following parameters must be used in the ALS
    optimizer:
    - maxIter: 5
    - rank: 70
    - regParam: 0.01
    - coldStartStrategy: 'drop'
    Test file: tests/test_basic_als.py
    '''
    spark=init_spark()
    lines = spark.read.text(filename).rdd
    parts = lines.map(lambda row: row.value.split("::"))
    
    ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]), rating=float(p[2])))
    ratings = spark.createDataFrame(ratingsRDD)
    
    (training, test) = ratings.randomSplit([0.8, 0.2],seed=seed)
    als = ALS(maxIter=5, rank=70, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
              coldStartStrategy="drop", seed=seed)
    
    predictions = als.fit(training).transform(test)
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
    rmse = evaluator.evaluate(predictions)

    return rmse

In [41]:
a = basic_als_recommender("/Users/manikhossain/IdeaProjects/bigdata-la2-ManikHossain08/data/sample_movielens_ratings.txt", 123)
a

1.6229741581612676

In [83]:
filename = "/Users/manikhossain/IdeaProjects/bigdata-la2-ManikHossain08/data/sample_movielens_ratings.txt"
spark=init_spark()
movie_df = spark.read.csv("/Users/manikhossain/IdeaProjects/bigdata-la2-ManikHossain08/data/movies.csv", header = True)


In [42]:
def global_average(filename, seed):
    '''
    This function must print the global average rating for all users and
    all movies in the training set. Training and test
    sets should be determined as before (e.g: as in function basic_als_recommender).
    Test file: tests/test_global_average.py
    '''
    spark = init_spark()
    lines = spark.read.text(filename).rdd
    parts = lines.map(lambda row: row.value.split("::"))
    ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                         rating=float(p[2])))
    ratings = spark.createDataFrame(ratingsRDD)
    (training, test) = ratings.randomSplit([0.8, 0.2], seed=seed)

    result = training.agg({"rating": "avg"}).collect()[0]
    return result["avg(rating)"]


In [43]:
result = global_average(filename, 123)
result

1.7774979009235936

In [44]:
def global_average_recommender(filename, seed):
    '''
    This function must print the RMSE of recommendations obtained
    through global average, that is, the predicted rating for each
    user-movie pair must be the global average computed in the previous
    task. Training and test
    sets should be determined as before. You can add a column to an existing DataFrame with function *.withColumn(...)*.
    Test file: tests/test_global_average_recommender.py
    '''
    spark = init_spark()
    lines = spark.read.text(filename).rdd
    parts = lines.map(lambda row: row.value.split("::"))
    ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                         rating=float(p[2])))
    ratings = spark.createDataFrame(ratingsRDD)
    (training, test) = ratings.randomSplit([0.8, 0.2], seed=seed)
    average = training.agg({"rating": "avg"}).collect()[0]['avg(rating)']
    training = training.withColumn('avg(rating)', lit(average))

    als = ALS(maxIter=5, rank=70, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="avg(rating)",
              coldStartStrategy="drop", seed=seed)
    
    predictions = als.fit(training).transform(test)

    evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                    predictionCol="prediction")
    rmse = evaluator.evaluate(predictions)

    return rmse

In [67]:
result = global_average_recommender(filename, 123)
result

1.1684684492180035

In [114]:
def top_movie_recommendation_touser(filename, seed):
    '''
    This function must print the RMSE of recommendations obtained
    through global average, that is, the predicted rating for each
    user-movie pair must be the global average computed in the previous
    task. Training and test
    sets should be determined as before. You can add a column to an existing DataFrame with function *.withColumn(...)*.
    Test file: tests/test_global_average_recommender.py
    '''
    spark = init_spark()
    lines = spark.read.text(filename).rdd
    parts = lines.map(lambda row: row.value.split("::"))
    ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                         rating=float(p[2])))
    ratings = spark.createDataFrame(ratingsRDD)
    (training, test) = ratings.randomSplit([0.8, 0.2], seed=seed)
    average = training.agg({"rating": "avg"}).collect()[0]['avg(rating)']
    training = training.withColumn('avg(rating)', lit(average))

    als = ALS(maxIter=5, rank=70, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
              coldStartStrategy="drop", seed=seed)
    model = als.fit(training)
    
    # Generate top 10 movie recommendations for each user
    userRecs = model.recommendForAllUsers(10)
    # Generate top 10 user recommendations for each movie
    movieRecs = model.recommendForAllItems(10)

    return (userRecs, movieRecs)

In [115]:
(userRecs, movieRecs)  = top_movie_recommendation_touser(filename, 123)
userRecs.show(n = 10)

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    28|[[92, 4.9826636],...|
|    26|[[88, 4.999562], ...|
|    27|[[18, 3.9991288],...|
|    12|[[17, 4.9900746],...|
|    22|[[74, 5.04252], [...|
|     1|[[68, 3.9480424],...|
|    13|[[93, 3.931377], ...|
|     6|[[25, 4.982175], ...|
|    16|[[90, 4.9941545],...|
|     3|[[18, 3.9945285],...|
+------+--------------------+
only showing top 10 rows



In [112]:
def movie_recommendation_toOneuser(filename, seed, userId):
    '''
    This function must print the RMSE of recommendations obtained
    through global average, that is, the predicted rating for each
    user-movie pair must be the global average computed in the previous
    task. Training and test
    sets should be determined as before. You can add a column to an existing DataFrame with function *.withColumn(...)*.
    Test file: tests/test_global_average_recommender.py
    '''
    spark = init_spark()
    lines = spark.read.text(filename).rdd
    parts = lines.map(lambda row: row.value.split("::"))
    ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                         rating=float(p[2])))
    ratings = spark.createDataFrame(ratingsRDD)
    (training, test) = ratings.randomSplit([0.8, 0.2], seed=seed)
    average = training.agg({"rating": "avg"}).collect()[0]['avg(rating)']
    training = training.withColumn('avg(rating)', lit(average))

    als = ALS(maxIter=5, rank=70, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
              coldStartStrategy="drop", seed=seed)
    model = als.fit(training)
    predictions = model.transform(test)

    oneUser = predictions.filter(col("userId") == userId).join(movie_df, "movieId") \
              .select("userId","movieId", "title", "genres", "prediction")
    
    return oneUser

In [113]:
singleUserPredictions = movie_recommendation_toOneuser(filename, 123, 15)
singleUserPredictions.show() #userId = 15

+------+-------+--------------------+--------------------+----------+
|userId|movieId|               title|              genres|prediction|
+------+-------+--------------------+--------------------+----------+
|    15|     85|Angels and Insect...|       Drama|Romance| 1.7286502|
|    15|     65|     Bio-Dome (1996)|              Comedy| 0.5836302|
|    15|     22|      Copycat (1995)|Crime|Drama|Horro...|0.46404094|
|    15|      1|    Toy Story (1995)|Adventure|Animati...|0.40661034|
|    15|     52|Mighty Aphrodite ...|Comedy|Drama|Romance|0.40930358|
|    15|     64|Two if by Sea (1996)|      Comedy|Romance|  1.077209|
|    15|     35|   Carrington (1995)|       Drama|Romance| 0.5394808|
|    15|     39|     Clueless (1995)|      Comedy|Romance| 1.0550858|
|    15|     87|Dunston Checks In...|     Children|Comedy| 1.0461016|
|    15|     51|Guardian Angel (1...|Action|Drama|Thri...|0.30425352|
|    15|     97|Hate (Haine, La) ...|         Crime|Drama| 0.7158899|
|    15|     73|Misé

In [126]:
 # Generate top 10 movie recommendations for a specified set of users
def top_movie_recommendation_toSetOfuser(filename, seed):
    '''
    This function must print the RMSE of recommendations obtained
    through global average, that is, the predicted rating for each
    user-movie pair must be the global average computed in the previous
    task. Training and test
    sets should be determined as before. You can add a column to an existing DataFrame with function *.withColumn(...)*.
    Test file: tests/test_global_average_recommender.py
    '''
    spark = init_spark()
    lines = spark.read.text(filename).rdd
    parts = lines.map(lambda row: row.value.split("::"))
    ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                         rating=float(p[2])))
    ratings = spark.createDataFrame(ratingsRDD)
    (training, test) = ratings.randomSplit([0.8, 0.2], seed=seed)
    average = training.agg({"rating": "avg"}).collect()[0]['avg(rating)']
    training = training.withColumn('avg(rating)', lit(average))

    als = ALS(maxIter=5, rank=70, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
              coldStartStrategy="drop", seed=seed)
    model = als.fit(training)
    predictions = model.transform(test)

    # Generate top 10 movie recommendations for a specified set of users (here 5 users)
    users = ratings.select(als.getUserCol()).distinct().limit(5)
    userSubsetRecs = model.recommendForUserSubset(users, 10) # top 10 movie recommend from model
    
    return userSubsetRecs

In [137]:
topMovie = top_movie_recommendation_toSetOfuser(filename, 123)
topMovie.printSchema()
topMovie.select("userId", "recommendations.movieId").show(10,False)

root
 |-- userId: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- movieId: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)

+------+----------------------------------------+
|userId|movieId                                 |
+------+----------------------------------------+
|26    |[88, 94, 22, 23, 68, 54, 36, 81, 57, 6] |
|22    |[74, 88, 22, 30, 51, 32, 69, 68, 62, 98]|
|19    |[90, 98, 74, 54, 22, 88, 94, 85, 68, 87]|
|29    |[46, 90, 32, 94, 22, 68, 17, 49, 10, 38]|
|0     |[92, 9, 91, 2, 26, 25, 41, 95, 89, 77]  |
+------+----------------------------------------+



In [134]:
# Generate top 10 user recommendations for a specified set of movies
def top_user_recommendation_setOfItems(filename, seed):
    '''
    This function must print the RMSE of recommendations obtained
    through global average, that is, the predicted rating for each
    user-movie pair must be the global average computed in the previous
    task. Training and test
    sets should be determined as before. You can add a column to an existing DataFrame with function *.withColumn(...)*.
    Test file: tests/test_global_average_recommender.py
    '''
    spark = init_spark()
    lines = spark.read.text(filename).rdd
    parts = lines.map(lambda row: row.value.split("::"))
    ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                         rating=float(p[2])))
    ratings = spark.createDataFrame(ratingsRDD)
    (training, test) = ratings.randomSplit([0.8, 0.2], seed=seed)
    average = training.agg({"rating": "avg"}).collect()[0]['avg(rating)']
    training = training.withColumn('avg(rating)', lit(average))

    als = ALS(maxIter=5, rank=70, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
              coldStartStrategy="drop", seed=seed)
    model = als.fit(training)
    predictions = model.transform(test)

    # Generate top 10 user recommendations for a specified set of movies(here 5 items)
    items = ratings.select(als.getItemCol()).distinct().limit(5)
    itemSubsetRecs = model.recommendForItemSubset(items, 10)
    return itemSubsetRecs

In [138]:
topUser = top_user_recommendation_setOfItems(filename, 123)
topUser.printSchema()
topUser.select("movieId", "recommendations.userId").show(10, False)

root
 |-- movieId: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- userId: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)

+-------+--------------------------------------+
|movieId|userId                                |
+-------+--------------------------------------+
|65     |[23, 5, 3, 1, 29, 24, 28, 22, 10, 12] |
|26     |[15, 0, 25, 28, 10, 29, 19, 7, 13, 20]|
|54     |[16, 26, 19, 22, 15, 3, 29, 1, 7, 12] |
|19     |[11, 2, 29, 15, 18, 23, 13, 25, 5, 3] |
|29     |[14, 21, 4, 7, 22, 13, 10, 20, 8, 1]  |
+-------+--------------------------------------+



In [178]:
# predictions some movie for specific users
def specific_user_movie_recommendations(filename, seed):
    '''
    This function must print the RMSE of recommendations obtained
    through global average, that is, the predicted rating for each
    user-movie pair must be the global average computed in the previous
    task. Training and test
    sets should be determined as before. You can add a column to an existing DataFrame with function *.withColumn(...)*.
    Test file: tests/test_global_average_recommender.py
    '''
    spark = init_spark()
    lines = spark.read.text(filename).rdd
    parts = lines.map(lambda row: row.value.split("::"))
    ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                         rating=float(p[2])))
    ratings = spark.createDataFrame(ratingsRDD)
    (training, test) = ratings.randomSplit([0.8, 0.2], seed=seed)
    average = training.agg({"rating": "avg"}).collect()[0]['avg(rating)']
    training = training.withColumn('avg(rating)', lit(average))

    als = ALS(maxIter=5, rank=70, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
              coldStartStrategy="drop", seed=seed)
    model = als.fit(training)
    predictions = model.transform(test) # predict for test data
    
    # predictions some movie for specific users
    movies = ratings.select(als.getItemCol()).distinct().limit(5)
    users = ratings.select(als.getUserCol()).distinct().limit(5)
    movie_ids = []
    user_ids = []
    
    for i in range(0, movies.count()):
            movie_ids.append(movies.collect()[i][0])
            user_ids.append(users.collect()[i][0])

    newUserPred = spark.createDataFrame(zip(movie_ids, user_ids), schema = ['movieId', 'userId'])
    
    newPredictions = predictions = model.transform(newUserPred)
    
    return newPredictions

In [179]:
topUserItem = specific_user_movie_recommendations(filename, 123)
topUserItem.printSchema()
topUserItem.show()

root
 |-- movieId: long (nullable = true)
 |-- userId: long (nullable = true)
 |-- prediction: float (nullable = false)

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|     65|    19|0.97528976|
|     26|    26|0.25505516|
|     54|    22| 1.9031708|
|     19|     0| 1.0041881|
|     29|    29| 1.0008368|
+-------+------+----------+

