In [9]:
sc

In [10]:
sc.master

'yarn'

In [11]:
sc.setCheckpointDir('hdfs://zh:9000/checkpoint')

In [12]:
from dateutil.parser import parse

In [13]:
import pyspark

In [14]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [15]:
import numpy as np

In [16]:
import pandas as pd

In [17]:
from pyspark.sql import Row

In [18]:
import numpy as np
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry

# 探索ml_100k数据

### 探索用户数据

In [19]:
user_data = sc.textFile('hdfs://zh:9000/ml-100k/u.user')

In [20]:
user_data.first()

'1|24|M|technician|85711'

### 探索电影数据

In [21]:
movie_data = sc.textFile('hdfs://zh:9000/ml-100k/u.item')

In [22]:
movie_data.first()

'1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0'

In [23]:
movie_data.count()

1682

In [24]:
movie_fields = movie_data.map(lambda x:x.split('|'))

In [25]:
def parse_exception(x):
    try:
        return str(parse(x).year)
    except Exception as e:
        return '1900'

In [26]:
years = movie_fields.map(lambda x:x[2]).map(parse_exception)

In [27]:
years_filtered = years.filter(lambda x:x!='1900')

In [28]:
years_filtered.take(1)

['1995']

### 探索评级数据

In [29]:
rating_data = sc.textFile('hdfs://zh:9000/ml-100k/u.data')

In [30]:
rating_data.first()

'196\t242\t3\t881250949'

In [31]:
num_ratings = rating_data.count()

In [32]:
num_ratings

100000

In [33]:
rating_data = rating_data.map(lambda x:x.split('\t'))

### 提取有效特征

In [34]:
rating_data_raw = sc.textFile('hdfs://zh:9000/ml-100k/u.data')

In [35]:
rating_data = rating_data_raw.map(lambda x:x.split('\t')[0:3])

In [36]:
rating_data.take(2)

[['196', '242', '3'], ['186', '302', '3']]

In [37]:
#rating_data.map(lambda x:np.str(x[0])).take(10)
#验证必须worker和driver有着相同的python包才能跑

In [38]:
rating_data_row = rating_data.map(lambda x:Row(user = int(x[0]),movie = int(x[1]), rating = float(x[2])))

In [39]:
rating_df = spark.createDataFrame(rating_data_row)

### 训练和测试

In [40]:
training_set, test_set =  rating_df.randomSplit([0.8, 0.2])

In [41]:
als = ALS(rank=50, maxIter=15, regParam=0.01, userCol='user', itemCol='movie', ratingCol='rating', seed=None,)

In [42]:
model_fited = als.fit(training_set)

In [43]:
predictions = model_fited.transform(test_set)
predictions

DataFrame[movie: bigint, rating: double, user: bigint, prediction: float]

In [44]:
predictions.take(10)

[Row(movie=148, rating=5.0, user=642, prediction=2.629945993423462),
 Row(movie=148, rating=4.0, user=236, prediction=2.289193868637085),
 Row(movie=148, rating=2.0, user=896, prediction=3.4841086864471436),
 Row(movie=148, rating=2.0, user=274, prediction=2.9117355346679688),
 Row(movie=148, rating=5.0, user=20, prediction=4.003357410430908),
 Row(movie=148, rating=2.0, user=479, prediction=2.6461288928985596),
 Row(movie=148, rating=2.0, user=430, prediction=2.8336360454559326),
 Row(movie=148, rating=3.0, user=347, prediction=2.4326119422912598),
 Row(movie=148, rating=3.0, user=234, prediction=2.8896214962005615),
 Row(movie=148, rating=4.0, user=534, prediction=3.5388197898864746)]

In [45]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

In [46]:
RMSE = evaluator.evaluate(predictions)

In [47]:
RMSE

nan

* 测试集中存在不存在于训练集的用户或者产品，所谓的冷启动问题

In [48]:
predictions.where("prediction == 'NaN' ").show()

