In [1]:
from pyspark import SparkContext
from scipy import sparse as sm
from sklearn.preprocessing import normalize
import numpy as np
import csv
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.metrics.pairwise import pairwise_distances
from scipy.stats import spearmanr
from scipy.stats import pearsonr as pears
from collections import defaultdict
from tqdm import tqdm_notebook as tqdm
import time
from sparsesvd import sparsesvd
import math as mt
sc = SparkContext.getOrCreate()

In [2]:
train_rdd = sc.textFile("data/train.csv")
icm_rdd = sc.textFile("data/icm_fede.csv")
test_rdd= sc.textFile("data/target_users.csv")

train_header = train_rdd.first()
icm_header = icm_rdd.first()
test_header= test_rdd.first()

train_clean_data = train_rdd.filter(lambda x: x != train_header).map(lambda line: line.split(',')).map(lambda x: (int(x[0]), int(x[1]), float(x[2])))
icm_clean_data = icm_rdd.filter(lambda x: x != icm_header).map(lambda line: line.split(',')).map(lambda x: (int(x[0]), int(x[1])))
test_clean_data= test_rdd.filter(lambda x: x != test_header).map(lambda line: line.split(','))

test_users=test_clean_data.map( lambda x: int(x[0])).collect()


grouped_rates = train_clean_data.filter(lambda x: x[0] in test_users).map(lambda x: (x[0],x[1])).groupByKey().map(lambda x: (x[0], list(x[1]))).collect()
grouped_rates_dic = dict(grouped_rates)


item_ratings = train_clean_data.map(lambda x: (x[0], x[2])).aggregateByKey((0,0), lambda x,y: (x[0] + y, x[1] + 1),lambda x,y: (x[0] + y[0], x[1] + y[1]))
user_ratings_mean = item_ratings.mapValues(lambda x: (x[0] / (x[1]))).collect()
user_ratings_mean_dic=dict(user_ratings_mean)


item_ratings_forTop = train_clean_data.map(lambda x: (x[1], x[2])).aggregateByKey((0,0), lambda x,y: (x[0] + y, x[1] + 1),lambda x,y: (x[0] + y[0], x[1] + y[1]))#.sortBy(lambda x: x[1][1], ascending=False)
shrinkage_factor = 5
item_ratings_mean = item_ratings_forTop.mapValues(lambda x: (x[0] / (x[1] + shrinkage_factor))).sortBy(lambda x: x[1], ascending = False).map(lambda x: x[0]).collect()


users = train_clean_data.map(lambda x: x[0]).collect()
items = train_clean_data.map(lambda x: x[1]).collect()
ratings = train_clean_data.map(lambda x: x[2]).collect()
#ratings_unbiased = train_clean_data.map(lambda x: x[2]-user_ratings_mean_dic[x[0]]).collect()

items_for_features= icm_clean_data.map(lambda x:x[0]).collect()
features = icm_clean_data.map(lambda x:x[1]).collect()
items_for_features.append(37142)
features.append(0)


unos=[1]*len(items_for_features)

UxI= sm.csr_matrix((ratings, (users, items)))
#UxI_unbiased= sm.csr_matrix((ratings_unbiased, (users, items)))
IxF= sm.csr_matrix((unos, (items_for_features, features)))

In [3]:
n_users,n_items=UxI.shape
n_features=IxF.shape[1]

In [4]:
'''SVD'''
K = 870
%time U, S, Vt = sparsesvd(UxI.tocsc(), K)


CPU times: user 1min 38s, sys: 88 ms, total: 1min 38s
Wall time: 1min 38s


In [5]:
S=np.sqrt(S.data)
S=np.diag(S)

In [6]:
#calc predictions
%time UxI_pred_SVD=sm.csr_matrix(U.T.dot(S).dot(Vt))

CPU times: user 48.2 s, sys: 5.62 s, total: 53.8 s
Wall time: 27.5 s


