In [5]:
from pyspark import SparkContext
from pyspark.mllib.evaluation import RegressionMetrics, RankingMetrics
from pyspark.mllib.recommendation import ALS, Rating

In [6]:
def parseLine(line):
    fields = line.split("|")
    return Rating(int(fields[0]), int(fields[1]), float(fields[2]) - 2.5)

In [7]:
peliculas = sc.textFile("peliculas.csv").filter(lambda l: not l.startswith(u'#') and not l.startswith(u'Entry|')).map(lambda l: l.split("|"))
print peliculas.take(2)

[[u'01', u'Brief Encounter', u'David Lean ', u'Celia Johnson, Cyril Raymond, Stanley Holloway, Trevor Howard', u'1945', u'', u'http://www.imdb.com/title/tt0037558/', u'http://www.guardian.co.uk/film/movie/35664/brief.encounter', u'UK'], [u'02', u'Casablanca', u'Michael Curtiz', u'Claude Rains, Humphrey Bogart, Ingrid Bergman, Paul Henreid', u'1942', u'3', u'http://www.imdb.com/title/tt0034583/', u'http://www.guardian.co.uk/film/movie/36156/casablanca', u'USA']]


In [9]:
ratings = sc.textFile("ratings.csv").filter(lambda l: not l.startswith('pelicula_id')).map(lambda l: l.split(",")).map(lambda l: Rating(int(l[1]), int(l[0]), float(l[2])))
print ratings.take(2)

[Rating(user=7770, product=719, rating=0.0), Rating(user=35851, product=225, rating=3.0)]


In [11]:
media_ratings = ratings.map(lambda r: (r.product, (r.rating, 1))).reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])).map(lambda p: (p[0], p[1][0] / float(p[1][1])))
print media_ratings.collect()

[(328, 2.1044776119402986), (320, 1.9655172413793103), (2, 1.911764705882353), (4, 2.0161290322580645), (342, 2.1447368421052633), (6, 2.25), (8, 1.828125), (10, 2.1951219512195124), (12, 2.0), (322, 2.0277777777777777), (14, 2.088607594936709), (18, 1.7704918032786885), (20, 2.189655172413793), (22, 2.3333333333333335), (24, 1.8709677419354838), (324, 1.7333333333333334), (26, 2.028985507246377), (28, 2.1315789473684212), (410, 2.1549295774647885), (414, 1.7534246575342465), (780, 2.026666666666667), (416, 1.9558823529411764), (240, 1.8421052631578947), (34, 2.1029411764705883), (36, 1.9830508474576272), (38, 1.8571428571428572), (412, 1.9844961240310077), (42, 2.262295081967213), (44, 2.130434782608696), (46, 1.6455696202531647), (48, 2.1805555555555554), (712, 1.6282051282051282), (270, 1.7575757575757576), (310, 2.15625), (312, 2.242857142857143), (316, 1.8840579710144927), (714, 1.9078947368421053), (62, 1.8142857142857143), (64, 1.8064516129032258), (66, 1.6343283582089552), (68,

In [12]:
# Entrenar modelo
model = ALS.train(ratings, 1)
model

<pyspark.mllib.recommendation.MatrixFactorizationModel at 0x7fd2f2cefb10>

In [13]:
# generar posibles pares de usuario / pelicula
ids_pelicula = sc.textFile('pelicula_ids.csv')
ids_usuario = sc.textFile('pelicula_usuarios.csv')
publico_objetivo = ids_usuario.cartesian(ids_pelicula)  #ids_pelicula.cartesian(ids_usuario)
# print posibles_pares.take(10)

In [14]:
# Crear predicciones
predicciones = model.predictAll(publico_objetivo)
print predicciones.take(4)

[Rating(user=66801, product=312, rating=-2.764148915822261), Rating(user=66801, product=39, rating=-3.182212845647882), Rating(user=66801, product=225, rating=-6.251215069607952), Rating(user=66801, product=324, rating=-3.230602171941655)]


In [17]:
from pyspark.sql import SQLContext, Row
spark = SparkSession.builder.master("local").appName("SQL").getOrCreate()
df = spark.createDataFrame(predicciones)
df.registerTempTable('predicciones')
df.collect()

[Row(user=66801, product=312, rating=-2.764148915822261),
 Row(user=66801, product=39, rating=-3.182212845647882),
 Row(user=66801, product=225, rating=-6.251215069607952),
 Row(user=66801, product=324, rating=-3.230602171941655),
 Row(user=66801, product=723, rating=2.748704391741285),
 Row(user=66801, product=417, rating=-4.412561805770338),
 Row(user=66801, product=711, rating=-3.3168052180560608),
 Row(user=66801, product=216, rating=2.5672523287495324),
 Row(user=66801, product=66, rating=2.9263784478993102),
 Row(user=66801, product=720, rating=4.47314602782626),
 Row(user=66801, product=765, rating=3.4323425635749345),
 Row(user=66801, product=36, rating=-2.7218384537545717),
 Row(user=66801, product=24, rating=2.7409406931727744),
 Row(user=66801, product=75, rating=2.781920917125717),
 Row(user=66801, product=270, rating=-2.7695228568110224),
 Row(user=66801, product=219, rating=-2.699865793031563),
 Row(user=66801, product=624, rating=3.5696594504831864),
 Row(user=66801, pro

In [18]:
# ¿Tenemos un modelo correcto?
# R-Squared 0, indicates that the model explains none of the variability of the response data around its mean.
# R-Squared 1, indicates that the model explains all the variability of the response data around its mean.
ratingsTuple = ratings.map(lambda r: ((r.user, r.product), r.rating))
scoreAndLabels = predicciones.map(lambda r: ((r.user, r.product), r.rating)).join(ratingsTuple).map(lambda tup: tup[1])

metrics = RegressionMetrics(scoreAndLabels)
print("RMSE = %s" % metrics.rootMeanSquaredError)
print("R-squared = %s" % metrics.r2)

RMSE = 0.557591172575
R-squared = 0.84337132393
