In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df_movies = spark.read.csv("movies.csv",header=True,sep=",");

df_rating = spark.read.csv("ratings.csv",header=True);
print(df_movies.collect())

In [None]:
df_joint = df_movies.join(df_rating, df_movies.movieId == df_rating.movieId)\
.select(df_movies.movieId,df_movies.genres,df_movies.title,\
        df_rating.userId,df_rating.rating,df_rating.timestamp)     

In [8]:
df_joint.printSchema()

root
 |-- movieId: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- title: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [None]:
df_joint = df_joint.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=float(p[2]), timestamp=long(p[3])))

In [17]:
from pyspark.sql.types import IntegerType

df_joint = df_joint.withColumn("userId", df_joint["userId"].cast(IntegerType()))
df_joint = df_joint.withColumn("movieId", df_joint["movieId"].cast(IntegerType()))
df_joint = df_joint.withColumn("rating", df_joint["rating"].cast(IntegerType()))

In [18]:
df_joint.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- genres: string (nullable = true)
 |-- title: string (nullable = true)
 |-- userId: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- timestamp: string (nullable = true)



In [20]:
(training, test) = df_joint.randomSplit([0.8, 0.2])

In [54]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

#lines = spark.read.text("data/mllib/als/sample_movielens_ratings.txt").rdd
#parts = lines.map(lambda row: row.value.split("::"))
#ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
 #                                    rating=float(p[2]), timestamp=long(p[3])))
#ratings = spark.createDataFrame(ratingsRDD)
#(training, test) = ratings.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

# Generate top 10 movie recommendations for a specified set of users
users = df_joint.select(als.getUserCol()).distinct()
userSubsetRecs = model.recommendForUserSubset(users, 10)
# Generate top 10 user recommendations for a specified set of movies
movies = df_joint.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)

Root-mean-square error = 1.1500808446632762


In [55]:
userSubsetRecs.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   148|[[89904, 6.751604...|
|   463|[[161582, 5.93614...|
|   471|[[72171, 12.02051...|
|   496|[[3266, 7.1788297...|
|   243|[[5303, 11.726888...|
|   392|[[2318, 8.787418]...|
|   540|[[2261, 6.7266893...|
|    31|[[65188, 8.068945...|
|   516|[[179819, 8.71031...|
|    85|[[2398, 6.934878]...|
|   137|[[52435, 5.702093...|
|   251|[[70946, 11.63048...|
|   451|[[52281, 7.258337...|
|   580|[[104879, 6.88864...|
|    65|[[86345, 6.501846...|
|   458|[[131013, 10.2180...|
|    53|[[2739, 6.814709]...|
|   255|[[5992, 11.773895...|
|   481|[[137857, 5.82718...|
|   588|[[4941, 8.71675],...|
+------+--------------------+
only showing top 20 rows



In [56]:
type(userSubsetRecs)

pyspark.sql.dataframe.DataFrame

In [57]:
userSubsetRecs.filter ("userID in( 127, 151,  300)").show(truncate=False)



+------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                                            |
+------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|300   |[[1295, 11.540114], [86345, 10.56369], [69406, 9.61298], [7669, 9.602613], [8810, 9.556323], [1301, 9.492831], [92535, 9.4679985], [4158, 9.454541], [1014, 9.376625], [3943, 9.238637]]   |
|127   |[[7028, 17.53144], [3503, 16.630356], [938, 15.093879], [611, 14.892273], [3846, 14.721819], [1904, 14.521244], [46965, 14.173786], [2942, 14.028612], [5055, 13.968603], [3272, 13.53122]]|
|151   |[[3272,