# Spark Movies Test Notebook

In [1]:
from pyspark.sql import SparkSession

from pyspark.sql.functions import udf, from_json, explode_outer
from pyspark.sql.functions import col, when, length, concat_ws
from pyspark.sql.functions import to_timestamp, month, year

from pyspark.sql.types import StructField, StructType
from pyspark.sql.types import ArrayType, StringType, IntegerType, BooleanType, FloatType, DateType

### Create Spark Session

In [2]:
spark = SparkSession.builder.appName("movie_analysis").getOrCreate()
sc = spark.sparkContext

### Read Data

In [3]:
df = spark.read.csv("../data/raw/movies.csv", sep=",", header=True, inferSchema=True)
df.count()

45572

In [4]:
df.printSchema()

root
 |-- adult: string (nullable = true)
 |-- belongs_to_collection: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- video: string (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- vote_count: string (nu

In [5]:
df = df.dropDuplicates()
df.count()

45555

## Index Analysis

### id

#### Rename - id -> movie_id

In [6]:
df = df.withColumnRenamed("id", "movie_id")
df.select("movie_id").groupby("movie_id").count().orderBy(col("count").desc()).show(5)

+--------------------+-----+
|            movie_id|count|
+--------------------+-----+
|                null|   31|
|[{'id': 35, 'name...|   15|
|                  []|    8|
|[{'id': 27, 'name...|    5|
|[{'id': 18, 'name...|    4|
+--------------------+-----+
only showing top 5 rows



#### Cast - Integer & Check NULL Values

In [7]:
df = df.withColumn("movie_id", col("movie_id").cast(IntegerType()))
df.select("movie_id").groupby("movie_id").count().orderBy(col("count").desc()).show(5)

+--------+-----+
|movie_id|count|
+--------+-----+
|    null|  209|
|       0|    6|
|  132641|    2|
|  109962|    2|
|   15028|    2|
+--------+-----+
only showing top 5 rows



#### Drop NULL & Duplicated Values

In [8]:
df = df.na.drop(subset=["movie_id"])
df = df.dropDuplicates(subset=["movie_id"])
df.select("movie_id").groupby("movie_id").count().orderBy(col("count").desc()).show(5)

+--------+-----+
|movie_id|count|
+--------+-----+
|    4519|    1|
|  103902|    1|
|   76493|    1|
|  125052|    1|
|    9465|    1|
+--------+-----+
only showing top 5 rows



In [9]:
df.count()

45328

### imdb_id

In [10]:
df.select("imdb_id").groupby("imdb_id").count().orderBy(col("count").desc()).show()

+---------+-----+
|  imdb_id|count|
+---------+-----+
|     null|   17|
|tt1445202|    1|
|tt0127288|    1|
|tt0111180|    1|
|tt0031448|    1|
|tt0119123|    1|
|tt0274932|    1|
|tt2053378|    1|
|tt3230082|    1|
|tt0331218|    1|
|tt1394259|    1|
|tt1568343|    1|
|tt0110116|    1|
|tt0120815|    1|
|tt2290065|    1|
|tt0016627|    1|
|tt0081573|    1|
|tt0069593|    1|
|tt0875113|    1|
|tt0378215|    1|
+---------+-----+
only showing top 20 rows



In [11]:
df.select('imdb_id').where(col('imdb_id').like("tt%")).count()

45309

#### Filter **like 'tt%'**

In [12]:
df = df.select('*').where(col('imdb_id').like("tt%"))
df = df.withColumn("imdb_id", col("imdb_id").cast(StringType()))
df.count()

45309

## Feature Analysis

#### adult - Cast Boolean

In [13]:
df = df.withColumn("adult", col("adult").cast(BooleanType()))

#### budget - Cast Integer

In [14]:
df = df.withColumn("budget", col("budget").cast(IntegerType()))

#### genres

##### Extract JSON

In [15]:
schema = ArrayType(
            StructType([
                StructField("id", StringType()), 
                StructField("name", StringType())
            ]))

df = df.withColumn("genres_array", from_json(df.genres, schema))

##### Create **genres** dataframe & **genre_ids** column

In [16]:
genres = df.select(["movie_id", "genres_array"])
df = df.withColumn("genre_ids", df.genres_array.id)
df = df.withColumn("genre_ids", concat_ws(",", col("genre_ids")))

### genres df

In [17]:
genres = genres.withColumn("genres_array", explode_outer("genres_array"))
genres = genres.select(["movie_id", "genres_array.*"])
genres = genres.withColumnRenamed("id", "genre_id")
genres.printSchema()

root
 |-- movie_id: integer (nullable = true)
 |-- genre_id: string (nullable = true)
 |-- name: string (nullable = true)



#### homepage - Filter **startswith("http://")) | (df.homepage.startswith("https://")**

In [18]:
fltr = (df.homepage.startswith("http://")) | (df.homepage.startswith("https://"))
df = df.withColumn("homepage", when(fltr, df.homepage).otherwise(None))

#### original_language - Filter by **string length == 2**

In [19]:
df.select("original_language").withColumn("str_len", length('original_language')) \
  .groupby("str_len").count().orderBy(col("count").desc()).show(5)

+-------+-----+
|str_len|count|
+-------+-----+
|      2|45298|
|   null|   11|
+-------+-----+



In [20]:
fltr = (length(df.original_language) == 2)
df = df.withColumn("original_language", when(fltr, df.original_language).otherwise(None))

#### popularity - Cast Float

In [21]:
df = df.withColumn("popularity", col("popularity").cast(FloatType()))

#### production_companies

##### Extract JSON

In [22]:
schema = ArrayType(
            StructType([
                StructField("id", StringType()), 
                StructField("name", StringType())
            ]))

df = df.withColumn("production_companies_array", from_json(df.production_companies, schema))

##### Create **production_companies** dataframe & **production_companies_ids** column

In [23]:
production_companies = df.select(["movie_id", "production_companies_array"])
df = df.withColumn("production_companies_ids", df.production_companies_array.id)
df = df.withColumn("production_companies_ids", concat_ws(",", col("production_companies_ids")))

### production_companies df

In [24]:
production_companies = production_companies.withColumn("production_companies_array", \
                       explode_outer("production_companies_array"))

production_companies = production_companies.select(["movie_id", "production_companies_array.*"])
production_companies = production_companies.withColumnRenamed("id", "production_companies_id")
production_companies.printSchema()

root
 |-- movie_id: integer (nullable = true)
 |-- production_companies_id: string (nullable = true)
 |-- name: string (nullable = true)



#### production_countries

##### Extract JSON

In [25]:
schema = ArrayType(
            StructType([
                StructField("iso_3166_1", StringType()), 
                StructField("name", StringType())
            ]))

df = df.withColumn("production_countries_array", from_json(df.production_countries, schema))

##### Create **production_countries** dataframe & **production_countries_abbvs** column

In [26]:
production_countries = df.select(["movie_id", "production_countries_array"])
df = df.withColumn("production_countries_abbvs", df.production_countries_array.iso_3166_1)
df = df.withColumn("production_countries_abbvs", concat_ws(",", col("production_countries_abbvs")))

### production_countries df

In [27]:
production_countries = production_countries.withColumn("production_countries_array", \
                       explode_outer("production_countries_array"))

production_countries = production_countries.select(["movie_id", "production_countries_array.*"])
production_countries = production_countries.withColumnRenamed("iso_3166_1", "production_countries_abbv")
production_countries.printSchema()

root
 |-- movie_id: integer (nullable = true)
 |-- production_countries_abbv: string (nullable = true)
 |-- name: string (nullable = true)



#### release_date - Timestamp & Cast Date

In [28]:
df = df.withColumn("release_date", to_timestamp(df.release_date, 'yyyy-MM-dd'))
df = df.withColumn("release_date", col("release_date").cast(DateType()))

#### revenue - Cast Integer

In [29]:
df = df.withColumn("revenue", col("runtime").cast(IntegerType()))

#### runtime - Cast Integer

In [30]:
df = df.withColumn("runtime", col("runtime").cast(IntegerType()))

#### spoken_languages

##### Extract JSON

In [31]:
schema = ArrayType(
            StructType([
                StructField("iso_639_1", StringType()), 
                StructField("name", StringType())
            ]))

df = df.withColumn("spoken_languages_array", from_json(df.spoken_languages, schema))

##### Create **spoken_languages** dataframe & **spoken_languages_abbvs** column

In [32]:
spoken_languages = df.select(["movie_id", "spoken_languages_array"])
df = df.withColumn("spoken_languages_abbvs", df.spoken_languages_array.iso_639_1)
df = df.withColumn("spoken_languages_abbvs", concat_ws(",", col("spoken_languages_abbvs")))

### spoken_languages df

In [33]:
spoken_languages = spoken_languages.withColumn("spoken_languages_array", \
                   explode_outer("spoken_languages_array"))

spoken_languages = spoken_languages.select(["movie_id", "spoken_languages_array.*"])
spoken_languages = spoken_languages.withColumnRenamed("iso_639_1", "spoken_languages_abbv")
spoken_languages.printSchema()

root
 |-- movie_id: integer (nullable = true)
 |-- spoken_languages_abbv: string (nullable = true)
 |-- name: string (nullable = true)



#### status - Filter **== ("Released")) | == ("Rumored")**

In [34]:
df = df.withColumn("status", \
     when(((df.status == "Released") | (df.status == "Rumored")), df.status).otherwise(None))

#### video - Filter **== ("Released")) | == ("True")** & Cast Boolean

In [35]:
df = df.withColumn("video", \
              when(((df.video == "Released") | (df.video == "True")), True) \
              .otherwise(False))

df = df.withColumn("video", col("video").cast(BooleanType()))

#### vote_average - Cast Float

In [36]:
df = df.withColumn("vote_average", col("vote_average").cast(FloatType()))

#### vote_count - Cast Integer

In [37]:
df = df.withColumn("vote_count", col("vote_count").cast(IntegerType()))

#### release_month & release_year

In [38]:
df = df.withColumn("release_month", month("release_date"))
df = df.withColumn("release_year", year("release_date"))

## DataFrame Check

### Movies

In [40]:
df.limit(2).collect()

[Row(adult=False, belongs_to_collection=None, budget=5000000, genres="[{'id': 18, 'name': 'Drama'}, {'id': 10749, 'name': 'Romance'}]", homepage=None, movie_id=148, imdb_id='tt0430576', original_language='en', original_title='The Secret Life of Words', overview='A touching story of a deaf girl who is sent to an oil rig to take care of a man who has been blinded in a terrible accident. The girl has a special ability to communicate with the men on board and especially with her patient as they share intimate moments together that will change their lives forever.', popularity=12.775583267211914, poster_path='/rlJWRiW74PAIrozd2d6X7e61Rq9.jpg', production_companies="[{'name': 'Hotshot Films', 'id': 78}]", production_countries="[{'iso_3166_1': 'ES', 'name': 'Spain'}]", release_date=datetime.date(2005, 12, 15), revenue=112, runtime=112, spoken_languages="[{'iso_639_1': 'en', 'name': 'English'}, {'iso_639_1': 'fr', 'name': 'Français'}, {'iso_639_1': 'es', 'name': 'Español'}]", status='Released'

In [41]:
df.printSchema()

root
 |-- adult: boolean (nullable = true)
 |-- belongs_to_collection: string (nullable = true)
 |-- budget: integer (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- movie_id: integer (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: float (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: date (nullable = true)
 |-- revenue: integer (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- video: boolean (nullable = false)
 |-- vote_average: float (nullable = true)
 |-- vote_count: i

In [42]:
drop_cols = ["genres_array", 
             "production_companies_array", 
             "production_countries_array", 
             "spoken_languages_array"]

df = df.drop(*drop_cols)

In [43]:
df

DataFrame[adult: boolean, belongs_to_collection: string, budget: int, genres: string, homepage: string, movie_id: int, imdb_id: string, original_language: string, original_title: string, overview: string, popularity: float, poster_path: string, production_companies: string, production_countries: string, release_date: date, revenue: int, runtime: int, spoken_languages: string, status: string, tagline: string, title: string, video: boolean, vote_average: float, vote_count: int, genre_ids: string, production_companies_ids: string, production_countries_abbvs: string, spoken_languages_abbvs: string, release_month: int, release_year: int]

In [45]:
df.write.partitionBy("release_year", "release_month").parquet('../data/processed/movies/')