In [29]:
import pandas as pd
from pyspark.sql.functions import col, explode
from pyspark import SparkContext
# Import the required functions
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator


In [30]:
from pyspark.sql import SparkSession
sc = SparkContext

spark = SparkSession.builder.appName('Recommendations').getOrCreate()

In [31]:
ratings = spark.read.csv("item_dedup.csv",header=False)

In [32]:
ratings.show()

+--------------+----------+---+----------+
|           _c0|       _c1|_c2|       _c3|
+--------------+----------+---+----------+
|A3AF8FFZAZYNE5|0000000078|5.0|1092182400|
| AH2L9G3DQHHAJ|0000000116|4.0|1019865600|
|A2IIIDRK3PRRZY|0000000116|1.0|1395619200|
|A1TADCM7YWPQ8M|0000000868|4.0|1031702400|
| AWGH7V0BDOJKB|0000013714|4.0|1383177600|
|A3UTQPQPM4TQO0|0000013714|5.0|1374883200|
| A8ZS0I5L5V31B|0000013714|5.0|1393632000|
| ACNGUPJ3A3TM9|0000013714|4.0|1386028800|
|A3BED5QFJWK88M|0000013714|4.0|1350345600|
|A2SUAM1J3GNN3B|0000013714|5.0|1252800000|
| APOZ15IEYQRRR|0000013714|5.0|1362787200|
| AYEDW3BFK53XK|0000013714|5.0|1325462400|
|A1KLCGLCXYP1U1|0000013714|3.0|1376092800|
|A37W6POFWIVG13|0000013714|5.0|1316131200|
|A2EIPZNHAEXZHJ|0000013714|4.0|1325030400|
|A1VAFVJFT58YI3|0000013714|5.0|1384300800|
| A9KTKY6BUR8U6|0000013714|1.0|1357516800|
|A2742OG8PK8KU6|0000013714|5.0|1358380800|
|A38AAPXSJN4C5G|0000015393|4.0|1239494400|
|A14A5Q8VJK5NLR|0000029831|4.0|1393286400|
+----------

In [33]:
ratings.printSchema()


root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)



In [34]:
print('数据处理开始：')
ratings = ratings.toDF('userId', 'productID', 'rating', 'mytime')

from operator import attrgetter

# data.map(attrgetter("label", "features")).toDF(["some_name", "some_other_name"])

print('添加属性列头的依据在:')
print('https://jmcauley.ucsd.edu/data/amazon/links.html')

数据处理开始：
添加属性列头的依据在:
https://jmcauley.ucsd.edu/data/amazon/links.html


In [35]:
ratings.show()

+--------------+----------+------+----------+
|        userId| productID|rating|    mytime|
+--------------+----------+------+----------+
|A3AF8FFZAZYNE5|0000000078|   5.0|1092182400|
| AH2L9G3DQHHAJ|0000000116|   4.0|1019865600|
|A2IIIDRK3PRRZY|0000000116|   1.0|1395619200|
|A1TADCM7YWPQ8M|0000000868|   4.0|1031702400|
| AWGH7V0BDOJKB|0000013714|   4.0|1383177600|
|A3UTQPQPM4TQO0|0000013714|   5.0|1374883200|
| A8ZS0I5L5V31B|0000013714|   5.0|1393632000|
| ACNGUPJ3A3TM9|0000013714|   4.0|1386028800|
|A3BED5QFJWK88M|0000013714|   4.0|1350345600|
|A2SUAM1J3GNN3B|0000013714|   5.0|1252800000|
| APOZ15IEYQRRR|0000013714|   5.0|1362787200|
| AYEDW3BFK53XK|0000013714|   5.0|1325462400|
|A1KLCGLCXYP1U1|0000013714|   3.0|1376092800|
|A37W6POFWIVG13|0000013714|   5.0|1316131200|
|A2EIPZNHAEXZHJ|0000013714|   4.0|1325030400|
|A1VAFVJFT58YI3|0000013714|   5.0|1384300800|
| A9KTKY6BUR8U6|0000013714|   1.0|1357516800|
|A2742OG8PK8KU6|0000013714|   5.0|1358380800|
|A38AAPXSJN4C5G|0000015393|   4.0|

In [36]:
import pyspark.sql.functions as F

# ratings.withColumn('userId', F.expr("conv(userId, 16, 10)")).show()
print('使用spark F 函数和直接cast函数转换复杂列和简单列\n')
ratings = ratings.\
    withColumn('userId', F.expr("conv(userId, 16, 10)").cast('integer')).\
    withColumn('productID', col('productID').cast('integer')).\
    withColumn('rating', col('rating').cast('float')).\
    drop('mytime')
ratings.show()

使用spark F 函数和直接cast函数转换复杂列和简单列

+---------+---------+------+
|   userId|productID|rating|
+---------+---------+------+
|171636991|       78|   5.0|
|       10|      116|   4.0|
|      162|      116|   1.0|
|      161|      868|   4.0|
|       10|    13714|   4.0|
|      163|    13714|   5.0|
|      168|    13714|   5.0|
|      172|    13714|   4.0|
| 10731221|    13714|   4.0|
|      162|    13714|   5.0|
|       10|    13714|   5.0|
|       10|    13714|   5.0|
|      161|    13714|   3.0|
|     2615|    13714|   5.0|
|     2606|    13714|   4.0|
|      161|    13714|   5.0|
|      169|    13714|   1.0|
|   665410|    13714|   5.0|
|   669866|    15393|   4.0|
|   660645|    29831|   4.0|
+---------+---------+------+
only showing top 20 rows



In [37]:
ratings.printSchema()


root
 |-- userId: integer (nullable = true)
 |-- productID: integer (nullable = true)
 |-- rating: float (nullable = true)



