In [2]:
import os
import glob
import pandas as pd
import numpy as np

from sklearn.metrics.pairwise import cosine_similarity
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.model_selection import train_test_split

import sys
import pyspark
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
# Recall y precision
from reco_utils.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation


print("System version: {}".format(sys.version))
print("Spark version: {}".format(pyspark.__version__))

System version: 3.9.5 (default, May 18 2021, 12:31:01) 
[Clang 10.0.0 ]
Spark version: 3.1.2


In [2]:
path='./DATA'
file = os.path.join(path, "users_bgs.csv")

In [3]:
df=pd.read_csv(file, usecols=['u_id','bgg_id','Game','category','Your Rating'], low_memory=False)
df.head()

Unnamed: 0,u_id,Game,Your Rating,bgg_id,category
0,0,1830: Railways & Robber Barons,-1,421,102110341011
1,0,18AL,7,2612,1021112010341011
2,0,2 de Mayo,6,36522,10511019
3,0,23,7,103651,1002
4,0,6 nimmt!,7,432,10021098


In [4]:
#Transform explicit values to implicit values
df['Rating'] = df['Your Rating'].apply(lambda x: 1 if x>0 else 0)

#### Remove users who only have one board game

In [6]:
#Group by 'user_id'
p=df.groupby('u_id').count()

In [7]:
# Visualise users with a single board game
(p['bgg_id']==1).sum()

6

In [9]:
# Remove the user_id with a single board game
bgu=list(p[p['bgg_id']==1].index)

dfm = df[~df.u_id.isin(bgu)]

#### Modified the id of games and users so that they are correlative

In [10]:
# Create a dictionary for games titles and ids

bgg_id=list(dfm['bgg_id'])
game=list(dfm['Game'])

item_dict={}

for key in bgg_id:
    for value in game:
        item_dict[key] = value
        game.remove(value)
        break 

In [11]:
print(len(set(item_dict.keys())))
print(len(set(item_dict.values())))

36535
36535


In [12]:
# Resample
resample_id_item_dict={}
for index, key in enumerate(item_dict.keys()):
    resample_id_item_dict[key]= index
    

In [13]:
resample_item_dict = {resample_id_item_dict[k]:v for k,v in item_dict.items()}
assert(len(set(resample_item_dict.keys())) == len(set(resample_item_dict.values())))

In [14]:
# Create a dictionary for resample users id

u_id=list(dfm['u_id'].unique())
resample_user_dict={}

for index,key in enumerate(u_id):
    resample_user_dict[key] = index

In [15]:
print(len(set(resample_user_dict.keys())))
print(len(set(resample_user_dict.values())))

2844
2844


In [16]:
# Copy and apply the changes
dfm_r=dfm.copy()
dfm_r['item_id']=dfm['bgg_id'].apply(lambda x: resample_id_item_dict[x])
dfm_r['user_id']=dfm['u_id'].apply(lambda x: resample_user_dict[x])
dfm_r.describe()

Unnamed: 0,u_id,Your Rating,bgg_id,Rating,item_id,user_id
count,838777.0,838777.0,838777.0,838777.0,838777.0,838777.0
mean,1404.570457,3.42304,91310.6183,0.545138,4722.867555,1403.031329
std,813.788285,4.159144,88829.182464,0.497959,6316.864111,812.113149
min,0.0,-1.0,1.0,0.0,0.0,0.0
25%,700.0,-1.0,9209.0,0.0,680.0,700.0
50%,1375.0,5.0,55952.0,1.0,2185.0,1374.0
75%,2087.0,7.0,163976.0,1.0,6332.0,2086.0
max,2851.0,10.0,332853.0,1.0,36534.0,2843.0


In [21]:
#dfm_r.to_csv('DATA/r_users_bgs.csv')
path='./DATA'
file = os.path.join(path, "r_users_bgs.csv")
dfm_r=pd.read_csv(file, low_memory=False)
dfm_r.head()

Unnamed: 0.1,Unnamed: 0,u_id,Game,Your Rating,bgg_id,category,Rating,item_id,user_id
0,0,0,1830: Railways & Robber Barons,-1,421,102110341011,0,0,0
1,1,0,18AL,7,2612,1021112010341011,1,1,0
2,2,0,2 de Mayo,6,36522,10511019,1,2,0
3,3,0,23,7,103651,1002,1,3,0
4,4,0,6 nimmt!,7,432,10021098,1,4,0


