In [1]:
# After we examine this file, we will use these functions in transformation.py file.

In [2]:
from pyspark.sql import SparkSession, functions as F
import findspark

In [3]:
findspark.init("/opt/manual/spark")

In [4]:
from pyspark.sql.types import *

In [5]:
# Create spark session.
spark = (SparkSession.builder
         .appName("prestudy")
         .config("spark.hadoop.fs.s3a.access.key", "root")
         .config("spark.hadoop.fs.s3a.secret.key", "root12345")
         .config("spark.hadoop.fs.s3a.path.style.access", "true")
         .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
         .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000")
         .getOrCreate())

2022-11-27 17:07:25,568 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### CREDITS - RAW

In [6]:
# Read credits raw data.
df_credits = spark.read.format("parquet").load("s3a://tmdb-bronze/credits/credits_part_20221124-130321.parquet")

                                                                                

In [7]:
df_credits.show()

[Stage 1:>                                                          (0 + 1) / 1]

+--------+--------------------+--------------------+--------------------+--------------------+
|movie_id|               title|                cast|                crew|          event_time|
+--------+--------------------+--------------------+--------------------+--------------------+
|   19995|              Avatar|[{"cast_id": 242,...|[{"credit_id": "5...|2022-11-24 16:03:...|
|     285|Pirates of the Ca...|[{"cast_id": 4, "...|[{"credit_id": "5...|2022-11-24 16:03:...|
|  206647|             Spectre|[{"cast_id": 1, "...|[{"credit_id": "5...|2022-11-24 16:03:...|
|   49026|The Dark Knight R...|[{"cast_id": 2, "...|[{"credit_id": "5...|2022-11-24 16:03:...|
|   49529|         John Carter|[{"cast_id": 5, "...|[{"credit_id": "5...|2022-11-24 16:03:...|
|     559|        Spider-Man 3|[{"cast_id": 30, ...|[{"credit_id": "5...|2022-11-24 16:03:...|
|   38757|             Tangled|[{"cast_id": 34, ...|[{"credit_id": "5...|2022-11-24 16:03:...|
|   99861|Avengers: Age of ...|[{"cast_id": 76, ..

                                                                                

In [8]:
df_credits.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- crew: string (nullable = true)
 |-- event_time: timestamp (nullable = true)



### MOVIES - RAW

In [9]:
# Read movies raw data.
df_movies = spark.read.format("parquet").load("s3a://tmdb-bronze/movies/movies_part_20221124-130454.parquet")

In [10]:
df_movies.show(5)

+---------+--------------------+--------------------+------+--------------------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+------------+----------+-------+--------------------+--------+--------------------+--------------------+------------+----------+--------------------+
|   budget|              genres|            homepage|    id|            keywords|original_language|      original_title|            overview|popularity|production_companies|production_countries|release_date|   revenue|runtime|    spoken_languages|  status|             tagline|               title|vote_average|vote_count|          event_time|
+---------+--------------------+--------------------+------+--------------------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+------------+----------+-------+--------------------+--------+--------------------+--------------------+------------+-

In [11]:
df_movies.printSchema()

root
 |-- budget: long (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: long (nullable = true)
 |-- keywords: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: long (nullable = true)
 |-- runtime: double (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- vote_average: double (nullable = true)
 |-- vote_count: long (nullable = true)
 |-- event_time: timestamp (nullable = true)



#### CAST & CREW

In [12]:
# Create appropriate schema for cast column to apply from_json function.
schema_cast = (
    ArrayType(
        StructType([
            StructField("cast_id", IntegerType()),
            StructField("character", StringType()),
            StructField("credit_id", StringType()),
            StructField("gender", IntegerType()),
            StructField("id", IntegerType()),
            StructField("name", StringType()),
            StructField("order", IntegerType())
                  ])
            )
         )

In [13]:
# Create appropriate schema for crew column to apply from_json function.
schema_crew = (ArrayType(
        StructType([
            StructField("credit_id", StringType()),
            StructField("department", StringType()),
            StructField("gender", IntegerType()),
            StructField("id", IntegerType()),
            StructField("job", StringType()),
            StructField("name", StringType())
                  ])
            )
         )

In [14]:
# Create new credits datafame using from_json function to organize nested data.
df_credits_nested= df_credits.withColumn("cast", F.from_json(df_credits.cast, schema_cast)).withColumn("crew", F.from_json(df_credits.crew, schema_crew))

In [15]:
df_credits_nested.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- cast: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- cast_id: integer (nullable = true)
 |    |    |-- character: string (nullable = true)
 |    |    |-- credit_id: string (nullable = true)
 |    |    |-- gender: integer (nullable = true)
 |    |    |-- id: integer (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- order: integer (nullable = true)
 |-- crew: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- credit_id: string (nullable = true)
 |    |    |-- department: string (nullable = true)
 |    |    |-- gender: integer (nullable = true)
 |    |    |-- id: integer (nullable = true)
 |    |    |-- job: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- event_time: timestamp (nullable = true)



In [16]:
# Select necessary column and apply explode_outer function to create new raws for each element in the array. If the array (or map) is null or empty, explode_outer returns null. 
cast_df = df_credits_nested.select("movie_id", "title", F.explode_outer("cast")).select("movie_id","title", "col.*").drop("order")

In [17]:
cast_df.show()

+--------+------+-------+--------------------+--------------------+------+-------+-------------------+
|movie_id| title|cast_id|           character|           credit_id|gender|     id|               name|
+--------+------+-------+--------------------+--------------------+------+-------+-------------------+
|   19995|Avatar|    242|          Jake Sully|5602a8a7c3a368553...|     2|  65731|    Sam Worthington|
|   19995|Avatar|      3|             Neytiri|52fe48009251416c7...|     1|   8691|        Zoe Saldana|
|   19995|Avatar|     25| Dr. Grace Augustine|52fe48009251416c7...|     1|  10205|   Sigourney Weaver|
|   19995|Avatar|      4|       Col. Quaritch|52fe48009251416c7...|     2|  32747|       Stephen Lang|
|   19995|Avatar|      5|        Trudy Chacon|52fe48009251416c7...|     1|  17647| Michelle Rodriguez|
|   19995|Avatar|      8|           Selfridge|52fe48009251416c7...|     2|   1771|    Giovanni Ribisi|
|   19995|Avatar|      7|       Norm Spellman|52fe48009251416c7...|     2

In [18]:
# Select necessary column and apply explode_outer function to create new raws for each element in the array. If the array (or map) is null or empty, explode_outer returns null. 
crew_df = df_credits_nested.select("movie_id", "title", F.explode_outer("crew")).select("movie_id", "title", "col.*")

In [19]:
crew_df.show(5)

+--------+------+--------------------+----------+------+----+--------------------+-----------------+
|movie_id| title|           credit_id|department|gender|  id|                 job|             name|
+--------+------+--------------------+----------+------+----+--------------------+-----------------+
|   19995|Avatar|52fe48009251416c7...|   Editing|     0|1721|              Editor|Stephen E. Rivkin|
|   19995|Avatar|539c47ecc3a36810e...|       Art|     2| 496|   Production Design|      Rick Carter|
|   19995|Avatar|54491c89c3a3680fb...|     Sound|     0| 900|      Sound Designer|Christopher Boyes|
|   19995|Avatar|54491cb70e0a26748...|     Sound|     0| 900|Supervising Sound...|Christopher Boyes|
|   19995|Avatar|539c4a4cc3a36810c...|Production|     1|1262|             Casting|        Mali Finn|
+--------+------+--------------------+----------+------+----+--------------------+-----------------+
only showing top 5 rows



In [20]:
# Fill null values as mentioned in expected_output.ipynb file.

In [21]:
cast_df = cast_df.na.fill("0000000000",["credit_id"])

In [22]:
crew_df = crew_df.na.fill("0000000000",["credit_id"])

#### MOVIES

In [23]:
# Select necessary columns for expected output.
df_movies_selected = df_movies.select("id","title", "budget", "homepage", "original_language", "original_title", "overview", "popularity", "release_date", "revenue", "runtime", "status", \
                 "tagline", "vote_average", "vote_count").withColumnRenamed("id", "movie_id")

In [24]:
df_movies_selected.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- budget: long (nullable = true)
 |-- homepage: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: long (nullable = true)
 |-- runtime: double (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- vote_average: double (nullable = true)
 |-- vote_count: long (nullable = true)



In [25]:
# Change the data types using cast() function.
movies_df = df_movies_selected.withColumn("movie_id", F.col("movie_id").cast(StringType())) \
.withColumn("budget", F.col("budget").cast(DoubleType())) \
.withColumn("popularity", F.col("budget").cast(FloatType())) \
.withColumn("release_date", F.to_date("release_date", "yyyy-MM-dd")) \
.withColumn("revenue", F.col("budget").cast(DoubleType())) \
.withColumn("runtime", F.col("budget").cast(IntegerType())) \
.withColumn("vote_average", F.col("vote_average").cast(FloatType())) \
.withColumn("vote_count", F.col("vote_count").cast(IntegerType()))

In [26]:
movies_df.show(5)

+--------+--------------------+------+--------------------+-----------------+--------------------+--------------------+----------+------------+-------+---------+--------+--------------------+------------+----------+
|movie_id|               title|budget|            homepage|original_language|      original_title|            overview|popularity|release_date|revenue|  runtime|  status|             tagline|vote_average|vote_count|
+--------+--------------------+------+--------------------+-----------------+--------------------+--------------------+----------+------------+-------+---------+--------+--------------------+------------+----------+
|   19995|              Avatar|2.37E8|http://www.avatar...|               en|              Avatar|In the 22nd centu...|    2.37E8|  2009-12-10| 2.37E8|237000000|Released|Enter the World o...|         7.2|     11800|
|     285|Pirates of the Ca...| 3.0E8|http://disney.go....|               en|Pirates of the Ca...|Captain Barbossa,...|     3.0E8|  2007

In [27]:
movies_df.printSchema()

root
 |-- movie_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- budget: double (nullable = true)
 |-- homepage: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: float (nullable = true)
 |-- release_date: date (nullable = true)
 |-- revenue: double (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- vote_average: float (nullable = true)
 |-- vote_count: integer (nullable = true)



#### GENRES, KEYWORDS, PRODUCTION COMPANY

In [28]:
# This part is similar to previous steps.
# Before starting you can check the "DATATYPE CONTROL" section.

In [29]:
schema = (
    ArrayType(
        StructType([
            StructField("id", IntegerType()),
            StructField("name", StringType()),
                  ])
            )
         )

In [30]:
for col in ["genres", "keywords", "production_companies"]:
    print(col)
    gkp_df_raw =df_movies.select("id",col).withColumnRenamed("id", "movie_id")
    gkp_df_raw = gkp_df_raw.withColumn(col, F.from_json(col, schema))
    gkp_df = gkp_df_raw.select("movie_id", F.explode_outer(col)).select("movie_id","col.id","col.name").withColumn("movie_id", F.col("movie_id").cast(StringType()))
    gkp_df = gkp_df.na.fill(value=-9999,subset=["id"])
    if col == "genres":
        genres = gkp_df
    elif col == "keywords":
        keywords = gkp_df
    else :
        prod_comp = gkp_df

genres
keywords
production_companies


In [31]:
genres.show(5)

+--------+---+---------------+
|movie_id| id|           name|
+--------+---+---------------+
|   19995| 28|         Action|
|   19995| 12|      Adventure|
|   19995| 14|        Fantasy|
|   19995|878|Science Fiction|
|     285| 12|      Adventure|
+--------+---+---------------+
only showing top 5 rows



In [32]:
keywords.show(5)

+--------+----+-------------+
|movie_id|  id|         name|
+--------+----+-------------+
|   19995|1463|culture clash|
|   19995|2964|       future|
|   19995|3386|    space war|
|   19995|3388| space colony|
|   19995|3679|      society|
+--------+----+-------------+
only showing top 5 rows



In [33]:
prod_comp.show(5)

+--------+---+--------------------+
|movie_id| id|                name|
+--------+---+--------------------+
|   19995|289|Ingenious Film Pa...|
|   19995|306|Twentieth Century...|
|   19995|444|  Dune Entertainment|
|   19995|574|Lightstorm Entert...|
|     285|  2|Walt Disney Pictures|
+--------+---+--------------------+
only showing top 5 rows



#### PRODUCTION COUNTRIES, SPOKEN LANGUAGES

In [34]:
for col in ["production_countries", "spoken_languages"]:
    ps_df_raw = df_movies.select("id",col).withColumnRenamed("id", "movie_id") 
    if col == "production_countries" :
        sub_col = "iso_3166_1"
    else :
        sub_col = "iso_639_1"
    schema = (
    ArrayType(
        StructType([
            StructField(sub_col, StringType()),
            StructField("name", StringType()),
                  ])
            )
         )
    ps_df_raw = ps_df_raw.withColumn(col, F.from_json(col, schema))
    ps_df = ps_df_raw.select("movie_id", F.explode_outer(col)).select("movie_id",f"col.{sub_col}","col.name").withColumn("movie_id", F.col("movie_id").cast(StringType()))
    ps_df = ps_df.na.fill("XX",[sub_col])
    if col == "production_countries":
        prod_coun = ps_df
    else :
        spo_lang = ps_df

In [35]:
prod_coun.show(5)

+--------+----------+--------------------+
|movie_id|iso_3166_1|                name|
+--------+----------+--------------------+
|   19995|        US|United States of ...|
|   19995|        GB|      United Kingdom|
|     285|        US|United States of ...|
|  206647|        GB|      United Kingdom|
|  206647|        US|United States of ...|
+--------+----------+--------------------+
only showing top 5 rows



In [36]:
spo_lang.show(5)

+--------+---------+--------+
|movie_id|iso_639_1|    name|
+--------+---------+--------+
|   19995|       en| English|
|   19995|       es| Español|
|     285|       en| English|
|  206647|       fr|Français|
|  206647|       en| English|
+--------+---------+--------+
only showing top 5 rows



#### DATATYPE CONTROL

In [37]:
genres_df_raw = df_movies.select("id","genres").withColumnRenamed("id", "movie_id")

In [38]:
genres_df_raw.collect()[0]

Row(movie_id=19995, genres='[{"id": 28, "name": "Action"}, {"id": 12, "name": "Adventure"}, {"id": 14, "name": "Fantasy"}, {"id": 878, "name": "Science Fiction"}]')

In [39]:
keywords_df_raw = df_movies.select("id","keywords").withColumnRenamed("id", "movie_id")

In [40]:
keywords_df_raw.collect()[0]

Row(movie_id=19995, keywords='[{"id": 1463, "name": "culture clash"}, {"id": 2964, "name": "future"}, {"id": 3386, "name": "space war"}, {"id": 3388, "name": "space colony"}, {"id": 3679, "name": "society"}, {"id": 3801, "name": "space travel"}, {"id": 9685, "name": "futuristic"}, {"id": 9840, "name": "romance"}, {"id": 9882, "name": "space"}, {"id": 9951, "name": "alien"}, {"id": 10148, "name": "tribe"}, {"id": 10158, "name": "alien planet"}, {"id": 10987, "name": "cgi"}, {"id": 11399, "name": "marine"}, {"id": 13065, "name": "soldier"}, {"id": 14643, "name": "battle"}, {"id": 14720, "name": "love affair"}, {"id": 165431, "name": "anti war"}, {"id": 193554, "name": "power relations"}, {"id": 206690, "name": "mind and soul"}, {"id": 209714, "name": "3d"}]')

In [41]:
pro_df_raw = df_movies.select("id","production_companies").withColumnRenamed("id", "movie_id")

In [42]:
pro_df_raw.collect()[0]

Row(movie_id=19995, production_companies='[{"name": "Ingenious Film Partners", "id": 289}, {"name": "Twentieth Century Fox Film Corporation", "id": 306}, {"name": "Dune Entertainment", "id": 444}, {"name": "Lightstorm Entertainment", "id": 574}]')

In [43]:
pro_count_df_raw = df_movies.select("id","production_countries").withColumnRenamed("id", "movie_id")

In [44]:
pro_count_df_raw .collect()[0]

Row(movie_id=19995, production_countries='[{"iso_3166_1": "US", "name": "United States of America"}, {"iso_3166_1": "GB", "name": "United Kingdom"}]')

In [45]:
spo_lan_df_raw = df_movies.select("id","spoken_languages").withColumnRenamed("id", "movie_id")

In [46]:
spo_lan_df_raw .collect()[0]

Row(movie_id=19995, spoken_languages='[{"iso_639_1": "en", "name": "English"}, {"iso_639_1": "es", "name": "Espa\\u00f1ol"}]')

#### STOP

In [47]:
spark.stop()