In [1]:
# Aim is to connect the Movie Names from the file "u.item" to the MovieID from the last notebook.
# Use of "sc.broadcast"

In [2]:
from pyspark import SparkConf, SparkContext

In [3]:
conf = SparkConf().setMaster("local").setAppName("PopularMovies")
sc = SparkContext.getOrCreate(conf = conf)

In [4]:
data1 = sc.textFile("/FileStore/tables/u.data")  #Movie ID and ratings data
for i in data1.collect()[0:9]:
  print(i)

In [5]:
# The MovieNames are in the file "u.item".
# Let's see the u.item file.

data2 = sc.textFile("/FileStore/tables/u.item")

for i in data2.collect()[0:4]:
  print(i)

In [6]:
# We can see that 1st column is MovieID, 2nd column is MovieName and are seperated by "|".
# Creating a function that extracts the required columns and created a dictionary of MovieID and MovieName.

def loadMovieNames():
  movieNames = {}                           #Empty dictionary
  for line in data2.collect():
    fields = line.split("|")                #Splitting on "|"
    movieNames[int(fields[0])] = fields[1]  #Adding MovieID = MovieName in blank dictionary movieNames
    
    return movieNames

In [7]:
nameDict = sc.broadcast(loadMovieNames())  #Saving the dictionary obtained through the function loadMoviesNames as broadcast object

In [8]:
# Repeating what we did in previous notebook i.e. finding the most popular (watched) movie (METHOD 2)

pairs = data1.map(lambda x: (int(x.split()[1]),1))
count = pairs.reduceByKey(lambda x,y: x+y)
sortedMovies  = count.sortBy(lambda x:x[1], ascending = False)

for i in sortedMovies.collect()[0:4]:
  print (i)

In [9]:
  # Now we have the 1st column as unique "MovieID" and 2nd column as the 2. "count", we can associate MovidID with MovieName from 'nameDict' as follows. 
# MovieName can be obtained as: dictionary.value(key)  i.e. nameDict.value(MovidID)

sortedMoviesWithNames = sortedMovies.map(lambda x : (nameDict.value[x[0]], x[1]))

for i in sortedMoviesWithNames.collect():
    print(i)