In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import  StructType,StructField,StringType,IntegerType,FloatType , LongType
from pyspark.sql import functions as func

In [2]:
spark= SparkSession.builder.appName("Movies-Rating").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better

In [3]:
schema = StructType([StructField("user_id",IntegerType(),True),StructField("Movie_id",IntegerType(),True),StructField("rating",IntegerType(),True),StructField("time",LongType(),True)])

txt=spark.read.option("sep","\t").schema(schema).csv(r"C:\Spark Course\Rating Movies\ml-100k\ml-100k\u.data")

In [4]:
ratings=txt.groupBy("Movie_id").count().sort(func.desc("count"))
ratings.show()


+--------+-----+
|Movie_id|count|
+--------+-----+
|      50|  583|
|     258|  509|
|     100|  508|
|     181|  507|
|     294|  485|
|     286|  481|
|     288|  478|
|       1|  452|
|     300|  431|
|     121|  429|
|     174|  420|
|     127|  413|
|      56|  394|
|       7|  392|
|      98|  390|
|     237|  384|
|     117|  378|
|     172|  367|
|     222|  365|
|     204|  350|
+--------+-----+
only showing top 20 rows



In [5]:
import codecs
def dict_func():
    dicty={}
    with open(r"C:\Spark Course\Rating Movies\ml-100k\ml-100k\u.item","r",encoding='ISO-8859-1',errors="ignore") as f:
        for i in f:
            fields=i.split("|")
            dicty[int(fields[0])]=fields[1]
    return dicty

In [6]:
broodcast=spark.sparkContext.broadcast(dict_func())


In [7]:
def lookup(movie_id):
    return broodcast.value[movie_id]



In [8]:
Lookup_udf =func.udf(lookup)

final=ratings.withColumn("film_name",Lookup_udf(func.col("Movie_id")))
final.show(10,False)


+--------+-----+-----------------------------+
|Movie_id|count|film_name                    |
+--------+-----+-----------------------------+
|50      |583  |Star Wars (1977)             |
|258     |509  |Contact (1997)               |
|100     |508  |Fargo (1996)                 |
|181     |507  |Return of the Jedi (1983)    |
|294     |485  |Liar Liar (1997)             |
|286     |481  |English Patient, The (1996)  |
|288     |478  |Scream (1996)                |
|1       |452  |Toy Story (1995)             |
|300     |431  |Air Force One (1997)         |
|121     |429  |Independence Day (ID4) (1996)|
+--------+-----+-----------------------------+
only showing top 10 rows



In [9]:
spark.stop()