In [1]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.recommendation import ALS
from math import sqrt
from operator import add
from sklearn.metrics import mean_squared_error
import pandas as pd
import numpy as np
from pyspark import SparkConf, SparkContext
RUTA_BD='BD'

In [2]:
import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error
from pyspark.ml.evaluation import RegressionEvaluator,Evaluator
from math import sqrt
from sklearn.metrics.pairwise import pairwise_distances
from sklearn.metrics.pairwise import cosine_similarity

class EvaluadorRMSE(Evaluator):
    """
    Evalua RMSE de forma robusta.
    Es Igual que RegressionEvaluator con metric=rmse pero descartando valores no predecidos
    
    """
    def __init__(self,predictionCol, targetCol):        
        super(EvaluadorRMSE, self).__init__()
        self.predictionCol=predictionCol
        self.targetCol=targetCol
        
    def _evaluate(self, dataset):       
        error=rmse(dataset,self.predictionCol,self.targetCol)
        print ("Error: {}".format(error))
        return error
    
    def isLargerBetter(self):
        return False
    
    
class ModelBasedALS(object):
    """
    Envoltorio para la clase ALS de ml de Spark. 
    Da soporte a los metodos de ALS de mllib
    
    """
    def __init__(self,modelALS):
        super(ModelBasedALS, self).__init__()
        """
        Parametros
        ----------
        modelALS : objeto entrenado de pyspark.ml.recommendation.ALS
        """
        self.userIndex,self.userFactors = self.toArray(modelALS.userFactors)
        self.itemIndex,self.itemFactors = self.toArray(modelALS.itemFactors)
        self.prediccion=pd.DataFrame(data=self.userFactors.dot(self.itemFactors.T),columns=self.itemIndex,index=self.userIndex)
        
        self.relacion_index_user=dict(zip(self.userIndex,range(len(self.userIndex))))
        self.relacion_index_item=dict(zip(self.itemIndex,range(len(self.itemIndex))))        


    def predictAll(self,user_item:pd.DataFrame,tag_prediccion='prediccion'):
        """
        Devuelve todas las predicciones dado el par (user,item)
        """
        estimaciones=[]
        for tupla in user_item.values:
            try:
                estimacion=self.prediccion.iloc[self.relacion_index_user[tupla[0]],self.relacion_index_item[tupla[1]]]
                estimaciones.append(estimacion)
            except:
                estimaciones.append(np.nan)

        user_item[tag_prediccion]=estimaciones
        return user_item
    
    def recommendProducts(self,user:int,n:int=3):
        """
        Devuelve el top de productos recomendados para el usuario
        """
        usuario=self.prediccion.loc[user]
        usuario.sort_values(ascending=False)
        return usuario.iloc[:n]
    
    def recommendUsers(self,product:int,n:int=3):
        """
        Devuelve el top de los usuarios de un producto
        """
        productos=self.prediccion.loc[:,product]
        productos.sort(ascending=False)        
        return productos.iloc[:n]
        
    @staticmethod
    def toArray(datos):
        indices=[]
        lista=[]
        aaa=datos.rdd.map(lambda l:(l.id,l.features)).collect()
        for tupla in aaa:
            indices.append(tupla[0])
            lista.append(tupla[1])

        return indices,np.array(lista)    
    
   
   
    
    
def rmse(dataset,predictionCol,targetCol):
    valores=np.array(dataset.dropna().rdd.map(lambda r:[r[predictionCol],r[targetCol]]).collect())
    error = sqrt(mean_squared_error(valores[:,0],valores[:,1]))
    return error  

In [3]:
USERID='user_id'
PRODUCTID='book_id'
PREDICCION ='prediccion'
NOMBRE_LIBRO='nombre_libro'
TARGET     ='rating'
    
def error_train_test(modelo,train,test,prediccion=PREDICCION,target=TARGET):
    return ( rmse(modelo.transform(train),prediccion,target),
        rmse(modelo.transform(test),prediccion,target))

In [4]:
historico_valoraciones = pd.read_csv("data\\ratings.csv")
historico_valoraciones = historico_valoraciones.loc[historico_valoraciones['user_id']<=1000]
relacion_libros = df_books = pd.read_csv("data\\books.csv")
relacion_libros = relacion_libros[['book_id','original_title']]

In [5]:
historico_valoraciones.head()

Unnamed: 0,user_id,book_id,rating
0,1,258,5
1,2,4081,4
2,2,260,5
3,2,9296,5
4,2,2318,3


In [6]:
relacion_libros.head()

Unnamed: 0,book_id,original_title
0,1,The Hunger Games
1,2,Harry Potter and the Philosopher's Stone
2,3,Twilight
3,4,To Kill a Mockingbird
4,5,The Great Gatsby


In [7]:
id_libros_mas_leidos=historico_valoraciones.groupby(by=PRODUCTID).count().sort_values(USERID,ascending=False).head(10).index.values
id_libros_mas_leidos

array([ 8,  4,  5,  2,  1, 26, 25, 18, 23, 14], dtype=int64)

In [8]:
libros_mas_leidos = relacion_libros[relacion_libros[PRODUCTID].isin(id_libros_mas_leidos)]
libros_mas_leidos

