In [1]:
import findspark
findspark.init('/Users/lmh/Programs/spark-2.2.0-bin-hadoop2.7')
from pyspark.sql import SparkSession 
from pyspark.sql import Row
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from matplotlib import pyplot as plt
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)

In [2]:
rdd = sc.textFile('./csv/Pre_100k.csv')
rdd = rdd.map(lambda x: x.split(','))
rdd.take(10)

[[u'3603', u'5.0', u'2838'],
 [u'3603', u'4.0', u'7527'],
 [u'523', u'1.0', u'6801'],
 [u'2055', u'5.0', u'254'],
 [u'1350', u'5.0', u'3979'],
 [u'1417', u'5.0', u'6297'],
 [u'1417', u'5.0', u'3264'],
 [u'2906', u'5.0', u'4006'],
 [u'2906', u'5.0', u'2403'],
 [u'2906', u'5.0', u'1888']]

In [3]:
dfrdd = rdd.map(lambda x: Row(userid = int(x[2]),
                              rating = float(x[1]),
                              itemid = int(x[0])))

In [4]:
df = spark.createDataFrame(dfrdd)
df.show()

+------+------+------+
|itemid|rating|userid|
+------+------+------+
|  3603|   5.0|  2838|
|  3603|   4.0|  7527|
|   523|   1.0|  6801|
|  2055|   5.0|   254|
|  1350|   5.0|  3979|
|  1417|   5.0|  6297|
|  1417|   5.0|  3264|
|  2906|   5.0|  4006|
|  2906|   5.0|  2403|
|  2906|   5.0|  1888|
|  2906|   5.0|  1485|
|  2906|   5.0|  4489|
|  2906|   4.0|  2984|
|  2906|   5.0|   137|
|  2906|   5.0|  2963|
|  2906|   5.0|   912|
|   589|   5.0|   644|
|   589|   5.0|   583|
|   589|   5.0|  3233|
|   589|   5.0|    14|
+------+------+------+
only showing top 20 rows



In [5]:
splits = df.randomSplit([0.9, 0.1], 24)

In [6]:
traindf = splits[0]
testdf = splits[1]

In [10]:
als = ALS(maxIter = 5, regParam = 0.01, implicitPrefs = False, 
          userCol = "userid", itemCol="itemid", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(df)

In [11]:
pred = model.transform(testdf)
pred.show()

+------+------+------+----------+
|itemid|rating|userid|prediction|
+------+------+------+----------+
|   463|   5.0|  3093|  4.999531|
|  1959|   5.0|  7290|  4.997807|
|  1143|   5.0|  4927|  4.998475|
|  3000|   4.0|  2374| 3.9972644|
|    65|   5.0|  3383|  4.997807|
|  1977|   3.0|  4773| 2.9996648|
|  2249|   5.0|  4934| 4.9987793|
|  2833|   5.0|   353|  4.997807|
|   799|   5.0|  3981| 4.9994607|
|  2156|   5.0|  1284| 4.9987197|
|   296|   5.0|  1394|  4.997807|
|  1466|   5.0|  6205|  4.997807|
|  1766|   5.0|  6152|  4.999999|
|  1766|   4.0|  7420| 3.9996452|
|  2874|   4.0|  4515|  3.997264|
|   322|   5.0|   605|  4.997807|
|   513|   5.0|  6599|  4.997807|
|   918|   5.0|   323|  4.999449|
|   918|   5.0|   681|  4.999449|
|  3657|   5.0|  7656|  4.999352|
+------+------+------+----------+
only showing top 20 rows



In [9]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(pred)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 4.61857167733
