In [32]:
from pyspark.sql import SparkSession, Row
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, StringType, DateType
import codecs


In [2]:
# SparkSession => entry to the DataFrame API
spark = SparkSession.builder.appName("Movies").getOrCreate()
spark


In [35]:
def data_loader():
    movieNames = {}
    
    # codecs => used for encoding the data (u.item)
    with codecs.open("ml-100k/u.item", "r", encoding="ISO-8859-1", errors="ignore") as f:
        for line in f:
            fields = line.split("|")
            movieNames[int(fields[0])] = fields[1]
    return movieNames


In [50]:
schema_ratings = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("movie_id", IntegerType(), True),
    StructField("rating", IntegerType(), True),
    StructField("timestamp", LongType(), True)])


In [51]:
df_ratings = spark.read.option("sep", "\t").schema(schema_ratings).csv("ml-100k/u.data")
df_ratings.printSchema()


root
 |-- user_id: integer (nullable = true)
 |-- movie_id: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- timestamp: long (nullable = true)



In [59]:
# we broadcast the output of data_loader() function
movie_names_dict = spark.sparkContext.broadcast(data_loader())


In [60]:
df_count = df_ratings.select("movie_id", "rating").groupBy("movie_id").count().orderBy("count", ascending=False)
df_count.show(5)


+--------+-----+
|movie_id|count|
+--------+-----+
|      50|  583|
|     258|  509|
|     100|  508|
|     181|  507|
|     294|  485|
+--------+-----+
only showing top 5 rows



In [61]:
# UDF that looks up movie names from our broadcaster dictionary
def look_up_movie(movie_id):
    return movie_names_dict.value[movie_id]


In [56]:
# register the UDF
look_up_movie_udf = func.udf(look_up_movie)


In [63]:
# create new columns by calling UDF on alrwady existing columns
df_result = df_count.withColumn("movie_title", look_up_movie_udf(df_count.movie_id))
df_result.show(5)

+--------+-----+--------------------+
|movie_id|count|         movie_title|
+--------+-----+--------------------+
|      50|  583|    Star Wars (1977)|
|     258|  509|      Contact (1997)|
|     100|  508|        Fargo (1996)|
|     181|  507|Return of the Jed...|
|     294|  485|    Liar Liar (1997)|
+--------+-----+--------------------+
only showing top 5 rows

