In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions

In [3]:
def loadMovieNames():
    """
    This function just creates a Python "dictionary" we can later
    use to convert movie ID's to movie names while printing out
    the final results.
    """
    movieNames = {}
    with open("ml-100k/u.item", encoding='ISO-8859-1') as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames
def parseInput(line):
    """
    Take each line of u.data and convert it to Row(movieID, count, rating)
    This way we can then add up all the ratings for each movie, and
    the total number of ratings for each movie (which lets us compute the average)
    """

    fields = line.split()
    return Row(movieID = int(fields[1]), rating = float(fields[2]))

In [5]:
if __name__ == "__main__":
    # Create a SparkSession (the config bit is only for Windows!)
    spark = SparkSession.builder.appName("PopularMovies").getOrCreate()

    # Load up our movie ID -> name dictionary
    movieNames = loadMovieNames()

    # Get the raw data
    #     lines = spark.sparkContext.textFile("hdfs:///user/maria_dev/ml-100k/u.data")
    lines = spark.sparkContext.textFile("ml-100k/u.data")

    # Convert it to a RDD of Row objects with (movieID, rating)
    movies = lines.map(parseInput)
    # Convert that to a DataFrame
    movieDataset = spark.createDataFrame(movies)

    # Compute average rating for each movieID
    averageRatings = movieDataset.groupBy("movieID").avg("rating")

    # Compute count of ratings for each movieID
    counts = movieDataset.groupBy("movieID").count()

    # Join the two together (We now have movieID, avg(rating), and count columns)
    averagesAndCounts = counts.join(averageRatings, "movieID")

    # Filter movies rated 10 or fewer times
    popularAveragesAndCounts = averagesAndCounts.filter("count > 10")

    # Pull the top 10 results
    topTen = popularAveragesAndCounts.orderBy("avg(rating)").take(10)

    # Print them out, converting movie ID's to names as we go.
    for movie in topTen:
        print (movieNames[movie[0]], movie[1], movie[2])

    # Stop the session
    spark.stop()


Children of the Corn: The Gathering (1996) 19 1.3157894736842106
Body Parts (1991) 13 1.6153846153846154
Amityville II: The Possession (1982) 14 1.6428571428571428
Lawnmower Man 2: Beyond Cyberspace (1996) 21 1.7142857142857142
Robocop 3 (1993) 11 1.7272727272727273
Free Willy 3: The Rescue (1997) 27 1.7407407407407407
Gone Fishin' (1997) 11 1.8181818181818181
Solo (1996) 12 1.8333333333333333
Vampire in Brooklyn (1995) 12 1.8333333333333333
Ready to Wear (Pret-A-Porter) (1994) 18 1.8333333333333333
