## PySpark recommendation engine

Have noted with Vidyut that currently we have to build method-pool instead of rules. (Notes are in separated email.)

* Here we illustrate the pySpark with ALS and BPR using our own data

In [1]:
# env
from pyspark import SparkConf, SparkContext 

# methods
from pyspark.mllib.recommendation import ALS, Rating
from bpr_spark.bpr import bprMF

# data processing
import numpy as np
import pandas as pd
# result processing
import heapq

## 1. Learning

### Load our data

This fuction translates the data into RDD rating format.

In [2]:
def get_rating(str):  
    arr = str.split('\t') 
    user_id = int(arr[0])  
    movie_id = int(arr[1])  
    user_rating = float(arr[2])  
    return Rating(user_id, movie_id, user_rating)
try:
    sc.stop()
except:
    pass

Set Spark Context (cannot be reset once set well).

In [3]:
conf = SparkConf().setMaster('local').setAppName('RecoEng').set("spark.executor.memory", "8g")  
sc = SparkContext(conf=conf)
data = sc.textFile('/Users/ito/venv/pyspark-rec/CG-Tops/Tops_user-item_data')  
data.top(3)

[u'999\t7071\t1\t736433', u'999\t7070\t1\t736433', u'999\t6951\t1\t736485']

In [4]:
# because for mllib.recommendation, there is already data structure running on it
ratings = data.map(get_rating) 
ratings.top(3)

[Rating(user=9204, product=43518, rating=1.0),
 Rating(user=9204, product=43392, rating=2.0),
 Rating(user=9204, product=43378, rating=1.0)]

In [5]:
# from sklearn.datasets import load_iris # iris数据集
# from sklearn.model_selection import train_test_split # 分割数据模块
# from sklearn.neighbors import KNeighborsClassifier # K最近邻(kNN，k-NearestNeighbor)分类算法
# from pyspark.ml.tuning import TrainValidationSplit

# # import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}

# # import pyspark..recommendation import ALS, Rating
# # val data = spark.read.format("libsvm").load("data/mllib/sample_linear_regression_data.txt")
# training__, test__ = ratings.randomSplit([0.9, 0.1], seed = 25)


### Collaborative Filtering 
* using Alternating Least Squares (ALS) optimization
* call java function "trainALSModel"

In [8]:
split = [0.9, 0.1]
ratings_cf_train, ratings_cf_test = ratings.randomSplit(split, seed = 225)

ratings2 = data.map(lambda line: line.split("\t")).map(lambda x: map(int, x[:2]))
ratings_bpr_train, ratings_bpr_test = ratings2.randomSplit([0.9, 0.1], seed = 25)

In [9]:
def CollaborativeFiltering(ratings, rank = 10, iterations = 5):
    model = ALS.train(ratings, rank, iterations)
    return model

In [10]:
%%time
CFmodel = CollaborativeFiltering(ratings_cf_train,10,5)

CPU times: user 12 ms, sys: 4.51 ms, total: 16.5 ms
Wall time: 22.1 s


In [11]:
%%time
rank = 10  
iterations = 5    
ALSmodel = ALS.train(ratings, rank, iterations)

CPU times: user 12 ms, sys: 4.49 ms, total: 16.5 ms
Wall time: 20.7 s


The scalability and efficiency of pySpark is going well, and even it is not CPU-wise multiprocessed. So far so good, means my next step is towards accuracy and realistic level.

In [12]:
%%time
CFmodel_ = CollaborativeFiltering(ratings,10,5)

CPU times: user 8.94 ms, sys: 3.67 ms, total: 12.6 ms
Wall time: 20.8 s


In [13]:
# sc.stop()