#### Format Data for models

In [22]:

dfratings = pd.DataFrame()

dfratings['user_id']=dfm_r['user_id']
dfratings['item_id']=dfm_r['item_id']
dfratings['rating']=dfm_r['Rating']

print(dfratings.shape)
dfratings.head()

(838777, 3)


Unnamed: 0,user_id,item_id,rating
0,0,0,0
1,0,1,1
2,0,2,1
3,0,3,1
4,0,4,1


In [23]:
dfbg = dfm_r.groupby(by=['bgg_id','category'], as_index=False).first()
dfbg.drop(['bgg_id','u_id','Your Rating','Rating', 'user_id'],axis=1, inplace=True)

In [24]:
dfgames = pd.DataFrame()

dfgames['item_id']=dfbg['item_id']
dfgames['title']=dfbg['Game']
dfgames['category']=dfbg['category']

print(dfgames.shape)
dfgames.head()

(36535, 3)


Unnamed: 0,item_id,title,category
0,747,Die Macher,102110261001
1,5896,Dragonmaster,10021010
2,292,Samurai,10091035
3,6308,Tal der Könige,1050
4,15364,Mare Mediterraneum,10151008


## Content Based

In [53]:
# ----- COMENTARIO

In [54]:
dfgames.head()

Unnamed: 0,item_id,title,category
0,747,Die Macher,102110261001
1,5896,Dragonmaster,10021010
2,292,Samurai,10091035
3,6308,Tal der Könige,1050
4,15364,Mare Mediterraneum,10151008


In [55]:
dfgames['category'] = dfgames['category'].map(lambda x: x.replace(',', ' '))
print(dfgames['category'])

0        1021 1026 1001
1             1002 1010
2             1009 1035
3                  1050
4             1015 1008
              ...      
36530              1002
36531         1002 1030
36532         1002 2481
36533    1009 1028 1113
36534         1021 1013
Name: category, Length: 36535, dtype: object


In [57]:
dfgames.set_index('title', inplace = True)
dfgames.head()

Unnamed: 0_level_0,item_id,category
title,Unnamed: 1_level_1,Unnamed: 2_level_1
Die Macher,747,1021 1026 1001
Dragonmaster,5896,1002 1010
Samurai,292,1009 1035
Tal der Könige,6308,1050
Mare Mediterraneum,15364,1015 1008


In [58]:
print(np.where(dbg.index=='Tal der Könige')[0])

[]


In [60]:
count = CountVectorizer()
count_matrix = count.fit_transform(dfgames['category'])

In [61]:
cosine = cosine_similarity(count_matrix, count_matrix)
cosine

array([[1.        , 0.        , 0.        , ..., 0.        , 0.        ,
        0.40824829],
       [0.        , 1.        , 0.        , ..., 0.5       , 0.        ,
        0.        ],
       [0.        , 0.        , 1.        , ..., 0.        , 0.40824829,
        0.        ],
       ...,
       [0.        , 0.5       , 0.        , ..., 1.        , 0.        ,
        0.        ],
       [0.        , 0.        , 0.40824829, ..., 0.        , 1.        ,
        0.        ],
       [0.40824829, 0.        , 0.        , ..., 0.        , 0.        ,
        1.        ]])

In [None]:
cosine.dump("DATA/cosine_cb.dat")
#cos = numpy.load("DATA/cosine_cb.dat")

In [1]:
def cb_recommendations(name, cosine = cosine):
    
    recommended = []
    indices = pd.Series(dfgames.index)
    idx = indices[indices == name].index[0]
    score = pd.Series(cosine[idx]).sort_values(ascending = False)
    top_10 = list(score.iloc[0:11].index)
    for i in top_10:
        if idx != i:
            recommended.append(list(dfgames.index)[i])
        
    return recommended

NameError: name 'cosine' is not defined

In [79]:
game_recommendations('7 Wonders')

['7 Wonders (Second Edition)',
 'Imperial Settlers',
 'Alba Longa',
 'Mini Cywilizacja',
 'Peloponnes Card Game',
 'Glory to Rome',
 '7 Wonders Duel',
 'City of Iron: Second Edition',
 'City of Iron',
 'Dragon Canyon']

In [None]:
## Collaborative Filtering

