## 1.数据集介绍
用户收听曲目信息表：track.csv

数据集大小：100万；
用户数：5000；
不重复音乐数：1700；
用户-音乐记录数：851475
<table>
  <tr>
    <th><strong>Field Name</strong></th>
    <th>Event ID</th>
    <th>Customer ID</th>
    <th>Track ID</th>
    <th>Datetime</th>
    <th>Mobile</th>
    <th>Listening Zip</th>
  </tr>
  <tr>
    <td><strong>Type</strong></td>
    <td>Integer</td>
    <td>Integer</td>
    <td>Integer</td>
    <td>String</td>
    <td>Integer</td>
    <td>Integer</td>
  </tr>
  <tr>
    <td><strong>Example Value</strong></td>
    <td>9999767</td>
    <td>2597</td>
    <td>788</td>
    <td>2014-12-01 09:54:09</td>
    <td>0</td>
    <td>11003</td>
  </tr>
</table>

## 2.导入相关python包

In [1]:
# 相关模块导入
import os
import pandas as pd
import numpy as np

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import col

from pyspark.mllib.recommendation import ALS, Rating, MatrixFactorizationModel
from pyspark.mllib.evaluation import RegressionMetrics

## 3.创建Spark会话，并加载数据

In [2]:
# 构建SparkSession实例对象
spark = SparkSession.builder.master("local")\
        .appName("music-customer-analysis-with-pyspark").getOrCreate()

# 获取SparkContext实例对象
sc = spark.sparkContext

In [3]:
# 创建数据模式，加载数据
music_schema = StructType([
    StructField('event_id', IntegerType(), nullable=True),
    StructField('customer_id', IntegerType(), nullable=True),
    StructField('track_id', IntegerType(), nullable=True),
    StructField('datetime', StringType(), nullable=True),
    StructField('is_mobile', IntegerType(), nullable=True),
    StructField('zip', IntegerType(), nullable=True)
])
music_df = spark.read.csv('data/tracks.csv', schema=music_schema, header=False)
music_df.createOrReplaceTempView("music")
music_df.registerTempTable("music")

## 4.数据观察

In [4]:
print(music_df.count())
music_df.show(5)

1000000
+--------+-----------+--------+-------------------+---------+-----+
|event_id|customer_id|track_id|           datetime|is_mobile|  zip|
+--------+-----------+--------+-------------------+---------+-----+
|       0|         48|     453|2014-10-23 03:26:20|        0|72132|
|       1|       1081|      19|2014-10-15 18:32:14|        1|17307|
|       2|        532|      36|2014-12-10 15:33:16|        1|66216|
|       3|       2641|     822|2014-10-20 02:24:55|        1|36690|
|       4|       2251|     338|2014-11-18 07:16:05|        1|61377|
+--------+-----------+--------+-------------------+---------+-----+
only showing top 5 rows



## 5.数据预处理：获取输入数据集结构（用户，音乐，收听次数）三元组

In [5]:
spark.sql("select customer_id, track_id, count(*) as rating from music \
           group by customer_id, track_id \
           order by customer_id, count(*) desc").show(8)

+-----------+--------+------+
|customer_id|track_id|rating|
+-----------+--------+------+
|          0|       0|    93|
|          0|       1|    55|
|          0|       2|    53|
|          0|       4|    45|
|          0|       3|    41|
|          0|       5|    39|
|          0|       6|    31|
|          0|       8|    31|
+-----------+--------+------+
only showing top 8 rows



## 6.函数定义：模型训练函数和评估函数

In [6]:
# 定义函数，训练模型与模型评估
def train_model_evaluate(training_rdd, testing_rdd, rank, iterations, lambda_):
    # 使用超参数的值，训练数据和ALS算法训练模型
    model = ALS.train(training_rdd, rank, iterations, lambda_)

    # 模型的评估
    print('rank=%d, iterations=%d, lambda_=%f' % (rank, iterations, lambda_))
    rmse_value = alsModelEvaluate(model, testing_rdd)

    # 返回多元组
    return (model, rmse_value, rank, iterations, lambda_)

In [7]:
# 模型评估函数
def alsModelEvaluate(model, testing_rdd):
    # 针对测试数据集进行预测
    predict_rdd = model.predictAll(testing_rdd.map(lambda r: (r[0], r[1])))
    predict_actual_rdd = predict_rdd.map(lambda r: ((r[0], r[1]), r[2])) \
        .join(testing_rdd.map(lambda r: ((r[0], r[1]), r[2])))

    # 创建评估指标实例对象
    metrics = RegressionMetrics(predict_actual_rdd.map(lambda pr: pr[1]))
    print("MSE = %s" % metrics.meanSquaredError)
    print("RMSE = %s" % metrics.rootMeanSquaredError)

    # 返回均方根误差
    return metrics.rootMeanSquaredError

## 7.模型训练：超参数选择，最优模型