### Bayesian Personalized Ranking (for PySpark)
* optimized using Stochastic Gradient Descent (SGD, cannot be as simply paralled as ALS)  
* have remained only user-item information yet (the basic BPR or say BPR-1)
* return 2 matrices (user matrix with (#user,k), item matrix with (k,#item))

In [14]:
# conf = SparkConf().setMaster("local").setAppName("BPR").set("spark.executor.memory", "8g")
# sc = SparkContext(conf=conf)

In [15]:
# data = sc.textFile("/Users/ito/venv/pyspark-rec/CG-Tops/Tops_user-item_data")
# ratings = data.map(lambda line: line.split("\t")).map(lambda x: map(int, x[:2]))

In [16]:
%%time
# bprMF(ratings, rank, num_iter=10, num_neg_samples=30):
userMat, prodMat = bprMF(ratings_bpr_train, 10, 20, 10) 

CPU times: user 920 ms, sys: 873 ms, total: 1.79 s
Wall time: 12min 43s


***** Building another version that can run faster (MR for map reduce) (to be continued..)

In [17]:
# from bpr_spark.distbpr import bpr_MF_MR
# userMat2, prodMat2 = bpr_MF_MR(ratings, 10, 10)
# userMat2, prodMat2 = bpr_MF_MR(ratings, 10, 10, nb_partitions = 8)

In [18]:
userid = 10
rec_items_bpr = np.inner(userMat[userid].T, prodMat)

Manually do the similar showcasing like "ALSmodel.recommendProducts(userid, top_howmany)" using heap sorting

In [26]:
top_howmany = 5
res = []
top_list = heapq.nlargest(top_howmany,rec_items_bpr)
res.append([i for i in range(len(rec_items_bpr)) if rec_items_bpr[i] in top_list])

userid = 10
rec_items = CFmodel.recommendProducts(userid, top_howmany)  

pd.DataFrame(rec_items)

Unnamed: 0,user,product,rating
0,10,5565,43.066266
1,10,2560,35.460403
2,10,18536,34.197045
3,10,18506,31.236967
4,10,18508,29.732398


* Simple result from Collaborative Filtering 

In [27]:
print ('recommend items for userid %d:' % userid)
[i for i in rec_items]

recommend items for userid 10:


[Rating(user=10, product=5565, rating=43.066266262561186),
 Rating(user=10, product=2560, rating=35.46040302641053),
 Rating(user=10, product=18536, rating=34.19704472498653),
 Rating(user=10, product=18506, rating=31.236967455736426),
 Rating(user=10, product=18508, rating=29.732397625435627)]

* Simple result from Bayesian Personalized Ranking

In [28]:
print ('recommend items for userid %d:' % userid)
for idx, i in enumerate(res[0]):
    print 'user=%d, ' % userid + 'product=%d, ' % i  + 'rating=%f' % top_list[idx]

recommend items for userid 10:
user=10, product=108, rating=251.157254
user=10, product=1226, rating=32.101318
user=10, product=6945, rating=17.967807
user=10, product=11419, rating=14.482411
user=10, product=37100, rating=13.816158


In [29]:
# userMat.shape
# num_users = ratings2.map(lambda x: x[0]).max()

In [30]:
%%time

rec_items_cf_ = []
rec_items_bpr_ = []
for i in range(1, 4):
    rec_items_cf_.extend(CFmodel_.recommendProducts(i, top_howmany)) 
    
    rec_items_bpr = np.inner(userMat[i].T, prodMat)
    top_list = heapq.nlargest(top_howmany,rec_items_bpr)
    
    idx = 0
    for j in range(len(rec_items_bpr)):
        if rec_items_bpr[j] in top_list:
            rec_items_bpr_.append((i, j, top_list[idx])) 
            idx += 1

CPU times: user 62.9 ms, sys: 9.53 ms, total: 72.5 ms
Wall time: 234 ms


## 2. Predicting

### predicting on test_set for each method considered

In [31]:
top_howmany = 10

* Gathering recommending result

In [32]:
# ## testing
# %%time
# # CFmodel_.recommendProducts(1, top_howmany)
# userid = 10
# rec_items_bpr = np.inner(userMat[userid].T, prodMat)
# res = []
# top_list = heapq.nlargest(top_howmany,rec_items_bpr)
# res.append([i for i in range(len(rec_items_bpr)) if rec_items_bpr[i] in top_list])

* Blackboxing the recommendProducts from BPR side

In [33]:
def bpr_recommendProducts(userID, top_howmany = 5):
    rec_items_bpr = np.inner(userMat[userID].T, prodMat)
    res_4_userID = []
    top_list = heapq.nlargest(top_howmany,rec_items_bpr)

    idx = 0
    for j in range(len(rec_items_bpr)):
        if rec_items_bpr[j] in top_list:
            res_4_userID.append((userID, j, top_list[idx])) 
            idx += 1
    return res_4_userID

### voting

In [35]:
rec_items_cf = []
rec_items_bpr = []
for i in range(1, userMat.shape[0]):
    rec_items_cf.extend(CFmodel_.recommendProducts(i, top_howmany)) 
    rec_items_bpr.extend(bpr_recommendProducts(i, top_howmany)) 

* Result regularizing

In [36]:
cf_res_df = pd.DataFrame(rec_items_cf)
cf_devider = np.mean(cf_res_df.groupby('user').rating.max())
cf_res_df['rating'] = cf_res_df.rating/cf_devider

In [37]:
bpr_res_df = pd.DataFrame(rec_items_bpr)
bpr_res_df.columns = cf_res_df.columns
bpr_devider = np.mean(bpr_res_df.groupby('user').rating.max())
bpr_res_df['rating'] = bpr_res_df.rating/bpr_devider

* Generating overall results

In [38]:
overall_df = pd.concat([cf_res_df,bpr_res_df])
overall_df_show = overall_df.groupby(['user','product']).rating.sum()

* Selecting the most possible recommendations (from mid of the top values -- avoid overfitting)

In [40]:
def taking_out_reco_pairs(input_pairs, num = 5, init_pos = 8):
    _pairs_items = input_pairs.sort_values(ascending=False).index
    if len(_pairs_items)<init_pos+num:
        return _pairs_items[-num:]
    return _pairs_items[init_pos:init_pos+num]

In [41]:
overall_result = []
for i in range(1,userMat.shape[0]):
    overall_result.append([i, list(taking_out_reco_pairs(overall_df_show[i]))])

In [42]:
overall_result

[[1, [11890, 5565, 18508, 11387, 1226]],
 [2, [1180, 24504, 5565, 108, 1226]],
 [3, [18506, 5546, 10885, 108, 462]],
 [4, [462, 20586, 11387, 10885, 1226]],
 [5, [28081, 11387, 15280, 108, 1226]],
 [6, [36429, 11052, 11102, 108, 1226]],
 [7, [108, 16247, 15794, 11052, 462]],
 [8, [108, 11387, 16247, 39780, 462]],
 [9, [29844, 5546, 27675, 108, 1226]],
 [10, [5154, 18508, 18506, 108, 1226]],
 [11, [5547, 20053, 3909, 108, 462]],
 [12, [28081, 25062, 18536, 108, 1226]],
 [13, [5154, 11890, 4753, 15280, 1226]],
 [14, [24504, 10885, 11890, 108, 1226]],
 [15, [5154, 4753, 39780, 108, 1226]],
 [16, [43427, 41707, 5562, 108, 1226]],
 [17, [24504, 11890, 43427, 108, 462]],
 [18, [92, 28074, 5565, 108, 1226]],
 [19, [41238, 43427, 34457, 108, 1226]],
 [20, [11387, 10884, 18508, 108, 462]],
 [21, [1226, 10885, 8965, 15280, 4753]],
 [22, [29844, 108, 11387, 11890, 462]],
 [23, [11387, 15280, 5562, 43427, 6945]],
 [24, [18508, 10885, 15794, 462, 1226]],
 [25, [10885, 24504, 8965, 4753, 1226]],
 [2

## 3. Evaluation

NDCG

Hit Ratio

In [None]:
sc.stop()