In [7]:
#Split random into training and test datasets
train, test = train_test_split(dfratings, test_size = 0.20, random_state = 42) 

In [8]:
print('There are %s users, %s itmes and %s pairs in the train set' \
      %(train.user_id.unique().shape[0], train.item_id.unique().shape[0], train.shape[0]))
train.head()

There are 2844 users, 34067 itmes and 671021 pairs in the train set


Unnamed: 0,user_id,item_id,rating
32330,123,1060,0
399743,1313,1287,1
763096,2564,5918,0
79614,259,1340,1
719432,2418,727,1


In [9]:
print('There are %s users, %s itmes and %s pairs in the test set' \
      %(test.user_id.unique().shape[0], test.item_id.unique().shape[0], test.shape[0]))
test.head()

There are 2838 users, 20894 itmes and 167756 pairs in the test set


Unnamed: 0,user_id,item_id,rating
332615,1114,1151,1
489596,1619,2907,1
499690,1659,647,1
698076,2334,1283,1
288796,963,7547,1


In [10]:
### Popularity

In [11]:
# group the train dataset by item and count the number of users
popular = train.groupby('item_id')['user_id'].count()

In [12]:
popular.head()

item_id
0    294
1     74
2    157
3     65
4    798
Name: user_id, dtype: int64

In [13]:
# Sort in descending order
popularsort = popular.sort_values(ascending=False)

In [14]:
popularsort.shape[0]

34067

In [15]:
popularsort.head()

item_id
51     1322
195    1296
70     1262
242    1179
55     1171
Name: user_id, dtype: int64

In [16]:
popularsort.index #id of the Board Game

Int64Index([   51,   195,    70,   242,    55,     5,    44,    15,    93,
              256,
            ...
            27193, 14744, 27180, 27181, 27182, 27183, 27185, 27188, 27190,
            36533],
           dtype='int64', name='item_id', length=34067)

In [18]:
popularbg = np.zeros(shape=(popularsort.shape[0], 3), dtype=object) #MODIFICAR ESTE NOMBRE

for i, ind in enumerate(popularsort.index):
    idx = ind 
    freq = popularsort[idx]  
    title = resample_item_dict[idx]
    popularbg[i] = [idx, title, freq]
    

NameError: name 'resample_item_dict' is not defined

In [None]:
popularbg[:10,:]

In [None]:
# Recall
def recall_at_n(N, test, recommended, train=None):
    """
    :param N: number of recommendations
    :param test: list of movies seen by user in test
    :param train: list of movies seen by user in train. This has to be removed from the recommended list 
    :param recommended: list of movies recommended
    
    :return the recall
    """
    if train is not None: 
        
        rec_true = []
        for r in recommended:
            if r not in train:
                rec_true.append(r)
    else:
        rec_true = recommended    
    intersection = len(set(test) & set(rec_true[:N]))
    return intersection / float(np.minimum(N, len(test)))

In [None]:
# get movies in train per user. For this, group by user and get a list of item ids.
trainUsergby = (train.groupby('user_id')['item_id'].apply(list).reset_index())

In [None]:
trainUsergby.head()

In [None]:
testUsergby = (test.groupby('user_id')['item_id'].apply(list).reset_index())
testUsergby.head()

In [None]:
# Merge both df
joinedtt = pd.merge(trainUsergby, testUsergby, how='inner', on='user_id', suffixes=('_train', '_test'))
joinedtt.head()

In [None]:
# Evaluate the model for different number of recommended elements 

list_topN=[1,5,10,20]

for topN in list_topN:
    rec=popularbg[:, 0]
    r_u=joined.apply(lambda l: recall_at_n(N=topN, test=l[2], recommended=rec, train=l[1]), axis=1)
    print('TopN:',topN,',','recall: ',r_u.mean())

In [None]:
#MAP
def apk(N, test, recommended, train=None):
    """
    Computes the average precision at N given recommendations.
    
    :param N: number of recommendations
    :param test: list of movies seen by user in test
    :param train: list of movies seen by user in train. This has to be removed from the recommended list 
    :param recommended: list of movies recommended
    
    :return The average precision at N over the test set
    """
    if train is not None: 
        rec_true = []
        for r in recommended:
            if r not in train:
                rec_true.append(r)
    else:
        rec_true = recommended    
    predicted = rec_true[:N] # top-k predictions
    
    score = 0.0 # This will store the numerator
    num_hits = 0.0 # This will store the sum of rel(i)

    for i,p in enumerate(predicted):
        if p in test and p not in predicted[:i]:
            num_hits += 1.0
            score += num_hits/(i+1.0)

    return score / min(len(test), N)

