基于pyspark的user-based和item-based协同过滤。

当数据量大到单机没有办法全部载入的时候，我们就要考虑用spark这种分布式的系统来处理了。让我们看看如何基于pyspark实现user-based和item-based系统过滤算法。

要使用spark这种分布式集群，必须要掌握map-reduce的思路。

### user-based协同过滤

In [3]:
import sys
import pdb
import random
import numpy as np
from collections import defaultdict
from itertools import combinations
from pyspark import SparkContext

首先，两个函数完成格式的解析。

In [7]:
# user item rating timestamp
def parse_vector_on_user(line):
    """ 解析数据，key是user，后面是item和打分 """
    line = line.split('|')
    return line[0], (line[1], float(line[2]))

def parse_vector_on_item(line):
    """ 解析数据，key是item，后面是user和打分 """
    line = line.split('|')
    return line[1], (line[0], float(line[2]))

def sample_interactions(term_id, users_with_rating, n):
    """ 如果某个商品用户行为特别多，可以适当做点下采样 """
    if len(users_with_rating) > n:
        return item_id, random.sample(users_with_rating, n)
    else:
        return item_id, users_with_rating
    
def find_user_pairs(item_id, users_with_rating, n):
    """ 对每个item，找到共同打分的user对 """
    for user1, user2 in combinations(users_with_rating, 2):
        return (user1[0], user2[0]), (user1[1], user2[1])

def cosine(dot_product, rating_norm_squared, rating2_nrom_squared):
    """ 
    2个向量A和B的余弦相似度
    dotProduct(A, B) / (norm(A) * norm(B))
    """
    numerator = dot_product # 分子
    donominator = rating_norm_squared * rating2_nrom_squared # 分母
    return (numerator / float(donominator)) if donominator else 0.0

def cal_similarity(user_pair, rating_pairs):
    """ 对每个user对，根据打分计算余弦相似度，并返回共同打分的item个数 """
    sum_xx, sum_xy, sum_yy, sum_x, sum_y, n = (0.0, 0.0, 0.0, 0.0, 0.0, 0)
    
    for rating_pair in rating_pairs:
        sum_xx += np.float(rating_pair[0]) * np.float(rating_pair[0])
        sum_yy += np.float(rating_pair[1]) * np.float(rating_pair[1])
        sum_xy += np.float(rating_pair[0]) * np.float(rating_pair[1])
        n += 1
    
    cosine_similarity = cosine(sum_xy, np.sqrt(sum_xx), np.sqrt(sum_yy))
    return user_pair, (cosine_similarity, n)

def key_on_first_user(user_pair, item_similarity_data):
    """ 对每个user-user对，用第一个user做key（map reduce的做法） """
    (user1_id, user2_id) = user_pair
    return user1_id, (user2_id, item_similarity_data)

def nearest_neighbors(user, users_and_sims, n):
    """ 选出相似度最高的N个邻居 """
    users_and_sims.sort(key=lambda x: x[1][0], reverse=True)
    return user, users_and_sims[:n]

def top_N_recommendations(user_id, user_sims, users_with_rating, n):
    """ 根据最近的N个邻居进行推荐 """
    pass

基于spark的ALS做推荐系统，针对movielens电影打分数据做推荐。

In [10]:
import sys
import itertools
from math import sqrt
from operator import add
from pyspark import SparkConf, SparkContext
from pyspark.mllib.recommendation import ALS

In [None]:
def parse_rating(line):
    """ movielens的打分格式userId::movieId::rating::timestamp，我们需要先对格式进行解析 """
    fields = line.strip().split('::')
    return long(fields[3]) % 10, (int(fields[0]), int(fields[1]), float(fields[2]))

def parse_movie(line): # id => 字符串的映射
    """ 对应的电影文件格式为movieId::movieTitle，解析成id, 文本"""
    fields = line.strip().split('::')
    return int(field[0]), fields[1]

def load_ratings(ratings_file):
    """ 载入得分 """
    if not isfile(ratings_file):
        print('file %s does not exist' % ratings_file)
        sys.exit(1)
    f = open(ratings_file, 'r')
    # 如果打分比零分小的话，会被过滤掉。
    ratings = filter(lambda r: r[2] > 0, [parse_rating(line)[1] for line in f])
    f.close()
    if not ratings:
        print("no ratings provided")
        sys.exit(1)
    else:
        return ratings

def compute_rmse(model, data, n):
    """ 评估的时候要用的，计算均方根误差 """
    predictions = model.predictAll(data.map(lambda x: (x[0], x[1])))
    