In [7]:
'''content based'''
IDF=[0]*n_features
for i in tqdm(range(n_features)):
    IDF[i]=np.log10(n_items/len(IxF.getcol(i).nonzero()[1]))
%time IxF=normalize(IxF,axis=1)
%time IxF_idf=IxF.multiply(IDF)
%time UxF=UxI.dot(IxF_idf)
%time UxI_pred_CB=UxF.dot(IxF.T)


CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 4 ms
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 4.79 ms
CPU times: user 56 ms, sys: 0 ns, total: 56 ms
Wall time: 58.7 ms




CPU times: user 7.84 s, sys: 1.16 s, total: 9.01 s
Wall time: 9.01 s


In [8]:
del IxF

In [9]:
'''collaborative filtering item based via content'''
#calc similarities
%time IxI_sim=IxF_idf.dot(IxF_idf.T)
IxI_sim.setdiag(0)

CPU times: user 6.52 s, sys: 1.46 s, total: 7.98 s
Wall time: 7.98 s




In [10]:
#take knn items
IxI_sim_knn=sm.lil_matrix((n_items,n_items))
k=200
for i in tqdm(range(n_items)):    
    top_k_idx =IxI_sim.getrow(i).toarray()[0].argpartition(-k)[-k:]
    IxI_sim_knn[i,top_k_idx]=IxI_sim[i,top_k_idx]




In [11]:
del IxI_sim 

In [12]:
#calc predictions
%time UxI_pred_CI=UxI.dot(IxI_sim_knn.T)

CPU times: user 3.17 s, sys: 352 ms, total: 3.52 s
Wall time: 3.52 s


In [13]:
del IxI_sim_knn

In [14]:
'''collaborative filtering user based via content'''
#calc similarities
%time UxU_sim=UxF.dot(UxF.T) #numerators of cosine

CPU times: user 5.78 s, sys: 496 ms, total: 6.28 s
Wall time: 6.28 s


In [15]:
%time cos=sm.csr_matrix(cosine_similarity(UxF)) #cosine
cos.data=1/cos.data
denominators=UxU_sim.multiply(cos) #get denominators of cosine
del cos

CPU times: user 14.6 s, sys: 2.97 s, total: 17.6 s
Wall time: 17.6 s


In [16]:
denominators.data+=1 #add shrinkage to cosine denominator
denominators.data=1/denominators.data
%time UxU_sim=UxU_sim.multiply(denominators) #calc shrinked cosine
del denominators

CPU times: user 2.36 s, sys: 552 ms, total: 2.91 s
Wall time: 2.91 s


In [17]:
UxU_sim.setdiag(0)



In [18]:
#take knn users
UxU_sim_knn=sm.lil_matrix((n_users,n_users))
k=50
for i in tqdm(range(n_users)):    
    top_k_idx =UxU_sim.getrow(i).toarray()[0].argpartition(-k)[-k:]
    UxU_sim_knn[i,top_k_idx]=UxU_sim[i,top_k_idx]  




In [19]:
del UxU_sim

In [20]:
#calc_predictions
%time UxI_pred_CU=UxU_sim_knn.dot(UxI) #k=50->382

CPU times: user 1.14 s, sys: 124 ms, total: 1.27 s
Wall time: 1.27 s


In [21]:
del UxU_sim_knn
del UxI
del UxF
del IxF_idf

In [22]:
'''aaaaaaaaaalllllttttt rimuovere già votati con lil altrimenti ci mette 10 minuti... ma convertendo a lil il kernel muore'''
''''''

''

In [23]:
#create matrix for test users predictions
UxI_pred_CB_test=sm.lil_matrix((n_users,n_items))
UxI_pred_CI_test=sm.lil_matrix((n_users,n_items))
UxI_pred_CU_test=sm.lil_matrix((n_users,n_items))
UxI_pred_SVD_test=sm.lil_matrix((n_users,n_items))