In [None]:
list_topN=[1,5,10,20]

for topN in list_topN:
    predictions = popularbg[:, 0]
    m = joined.apply(lambda l: apk(topN, l[2], predictions, l[1]), axis=1)
    print('TopN:',topN,',','map: ',m.mean()) #Mejorar impresion

In [None]:
## Matrix Co-Ocurrence

In [None]:
moviesPerUser = (train.groupby('user_id')['item_id']
                 .apply(np.array)
                 .to_dict()
                 )

In [None]:
# calculate the number of items in train
n_items = len(resample_item_dict.keys())
n_items

In [None]:
# co-ocurrance matrix will have shape=[n_items,n_items]
coMatrix = np.zeros((n_items, n_items)) # co-occurrence matrix
for user,movies in moviesPerUser.items():
    for m in movies:
        coMatrix[m, movies] += 1

In [None]:
coMatrix

In [None]:
# visualize the matrix
plt.matshow(coMatrix, fignum=1000, cmap=plt.cm.binary)
plt.gcf().set_size_inches(18.5, 10.5)
plt.show()

In [None]:
def co_occurrance_similarity(item_id, coocurrance, ntop=10):
    """
    Returns the top-N most similar items to a given one, based on the coocurrance matrix
    
    :param item_id: id of input item
    :param cooccurrance: 2-dim numpy array with the co-occurance matrix
    :param ntop: number of items to be retrieved
    
    :return top-N most similar items to the given item_id
    """
    similarItems = coocurrance[item_id, :]
    # return indeces of most similar items in descendign order
    mostSimilar = np.argsort(similarItems)[::-1]
    # remove the first element, as it is the item itslef
    mostSimilar = mostSimilar[1:ntop+1]
    
    # return a numpy array with the index (first column) and the value (second column) of the most similar items
    return np.stack((mostSimilar, similarItems[mostSimilar])).T

In [None]:
def co_occurrance_recommendation(items_id, cooccurrance, ntop=10):
    """
    Obtain the list of ntop recommendations based on a list of items (user history of views)
    
    :param items_id: list of items ids
    :param coocurrence: co-ocurrence matrix (numpy 2-dim array)
    :param ntop: top-K items to be retrieved
    
    :return list of ntop items recommended
    """
    # put together all the similar items and its value. For this, use np.vstack, wich stacks one array after 
    # another (row wise)
    list_sim_items = np.vstack([co_occurrance_similarity(id_, cooccurrance, ntop) for id_ in items_id])
    # Group by id and take the maximum frquency to remove duplicates
    largest_freq = pd.DataFrame(list_sim_items, columns=['id', 'freq']).groupby('id').agg(max).reset_index()
    
    # sort by value in descending order
    sorted_list = largest_freq.sort_values(by='freq', ascending=False)
    
    # get the top N
    out = sorted_list.values[:ntop, 0]
    return out

In [None]:
# get users in train with their movies
trainUsersGrouped = train.groupby('user_id')['item_id'].apply(list).reset_index()
trainUsersGrouped.head()

In [None]:
Ntop = 20
# Get the recommendations for all users using the apply method
predictions = trainUsersGrouped.item_id.apply(lambda x: co_occurrance_recommendation(x, coMatrix, Ntop))
predictions.head()

In [None]:
# get users in test with their movies
testUsersGrouped = test.groupby('user_id')['item_id'].apply(list).reset_index()
testUsersGrouped.head()

In [None]:
for (seen, recom) in zip(testUsersGrouped.values[:5, 1], predictions[:5]):
    print("*"*6)
    print("Seen items: ")
    print([resample_item_dict[i] for i in seen])
    print("Recommended items: ")
    print([resample_item_dict[i] for i in recom]) #QUITAR ELEMENTOS MOSTRADOS

In [None]:
topN=30
# add a prediction column to train
trainUsersGrouped['prediction'] = trainUsersGrouped.item_id.apply(
    lambda x: co_occurrance_recommendation(x, coMatrix, topN)
)

