In [1]:
from pyspark.sql import SparkSession 
spark=SparkSession.builder.appName('rs').getOrCreate()

In [2]:
df_ratings=spark.read.csv('ml-latest-small/ratings.csv',inferSchema=True,header=True)

In [3]:
df_ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [4]:
df_movie=spark.read.csv('ml-latest-small/movies.csv',inferSchema=True,header=True)

In [5]:
df_movie.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [6]:
df_ratings.createOrReplaceTempView("ratings")   # 构建评分视图
df_movie.createOrReplaceTempView("movies")      # 构建电影视图

In [7]:
df_details = spark.sql("SELECT ratings.userId , ratings.movieId , movies.title , movies.genres , ratings.rating  FROM ratings   \
                        INNER JOIN movies ON ratings.movieId = movies.movieId ")    # 两表关联，获取具体的信息

In [8]:
df_details.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- rating: double (nullable = true)



In [9]:
print((df_details.count(),len(df_details.columns))) 

(100836, 5)


In [11]:
df_details.groupBy('userId').count().orderBy('count',ascending=False).show(10,False)

+------+-----+
|userId|count|
+------+-----+
|414   |2698 |
|599   |2478 |
|474   |2108 |
|448   |1864 |
|274   |1346 |
|610   |1302 |
|68    |1260 |
|380   |1218 |
|606   |1115 |
|288   |1055 |
+------+-----+
only showing top 10 rows



In [17]:
df_details.na.drop().count()

100836

In [28]:
df_details.summary().select('summary','rating').show()

+-------+------------------+
|summary|            rating|
+-------+------------------+
|  count|            100836|
|   mean| 3.501556983616962|
| stddev|1.0425292390606342|
|    min|               0.5|
|    25%|               3.0|
|    50%|               3.5|
|    75%|               4.0|
|    max|               5.0|
+-------+------------------+



In [23]:
df_details.createOrReplaceTempView("df_details") 

In [27]:
spark.sql(" select * from df_details where title is null ").show()

+------+-------+-----+------+------+
|userId|movieId|title|genres|rating|
+------+-------+-----+------+------+
+------+-------+-----+------+------+



In [31]:
from pyspark.ml.feature import StringIndexer,IndexToString

In [32]:
stringIndexer = StringIndexer(inputCol="title", outputCol="title_num")

In [33]:
model = stringIndexer.fit(df_details) 

In [34]:
indexed = model.transform(df_details) 

In [35]:
indexed.show(10)

+------+-------+--------------------+--------------------+------+---------+
|userId|movieId|               title|              genres|rating|title_num|
+------+-------+--------------------+--------------------+------+---------+
|     1|      1|    Toy Story (1995)|Adventure|Animati...|   4.0|     11.0|
|     1|      3|Grumpier Old Men ...|      Comedy|Romance|   4.0|    422.0|
|     1|      6|         Heat (1995)|Action|Crime|Thri...|   4.0|    129.0|
|     1|     47|Seven (a.k.a. Se7...|    Mystery|Thriller|   5.0|     15.0|
|     1|     50|Usual Suspects, T...|Crime|Mystery|Thr...|   5.0|     13.0|
|     1|     70|From Dusk Till Da...|Action|Comedy|Hor...|   3.0|    385.0|
|     1|    101|Bottle Rocket (1996)|Adventure|Comedy|...|   5.0|   1130.0|
|     1|    110|   Braveheart (1995)|    Action|Drama|War|   4.0|      7.0|
|     1|    151|      Rob Roy (1995)|Action|Drama|Roma...|   5.0|    533.0|
|     1|    157|Canadian Bacon (1...|          Comedy|War|   5.0|   2053.0|
+------+----

In [36]:
indexed.groupBy('title_num').count().orderBy('count',ascending=False).show(3,False)

+---------+-----+
|title_num|count|
+---------+-----+
|0.0      |329  |
|1.0      |317  |
|2.0      |307  |
+---------+-----+
only showing top 3 rows



In [37]:
train,test=indexed.randomSplit([0.7,0.3])  #将训练和测试数据集以7比3划分

In [38]:
from pyspark.ml.recommendation import ALS  #导入推荐系统中的ALS算法

In [39]:
rec=ALS(maxIter=10,regParam=0.01,userCol='userId',itemCol='title_num',ratingCol='rating',nonnegative=True,coldStartStrategy="drop")

In [40]:
rec_model=rec.fit(train)

In [41]:
predicted_ratings=rec_model.transform(test)

In [42]:
predicted_ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- title_num: double (nullable = false)
 |-- prediction: float (nullable = false)



In [47]:
predicted_ratings.show(10,truncate=False)

+------+-------+---------------------------+-------------------------------------+------+---------+----------+
|userId|movieId|title                      |genres                               |rating|title_num|prediction|
+------+-------+---------------------------+-------------------------------------+------+---------+----------+
|305   |6365   |Matrix Reloaded, The (2003)|Action|Adventure|Sci-Fi|Thriller|IMAX|3.0   |148.0    |2.8269906 |
|20    |6365   |Matrix Reloaded, The (2003)|Action|Adventure|Sci-Fi|Thriller|IMAX|4.0   |148.0    |3.205861  |
|169   |6365   |Matrix Reloaded, The (2003)|Action|Adventure|Sci-Fi|Thriller|IMAX|4.0   |148.0    |3.795758  |
|430   |6365   |Matrix Reloaded, The (2003)|Action|Adventure|Sci-Fi|Thriller|IMAX|4.5   |148.0    |4.0312037 |
|64    |6365   |Matrix Reloaded, The (2003)|Action|Adventure|Sci-Fi|Thriller|IMAX|3.0   |148.0    |2.7575402 |
|590   |6365   |Matrix Reloaded, The (2003)|Action|Adventure|Sci-Fi|Thriller|IMAX|3.0   |148.0    |2.8200083 |
|

In [48]:
from pyspark.ml.evaluation import RegressionEvaluator        # RegressionEvaluator 回归评估器，它期望两个输入列:预测和标签。

In [49]:
evaluator=RegressionEvaluator(metricName='rmse',predictionCol='prediction',labelCol='rating')

In [50]:
rmse=evaluator.evaluate(predicted_ratings)

In [51]:
print('{}{}'.format("标准误差：",rmse))

标准误差：1.04349573121


In [52]:
unique_movies=indexed.select('title_num').distinct()

In [53]:
unique_movies.count()

9719

In [54]:
all_movie = unique_movies.alias('all')

In [55]:
watched_movies=indexed.filter(indexed['userId'] == 46).select('title_num').distinct()

In [56]:
watched_movies.count()

42

In [57]:
no_46=watched_movies.alias('no_46')

In [60]:
total_movies = all_movie.join(no_46, all_movie.title_num == no_46.title_num,how='left')

In [61]:
total_movies.show(10,False)

+---------+---------+
|title_num|title_num|
+---------+---------+
|299.0    |null     |
|305.0    |305.0    |
|496.0    |null     |
|558.0    |null     |
|596.0    |null     |
|692.0    |null     |
|769.0    |null     |
|934.0    |null     |
|1051.0   |null     |
|1761.0   |null     |
+---------+---------+
only showing top 10 rows



In [66]:
from pyspark.sql.functions import *
remaining_movies=total_movies.where(col("no_46.title_num").isNull()).select(all_movie.title_num).distinct() 

In [67]:
remaining_movies=remaining_movies.withColumn("userId",lit(46))

In [68]:
recommendations=rec_model.transform(remaining_movies).orderBy('prediction',ascending=False)	

In [None]:
recommendations.show(100,False)