In [None]:
from pyspark import SparkContext
sc = SparkContext("local", "test")

In [None]:
# ['user_id', 'age', 'sex', 'occupation', 'zip_code']
userRDD = sc.textFile("./u.user")

# ['user_id', 'movie_id', 'rating', 'unix_timestamp']
ratingRDD = sc.textFile("./u.data")

# ['movie_id', 'title', 'release_date', 'video_release_date', 'imdb_url' and 18 more columns]
movieRDD = sc.textFile("./u.item")

print("userRDD : ", userRDD.take(2))
print("ratingRDD : ", ratingRDD.take(2))
print("movieRDD : ", movieRDD.take(2))

In [None]:
### What are the 25 most rated movies? ###

# Create a RDD from ratingRDD that only contains the two columns of interest, i.e., movie_id, rating
RDD_movid_rating = ratingRDD.map(lambda x : (x.split("\t")[1], x.split("\t")[2]))
print("RDD_movid_rating : ", RDD_movid_rating.take(3))

# Create a RDD from movieRDD that only contains the two columns of interest, i.e., movie_id, title
RDD_movid_title = movieRDD.map(lambda x : (x.split("|")[0], x.split("|")[1]))
print("RDD_movid_title : ", RDD_movid_title.take(3))

# merge these two pair RDDs based on movie_id.  We will use the transformation leftOuterJoin()
rdd_movid_title_rating = RDD_movid_rating.leftOuterJoin(RDD_movid_title)
print("rdd_movid_title_rating : ", rdd_movid_title_rating.take(10))

# use the RDD in previous step to create (movie, 1) tuple pair RDD
rdd_title_rating = rdd_movid_title_rating.map(lambda x : (x[1][1], 1))
print("rdd_title_rating : ", rdd_title_rating.take(20))

# Use the transformation reduceByKey() to reduce on the basis of title
rdd_title_ratingcnt = rdd_title_rating.reduceByKey(lambda x, y : x + y)
print("rdd_title_ratingcnt : ", rdd_title_ratingcnt.take(5))

# Get the final answer by using the transformation takeOrdered()
print("#####################################")
print("25 most rated movies : ", rdd_title_ratingcnt.takeOrdered(25, lambda x : -x[1]))
print("#####################################")

In [None]:
# The above example in a single code
print(((ratingRDD.map(lambda x : (x.split("\t")[1], x.split("\t")[2]))). \
    leftOuterJoin(movieRDD.map(lambda x : (x.split("|")[0], x.split("|")[1])))). \
    map(lambda x : (x[1][1], 1)). \
    reduceByKey(lambda x, y : x + y). \
    takeOrdered(25, lambda x : -x[1]))

In [None]:
### Which movies are most highly rated? ###

# We already have rdd_movid_title_rating : [(u'429', (u'5', u'Day the Earth Stood Still, The (1951)'))]
# We create an RDD that contains sum of all the ratings for a particular movie

rdd_title_ratingsum = (rdd_movid_title_rating.map(lambda x : (x[1][1], int(x[1][0]))).reduceByKey(lambda x, y : x + y))

print("rdd_title_ratingsum : ", rdd_title_ratingsum.take(4))

# Merge this data with rdd_title_ratingcnt that we created previously
# And use map() function to divide rdd_title_ratingsum by rdd_title_ratingcnt

rdd_title_ratingmean_rating_count = (rdd_title_ratingsum.leftOuterJoin(rdd_title_ratingcnt). \
                                     map(lambda x : (x[0], (float(x[1][0]) / x[1][1], x[1][1]))))

print("rdd_title_ratingmean_rating_count : ", rdd_title_ratingmean_rating_count.take(3))

# We could have used take ordered here, but we want to only get the movies which have the counts
# of ratings >= 100; so lets filter the RDDs
rdd_title_rating_rating_count_gt_100 = (rdd_title_ratingmean_rating_count.filter(lambda x : x[1][1] >= 100))

print("rdd_title_rating_rating_count_gt_100 : ", rdd_title_rating_rating_count_gt_100.take(3))

# Get the final answer by using the transformation takeOrdered()
print("#####################################")
print("25 highly rated movies : ")
print(rdd_title_rating_rating_count_gt_100.takeOrdered(25, lambda x : -x[1][0]))
print("#####################################")