In [None]:
trainUsersGrouped.head()

In [None]:
# Join the df with train and predictions with the test df
joined = pd.merge(trainUsersGrouped, testUsersGrouped, how='inner', on='user_id', suffixes=('_train', '_test'))

In [None]:
joined.head()

In [None]:
def evaluate_recall(topN, trainGrouped, testGrouped, coMatrix, popularity_baseline):
    # add a prediction column to train
    trainUsersGrouped['prediction'] = trainUsersGrouped.item_id.apply(
        lambda x: co_occurrance_recommendation(x, coMatrix, topN)
    )
    # join with test data
    joined = pd.merge(trainUsersGrouped, testUsersGrouped, how='inner', on='user_id', suffixes=('_train', '_test'))
    # calculate average recall
    recall = joined.apply(lambda l: 
                 recall_at_n(N=topN, test=l[3], recommended=l[2], train=l[1]), axis=1).mean()
    print("Co-occurance model: recall@%s=%.3f"%(topN, recall))
    # calculate average recall for the baseline
    recall_baseline = joined.apply(lambda l: 
                 recall_at_n(N=topN, test=l[3], recommended=popularity_baseline, train=l[1]), axis=1).mean()
    print("Popularity model: recall@%s=%.3f"%(topN, recall_baseline))    
    return recall, recall_baseline

In [None]:
for k in [5,10,20,30]:
    evaluate_recall(k, trainUsersGrouped, testUsersGrouped, coMatrix, popularbg[:, 0]);

In [20]:
## CF ALS

In [25]:
#Creamos una sesion de spark
spark = SparkSession.builder.getOrCreate()

In [27]:
'''El análogo a Pandas dataframe es Spark dataframe, al cual es necesario darle un schema de construcción'''

schema = StructType(
    (
        StructField("user_id", IntegerType()),
        StructField("item_id", IntegerType()),
        StructField("rating", IntegerType()),
    )
)

df_ratings = spark.createDataFrame(dfratings, schema=schema)
df_ratings.show(5)

schema = StructType(
    (
        StructField("item_id", IntegerType()),
        StructField("title", StringType()),
        StructField("category", StringType()),
    )
)


df_games = spark.createDataFrame(dfgames, schema=schema)
df_games.show(5, False)


+-------+-------+------+
|user_id|item_id|rating|
+-------+-------+------+
|      0|      0|     0|
|      0|      1|     1|
|      0|      2|     1|
|      0|      3|     1|
|      0|      4|     1|
+-------+-------+------+
only showing top 5 rows

+-------+------------------+--------------+
|item_id|title             |category      |
+-------+------------------+--------------+
|747    |Die Macher        |1021,1026,1001|
|5896   |Dragonmaster      |1002,1010     |
|292    |Samurai           |1009,1035     |
|6308   |Tal der Könige    |1050          |
|15364  |Mare Mediterraneum|1015,1008     |
+-------+------------------+--------------+
only showing top 5 rows



In [28]:
#Split the data using the Spark random 
train, test = df_ratings.randomSplit([0.8, 0.2])
print ("N train", train.cache().count())
print ("N test", test.cache().count())

N train 671495
N test 167282


In [29]:
#Construimos el modelo
'''Configuración del modelo ALS en Spark'''

rec = ALS(maxIter=10, regParam=0.01, implicitPrefs=True, userCol='user_id', itemCol='item_id', ratingCol='rating', nonnegative=True, coldStartStrategy="drop")

'''
Parámetros:

* rank:        Number of latent factors in the model (defaults to 10).
* maxIter:     Maximum number of iterations to run (defaults to 10).
* regParam:    Regularization parameter in ALS (defaults to 1.0).
* nonnegative: Use nonnegative constraints for least squares (defaults to false).

'''

'\nParámetros:\n\n* rank:        Number of latent factors in the model (defaults to 10).\n* maxIter:     Maximum number of iterations to run (defaults to 10).\n* regParam:    Regularization parameter in ALS (defaults to 1.0).\n* nonnegative: Use nonnegative constraints for least squares (defaults to false).\n\n'

In [30]:
#Entrenamos el modelo
model = rec.fit(train)

In [31]:
#Vemos las predicciones
predictions = model.transform(test)
predictions.show(10)