In [8]:
# 训练模型，并选择评估结果最好的模型
rawRatings = spark.sql("select customer_id, track_id, count(*) as rating from music \
           group by customer_id, track_id \
           order by customer_id, count(*) desc").rdd
ratings = rawRatings.map(lambda x: Rating(int(x[0]),int(x[1]),float(x[2])))
(training, test) = ratings.randomSplit([0.8, 0.2])

# ALS算法的超参数的调整（使用三层for循环，设置不同参数的值，分别使用ALS算法训练模型，评估获取RMSE的值）
metrix_list = [train_model_evaluate(training, test, param_rank, param_iterations, param_lambda)
               for param_rank in [10, 20]
               for param_iterations in [10, 20]
               for param_lambda in [0.01]]
sorted(metrix_list, key=lambda k: k[1], reverse=False)
model, rmse_value, rank, iterations, lambda_ = metrix_list[0]
print("The best parameters, rank=%s, iterations=%s, lambda_=%s" % (rank, iterations, lambda_))

rank=10, iterations=10, lambda_=0.010000
MSE = 0.28716683662293263
RMSE = 0.5358794982297164
rank=10, iterations=20, lambda_=0.010000
MSE = 0.28757172413183685
RMSE = 0.5362571436650861
rank=20, iterations=10, lambda_=0.010000
MSE = 0.29285678025140727
RMSE = 0.5411624342574115
rank=20, iterations=20, lambda_=0.010000
MSE = 0.29403526408964
RMSE = 0.5422501858825315
The best parameters, rank=10, iterations=10, lambda_=0.01


## 8.用户和物品的特征因子矩阵

In [9]:
# 用户特征因子矩阵
user_feature_matrix = model.userFeatures()
print(user_feature_matrix.first())

# 物品特征因子矩阵
item_feature_matrix = model.productFeatures()
print(item_feature_matrix.first())

(0, array('d', [-5.566326141357422, -1.6652482748031616, -3.4502928256988525, -2.164012908935547, 7.6700239181518555, -7.7029805183410645, -2.8192532062530518, -7.601554870605469, -0.21442097425460815, 1.1964198350906372]))
(0, array('d', [0.053542573004961014, -1.1422486305236816, -0.12883080542087555, -2.3653976917266846, 1.4874904155731201, -3.776639699935913, 0.15738777816295624, -5.200829029083252, 1.451997995376587, -1.614256501197815]))


## 9.使用模型进行推荐
### 9.1基于用户的推荐

In [10]:
# 基于用户的推荐
userId, itemId, recNum = 0, 0, 5

# 为用户推荐音乐
topKRecs = model.recommendProducts(userId, recNum)
print('给用户%d推荐%d个喜欢的音乐：' % (userId, recNum))
for rec in topKRecs:
    print(rec)

# 为音乐推荐用户
topKRecs = model.recommendUsers(itemId, recNum)
print('将音乐%d推荐给%d个喜欢它的用户：' % (itemId, recNum))
for rec in topKRecs:
    print(rec)

# 预测某个用户对某个音乐的评分
print('给用户%d推荐音乐%d的评分为:' % (userId, itemId), model.predict(userId, itemId))

给用户0推荐5个喜欢的音乐：
Rating(user=0, product=0, rating=84.51582029704726)
Rating(user=0, product=1, rating=52.18686044644898)
Rating(user=0, product=2, rating=48.86769372137471)
Rating(user=0, product=4, rating=41.34987181609816)
Rating(user=0, product=3, rating=39.46888008305788)
将音乐0推荐给5个喜欢它的用户：
Rating(user=0, product=0, rating=84.51582029704726)
Rating(user=4, product=0, rating=46.68360221697599)
Rating(user=1, product=0, rating=45.45440158290519)
Rating(user=3, product=0, rating=42.969510613587055)
Rating(user=14, product=0, rating=32.78906027734245)
给用户0推荐音乐0的评分为: 84.51582029704726


### 9.2基于物品的推荐

In [11]:
# 基于物品的推荐
def cosineSImilarity(x,y):
    return np.dot(x,y)/(np.linalg.norm(x)*np.linalg.norm(y))

itemId, recNum = 0, 5
itemFactor = model.productFeatures().lookup(itemId)[0] 

#计算该商品与其他所有商品的余弦相似度
sims = model.productFeatures().map(lambda line:\
       (line[0], cosineSImilarity(np.array(line[1]), np.array(itemFactor))))
simItem = sims.sortBy(lambda line:line[1],ascending=False).collect()
print('与音乐%d相似的前%d个音乐是：' % (itemId, recNum))
for i in simItem[:recNum]:
    print(i)

与音乐0相似的前5个音乐是：
(0, 1.0)
(48, 0.676069769085381)
(41, 0.6350276830270337)
(29, 0.6289726465048315)
(26, 0.6081736969039829)


## 10.模型保存与加载

In [13]:
# 保存模型
model.save(sc, "model/als_model")

# 加载模型
model = MatrixFactorizationModel.load(sc, "model/als_model")