In [1]:
import pandas as pd
import numpy as np
from copy import deepcopy 

from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)
from pyspark.ml.recommendation import ALS
from pyspark.sql import functions as F

In [3]:
category = ["diaper","apparel","feeding"]

data = {}
nItemByCat = {}
deltaByCat = {}
nItem = 0
for cat in category:
    data[cat] = pd.read_csv("/home/romain/Documents/PhD/logisticDPP/amazon/1_100_100_100_"+cat+"_regs.csv",
                   sep=";",header=None,names=['itemSet'])
    
    data[cat]['setSize'] = list(map(lambda x: len(x.split(',')),data[cat]['itemSet']))
    items = list(map(lambda x: list(map(int,x.split(','))),data[cat]['itemSet']))
    items = list(set([item for sublist in items for item in sublist]))
    nItemByCat[cat] = len(items)
    deltaByCat[cat] = nItem
    nItem += len(items)

sets = []
for cat in category:
    tmp = list(map(lambda x: x.split(','),data[cat]['itemSet']))
    tmp = list(map(lambda x: list(map(lambda y: int(y)+deltaByCat[cat],x)),tmp))
    sets.extend(tmp)
size = list(map(len,sets))
nUsers = len(sets)
users = np.repeat(range(nUsers),size)

flatsets = [int(item)-1 for sublist in sets for item in sublist]
df = pd.DataFrame({'user':users,'item':flatsets,'rating':1.0})
df.index = df['user']

In [4]:
def random_mask(x):
    result = np.zeros_like(x)
    if len(x)>1:
        result[np.random.choice(len(x))] = 1
    return result

threshold   = 0.7
numTraits   = 30 if len(category)==1 else 60
nRuns       = 3
MPR         = []
Ks          = [5,10,20]
P           = dict.fromkeys(Ks)
for K in Ks:
    P[K] = []

In [6]:
for run in range(nRuns):
    print("run number",run+1,"-",numTraits)
    np.random.seed(123*run)
    
    testUsers = list(np.random.choice(range(nUsers),size=int((1-threshold)*nUsers),replace=False))
    trainingUsers = list(set(range(nUsers))-set(testUsers))
    trainingData = df.loc[trainingUsers]
    trainingData.index = range(len(trainingData))
    
    testData = df.loc[testUsers]
    testData.index = range(len(testData))
    
    mask = testData.groupby(['user'])['user'].transform(random_mask).astype(bool)
    not_mask = list(map(lambda x: not(x),mask))
    
    trainingData = pd.concat([trainingData,testData.loc[not_mask]])
    testData = testData.loc[mask]
    
    sparkInput = sqlCtx.createDataFrame(trainingData)
    als = ALS(rank=numTraits,regParam=0.1,userCol='user',itemCol='item',ratingCol='rating')
    mod = als.fit(sparkInput) 
    
    testUsers = list(set(testData['user']))
    testUsers = pd.DataFrame({'user':testUsers})
    testUsers = sqlCtx.createDataFrame(testUsers)
    #recosForUser = mod.recommendForUserSubset(testUsers,nItem)
    recosForUser = mod.recommendForUserSubset(testUsers,nItem)
    
    recosForUser = recosForUser.select(recosForUser.user,F.posexplode(recosForUser.recommendations))
    recosForUser = recosForUser.withColumn("item",recosForUser["col"].getItem("item"))
    recosForUser = recosForUser.drop('col')
    testData = testData.rename(columns={'item':'true_item'})
    sparkOutput = sqlCtx.createDataFrame(testData[['user','true_item']])
    
    cond = [recosForUser.user == sparkOutput.user, recosForUser.item == sparkOutput.true_item]
    targetPosition = sparkOutput.join(recosForUser,cond,how='left')
    MPR_ = targetPosition.select(F.mean('pos')).toPandas().loc[0,'avg(pos)']
    MPR_ = 100*(1-MPR_/nItem)
    MPR.append(MPR_)
    
    for K in Ks:
        P[K].append(targetPosition.filter(targetPosition['pos'] < K).count()/targetPosition.count()*100)

print("MPR=",np.mean(MPR))
for K in Ks:
    print("Precision @"+str(K)+"=",np.mean(P[K]))

run number 1 - 60
run number 2 - 60
run number 3 - 60
MPR= 89.50933772219582
Precision @5= 15.777830650556796
Precision @10= 25.371560136504986
Precision @20= 42.154626395943225
