In [1]:
import findspark

In [2]:
findspark.init("/home/i-sip_iot/spark-3.0.1-bin-hadoop2.7")

### FIRST SIMPLE PRACTICE

In [9]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Recom').getOrCreate()

In [10]:
df = spark.read.csv("movielens_ratings.csv", inferSchema=True, header=True)

In [11]:
df.show()

+-------+------+------+
|movieId|rating|userId|
+-------+------+------+
|      2|   3.0|     0|
|      3|   1.0|     0|
|      5|   2.0|     0|
|      9|   4.0|     0|
|     11|   1.0|     0|
|     12|   2.0|     0|
|     15|   1.0|     0|
|     17|   1.0|     0|
|     19|   1.0|     0|
|     21|   1.0|     0|
|     23|   1.0|     0|
|     26|   3.0|     0|
|     27|   1.0|     0|
|     28|   1.0|     0|
|     29|   1.0|     0|
|     30|   1.0|     0|
|     31|   1.0|     0|
|     34|   1.0|     0|
|     37|   1.0|     0|
|     41|   2.0|     0|
+-------+------+------+
only showing top 20 rows



In [12]:
df.describe().show()

+-------+------------------+------------------+------------------+
|summary|           movieId|            rating|            userId|
+-------+------------------+------------------+------------------+
|  count|              1501|              1501|              1501|
|   mean| 49.40572951365756|1.7741505662891406|14.383744170552964|
| stddev|28.937034065088994| 1.187276166124803| 8.591040424293272|
|    min|                 0|               1.0|                 0|
|    max|                99|               5.0|                29|
+-------+------------------+------------------+------------------+



In [14]:
df.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- userId: integer (nullable = true)



In [17]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [21]:
als = ALS(maxIter=5, regParam=0.01, userCol="userId", ratingCol="rating", itemCol="movieId")

In [22]:
train, test = df.randomSplit([0.8, 0.2])

In [23]:
model = als.fit(train)

In [25]:
result = model.transform(test)

In [26]:
result.show()

+-------+------+------+-----------+
|movieId|rating|userId| prediction|
+-------+------+------+-----------+
|     31|   1.0|    27| -0.8701583|
|     31|   1.0|     0|  3.2491508|
|     85|   1.0|    26|  4.0484324|
|     85|   1.0|    15| 0.39796335|
|     85|   1.0|    23|  1.3905617|
|     85|   3.0|    21|   4.688449|
|     65|   2.0|    15|  0.3426787|
|     65|   1.0|     2| -1.1352309|
|     53|   1.0|    25| -0.5097553|
|     53|   5.0|    21|  3.8670835|
|     53|   3.0|    14|  3.4184494|
|     78|   1.0|     4| 0.45249736|
|     34|   1.0|    16|-0.41966838|
|     81|   1.0|    16|-0.13798101|
|     81|   3.0|    18| -0.3642043|
|     28|   1.0|     6|  0.2729597|
|     28|   1.0|     2|   2.184822|
|     76|   3.0|     7|  1.4831169|
|     76|   5.0|    14|  2.2320282|
|     26|   1.0|    13|  1.1973535|
+-------+------+------+-----------+
only showing top 20 rows



In [27]:
regEvaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')

In [29]:
regEvaluator.evaluate(result) 

1.6005933826824097

In [31]:
user_11 = df.filter(df['userId']==11).select(['movieId', 'userId'])

In [32]:
recommendation = model.transform(user_11)

In [34]:
recommendation.orderBy('prediction', ascending=False).show()

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|     23|    11| 5.2858744|
|     30|    11| 5.0727396|
|     32|    11| 5.0663557|
|     18|    11| 5.0059743|
|     69|    11|  4.878978|
|     27|    11| 4.8522844|
|     48|    11| 4.7792144|
|     79|    11|  4.608846|
|     81|    11|  4.546835|
|     19|    11|  4.115492|
|     38|    11|  4.097184|
|     66|    11| 3.8796604|
|     13|    11| 3.8688502|
|     90|    11| 3.8520064|
|     50|    11|  3.748921|
|     75|    11| 2.9841378|
|     80|    11| 2.9467983|
|     72|    11|  2.890746|
|     71|    11| 2.8282604|
|     97|    11| 2.7979472|
+-------+------+----------+
only showing top 20 rows



### New More Challenging Dataset

In [35]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Recom').getOrCreate()


In [37]:
from pyspark.sql import Row

In [36]:
data = spark.read.text("./ml-100k/u.data").rdd

In [39]:
df = spark.read.text("./ml-100k/u.item").rdd
df.collect()