+-------+-------+------+------------+
|user_id|item_id|rating|  prediction|
+-------+-------+------+------------+
|   1139|    148|     1|  0.41549194|
|   2238|    148|     0|  0.26529557|
|    513|    148|     1|  0.47048476|
|   1005|    148|     0|  0.20431815|
|   2564|    148|     0|  0.35755113|
|    876|    148|     0|0.0033049937|
|    368|    148|     1|   0.5412031|
|    772|    148|     1|   0.7745851|
|    830|    148|     1|   0.7253903|
|    210|    148|     0|  0.51075816|
+-------+-------+------+------------+
only showing top 10 rows



In [32]:
'''Escogemos un usuario y rankeamos las predicciones'''

user = 19

predictions.filter(F.col('user_id') == user)\
                 .orderBy(F.col('item_id')).show(5)  

predictions.filter(F.col('user_id') == user)\
                 .orderBy(F.col('prediction'), ascending=False).show(5)

+-------+-------+------+-----------+
|user_id|item_id|rating| prediction|
+-------+-------+------+-----------+
|     19|     26|     1|  0.2105639|
|     19|     34|     1| 0.36611417|
|     19|     40|     1|  0.3984456|
|     19|     70|     1| 0.71551883|
|     19|     82|     1|0.099094376|
+-------+-------+------+-----------+
only showing top 5 rows

+-------+-------+------+----------+
|user_id|item_id|rating|prediction|
+-------+-------+------+----------+
|     19|     70|     1|0.71551883|
|     19|   1500|     1|   0.65379|
|     19|    521|     1|0.58734435|
|     19|    621|     1|0.58279544|
|     19|    149|     1| 0.5730952|
+-------+-------+------+----------+
only showing top 5 rows



In [33]:
# Evaluacion
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

In [34]:
# Contabilizamos el error
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.6030811975510398


In [35]:
# Con este comando creamos una nueva columna que es la diferencia y-y_hat
# Y lo pasamos a pandas para verlo en un histograma
predicted_ratings = predictions.withColumn('dif', F.col('rating')-F.col('prediction')).toPandas()
predicted_ratings.head()

Unnamed: 0,user_id,item_id,rating,prediction,dif
0,1139,148,1,0.415492,0.584508
1,2238,148,0,0.265296,-0.265296
2,513,148,1,0.470485,0.529515
3,1005,148,0,0.204318,-0.204318
4,2564,148,0,0.357551,-0.357551


In [36]:
#Realizamos una recomendación
'''Spark-ALS nos ofrece también comandos para la generación de recomendaciones'''

# Las 5 mejores peliculas para todos los usuarios
userRecs = model.recommendForAllUsers(5)

# Filtramos para nuestro usuario concreto
userRecs.filter(F.col('user_id')==user).show(1,False)

+-------+-----------------------------------------------------------------------------------------+
|user_id|recommendations                                                                          |
+-------+-----------------------------------------------------------------------------------------+
|19     |[{814, 0.8773691}, {4, 0.83757365}, {44, 0.81563264}, {360, 0.80327845}, {46, 0.7951992}]|
+-------+-----------------------------------------------------------------------------------------+



In [37]:
predictions_mod=predictions.select(['user_id','item_id','prediction'])

In [38]:
TOP_K = 10
rank_eval = SparkRankingEvaluation(test, predictions_mod, k = TOP_K, col_user="user_id", col_item="item_id", 
                                    col_rating="rating", col_prediction="prediction", 
                                    relevancy_method="top_k")

In [39]:
print("Model:\tALS",
      "Top K:\t%d" % rank_eval.k,
      "MAP:\t%f" % rank_eval.map_at_k(),
      "Recall@K:\t%f" % rank_eval.recall_at_k(), sep='\n')

Model:	ALS
Top K:	10
MAP:	0.359849
Recall@K:	0.359849


In [40]:
'''Este proceso nos permite encontrar los hiperparámetros que mejor se ajustan a una predicción.
   Este proceso, requiere ejecutar varias repeticiones del mismo modelo, por lo que consume tiempo 
   de computación.'''


from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

(trainingRatings, validationRatings) = df_ratings.randomSplit([80.0, 20.0])

evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')

paramGrid = ParamGridBuilder()\
                .addGrid(rec.rank, [5, 7, 10,20])\
                .addGrid(rec.maxIter, [20])\
                .addGrid(rec.regParam, [0.05, 0.1, 0.5]).build()