In [24]:
#take only test users predictions
%time UxI_pred_CB_test[test_users,:]=UxI_pred_CB[test_users,:]
%time UxI_pred_CI_test[test_users,:]=UxI_pred_CI[test_users,:]
%time UxI_pred_CU_test[test_users,:]=UxI_pred_CU[test_users,:]
%time UxI_pred_SVD_test[test_users,:]=UxI_pred_SVD[test_users,:]

CPU times: user 21.1 s, sys: 3.26 s, total: 24.4 s
Wall time: 24.4 s
CPU times: user 11 s, sys: 328 ms, total: 11.3 s
Wall time: 11.3 s
CPU times: user 12 s, sys: 524 ms, total: 12.6 s
Wall time: 12.6 s
CPU times: user 21.4 s, sys: 4.25 s, total: 25.6 s
Wall time: 25.6 s


In [25]:
#remove items already voted
for user in tqdm(test_users):
    UxI_pred_CB_test[user,grouped_rates_dic[user]]=0
    UxI_pred_CI_test[user,grouped_rates_dic[user]]=0
    UxI_pred_CU_test[user,grouped_rates_dic[user]]=0
    UxI_pred_SVD_test[user,grouped_rates_dic[user]]=0




In [26]:
#convert to csr to improve future operations performances
%time UxI_pred_CB_test=UxI_pred_CB_test.tocsr()
%time UxI_pred_CI_test=UxI_pred_CI_test.tocsr()
%time UxI_pred_CU_test=UxI_pred_CU_test.tocsr()
%time UxI_pred_SVD_test=UxI_pred_SVD_test.tocsr()

CPU times: user 16.2 s, sys: 948 ms, total: 17.1 s
Wall time: 17.1 s
CPU times: user 580 ms, sys: 20 ms, total: 600 ms
Wall time: 603 ms
CPU times: user 1.45 s, sys: 20 ms, total: 1.47 s
Wall time: 1.47 s
CPU times: user 21.4 s, sys: 1.08 s, total: 22.5 s
Wall time: 22.5 s


#rescale algorithm 1
UxI_pred_CB_temp=sm.lil_matrix((n_users,n_items))
for user in tqdm(test_users):       
    OldMax=max(UxI_pred_CB.getrow(user).toarray()[0])
    if(OldMax!=0):
        UxI_pred_CB_temp[user,:]=(10*(UxI_pred_CB[user,:]/OldMax)) 
%time UxI_pred_CB=UxI_pred_CB_temp.tocsr()
del UxI_pred_CB_temp

#rescale algorithm 2
UxI_pred_CI_temp=sm.lil_matrix((n_users,n_items))
for user in tqdm(test_users):       
    OldMax=max(UxI_pred_CI.getrow(user).toarray()[0])
    if(OldMax!=0):
        UxI_pred_CI_temp[user,:]=(10*(UxI_pred_CI[user,:]/OldMax)) 
%time UxI_pred_CI=UxI_pred_CI_temp.tocsr()
del UxI_pred_CI_temp

#rescale algorithm 3
UxI_pred_CU_temp=sm.lil_matrix((n_users,n_items))
for user in tqdm(test_users):       
    OldMax=max(UxI_pred_CU.getrow(user).toarray()[0])
    if(OldMax!=0):
        UxI_pred_CU_temp[user,:]=(10*(UxI_pred_CU[user,:]/OldMax)) 
%time UxI_pred_CU=UxI_pred_CU_temp.tocsr()
del UxI_pred_CU_temp

#rescale algorithm 4
UxI_pred_SVD_temp=sm.lil_matrix((n_users,n_items))
for user in tqdm(test_users):       
    OldMax=max(UxI_pred_SVD.getrow(user).toarray()[0])
    if(OldMax!=0):
        UxI_pred_SVD_temp[user,:]=(10*(UxI_pred_SVD[user,:]/OldMax)) 