In [38]:
# Count the total number of ratings in the dataset
numerator = ratings.select("rating").count()
print('numerator=', numerator)

# Count the number of distinct userIds and distinct movieIds
num_users = ratings.select("userId").distinct().count()
num_products = ratings.select("productID").distinct().count()

# Set the denominator equal to the number of users multiplied by the number of movies
denominator = num_users * num_products

# Divide the numerator by the denominator
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("The ratings dataframe is ", "%.2f" % sparsity + "% empty.")

numerator= 82677131
The ratings dataframe is  99.99% empty.


In [39]:
# Group data by userId, count ratings
print('数据探索1')
userId_ratings = ratings.groupBy("userId").count().orderBy('count', ascending=False)
userId_ratings.show()

数据探索1
+------+--------+
|userId|   count|
+------+--------+
|   161|12117249|
|    10|12105671|
|   162|12089527|
|   163| 9871284|
|  null|  632787|
|  2580|  381534|
|   175|  368543|
|   173|  345221|
|  2602|  343196|
|  2595|  342956|
|   171|  342613|
|  2608|  341928|
|   165|  340807|
|  2622|  340461|
|  2584|  340082|
|   164|  339785|
|  2620|  339625|
|  2585|  339567|
|  2577|  339408|
|   170|  339219|
+------+--------+
only showing top 20 rows



In [40]:
print('数据探索2')
productID_ratings = ratings.groupBy("productID").count().orderBy('count', ascending=False)
productID_ratings.show()

数据探索2
+----------+--------+
| productID|   count|
+----------+--------+
|      null|64387837|
| 439023483|   21398|
| 439023513|   14114|
| 385537859|   12973|
|   7444117|   12629|
| 375831002|   12571|
| 345803485|   12290|
|1608838137|   11906|
| 316055433|   11746|
| 849922070|   10424|
|   7442920|   10172|
| 345803493|    9980|
| 399159347|    9906|
| 345803507|    9610|
|   7386648|    9062|
|1469984202|    8280|
| 606238409|    8106|
| 316044695|    7986|
| 141039280|    7905|
| 545265355|    7786|
+----------+--------+
only showing top 20 rows



In [44]:
# 下面开始过滤无效数值
ratings = ratings.filter("productID is not NULL")
ratings = ratings.filter("userId is not NULL")
ratings = ratings.filter("rating is not NULL")

ratings=ratings.dropna()

numerator = ratings.select("rating").count()
print('numerator=', numerator)

numerator= 18154053


In [45]:


print('数据探索3 检查userid过滤')
userId_ratings = ratings.groupBy("userId").count().orderBy('count', ascending=False)
userId_ratings.show()

print('数据探索4 检查productid过滤')
productID_ratings = ratings.groupBy("productID").count().orderBy('count', ascending=False)
productID_ratings.show()

数据探索3 检查userid过滤
+------+-------+
|userId|  count|
+------+-------+
|   161|2683569|
|   162|2670672|
|    10|2664444|
|   163|2190621|
|  2580| 110418|
|   175|  99783|
|  2589|  77534|
|  2584|  77181|
|  2579|  77179|
|  2577|  76908|
|  2623|  76753|
|  2595|  76560|
|   170|  76376|
|  2588|  76215|
|   171|  76171|
|  2598|  76012|
|  2593|  76004|
|   173|  75813|
|  2599|  75803|
|  2585|  75439|
+------+-------+
only showing top 20 rows

数据探索4 检查productid过滤
+----------+-----+
| productID|count|
+----------+-----+
| 439023483|21177|
| 439023513|13967|
| 385537859|12883|
|   7444117|12526|
| 375831002|12477|
| 345803485|12199|
|1608838137|11842|
| 316055433|11664|
| 849922070|10338|
|   7442920|10083|
| 345803493| 9880|
| 399159347| 9837|
| 345803507| 9508|
|   7386648| 8991|
|1469984202| 8222|
| 606238409| 8039|
| 316044695| 7920|
| 141039280| 7834|
| 545265355| 7705|
|1442359315| 7639|
+----------+-----+
only showing top 20 rows



# 构建模型 #

In [16]:
(train, test) = ratings.randomSplit([0.8, 0.2], seed = 1234)

# Create ALS model
als = ALS(userCol="userId", itemCol="productID", ratingCol="rating", nonnegative = True, implicitPrefs = False, coldStartStrategy="drop")

print('输出模型类型，确认使用了ALS =》')
type(als)

输出模型类型，确认使用了ALS =》


pyspark.ml.recommendation.ALS

In [21]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Add hyperparameters and their respective values to param_grid
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()
            #             .addGrid(als.maxIter, [5, 50, 100, 200]) \

           
# Define evaluator as RMSE and print length of evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))

Num models to be tested:  16


# 进行模型交叉验证 #

In [23]:
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

# Confirm cv was built
print('交叉验证', cv)

#Fit cross validator to the 'train' dataset
model = cv.fit(train)

#Extract best model from the cv model above
best_model = model.bestModel

交叉验证 CrossValidator_cae4615c979e


In [24]:
# Print best_model
print(type(best_model))

# Complete the code below to extract the ALS model parameters
print("**Best Model**")

# # Print "Rank"
print("  Rank:", best_model._java_obj.parent().getRank())

# Print "MaxIter"
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())

# Print "RegParam"
print("  RegParam:", best_model._java_obj.parent().getRegParam())

<class 'pyspark.ml.recommendation.ALSModel'>
**Best Model**
  Rank: 150
  MaxIter: 10
  RegParam: 0.15


In [41]:
# View the predictions
test_predictions = best_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)


In [43]:
print(RMSE)

0.8319647284122662
