### 1. Take a look at the two data file


#### U.Data
    User ID, Movie ID, Rating, Timestamp
        196	242	3	881250949
        186	302	3	891717742
        166	346	1	886397596


#### U.Item
    Movie ID, Movie Name, Year, Website
    1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)
    2|GoldenEye (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?GoldenEye%20(1995)


### 2. Task: Find the top-10 most popular movies in 1998

    Import SparkConf and SparkContext

In [None]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("PopularMovies")
sc = SparkContext(conf = conf)

    Load movie names from U.item file. In the end we want to print movie names instead of movie ids.
    Only extract movie id and name and store them in a dictionary.

In [None]:
def loadMovieNames():
    movieNames = {}
    with open("SparkCourse/ml-100k/u.ITEM") as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

    Broadcast it all executors so every one of them can share the information.

In [None]:
nameDict = sc.broadcast(loadMovieNames())

    Read the u.data file and get movie id.
    Map each movie id a value 1 so we can perform count later.
    Now we have a new RDD called "movies" that contains movie id as keys and 1 as values

lines = sc.textFile("file:///SparkCourse/ml-100k/u.data")
movies = lines.map(lambda x: (int(x.split()[1]), 1))

    Now call reduceByKey to add those 1's together for each unique movie id to get the count of how many times each movie appears.

In [None]:
movieCounts = movies.reduceByKey(lambda x, y: x + y)

    But we want to our results to be sorted in decesdening order (i.e. popular movies appear fist).
    Let's filp the key value pair to get counts as the key and movie id as the value
    Then we can call sortByKey to sort the results

In [None]:
flipped = movieCounts.map(lambda xy: (xy[1],xy[0]))
sortedMovies = flipped.sortByKey()

    Let's replace the movie id with movie name so that the result can be more readable.
    Remember we use broadcast() to ship off the nameDict. Call ".value" to get it back
    And don't forget to flip it back so movie name will be the key

In [None]:
sortedMoviesWithNames = sortedMovies.map(lambda countMovie : (nameDict.value[countMovie[1]], countMovie[0]))

    Use collect() to collect the results and then print them out

In [None]:
results = sortedMoviesWithNames.collect()
for result in results:
    print(result)

#### Output:
    

    ('English Patient, The (1996)', 481)
    ('Liar Liar (1997)', 485)
    ('Return of the Jedi (1983)', 507)
    ('Fargo (1996)', 508)
    ('Contact (1997)', 509)
    ('Star Wars (1977)', 583)

#### All in one

In [None]:
from pyspark import SparkConf, SparkContext

def loadMovieNames():
    movieNames = {}
    with open("SparkCourse/ml-100k/u.ITEM") as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

conf = SparkConf().setMaster("local").setAppName("PopularMovies")
sc = SparkContext(conf = conf)

nameDict = sc.broadcast(loadMovieNames())

lines = sc.textFile("file:///SparkCourse/ml-100k/u.data")
movieCounts = lines.map(lambda x: (int(x.split()[1]), 1)).reduceByKey(lambda x, y: x + y)
sortedMovies = movieCounts.map(lambda xy: (xy[1],xy[0])).sortByKey()
sortedMoviesWithNames = sortedMovies.map(lambda countMovie : (nameDict.value[countMovie[1]], countMovie[0]))

results = sortedMoviesWithNames.collect()
for result in results:
    print(result)