%time UxI_pred_SVD=UxI_pred_SVD_temp.tocsr()
del UxI_pred_SVD_temp

In [27]:
#rescale algoritms
%time UxI_pred_CB_test/=UxI_pred_CB.max()
%time UxI_pred_CI_test/=UxI_pred_CI.max()
%time UxI_pred_CU_test/=UxI_pred_CU.max()
%time UxI_pred_SVD_test/=UxI_pred_SVD.max()

CPU times: user 29.6 s, sys: 0 ns, total: 29.6 s
Wall time: 29.6 s
CPU times: user 952 ms, sys: 0 ns, total: 952 ms
Wall time: 950 ms
CPU times: user 2.62 s, sys: 0 ns, total: 2.62 s
Wall time: 2.62 s
CPU times: user 864 ms, sys: 0 ns, total: 864 ms
Wall time: 863 ms


Predictions of all algorithms computed, now have fun with hybrids!

In [28]:
%time UxI_pred_1=UxI_pred_CB_test*0.80+UxI_pred_SVD_test*0.20

CPU times: user 1.57 s, sys: 552 ms, total: 2.12 s
Wall time: 2.12 s


In [42]:
%time UxI_pred_1/=UxI_pred_1.max()

CPU times: user 288 ms, sys: 0 ns, total: 288 ms
Wall time: 284 ms


In [54]:
%time UxI_pred_2=UxI_pred_1*0.98+UxI_pred_CI_test*0.02

CPU times: user 832 ms, sys: 724 ms, total: 1.56 s
Wall time: 1.56 s


In [55]:
%time UxI_pred_2/=UxI_pred_2.max()

CPU times: user 372 ms, sys: 0 ns, total: 372 ms
Wall time: 369 ms


In [76]:
%time UxI_pred=UxI_pred_2*0.99+UxI_pred_CU_test*0.01

CPU times: user 952 ms, sys: 336 ms, total: 1.29 s
Wall time: 1.29 s


In [77]:
%time UxI_pred/=UxI_pred.max()

CPU times: user 364 ms, sys: 0 ns, total: 364 ms
Wall time: 364 ms


In [78]:
f = open('submission_wSUM_CB-SVD-80-20_CI-98-2_CU-99-1.csv', 'wt')
#f = open('submission_CBBBB.csv', 'wt')
writer = csv.writer(f)
writer.writerow(('userId','RecommendedItemIds'))

for user in tqdm(test_users):
    top=[]

    user_predictions=user_predictions=(-UxI_pred.getrow(user).toarray()[0]).argsort()
    i=0
    while len(top)<=5:
        prediction=user_predictions[i]
        
        if UxI_pred[user,prediction]==0.0:
            j=0
            while len(top)<=5:
                prediction = item_ratings_mean[j]
                while prediction in grouped_rates_dic[user] or prediction in top:
                    j += 1
                    prediction = item_ratings_mean[j]
                top.append(prediction)
                j += 1
        
        
        
        if prediction in grouped_rates_dic[user]:
            i+=1
            continue
        
        i+=1
        top.append(prediction)
        
   
    writer.writerow((user, '{0} {1} {2} {3} {4}'.format(top[0], top[1], top[2], top[3], top[4])))

f.close()




In [63]:
UxI_pred_1[UxI_pred_1.nonzero()].mean()


0.0035030769571082372

In [66]:
UxI_pred_CI_test[UxI_pred_CI_test.nonzero()].mean()

0.015697336846175875

In [62]:
UxI_pred_2[UxI_pred_2.nonzero()].mean()


0.0035117705324024819

In [67]:
UxI_pred_CU_test[UxI_pred_CU_test.nonzero()].mean()

0.0081333590405483248

In [73]:
UxI_pred[UxI_pred.nonzero()].mean()

0.0035225282094090734

In [74]:
UxI_pred[UxI_pred.nonzero()].max()

1.0