In [1]:
# 导入pyspark库
import findspark
findspark.init()
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
# 初始化并创建spark对象
conf = SparkConf().setMaster('local').setAppName('keshe')
spark=SparkSession(SparkContext(conf=conf))

In [2]:
# 从hdfs中读取数据
data = spark.read.format('csv').load('/keshe/user_meal.csv',header=True,encoding='utf-8')
# df = data.select("UserID","MealID","Rating").show()
#读取本地数据，参数header=True为使用第一行作为列的名称
#data = spark.read.format('csv').load("file:///home/hadoop/jupyternotebook/keshe/user_meal.csv",header=True)
data.show()

+--------------+------+----------+--------------------+----------+
|        UserID|Rating|ReviewTime|              Review|    MealID|
+--------------+------+----------+--------------------+----------+
|A2A6NH6DPE0VXR|     4|1493576000|        非常非常好吃|B000W4WD40|
|A1MNDBR7DF0EU9|     5|1493576000|太美味了，强烈推荐！|B002BLCNHY|
| AT1BYQVGK7U71|     5|1493576000|          简直太赞了|B001SE07JG|
|A328S9RN3U5M68|     3|1493576000|            味道很正|B001PN63PC|
|A16H208JVRTMU4|     5|1493576000|太美味了，强烈推荐！|B000WT7R6O|
| ATDNMB4EB7ZY4|     2|1493576000|        有特色，好吃|B000NHRTAO|
|A3VNYHAEKTHVPY|     3|1493577600|        非常非常好吃|B005GT575S|
|A13MM7UES60AAU|     5|1493592000|太美味了，强烈推荐！|B008X0SGDC|
|A3TNYNA2360NPA|     5|1493592000|太美味了，强烈推荐！|B008QTTGGG|
|A206S2JFUZ5WT1|     5|1493592000|太美味了，强烈推荐！|B008O2QERY|
|A2UO0MN92AQ4L3|     4|1493592000|    很美味，推荐品尝|B008JSO786|
|A3FYKYY3BR4NN2|     3|1493592000|        有特色，卫生|B0099JKR6U|
|A3UPYGJKZ0XTU4|     3|1493592000|        有特色，卫生|B008DGRDZ8|
| AZJOKE3Y0UCBC|     5|14

In [3]:
# 特征转换
from pyspark.ml.feature import StringIndexer
# 初始化StringIndexer
indexer1 = StringIndexer(inputCol='MealID',outputCol='MealID_Index').fit(data)
data_index1 = indexer1.transform(data)
indexer2 = StringIndexer(inputCol='UserID',outputCol='UserID_Index').fit(data_index1)
data_index = indexer2.transform(data_index1)
data_index.show(10)

+--------------+------+----------+--------------------+----------+------------+------------+
|        UserID|Rating|ReviewTime|              Review|    MealID|MealID_Index|UserID_Index|
+--------------+------+----------+--------------------+----------+------------+------------+
|A2A6NH6DPE0VXR|     4|1493576000|        非常非常好吃|B000W4WD40|       303.0|       295.0|
|A1MNDBR7DF0EU9|     5|1493576000|太美味了，强烈推荐！|B002BLCNHY|       488.0|      1122.0|
| AT1BYQVGK7U71|     5|1493576000|          简直太赞了|B001SE07JG|       162.0|      3251.0|
|A328S9RN3U5M68|     3|1493576000|            味道很正|B001PN63PC|       408.0|         6.0|
|A16H208JVRTMU4|     5|1493576000|太美味了，强烈推荐！|B000WT7R6O|       273.0|      3401.0|
| ATDNMB4EB7ZY4|     2|1493576000|        有特色，好吃|B000NHRTAO|       940.0|       556.0|
|A3VNYHAEKTHVPY|     3|1493577600|        非常非常好吃|B005GT575S|       107.0|      3018.0|
|A13MM7UES60AAU|     5|1493592000|太美味了，强烈推荐！|B008X0SGDC|        48.0|      3372.0|
|A3TNYNA2360NPA|     5|1493592000|

In [4]:
from pyspark.ml.recommendation import ALS
#字段类型转换，因为ALS模型只支持int或float
data_als = data_index.selectExpr(
    "cast(MealID_Index as int) MealID_Index",
    "cast(UserID_Index as int) UserID_Index",
    "cast(Rating as float) Rating"
)

#拆分数据集
training,test = data_als.randomSplit([0.8,0.2])

#构建ALS模型，将冷启动策略设置为“下降”（coldStartStrategy="drop"),以确保我们不会获得NaN评估指标
als = ALS(maxIter=5, regParam=0.01,userCol='UserID_Index',\
          itemCol='MealID_Index',ratingCol='Rating',coldStartStrategy="drop")