Unnamed: 0,book_id,original_title
0,1,The Hunger Games
1,2,Harry Potter and the Philosopher's Stone
3,4,To Kill a Mockingbird
4,5,The Great Gatsby
7,8,The Catcher in the Rye
13,14,Animal Farm: A Fairy Story
17,18,Harry Potter and the Prisoner of Azkaban
22,23,Harry Potter and the Chamber of Secrets
24,25,Harry Potter and the Deathly Hallows
25,26,The Da Vinci Code


In [9]:
#Se ingresan los valores de los libros más leidos
ranking_mio = []
for fila in libros_mas_leidos.itertuples():
    puntuacion = input("Puntuacion de 1 al 5 de {}: ".format(fila[2]))
    ranking_mio.append([0,fila[1],float(puntuacion)])
mis_valoraciones = pd.DataFrame(data=ranking_mio,columns=[USERID,PRODUCTID,TARGET])
mis_valoraciones[TARGET] = mis_valoraciones[TARGET].astype(np.float64,copy=False)

Puntuacion de 1 al 5 de The Hunger Games: 3
Puntuacion de 1 al 5 de Harry Potter and the Philosopher's Stone: 4
Puntuacion de 1 al 5 de To Kill a Mockingbird: 2
Puntuacion de 1 al 5 de The Great Gatsby: 5
Puntuacion de 1 al 5 de The Catcher in the Rye: 2
Puntuacion de 1 al 5 de Animal Farm: A Fairy Story: 3
Puntuacion de 1 al 5 de Harry Potter and the Prisoner of Azkaban: 4
Puntuacion de 1 al 5 de Harry Potter and the Chamber of Secrets: 2
Puntuacion de 1 al 5 de Harry Potter and the Deathly Hallows: 3
Puntuacion de 1 al 5 de The Da Vinci Code: 1


In [10]:
ranking_mio

[[0, 1, 3.0],
 [0, 2, 4.0],
 [0, 4, 2.0],
 [0, 5, 5.0],
 [0, 8, 2.0],
 [0, 14, 3.0],
 [0, 18, 4.0],
 [0, 23, 2.0],
 [0, 25, 3.0],
 [0, 26, 1.0]]

In [11]:
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
         .setMaster("local[*]")
         .setAppName("Myapp")
         .set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)
sc.setLogLevel("ERROR")

In [12]:
from sklearn.model_selection import train_test_split
X_historico, X_test = train_test_split(
    historico_valoraciones, test_size=0.3, random_state=0)

X_historico = pd.DataFrame(X_historico,columns=[USERID,PRODUCTID,TARGET])
X_test = pd.DataFrame(X_test,columns=[USERID,PRODUCTID,TARGET])

# Añado mis recomendaciones
X_train = pd.concat([mis_valoraciones,X_historico])

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

train_data = sqlContext.createDataFrame(data=X_train).cache()
test_data = sqlContext.createDataFrame(data=X_test).cache()

In [13]:
algoritmo = ALS(rank=5,userCol=USERID, itemCol=PRODUCTID, ratingCol=TARGET)
grid_parametros = ParamGridBuilder().addGrid(algoritmo.regParam, [0.1,0.25,0.5]).build()
evaluador = EvaluadorRMSE(predictionCol=algoritmo.getPredictionCol(), targetCol=algoritmo.getRatingCol())
cv = CrossValidator(estimator=algoritmo, estimatorParamMaps=grid_parametros, evaluator=evaluador, numFolds=2)
cv_model = cv.fit(train_data)

Error: 1.0556462016218362
Error: 0.9780324593207975
Error: 1.0453856553935064
Error: 1.0656756842578794
Error: 0.9779672080906572
Error: 1.0415927241266025


In [14]:
(error_training_als,error_test_als)=error_train_test(cv_model,train_data,test_data,algoritmo.getPredictionCol(),algoritmo.getRatingCol())
print ("ErrorValidacion: {}, ErrorTest: {}".format(error_training_als,error_test_als))

ErrorValidacion: 0.7951683797611278, ErrorTest: 0.9269212484081187


In [15]:
mb_als=ModelBasedALS(cv_model.bestModel)

In [16]:
pd.merge(mis_valoraciones,mb_als.predictAll(mis_valoraciones.iloc[:,[0,1]]))

Unnamed: 0,user_id,book_id,rating,prediccion
0,0,1,3.0,2.869859
1,0,2,4.0,2.760792
2,0,4,2.0,3.065893
3,0,5,5.0,2.708
4,0,8,2.0,2.733631
5,0,14,3.0,2.648598
6,0,18,4.0,2.889463
7,0,23,2.0,2.58591
8,0,25,3.0,3.113218
9,0,26,1.0,2.024028


In [18]:
relacion_libros.join(mb_als.recommendProducts(0,20),how='right')

Unnamed: 0,book_id,original_title,0
10,11,The Kite Runner,2.826431
20,21,Harry Potter and the Order of the Phoenix,2.417602
30,31,The Help,2.542763
40,41,The Lightning Thief,2.136401
50,51,City of Bones,3.062342
60,61,The Girl on the Train,2.49007
70,71,"Frankenstein; or, The Modern Prometheus",2.891159
80,81,The Glass Castle,3.021716
90,91,The Maze Runner,2.649585
100,101,Me Talk Pretty One Day,2.703109
