# Modelo de recomendacion

A partir de la libreria Surprice, usamos el algoritmo de KNnWhitMeans, que en su caja negra lo que hace es construir una matriz a partir de los ratings puestos por los usuario
sobre los productos. Calcula la distancia por coseno entre vectores(rating) para ponerle valor numerico a la relacion para poder recomendar en base a los productos.

In [None]:
! pip install scikit-surprise

In [None]:
import pandas as pd #Importamos lo necesario
from pyspark.sql import SparkSession 
import numpy as np
from pyspark.sql import functions as F
from pyspark.sql.functions import col, udf, desc
from surprise import Reader
from surprise import Dataset
from surprise import accuracy
from surprise.model_selection import cross_validate
from surprise.model_selection import train_test_split
from surprise.model_selection import GridSearchCV
from pyspark.sql.types import DoubleType, IntegerType, StringType, FloatType
from surprise import SVD, SVDpp, NMF, SlopeOne, CoClustering, KNNBaseline, KNNWithZScore, KNNWithMeans, KNNBasic, BaselineOnly, NormalPredictor

In [None]:
#Iniciamos sesion en Spark
spark = SparkSession.builder.appName('sent').getOrCreate()
#leemos el dataset
dfspark = spark.read.json('../common/final')

In [None]:
#Definimos una funcion que va a servir para crear un columna nueva
def prom_rating(val, val1):
    ''' Calcula el valor pormedio entre dos valores '''
    return (val + val1)/2  
col_new = udf(prom_rating)
#Creamos una columna nueva que promedia el ranking dado por el usuario y el establecido por el analizis de testo
dfspark = dfspark.withColumn("average_ranq", col_new(col("sentiment"), col('rating')))

In [None]:
user = 'usuarioParaAlCualRecomendar' #Seleccionamos un usuario
producto = 'productoSegúnCualRecomendar' #Seleccionamos un producto que haya comprado el usuario
usuarios_rel = dfspark.filter(dfspark.productId == producto).select('reviewerId').collect()
df_reducido = dfspark.filter((dfspark.reviewerId).isin(usuarios_rel)) #Trabajamos con los usuarios que comparten el producto

In [None]:
#Lista de los productos con mas reviews
mejores_productos_más_calificados = df_reducido.groupBy('categories','productId').count().sort(F.col("count").desc()).limit(7000)
productos = mejores_productos_más_calificados.select('productId').rdd.flatMap(lambda x: x).collect()
#Seleccionamos los campos a utilizar
dfspark1 = df_reducido.select('productId', 'reviewerId', 'average_ranq')
#Filtramos el dataframe con los productos con mas reviews
dfsparkFiltro = dfspark1.filter((dfspark.productId).isin(productos))

In [None]:
#Cambiamos los nombres de los campos para que el modelo los pueda procesar
modelo = dfsparkFiltro.withColumnRenamed('reviewerId', 'user').withColumnRenamed('productId','item').withColumnRenamed('average_ranq', 'rating')
#Cambiamos el tipo de dato de la columna 'rating'
modelo = modelo.withColumn('rating', F.col('rating').cast(FloatType()))

In [None]:
#Hacemos dos listas de usuarios e items
users = modelo.select('user').distinct()
items = modelo.select('item').distinct()
users = users.select('user').rdd.flatMap(lambda x: x).collect()
items = items.select('item').rdd.flatMap(lambda x: x).collect()


In [None]:
#Fortalecemos la conexion de PySpark con Pandas
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
#Pasamos el dataframe a Pandas
df_pd = modelo.toPandas()
#Instanciamos el modelo
sim_options = {"name": "cosine", "user_based": False,}

algoritmo = KNNWithMeans(sim_options=sim_options)
reader = Reader(rating_scale=(1, 5))
data_n = Dataset.load_from_df(df_pd[["user", "item", "rating"]], reader)
#Definimos la matriz de entrenamiento
trainingSet = data_n.build_full_trainset()
algoritmo.fit(trainingSet)

In [None]:
#El modelo devuelvo los 5 productos mas recomendados para un usuario x
def recomsnd_user(user):
    recomend = []    
    for item in items:
        p1 = algoritmo.predict(user, item[:50])
        if p1.est > 1:
            p2 = [p1.est , p1.iid]
            recomend.append(p2)
    recomend.sort()
    return recomend[5:]

def item_id_to_name(prod_id):
    name = dfspark.filter((dfspark.productId)==prod_id).collect()[0][10]    
    return name

def display(recomend): 
    con = 0
    while con <= 4:
        calification = recomend[-5:][con][0]
        prod_id = recomend[-5:][con][1]
        print(f'Para el usuario {user} la recomendación número {con+1} : {item_id_to_name(prod_id)}')
        con += 1

In [None]:
recomend = recomsnd_user(user)
display(recomend) #Obtenemos las recomendaciones para ese usuario

In [None]:
prod_id = '0006476155' #Obtenemos el nombre de un producto según su ID
item_id_to_name(prod_id)

In [None]:
dfspark.filter(dfspark.reviewerId == user).select('title').show() #Devuelve los productos que compró el usuario

In [None]:
df_reducido.filter(dfspark.reviewerId == user).select('title').show() #Devuelve los productos que compró el usuario, que se usan en el modelo