In [1]:
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col,json_tuple,expr,split,regexp_replace,array_join,concat_ws,explode,count,max,first
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, IntegerType,DateType,FloatType,DecimalType

In [None]:
# Create a Spark session
spark = SparkSession.builder.appName("MovieAnalysis").getOrCreate()

In [2]:
file_path = 'hdfs://localhost:8020/spark_hadoop_project/data/tmdb_5000_movies.csv'
popular_film = 'hdfs://localhost:8020/spark_hadoop_project/data/popular_film_per_lan.csv'
Genres_Agggregations = 'hdfs://localhost:8020/spark_hadoop_project/data/Genres_Agggregations.csv'

In [3]:
# Define the schema for parsing CSV data
df_schema = StructType([StructField('budget', DecimalType(18, 2), True),
                        StructField('genres', StringType(), True),
                        StructField('homepage', StringType(), True),
                        StructField('id', IntegerType(), True),
                        StructField('keywords', StringType(), True),
                        StructField('original_language', StringType(),True),
                        StructField('original_title', StringType(), True),
                        StructField('overview', StringType(), True),
                        StructField('popularity', FloatType(), True),
                        StructField('production_companies', StringType(), True),
                        StructField('production_countries', StringType(), True),
                        StructField('release_date', DateType(), True),
                        StructField('revenue', DecimalType(18, 2), True),
                        StructField('runtime', FloatType(), True),
                        StructField('spoken_languages', StringType(), True),
                        StructField('status', StringType(), True),
                        StructField('tagline', StringType(), True),
                        StructField('title', StringType(), True),
                        StructField('vote_average', FloatType(), True),
                        StructField('vote_count', FloatType(), True)])


In [4]:
# Read data from HDFS
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("multiLine", "true") \
    .option("quote", '"') \
    .option("escape", '"') \
    .schema(df_schema) \
    .load(file_path)

In [5]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
df.show(5,truncate=False)

+------------+--------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------+------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------+-----------------

In [6]:
df.printSchema()

root
 |-- budget: decimal(18,2) (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- keywords: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: float (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: date (nullable = true)
 |-- revenue: decimal(18,2) (nullable = true)
 |-- runtime: float (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- vote_average: float (nullable = true)
 |-- vote_count: float (nullable = true)



In [7]:
# Define the schema for parsing JSON data
j_schema = ArrayType(StructType([
    StructField("id", IntegerType()),
    StructField("name", StringType())
]))

In [8]:
j_columns = ["keywords","production_companies","production_countries","spoken_languages"]

In [9]:
def parse_and_extract_name(df, columns):
    for old_col in columns:
        # Parse JSON data and extract "name" field
        df = df.withColumn("parsed_data", from_json(col(old_col), j_schema)) \
               .withColumn(old_col, expr("transform(parsed_data, x -> x.name)")) \
               .drop("parsed_data")
    return df

In [10]:
def write_to_hdfs(df,path):
    df.write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(path)

In [14]:
if __name__ == "__main__":
    parsed_df = parse_and_extract_name(df, j_columns)
    parsed_df.cache()
    Genres_Agggregations_df = parsed_df.withColumn("parsed_data", from_json(col("genres"), j_schema))\
                                       .withColumn("genres", explode("parsed_data")) \
                                       .withColumn("genre_id", col("genres.id")) \
                                       .withColumn("genre_name", col("genres.name")) \
                                       .groupby("genre_id","genre_name").agg(count("*").alias("num_movies")) \
                                       .drop("genres")
    write_to_hdfs(Genres_Agggregations_df,Genres_Agggregations)
    most_popular_per_language_df  = parsed_df.groupby("original_language") \
                                             .agg(max("popularity").alias("popularity"), \
                                                  first("title").alias("most_popular_movie"))
    write_to_hdfs(most_popular_per_language_df,popular_film)
    parsed_df.unpersist()

         
    

24/05/02 06:07:27 WARN CacheManager: Asked to cache already cached data.