+-----+------+----+----------+
|movie|rating|user|prediction|
+-----+------+----+----------+
| 1653|   5.0| 675|       NaN|
| 1350|   2.0| 207|       NaN|
| 1350|   1.0| 181|       NaN|
| 1564|   1.0| 405|       NaN|
| 1659|   1.0| 747|       NaN|
| 1582|   1.0| 405|       NaN|
| 1633|   3.0| 655|       NaN|
| 1505|   4.0| 291|       NaN|
| 1590|   3.0| 782|       NaN|
| 1590|   1.0| 405|       NaN|
| 1626|   1.0| 648|       NaN|
| 1595|   2.0| 425|       NaN|
| 1636|   4.0| 655|       NaN|
| 1654|   1.0| 676|       NaN|
| 1310|   3.0| 167|       NaN|
| 1520|   3.0| 314|       NaN|
| 1494|   1.0| 279|       NaN|
| 1638|   3.0| 655|       NaN|
| 1325|   1.0| 181|       NaN|
| 1492|   4.0| 279|       NaN|
+-----+------+----+----------+
only showing top 20 rows



In [49]:
predictions_excep_nan = predictions.where("prediction != 'NaN' ")

In [50]:
RMSE = evaluator.evaluate(predictions_excep_nan)

In [51]:
RMSE

1.21748365856646

### MAPK

In [52]:
k = 10

In [53]:
rating_data.count()

100000

#### 计算预测结果

In [55]:
pred_top_k = model_fited.recommendForAllUsers(k).rdd

MapPartitionsRDD[460] at javaToPython at NativeMethodAccessorImpl.java:0

In [57]:
pred_top_k = pred_top_k.map(lambda x:(x.user, x.recommendations))

In [58]:
pred_top_k.cache()
pred_top_k.checkpoint()

In [60]:
pred_top_k = pred_top_k.\
mapValues(lambda x_list:[(x.movie, x.rating) for x in x_list])

* 这里开始报错StackOverflowError
* 原因是执行中有着很多的for循环导致栈溢出，解决办法：1、checkpoint，缓存rdd到hdfs 2、使用sparksql(未使用)

In [61]:
pred_top_k.take(2)  

[(463,
  [(316, 5.521382808685303),
   (283, 5.386106967926025),
   (311, 5.37991189956665),
   (302, 5.100201606750488),
   (19, 5.036096572875977),
   (258, 5.0186991691589355),
   (887, 4.961373805999756),
   (275, 4.922547340393066),
   (221, 4.919186115264893),
   (253, 4.897623538970947)]),
 (833,
  [(187, 5.317366600036621),
   (160, 5.105474948883057),
   (654, 5.097400665283203),
   (856, 5.050495147705078),
   (86, 5.037764072418213),
   (302, 5.031163215637207),
   (616, 5.012308597564697),
   (1070, 4.9963178634643555),
   (1019, 4.98592472076416),
   (185, 4.973251819610596)])]

In [62]:
pred_top_k.cache()
pred_top_k.checkpoint()

In [63]:
pred_top_k.count()

943

#### 实际的排序结果

In [64]:
rating_data.take(2)

[['196', '242', '3'], ['186', '302', '3']]

In [65]:
actual_rating_data = rating_data.map(lambda x:(x[0], (x[1], x[2])))

In [66]:
actual_rating_data.take(2)

[('196', ('242', '3')), ('186', ('302', '3'))]

In [67]:
def list_append(x,y):
    x.append(y)
    return x

In [68]:
actual_rating_data_by_user = actual_rating_data.sortBy(lambda x:x[1][1], ascending=False\
).sortByKey().aggregateByKey([], lambda x, y:list_append(x, y), lambda x, y:x+y)

In [69]:
actual_rating_data_by_user.cache()
actual_rating_data_by_user.checkpoint()

In [70]:
actual_rating_data_by_user.take(1)