#训练模型
model = als.fit(training)

In [5]:
from pyspark.ml.evaluation import RegressionEvaluator
# 评估模型
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName='rmse',labelCol='Rating',predictionCol='prediction')
rmse = evaluator.evaluate(predictions)
print('均方根误差='+str(rmse))

均方根误差=5.474643567145228


In [6]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
#为每个用户生成十大菜品推荐
userRecs = model.recommendForAllUsers(10)
#结果转换
flatUserRecs = userRecs.rdd.flatMapValues(lambda p: p)
#转换后的结果放在DataFrame中
userRecs_res = spark.createDataFrame(flatUserRecs)
#字段名称重命名
userRecs_res = userRecs_res.selectExpr("cast(_1 as int) UserID_Index",
                                       "cast(_2 as String) recommed")
# 将生成的推荐数据保存到本地
userRecs_res.toPandas().to_csv('user10_recommend.csv',index=False,header=None)
userRecs_res.show()



+------------+--------------------+
|UserID_Index|            recommed|
+------------+--------------------+
|           0|{503, 9.370707511...|
|           0|{727, 7.570865631...|
|           0|{1408, 7.34706544...|
|           0|{387, 7.088261127...|
|           0|{1011, 7.04486417...|
|           0|{1012, 6.75635528...|
|           0|{308, 6.551982879...|
|           0|{469, 6.426281929...|
|           0|{634, 6.402853488...|
|           0|{193, 6.339433193...|
|           1|{1615, 10.5455341...|
|           1|{919, 9.954395294...|
|           1|{866, 9.489836692...|
|           1|{726, 8.684346199...|
|           1|{463, 8.409170150...|
|           1|{982, 8.407964706...|
|           1|{719, 8.217915534...|
|           1|{106, 8.214841842...|
|           1|{561, 8.184371948...|
|           1|{276, 7.930413246...|
+------------+--------------------+
only showing top 20 rows



In [7]:
# 为每个菜品生成十大用户推荐
mealRecs = model.recommendForAllItems(10)
# mealRecs.show()
# 结果转换
flatMealRecs = mealRecs.rdd.flatMapValues(lambda p : p)
mealRecs_res = spark.createDataFrame(flatMealRecs)
# 字段名称重命名
mealRecs_res1 = mealRecs_res.selectExpr('cast(_1 as int) MealID_index','cast(_2 as String) recommend')
# 将生成的推荐数据保存到本地
mealRecs_res1.toPandas().to_csv('meal10_recommend.csv',index=False,header=None)
mealRecs_res1.show()

+------------+--------------------+
|MealID_index|           recommend|
+------------+--------------------+
|           0|{471, 14.01844596...|
|           0|{867, 13.32805156...|
|           0|{1823, 13.2197532...|
|           0|{1410, 12.4298095...|
|           0|{1096, 11.9871644...|
|           0|{631, 11.24498271...|
|           0|{5007, 11.0192613...|
|           0|{532, 10.85748004...|
|           0|{88, 10.817880630...|
|           0|{127, 10.73826217...|
|           1|{194, 10.38314151...|
|           1|{323, 9.957196235...|
|           1|{867, 9.350378990...|
|           1|{685, 8.064225196...|
|           1|{794, 7.782574653...|
|           1|{1499, 7.52082633...|
|           1|{738, 7.111577033...|
|           1|{196, 6.976717948...|
|           1|{1302, 6.79769372...|
|           1|{864, 6.677715301...|
+------------+--------------------+
only showing top 20 rows



In [8]:
# 生成基于用户推荐
users = userRecs_res.select(als.getUserCol()).distinct().limit(10)
userRecs1 = model.recommendForUserSubset(users, 10)
userRecs1.show()
# 生成基于菜品推荐
items = mealRecs_res1.select(als.getItemCol()).distinct().limit(10)
itemRecs1 = model.recommendForItemSubset(items, 10)
itemRecs1.show()

+------------+--------------------+
|UserID_Index|     recommendations|
+------------+--------------------+
|        1580|[{850, 6.4149814}...|
|         471|[{952, 49.011143}...|
|        1591|[{743, 40.471184}...|
|        1342|[{472, 22.56163},...|
|         463|[{505, 5.290133},...|
|         833|[{661, 18.396915}...|
|         496|[{338, 31.072645}...|
|         148|[{662, 47.93017},...|
|        1088|[{304, 11.246425}...|
|        1238|[{271, 20.090534}...|
+------------+--------------------+

+------------+--------------------+
|MealID_Index|     recommendations|
+------------+--------------------+
|        1580|[{553, 15.483669}...|
|         471|[{656, 41.004883}...|
|        1591|[{526, 17.230974}...|
|        1342|[{526, 42.396187}...|
|         463|[{1517, 44.82696}...|
|         833|[{1526, 44.78265}...|
|         496|[{1104, 40.856556...|
|         148|[{194, 34.87207},...|
|        1088|[{194, 13.739573}...|
|        1238|[{379, 28.787289}...|
+------------+-------------

In [9]:
# 为指定的用户生成十大推荐
user_df = spark.createDataFrame([(2040,)],['UserID_Index'])
user_recs = model.recommendForUserSubset(user_df,10)
user_recs.show(truncate=False)


+------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|UserID_Index|recommendations                                                                                                                                                                     |
+------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|2040        |[{625, 14.219399}, {704, 13.005264}, {271, 12.67974}, {619, 12.295608}, {384, 12.218763}, {134, 12.204218}, {297, 11.899029}, {331, 11.4081745}, {657, 11.347091}, {307, 11.127257}]|
+------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+



In [10]:
# 为指定的菜品生成十大用户推荐
item_df = spark.createDataFrame([(270,)],['MealID_Index'])
item_recs = model.recommendForItemSubset(item_df,10)
item_recs.show(truncate=False)

+------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|MealID_Index|recommendations                                                                                                                                                                        |
+------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|270         |[{289, 41.717537}, {1295, 39.427933}, {1875, 37.360756}, {153, 33.697693}, {685, 31.277645}, {206, 29.526785}, {414, 29.069048}, {1833, 25.734497}, {233, 25.080442}, {740, 24.944597}]|
+------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+



In [11]:
# 生成用户对菜品的十大最佳匹配度
res_users = userRecs.selectExpr('UserID_Index','explode(recommendations) as mr')
res_users1 = res_users.selectExpr('UserID_Index','mr.MealID_Index','mr.Rating')
res_users1.printSchema()
res_users1.show()

root
 |-- UserID_Index: integer (nullable = false)
 |-- MealID_Index: integer (nullable = true)
 |-- Rating: float (nullable = true)

+------------+------------+---------+
|UserID_Index|MealID_Index|   Rating|
+------------+------------+---------+
|           0|         503|9.3707075|
|           0|         727|7.5708656|
|           0|        1408|7.3470654|
|           0|         387| 7.088261|
|           0|        1011| 7.044864|
|           0|        1012|6.7563553|
|           0|         308| 6.551983|
|           0|         469| 6.426282|
|           0|         634|6.4028535|
|           0|         193| 6.339433|
|           1|        1615|10.545534|
|           1|         919| 9.954395|
|           1|         866| 9.489837|
|           1|         726| 8.684346|
|           1|         463|  8.40917|
|           1|         982| 8.407965|
|           1|         719| 8.217916|
|           1|         106| 8.214842|
|           1|         561| 8.184372|
|           1|         276|7.9

In [12]:
# 生成菜品对用户的十大最佳匹配度
res_items = mealRecs.selectExpr('MealID_Index','explode(recommendations) as ur')
res_items1 = res_items.selectExpr('MealID_Index','ur.UserID_Index','ur.Rating')
res_items1.printSchema()
res_items1.show()

root
 |-- MealID_Index: integer (nullable = false)
 |-- UserID_Index: integer (nullable = true)
 |-- Rating: float (nullable = true)

+------------+------------+----------+
|MealID_Index|UserID_Index|    Rating|
+------------+------------+----------+
|           0|         471| 14.018446|
|           0|         867| 13.328052|
|           0|        1823| 13.219753|
|           0|        1410|  12.42981|
|           0|        1096|11.9871645|
|           0|         631| 11.244983|
|           0|        5007| 11.019261|
|           0|         532|  10.85748|
|           0|          88| 10.817881|
|           0|         127| 10.738262|
|           1|         194|10.3831415|
|           1|         323|  9.957196|
|           1|         867|  9.350379|
|           1|         685|  8.064225|
|           1|         794| 7.7825747|
|           1|        1499| 7.5208263|
|           1|         738|  7.111577|
|           1|         196|  6.976718|
|           1|        1302| 6.7976937|
|       

In [13]:
# 将推荐结果存放到数据仓库
from sqlalchemy import create_engine
engine=create_engine('mysql+pymysql://root:fsd009750@localhost:3306/spark')
res_users1.toPandas().to_sql(name='userRecs',con=engine,if_exists='replace',index=False)
res_items1.toPandas().to_sql(name='mealRecs',con=engine,if_exists='replace',index=False)

In [14]:
spark.stop()