# Recomendación con Spark

Importamos Spark e iniciamos un contexto Spark:

In [1]:
#import findspark
#findspark.init()
import pyspark
sc = pyspark.SparkContext()

Estos son otros módulos que usaremos:

In [2]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
import os

Comenzamos cargando los datos en nuestro contexto Spark:

In [3]:
data = sc.textFile("./data/rating.csv")

Debemos excluir el cabecero con los nombres de las columnas:

In [4]:
header = data.first() #extraemos la primera línea
data = data.filter(lambda fila: fila != header) #filtramos conservando todas las líneas que son distintas al cabecero

Spark requiere parsear y estructurar los datos de manera explícita. A continuación para cada línea separamos por coma y parseamos el id de usuario y película a entero y el rating a decimal. Esto nos genera una secuencia de objetos Rating que tienen en la primera componente el id de usuario, en la segunda el id de la película y en la tercera la evaluación:

In [5]:
ratings = data.map(
  lambda l: l.split(',')
).map(
  lambda l: Rating(int(l[0]), int(l[1]), float(l[2])) #objeto Rating prediseñado porque Spark está habituado a trabajar con recomendaciones
)

Dividimos de nuevo en entrenamiento y test:

In [6]:
train, test = ratings.randomSplit([0.8, 0.2])

Fijamos los parámetros como es habitual y entrenamos el modelo:

In [7]:
K = 10
iteraciones = 10
model = ALS.train(train, K, iteraciones)

Extraemos los usuarios y películas (input del modelo):

In [8]:
x = train.map(lambda p: (p[0], p[1]))

Extraemos las predicciones aplicando los datos de entrada extraídos previamente:

In [9]:
p = model.predictAll(x).map(lambda r: ((r[0], r[1]), r[2])) #almacenamos el resultado en una tupla de dos elementos

Unimos presentando el user_id, el movie_id el rating real y la predicción:

In [10]:
ratesAndPreds = train.map(lambda r: ((r[0], r[1]), r[2])).join(p) #mapeamos al mismo formato para poder hacer el join

Finalmente calculamos el error medio cuadrático:

In [None]:
mse = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Error medio de entrenamiento: %s" % mse)

Análogamente para las observaciones de validación:

In [12]:
x = test.map(lambda p: (p[0], p[1]))
p = model.predictAll(x).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = test.map(lambda r: ((r[0], r[1]), r[2])).join(p)
mse = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Error medio del test: %s" % mse)

Error medio del test: 0.6466768682765576


In [13]:
sc.stop()

## Conclusión

Trabajando en local no disponemos de clusters en los que derivar y repartir el trabajo. Si empleáramos el cluster de una empresa o tecnología en la nube (Amazon, Google...) este script se ejecutaría en menos de cinco minutos con los recursos adecuados.