In [32]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, StringType,IntegerType,FloatType
import codecs

# Create a SparkSession
spark = SparkSession.builder.appName("Top_movies").getOrCreate()

# Load movie data

In [33]:
schema = StructType([ \
        StructField("user_id",IntegerType(),False), \
        StructField("movie_id",IntegerType(),False), \
        StructField("rating",IntegerType(),False), \
        StructField("timestamp",FloatType(),True) ])
df = spark.read.option("sep","\t").schema(schema).csv("../../data/ml-100k/u.data").select("movie_id")
df.show()

+--------+
|movie_id|
+--------+
|     242|
|     302|
|     377|
|      51|
|     346|
|     474|
|     265|
|     465|
|     451|
|      86|
|     257|
|    1014|
|     222|
|      40|
|      29|
|     785|
|     387|
|     274|
|    1042|
|    1184|
+--------+
only showing top 20 rows



# Load movie names as broadcast variables

In [36]:
def loadMovieNames():
    movieNames = {}
    with codecs.open("../../data/ml-100k/u.item","r",encoding="ISO-8859-1",errors="ignore") as f:
        for line in f:
            fields = line.split("|")
            movieNames[int(fields[0])] = fields[1]
    return movieNames

nameDict = spark.sparkContext.broadcast(loadMovieNames())

def lookUpName(movieID):
    return nameDict.value[movieID]
lookUpNameUDF= func.udf(lookUpName)

# Find top 10 movies

In [43]:
top10Movies = df.groupBy("movie_id").count().orderBy(func.desc("count"))
top10MoviesWithNames = top10Movies.withColumn("movie_title",lookUpNameUDF(func.col("movie_id")))
top10MoviesWithNames.show(10)

+--------+-----+--------------------+
|movie_id|count|         movie_title|
+--------+-----+--------------------+
|      50|  583|    Star Wars (1977)|
|     258|  509|      Contact (1997)|
|     100|  508|        Fargo (1996)|
|     181|  507|Return of the Jed...|
|     294|  485|    Liar Liar (1997)|
|     286|  481|English Patient, ...|
|     288|  478|       Scream (1996)|
|       1|  452|    Toy Story (1995)|
|     300|  431|Air Force One (1997)|
|     121|  429|Independence Day ...|
+--------+-----+--------------------+
only showing top 10 rows



In [44]:
spark.stop()