In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions

In [5]:
def loadMovieNames():
    movieNames = {}
    with open("ml-100k/u.item",encoding='ISO-8859-1') as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

In [6]:
# Create a SparkSession (the config bit is only for Windows!)
spark = SparkSession.builder.appName("PopularMovies").getOrCreate()

In [7]:
#Load up our ID -> name directionary
nameDict = loadMovieNames()

In [8]:
# Get the raw data
lines = spark.sparkContext.textFile("ml-100k/u.data")

In [9]:
# Convert it to a RDD of Row objects
movies = lines.map(lambda x: Row(movieID = int(x.split()[1])))

In [10]:
# Convert that to a DataFrame
movieDataset = spark.createDataFrame(movies)

In [12]:
# Some SQL-style magic to sort all movies by popularity in one line!
topMovieIDs = movieDataset.groupBy("movieID").count().orderBy("count", ascending=False).cache()

In [13]:
# Show the results at this point:

#|movieID|count|
#+-------+-----+
#|     50|  584|
#|    258|  509|
#|    100|  508|

In [14]:
topMovieIDs.show()

+-------+-----+
|movieID|count|
+-------+-----+
|     50|  583|
|    258|  509|
|    100|  508|
|    181|  507|
|    294|  485|
|    286|  481|
|    288|  478|
|      1|  452|
|    300|  431|
|    121|  429|
|    174|  420|
|    127|  413|
|     56|  394|
|      7|  392|
|     98|  390|
|    237|  384|
|    117|  378|
|    172|  367|
|    222|  365|
|    313|  350|
+-------+-----+
only showing top 20 rows



In [15]:
# Grab the top 10
top10 = topMovieIDs.take(10)

In [17]:
# Print the results
print("\n")
for result in top10:
    # Each row has movieID, count as above.
    print("%s: %d" % (nameDict[result[0]],result[1]))



Star Wars (1977): 583
Contact (1997): 509
Fargo (1996): 508
Return of the Jedi (1983): 507
Liar Liar (1997): 485
English Patient, The (1996): 481
Scream (1996): 478
Toy Story (1995): 452
Air Force One (1997): 431
Independence Day (ID4) (1996): 429


In [18]:
# Stop the session
spark.stop()