[Row(value='1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0'),
 Row(value='2|GoldenEye (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?GoldenEye%20(1995)|0|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0'),
 Row(value='3|Four Rooms (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Four%20Rooms%20(1995)|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0'),
 Row(value='4|Get Shorty (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Get%20Shorty%20(1995)|0|1|0|0|0|1|0|0|1|0|0|0|0|0|0|0|0|0|0'),
 Row(value='5|Copycat (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Copycat%20(1995)|0|0|0|0|0|0|1|0|1|0|0|0|0|0|0|0|1|0|0'),
 Row(value='6|Shanghai Triad (Yao a yao yao dao waipo qiao) (1995)|01-Jan-1995||http://us.imdb.com/Title?Yao+a+yao+yao+dao+waipo+qiao+(1995)|0|0|0|0|0|0|0|0|1|0|0|0|0|0|0|0|0|0|0'),
 Row(value='7|Twelve Monkeys (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Twelve%20Monkeys%20(1995)|0|0|0|0|0|0|0|0|1|0

In [40]:
def parse_date(line):
    data = line.value.split()
    return Row(userID=int(data[0]), movieID=int(data[1]), rating=float(data[2]))

In [41]:
ratingData = data.map(parse_date)

In [61]:
ratingData.collect()

[Row(userID=196, movieID=242, rating=3.0),
 Row(userID=186, movieID=302, rating=3.0),
 Row(userID=22, movieID=377, rating=1.0),
 Row(userID=244, movieID=51, rating=2.0),
 Row(userID=166, movieID=346, rating=1.0),
 Row(userID=298, movieID=474, rating=4.0),
 Row(userID=115, movieID=265, rating=2.0),
 Row(userID=253, movieID=465, rating=5.0),
 Row(userID=305, movieID=451, rating=3.0),
 Row(userID=6, movieID=86, rating=3.0),
 Row(userID=62, movieID=257, rating=2.0),
 Row(userID=286, movieID=1014, rating=5.0),
 Row(userID=200, movieID=222, rating=5.0),
 Row(userID=210, movieID=40, rating=3.0),
 Row(userID=224, movieID=29, rating=3.0),
 Row(userID=303, movieID=785, rating=3.0),
 Row(userID=122, movieID=387, rating=5.0),
 Row(userID=194, movieID=274, rating=2.0),
 Row(userID=291, movieID=1042, rating=4.0),
 Row(userID=234, movieID=1184, rating=2.0),
 Row(userID=119, movieID=392, rating=4.0),
 Row(userID=167, movieID=486, rating=4.0),
 Row(userID=299, movieID=144, rating=4.0),
 Row(userID=291,

In [82]:
newDF = ratingData.reduceByKey(lambda movie1, movie2: ( movie1[0] + movie2[0], movie1[1] + movie2[1] ) )

In [87]:
# Filter out movies rated 10 or fewer times
popularTotalsAndCount = newDF.filter(lambda x: x[1][1] > 10)


# Map to (rating, averageRating)
averageRatings = popularTotalsAndCount.mapValues(lambda totalAndCount : totalAndCount[0] / totalAndCount[1])

# Sort by average rating
sortedMovies = averageRatings.sortBy(lambda x: x[1])

for result in sortedMovies:
    print(result[0], result[1])

# # Take the top 10 results
# results = sortedMovies.take(10)

# Print them out:
# for result in results:
#     print(movieNames[result[0]], result[1])

TypeError: 'PipelinedRDD' object is not iterable

In [74]:
Data_df_none_cache = spark.createDataFrame(ratingData)


In [69]:
Data_df = spark.createDataFrame(ratingData).cache()
Data_df.collect()

[Row(userID=196, movieID=242, rating=3.0),
 Row(userID=186, movieID=302, rating=3.0),
 Row(userID=22, movieID=377, rating=1.0),
 Row(userID=244, movieID=51, rating=2.0),
 Row(userID=166, movieID=346, rating=1.0),
 Row(userID=298, movieID=474, rating=4.0),
 Row(userID=115, movieID=265, rating=2.0),
 Row(userID=253, movieID=465, rating=5.0),
 Row(userID=305, movieID=451, rating=3.0),
 Row(userID=6, movieID=86, rating=3.0),
 Row(userID=62, movieID=257, rating=2.0),
 Row(userID=286, movieID=1014, rating=5.0),
 Row(userID=200, movieID=222, rating=5.0),
 Row(userID=210, movieID=40, rating=3.0),
 Row(userID=224, movieID=29, rating=3.0),
 Row(userID=303, movieID=785, rating=3.0),
 Row(userID=122, movieID=387, rating=5.0),
 Row(userID=194, movieID=274, rating=2.0),
 Row(userID=291, movieID=1042, rating=4.0),
 Row(userID=234, movieID=1184, rating=2.0),
 Row(userID=119, movieID=392, rating=4.0),
 Row(userID=167, movieID=486, rating=4.0),
 Row(userID=299, movieID=144, rating=4.0),
 Row(userID=291,

In [70]:
Data_df_none_cache = spark.createDataFrame(ratingData)
Data_df_none_cache.show()

+------+-------+------+
|userID|movieID|rating|
+------+-------+------+
|   196|    242|   3.0|
|   186|    302|   3.0|
|    22|    377|   1.0|
|   244|     51|   2.0|
|   166|    346|   1.0|
|   298|    474|   4.0|
|   115|    265|   2.0|
|   253|    465|   5.0|
|   305|    451|   3.0|
|     6|     86|   3.0|
|    62|    257|   2.0|
|   286|   1014|   5.0|
|   200|    222|   5.0|
|   210|     40|   3.0|
|   224|     29|   3.0|
|   303|    785|   3.0|
|   122|    387|   5.0|
|   194|    274|   2.0|
|   291|   1042|   4.0|
|   234|   1184|   2.0|
+------+-------+------+
only showing top 20 rows



In [71]:
from pyspark.ml.recommendation import ALS

In [72]:
als = ALS(maxIter=5, regParam=0.01, userCol="userID", ratingCol="rating", itemCol="movieID")

In [73]:
fittedDF = als.fit(Data_df)