crossval = CrossValidator(estimator=rec, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=10)
cvModel = crossval.fit(trainingRatings)
predictions = cvModel.transform(validationRatings)

print('The root mean squared error for our model is: {}'.format(evaluator.evaluate(predictions.na.drop())))

The root mean squared error for our model is: 0.5956734511935461


In [62]:
best_model=cvModel.bestModel

In [63]:
path='/DATA'
best_model.write().overwrite().save("DATA/cvModel_bm")

In [44]:
predictions.show()

+-------+-------+------+----------+
|user_id|item_id|rating|prediction|
+-------+-------+------+----------+
|   2659|    148|     1| 0.7264628|
|    897|    148|     1|0.15539992|
|   1990|    148|     1|0.82738805|
|   2580|    148|     1|0.41153458|
|    516|    148|     1|0.09480336|
|   2096|    148|     0|0.18181685|
|   2622|    148|     0|       0.0|
|    613|    148|     1| 0.2595713|
|    375|    148|     1| 0.5614809|
|    593|    148|     0| 0.1573008|
|    530|    148|     1|0.23072395|
|    772|    148|     1| 0.8326964|
|   1165|    148|     1|0.45428574|
|   2261|    148|     1|0.47950786|
|   1259|    148|     1|0.30406782|
|   1031|    148|     1|0.69918656|
|   2683|    148|     1|0.37176973|
|   1051|    148|     0|       0.0|
|   1489|    148|     0|0.24846314|
|   1752|    148|     1| 0.5928776|
+-------+-------+------+----------+
only showing top 20 rows



In [68]:
predictions = best_model.transform(validationRatings)
predictions_cv_mod=predictions.select(['user_id','item_id','prediction'])

In [46]:
# Generate top 10 digital music recommendations for each user
userRecs = cvModel.bestModel.recommendForAllUsers(10)
# Generate top 10 user recommendations for each digital music
boardGameRecs = cvModel.bestModel.recommendForAllItems(10)

userRecs.show(5)
boardGameRecs.show(5)

+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|   1580|[{695, 0.7982432}...|
|    471|[{826, 0.8448902}...|
|   1591|[{167, 0.73535216...|
|   1342|[{46, 0.78094655}...|
|   2122|[{268, 0.44451216...|
+-------+--------------------+
only showing top 5 rows

+-------+--------------------+
|item_id|     recommendations|
+-------+--------------------+
|    148|[{1603, 1.0292795...|
|    463|[{1310, 0.4224890...|
|    471|[{2168, 0.0833495...|
|    496|[{2055, 0.0415961...|
|    833|[{1253, 0.8953640...|
+-------+--------------------+
only showing top 5 rows



In [69]:
TOP_K = 10
rank_eval2 = SparkRankingEvaluation(validationRatings, predictionscv_mod, k = TOP_K, col_user="user_id", col_item="item_id", 
                                    col_rating="rating", col_prediction="prediction", 
                                    relevancy_method="top_k")

In [70]:
print("Model:\tALS",
      "Top K:\t%d" % rank_eval2.k,
      "MAP:\t%f" % rank_eval2.map_at_k(),
      "Recall@K:\t%f" % rank_eval2.recall_at_k(), sep='\n')

Model:	ALS
Top K:	10
MAP:	0.360087
Recall@K:	0.360087


In [49]:
boardGameRecs_pandas = cvModel.bestModel.recommendForAllItems(10).toPandas()
userRecs_pandas = cvModel.bestModel.recommendForAllUsers(10).toPandas()
boardGameRecs_pandas['recommendations'][0]
userRecs_pandas['recommendations'][3]

In [78]:
recs=userRecs[0]

In [75]:
def get_recs_for_user(recs):
    #Recs should be for a specific user
    recs=recs.select("recommendations.item_id","recommendations.ratings")
    items=recs.select('item_id').toPandas().iloc[0,0]
    ratings=recs.select("ratings").toPandas().iloc[0,0]
    rating_matrix=pd.DataFrame(items, columns=['item_id'])
    rating_matrix['ratings']=ratings
    rating_matrix_ps=sqlContext.createDataFrame(rating_matix)
    return rating_matrix_ps

In [83]:
sameModel = cvModel.load("DATA/cvModel")

In [85]:

spark.stop()