[('190',
  [('302', '5'),
   ('471', '5'),
   ('313', '5'),
   ('272', '5'),
   ('237', '5'),
   ('288', '5'),
   ('100', '4'),
   ('7', '4'),
   ('117', '4'),
   ('147', '4'),
   ('751', '4'),
   ('300', '4'),
   ('628', '4'),
   ('269', '4'),
   ('222', '4'),
   ('405', '4'),
   ('333', '4'),
   ('354', '4'),
   ('15', '4'),
   ('245', '4'),
   ('273', '4'),
   ('591', '4'),
   ('276', '4'),
   ('544', '4'),
   ('310', '4'),
   ('326', '4'),
   ('148', '4'),
   ('546', '3'),
   ('328', '3'),
   ('258', '3'),
   ('826', '3'),
   ('24', '3'),
   ('717', '3'),
   ('742', '3'),
   ('282', '3'),
   ('118', '3'),
   ('294', '3'),
   ('291', '3'),
   ('748', '3'),
   ('895', '3'),
   ('125', '3'),
   ('281', '3'),
   ('989', '3'),
   ('685', '3'),
   ('121', '3'),
   ('508', '3'),
   ('696', '3'),
   ('327', '2'),
   ('930', '2'),
   ('977', '2'),
   ('974', '2'),
   ('363', '2'),
   ('1313', '2'),
   ('539', '2'),
   ('898', '2'),
   ('823', '2'),
   ('597', '2'),
   ('9', '1'),
   ('340',

* sortByKey之后用aggregateByKey导致rating5的在不同machine的结果没有按照顺序来。
* 排序应该是最后，否则中间的操作可能会破坏顺序，可能跟存储位置有关。

In [71]:
actual_rating_data_by_user = actual_rating_data.aggregateByKey([], lambda x, y:list_append(x,y), lambda x, y:x+y)

In [72]:
def list_sorted(a_list, reverse=True):
    a_list.sort(key=lambda x:x[1],  reverse=True)
    return a_list

In [73]:
actual_rating_data_by_user_sorted = actual_rating_data_by_user.mapValues(lambda x:list_sorted(x))

In [74]:
actual_rating_data_by_user.cache()
actual_rating_data_by_user.checkpoint()

In [75]:
actual_rating_data_by_user_sorted.take(1)

[('190',
  [('237', '5'),
   ('288', '5'),
   ('302', '5'),
   ('471', '5'),
   ('313', '5'),
   ('272', '5'),
   ('100', '4'),
   ('7', '4'),
   ('117', '4'),
   ('147', '4'),
   ('751', '4'),
   ('300', '4'),
   ('628', '4'),
   ('269', '4'),
   ('222', '4'),
   ('405', '4'),
   ('333', '4'),
   ('354', '4'),
   ('15', '4'),
   ('245', '4'),
   ('273', '4'),
   ('591', '4'),
   ('276', '4'),
   ('544', '4'),
   ('310', '4'),
   ('326', '4'),
   ('148', '4'),
   ('546', '3'),
   ('328', '3'),
   ('258', '3'),
   ('826', '3'),
   ('24', '3'),
   ('717', '3'),
   ('742', '3'),
   ('282', '3'),
   ('118', '3'),
   ('294', '3'),
   ('291', '3'),
   ('748', '3'),
   ('895', '3'),
   ('125', '3'),
   ('281', '3'),
   ('989', '3'),
   ('685', '3'),
   ('121', '3'),
   ('508', '3'),
   ('696', '3'),
   ('327', '2'),
   ('930', '2'),
   ('977', '2'),
   ('974', '2'),
   ('363', '2'),
   ('1313', '2'),
   ('539', '2'),
   ('898', '2'),
   ('823', '2'),
   ('597', '2'),
   ('9', '1'),
   ('340',

In [76]:
actual_top_k = actual_rating_data_by_user_sorted.mapValues(lambda a_list:a_list[:k])

* a[:k] 如果a的length<k, return a

In [78]:
actual_top_k.take(100)

[('190',
  [('237', '5'),
   ('288', '5'),
   ('302', '5'),
   ('471', '5'),
   ('313', '5'),
   ('272', '5'),
   ('100', '4'),
   ('7', '4'),
   ('117', '4'),
   ('147', '4')]),
 ('263',
  [('133', '5'),
   ('357', '5'),
   ('199', '5'),
   ('135', '5'),
   ('378', '5'),
   ('141', '5'),
   ('176', '5'),
   ('419', '5'),
   ('416', '5'),
   ('523', '5')]),
 ('110',
  [('313', '5'),
   ('161', '5'),
   ('333', '4'),
   ('12', '4'),
   ('54', '4'),
   ('939', '4'),
   ('651', '4'),
   ('739', '4'),
   ('569', '4'),
   ('307', '4')]),
 ('695',
  [('1024', '5'),
   ('346', '5'),
   ('242', '5'),
   ('991', '5'),
   ('268', '5'),
   ('358', '5'),
   ('319', '5'),
   ('260', '4'),
   ('995', '4'),
   ('270', '4')]),
 ('473',
  [('246', '5'),
   ('475', '5'),
   ('127', '5'),
   ('150', '5'),
   ('273', '5'),
   ('1142', '5'),
   ('268', '5'),
   ('116', '5'),
   ('275', '5'),
   ('9', '5')]),
 ('198',
  [('186', '5'),
   ('127', '5'),
   ('531', '5'),
   ('50', '5'),
   ('474', '5'),
   ('8

#### 准备计算MAPK

In [80]:
actual_top_k.cache()
actual_top_k.checkpoint()
pred_top_k.cache()
pred_top_k.checkpoint()

In [90]:
actual_top_k = actual_top_k.map(lambda x:(int(x[0]), [(int(i), float(j)) for i, j in x[1]])) 

In [81]:
actual_top_k.count()

943

In [82]:
pred_top_k.count()

943

#### 在join前先提取成（user, [movie1, movie2 ]）的形式

In [98]:
actual_top_k = actual_top_k.mapValues(lambda a_list:[i for i, j in a_list])
pred_top_k = pred_top_k.mapValues(lambda a_list:[i for i, j in a_list])

In [99]:
actual_top_k.take(1)

[(190, [237, 288, 302, 471, 313, 272, 100, 7, 117, 147])]

In [100]:
actual_top_k.cache()
actual_top_k.checkpoint()
pred_top_k.cache()
pred_top_k.checkpoint()

In [101]:
mix_top_k_actual_pred = actual_top_k.join(pred_top_k)

* 总是报错：Container exited with a non-zero exit code 50 实际是前面的栈溢出导致的

In [102]:
mix_top_k_actual_pred.cache()
mix_top_k_actual_pred.checkpoint()

In [124]:
mix_top_k_actual_pred.take(10)

[(808,
  ([294, 302, 750, 262, 313, 340, 264, 872, 327, 346],
   [313, 357, 318, 136, 133, 515, 316, 50, 315, 114])),
 (202,
  ([604, 195, 96, 484, 258, 516, 269, 283, 172, 423],
   [604, 8, 367, 485, 430, 663, 150, 751, 709, 87])),
 (404,
  ([313, 259, 22, 690, 269, 748, 294, 328, 258, 739],
   [257, 327, 129, 949, 201, 559, 763, 42, 204, 184])),
 (606,
  ([418, 15, 186, 147, 96, 763, 11, 228, 966, 64],
   [318, 196, 315, 12, 217, 64, 246, 68, 496, 173])),
 (809,
  ([272, 328, 307, 302, 315, 286, 313, 340, 300, 299],
   [89, 190, 50, 169, 12, 52, 285, 86, 175, 498])),
 (203,
  ([181, 332, 50, 248, 288, 150, 283, 471, 93, 117],
   [98, 219, 186, 313, 172, 178, 183, 69, 531, 207])),
 (1,
  ([137, 127, 16, 45, 48, 195, 168, 191, 55, 42],
   [489, 792, 317, 176, 430, 709, 461, 434, 50, 1007])),
 (405,
  ([210, 692, 186, 172, 135, 127, 575, 451, 527, 673],
   [137, 238, 531, 357, 255, 575, 174, 1053, 376, 1218])),
 (607,
  ([511, 211, 494, 482, 56, 487, 529, 107, 86, 528],
   [59, 56, 211,

In [97]:
def avgPrecisionK(actual, predict, k):
    if len(actual) == 0:
        return 1
    if len(actual)<k:
        k = len(actual)
    score = 0
    numHits = 0
    predict_k = predict[:k]
    for s,i in enumerate(predict_k):
        if i in actual:
            numHits+=1
        score = score + numHits/(s+1)
    return score/k

In [127]:
PrecisionK_by_user = mix_top_k_actual_pred.mapValues(lambda x:avgPrecisionK(x[0], x[1], 10))

In [132]:
avgPrecisionK_top_k = PrecisionK_by_user.map(lambda x:x[1]).mean()

In [133]:
avgPrecisionK_top_k

0.08405893888131431

* 此结果包含这已经有过评分的电影，大概等于把训练集结果加到了模型评价中，值偏高。
* 推荐类算法通常结果都比较低。

### 总结

* funkSVD for recommendation.
* avgPrecisionK as evaluation of top k fields.
* checkpoint() resolves the problems that stackoverflowerror. (too many for_loop)