In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func 
from pyspark.sql.types import StructType,StructField,IntegerType,LongType

In [4]:
spark = SparkSession.builder.appName("PopularMovies").getOrCreate()

In [5]:
schema = StructType([\
         StructField("userID",IntegerType(),True),\
         StructField("movieID",IntegerType(),True),\
         StructField("rating",IntegerType(),True),\
         StructField("timestamp",LongType(),True)
])

In [6]:
moviesDF = spark.read.option("sep","\t").schema(schema).csv("ml-100k/u.data")

In [7]:
moviesDF.show()

+------+-------+------+---------+
|userID|movieID|rating|timestamp|
+------+-------+------+---------+
|   196|    242|     3|881250949|
|   186|    302|     3|891717742|
|    22|    377|     1|878887116|
|   244|     51|     2|880606923|
|   166|    346|     1|886397596|
|   298|    474|     4|884182806|
|   115|    265|     2|881171488|
|   253|    465|     5|891628467|
|   305|    451|     3|886324817|
|     6|     86|     3|883603013|
|    62|    257|     2|879372434|
|   286|   1014|     5|879781125|
|   200|    222|     5|876042340|
|   210|     40|     3|891035994|
|   224|     29|     3|888104457|
|   303|    785|     3|879485318|
|   122|    387|     5|879270459|
|   194|    274|     2|879539794|
|   291|   1042|     4|874834944|
|   234|   1184|     2|892079237|
+------+-------+------+---------+
only showing top 20 rows



In [8]:
topmovieIDs = moviesDF.groupBy("movieID").count().orderBy(func.desc("count"))

In [9]:
topmovieIDs.show(10) #Grab top 10

+-------+-----+
|movieID|count|
+-------+-----+
|     50|  583|
|    258|  509|
|    100|  508|
|    181|  507|
|    294|  485|
|    286|  481|
|    288|  478|
|      1|  452|
|    300|  431|
|    121|  429|
+-------+-----+
only showing top 10 rows



In [20]:
#Now above cannot see Movie Names

To Display names not IDs (from u.file)

we could use a DataSet to map ID's to name - than Join it with our ratings dataset.

-above method comes with some unnecessary overhead.



We could just keep a dictionary loaded in a driver program:
   - Or we could let Spark automatically forward it to each executor when needed.
   - But in case table is massive we would want to send it once to each executer, and keep it there.

#### This is where Broadcast Variables comes in:

It broadcasts objects to the executors, such as they are always there whenever needed 

just use sc.broadcast() to ship off whatever you want

then use .value() to get the object back 

Use broadcast object however you want- map functions, UDF etc


In [10]:
# using codecs  to load up the u.file locally from our driver scripts before broadcasting it out

import codecs 

In [11]:
def loadMovieNames():
    movieNames = {}
   
    with codecs.open("ml-100k/u.ITEM", "r", encoding='ISO-8859-1', errors='ignore') as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

In [12]:
spark = SparkSession.builder.appName("PopularMovies").getOrCreate()

nameDict = spark.sparkContext.broadcast(loadMovieNames())

In [13]:
moviesDF = spark.read.option("sep","\t").schema(schema).csv("ml-100k/u.data")

In [14]:
movieCounts = moviesDF.groupBy("movieID").count()

In [15]:
def lookUp(movieID):
   return nameDict.value[movieID]

In [16]:
lookupudf = func.udf(lookUp)

In [17]:
moviesWithNames = movieCounts.withColumn("movieTitle",lookupudf(func.col("movieID")))


In [18]:
sortedmoviesWithNames = moviesWithNames.orderBy(func.desc('count'))

In [20]:
sortedmoviesWithNames.show(10,False)

+-------+-----+-----------------------------+
|movieID|count|movieTitle                   |
+-------+-----+-----------------------------+
|50     |583  |Star Wars (1977)             |
|258    |509  |Contact (1997)               |
|100    |508  |Fargo (1996)                 |
|181    |507  |Return of the Jedi (1983)    |
|294    |485  |Liar Liar (1997)             |
|286    |481  |English Patient, The (1996)  |
|288    |478  |Scream (1996)                |
|1      |452  |Toy Story (1995)             |
|300    |431  |Air Force One (1997)         |
|121    |429  |Independence Day (ID4) (1996)|
+-------+-----+-----------------------------+
only showing top 10 rows



In [21]:
spark.stop()