### Initialization libraries

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as fnc
from pyspark.sql.types import StructType, IntegerType, StringType, FloatType, StructField, LongType
import codecs

### Deploy function loading name of movies

In [None]:
def LoadingMovies():
    NamesOfMovies = {}
    with codecs.open("folders_path", 'r', encoding='ISO-8859-1', errors='ignore') as f:
        for line in f:
            fields = line.split('|')
            NamesOfMovies[int(fields[0])] = fields[1]
    return NamesOfMovies

* **Purpose**: This function reads a file containing movie information and creates a dictionary where the keys are movies IDs and the values are movie names.

### Create Spark Session

In [5]:
spark = SparkSession.builder.appName("PopularMoviesName").getOrCreate()

In [None]:
nameDict = spark.sparkContext.broadcast(LoadingMovies())

In Apche Spark, the `broadcast` method is used to efficiently distribute a large rea-only variable to all worker nodes in a cluster. This is particularly useful when you have a variable or data that needs to be accessed frequently across many tasks, as it avoids the need to repeatedly send the same data over the network.


In this situation it helps:
* **Avoid Redundant Data Transfer**: If each task needs to access the movie names dictionary, broadcasting ensures that the dictionary is sent to each node only once, rather than sending it multiple times.
* **Reduced Network Overhead**: By broadcastingm you minimize the amount of data transferred over the network, as each node gets a local copy of the broadcast variable.

### Create schema when reading file u.data

In [6]:
schema = StructType([\
                     StructField("userID", IntegerType(), True), \
                     StructField("movieID", IntegerType(), True), \
                     StructField("rating", IntegerType(), True), \
                     StructField("timestamp", LongType(), True)])

In [None]:
movie_df = spark.read.option("sep", "\t").schema(schema).csv("csv_folder_path")

movie_count = movie_df.groupBy("movieID").count()

### USer-Defined function (UDF) for lookup

In [None]:
def LookUpName(movieID):
    return nameDict.value[movieID]

lookupNameUDF = func.udf(lookupName)

* **Purpose**: Defines a UDF to lookup movie names from the broadcasted dictionary
* **Details**: 
  * `LookUpName(movieID)`: Retrieves the movie name for the given `movieID` from `nameDict`
  * `func.udf(lookUpName)`: Registers `lookUpName` as a UDF so it can be used in DataFrame operation

In [None]:
# Add a movieTitle column using our new udf
moviesWithNames = movieCounts.withColumn("movieTitle", lookupNameUDF(func.col("movieID")))

# Sort the results
sortedMoviesWithNames = moviesWithNames.orderBy(func.desc("count"))

# Grab the top 10
sortedMoviesWithNames.show(10, False)

# Stop the session
spark.stop()