# Fahmi Abdulaziz - Qoala ETL Pipeline Assingment
This is development notebook used for ETL development and prototyping. For better reading experience please use Google Colab.

## Preparing Dependencies

In [1]:
!pip install -q pyspark kaggle

[K     |████████████████████████████████| 281.4 MB 29 kB/s 
[K     |████████████████████████████████| 198 kB 36.6 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


preparing kaggle credential

In [3]:
!mkdir -p ~/.kaggle
!cp /content/drive/MyDrive/kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

## Preparing Source File

### Download & Unzip tmdb Dataset
This section I download kaggle datasets by using kaggle library we already installed begore

In [4]:
!rm -rf ./movies ./series

In [5]:
%%bash --err null
kaggle datasets download edgartanaka1/tmdb-movies-and-series -p /content
unzip /content/tmdb-movies-and-series.zip

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



### Combining all the files into one JSON
To make it easier to work with, I combine all the files into single json file.

WARNING: these code takes long time to finish and there is big chance of breaking

In [6]:
from os import listdir
from os.path import isfile, join
import json

In [7]:
def get_files_in_dir(path: str):
    return [f for f in listdir(path) if isfile(join(path, f))]

In [8]:
def combine_all_file(dir: str, out: str):
    joined_file = open(out, "a+")
    file_paths = get_files_in_dir(dir)

    for file_path in file_paths:
        with open(join(dir, file_path)) as f:
            payload = f.read()
            joined_file.write(payload+'\n')

    joined_file.close()

In [9]:
for folder in ['series', 'movies']:
    dir = join("/content", folder, folder)
    out_file = join("/content", folder+"_joined.json")
    combine_all_file(dir, out_file)

## Environment Preparation
To handle high amount of data and compute-intensive wrangling, I decided to use Apache Spark for ETL process.

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
import pyspark.sql.functions as F

In [4]:
spark = SparkSession.builder.appName("tmdb") \
    .config("spark.driver.maxResultSize", "10g") \
    .getOrCreate()

## Extract
Load the combined file into Spark dataframe

In [5]:
df_movies = spark.read.json("/content/drive/MyDrive/datasets/tmdb/movies_joined.json")
df_series = spark.read.json("/content/drive/MyDrive/datasets/tmdb/series_joined.json")

In [13]:
df_movies.printSchema()

root
 |-- adult: boolean (nullable = true)
 |-- backdrop_path: string (nullable = true)
 |-- belongs_to_collection: struct (nullable = true)
 |    |-- backdrop_path: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- poster_path: string (nullable = true)
 |-- budget: long (nullable = true)
 |-- genres: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: long (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- production_companies: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |

In [14]:
df_series.printSchema()

root
 |-- backdrop_path: string (nullable = true)
 |-- created_by: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- credit_id: string (nullable = true)
 |    |    |-- gender: long (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- profile_path: string (nullable = true)
 |-- episode_run_time: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- first_air_date: string (nullable = true)
 |-- genres: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: long (nullable = true)
 |-- in_production: boolean (nullable = true)
 |-- languages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- last_air_date: string (nullable = true)
 |-- last_episode_to_air: struct (nullable = true)
 |    |-

## Exploration

### Array struct column type consistency check
The repetitive value in `Array - Struct` column type could create inconsistency between data. In this part we will try to check if the data in `Array - Struct` field is consistent

In [6]:
def explode_array_column(dataframe: DataFrame, column: str, new_column: str):
    """
    description: flatten rows of array to rows of element
    """
    return dataframe.select(F.col(column)) \
        .where(F.size(F.col(column)) > 0 ) \
        .withColumn(new_column, F.explode(F.col(column))) \
        .select(F.col(new_column)) \
        


def check_array_struct_uniqueness(dataframe: DataFrame, column: str, id_key: str):
    """
    description: check if struct from different row is consistent.
        if there is inconsistencies, it will print the id(s) of inconsistent row
    """
    res_table = explode_array_column(dataframe, column, "temp") \
        .select(F.col(f"temp.*")) \
        .distinct() \
        .groupBy(id_key).count() \
        .where(F.col("count") > 1)
    
    if res_table.count() > 0:
        print(f"Data in column {column} is inconsistent")
        res_table.show(10)

In [7]:
check_array_struct_uniqueness(df_movies, "production_companies", "id")
check_array_struct_uniqueness(df_movies, "genres", "id")
check_array_struct_uniqueness(df_movies, "spoken_languages", "iso_639_1")
check_array_struct_uniqueness(df_movies, "production_countries", "iso_3166_1")
check_array_struct_uniqueness(df_movies, "genres", "id")
check_array_struct_uniqueness(df_series, "genres", "id")
check_array_struct_uniqueness(df_series, "networks", "id")
check_array_struct_uniqueness(df_series, "production_companies", "id")

Data in column production_companies is inconsistent
+----+-----+
|  id|count|
+----+-----+
| 215|    2|
|6689|    2|
+----+-----+



The result above shows that column of `production_companies` has inconsistencies in two ids. The details are shown in the result below.

In [8]:
df_movies.select(F.explode("production_companies")).select("col.*").where((F.col("id") == 215)).show()
df_movies.select(F.explode("production_companies")).select("col.*").where((F.col("id") == 6689)).show()

# if we take a look at the below result, it shows that both companies has incosistencies in it's data
# for this project, I will just leave warning in pipeline

+---+--------------------+--------------------+--------------+
| id|           logo_path|                name|origin_country|
+---+--------------------+--------------------+--------------+
|215|/tQyeqkCj24krhY2W...|Double Feature Films|            US|
|215|/tQyeqkCj24krhY2W...|Double Feature Films|            US|
|215|/tQyeqkCj24krhY2W...|Double Feature Films|            US|
|215|/tQyeqkCj24krhY2W...|Double Feature Films|            US|
|215|/tQyeqkCj24krhY2W...|Double Feature Films|            US|
|215|/tQyeqkCj24krhY2W...|Double Feature Films|            US|
|215|/tQyeqkCj24krhY2W...|Double Feature Films|            US|
|215|/tQyeqkCj24krhY2W...|Double Feature Films|            US|
|215|/tQyeqkCj24krhY2W...|Double Feature Films|            US|
|215|/tQyeqkCj24krhY2W...|Double Feature Films|            US|
|215|/tQyeqkCj24krhY2W...|Double Feature Films|              |
|215|/tQyeqkCj24krhY2W...|Double Feature Films|            US|
|215|/tQyeqkCj24krhY2W...|Double Feature Films|        

### Common field
both series and movies dataset has one same column, `genres` and `production_companies` column. This column has `ArrayType` type consists `Struct`. Yet we are not sure wether those two field has the same `id` and `name`. This part will try to check if `genres` & `production_companies` on both datasets are the same.

#### Genre Field
Comparing `genre` field between `series` and `movies`

In [9]:
def explode_and_spread_column(df, column: str):
    return df.select(F.explode(F.col(column))).select(F.col("col.*"))


def rename_columns(df, current_cols, new_cols):
    if len(current_cols) != len(new_cols):
        raise Exception()

    for i in range(len(current_cols)):
        df = df.withColumnRenamed(current_cols[i], new_cols[i])

    return df

In [15]:
# Comparing genres between two datasets

df_series_gen = rename_columns(
        explode_and_spread_column(df_series, "genres"),
        ["id", "name"], ["series_id", "series_genre"]
    ).distinct()

df_movies_gen = rename_columns(
        explode_and_spread_column(df_movies, "genres"),
        ["id", "name"], ["movies_id", "movies_genre"]
    ).distinct()

genre_comparison = df_series_gen \
    .join(df_movies_gen, df_series_gen.series_id == df_movies_gen.movies_id, "full")
genre_comparison.show(5)

# from the result below it seems that the id and genres between two table are the same

+---------+------------+---------+------------+
|series_id|series_genre|movies_id|movies_genre|
+---------+------------+---------+------------+
|       12|   Adventure|       12|   Adventure|
|       14|     Fantasy|       14|     Fantasy|
|       16|   Animation|       16|   Animation|
|       18|       Drama|       18|       Drama|
|       22|     Musical|     null|        null|
+---------+------------+---------+------------+
only showing top 5 rows



In [16]:
# decide to check futher by looking for inequality between series_genre and movies_genre
genre_comparison.where(F.col("movies_id").isNotNull() & F.col("series_id").isNotNull() & (F.col("movies_genre") != F.col("series_genre"))).show()

+---------+------------+---------+------------+
|series_id|series_genre|movies_id|movies_genre|
+---------+------------+---------+------------+
+---------+------------+---------+------------+



The result above shows that there is no differencese between `genre` in `series` and `movies` dataset. So we decide to make dimension table of `genre` to be used both in `series` and `movies`

In [17]:
dwh_genres = genre_comparison \
    .withColumn("id", F.when(F.col("series_id").isNull(), F.col("movies_id")).otherwise(F.col("series_id"))) \
    .withColumn("genre", F.when(F.col("series_id").isNull(), F.col("movies_genre")).otherwise(F.col("series_genre"))) \
    .select(["id", "genre"])

dwh_genres.show(5)

# the below table will be used for dimension table

+---+---------+
| id|    genre|
+---+---------+
| 12|Adventure|
| 14|  Fantasy|
| 16|Animation|
| 18|    Drama|
| 22|  Musical|
+---+---------+
only showing top 5 rows



#### production_companies Field
comparing `production_companies` between `series` and `movies`

In [18]:
# Comparing production_companies between two datasets

df_series_com = rename_columns(
        explode_and_spread_column(df_series, "production_companies"),
        ["id", "name", "origin_country"], ["series_id", "series_company", "series_origin_country"]
    ).distinct()

df_movies_com = rename_columns(
        explode_and_spread_column(df_movies, "production_companies"),
        ["id", "name", "origin_country"], ["movies_id", "movies_company", "movies_origin_country"]
    ).distinct()

companies_comparison = df_series_com \
    .join(df_movies_com, df_series_com.series_id == df_movies_com.movies_id, "full")
companies_comparison.show(5)

# from the result below it seems that the id and genres between two table are the same

+---------+--------------------+-------------------+---------------------+---------+--------------------+-------------------+---------------------+
|series_id|           logo_path|     series_company|series_origin_country|movies_id|           logo_path|     movies_company|movies_origin_country|
+---------+--------------------+-------------------+---------------------+---------+--------------------+-------------------+---------------------+
|        1|/o86DbpburjxrqAzE...|     Lucasfilm Ltd.|                   US|        1|/o86DbpburjxrqAzE...|     Lucasfilm Ltd.|                   US|
|        5|/71BqEFAF4V3qjjMP...|  Columbia Pictures|                   US|        5|/71BqEFAF4V3qjjMP...|  Columbia Pictures|                   US|
|     null|                null|               null|                 null|        6|/n53F7K9scQWFXYbr...| RKO Radio Pictures|                   US|
|        7|/vru2SssLX3FPhnKZ...|DreamWorks Pictures|                   US|        7|/vru2SssLX3FPhnKZ...|DreamWo

In [22]:
# decide to check futher by looking for inequality between series_genre and movies_genre
companies_comparison \
.where(F.col("movies_id").isNotNull() \
       & F.col("series_id").isNotNull() \
       & (F.col("movies_company") != F.col("series_company")) \
       & (F.col("movies_origin_country") != F.col("series_origin_country"))) \
.show()

# the below table will be used for dimension table

+---------+--------------+---------------------+---------+--------------+---------------------+
|series_id|series_company|series_origin_country|movies_id|movies_company|movies_origin_country|
+---------+--------------+---------------------+---------+--------------+---------------------+
+---------+--------------+---------------------+---------+--------------+---------------------+



The result above shows that there is no differencese between `production_companies` in `series` and `movies` dataset. So we decide to make dimension table of `companies` to be used both in `series` and `movies`

In [19]:
dwh_companies = companies_comparison \
    .withColumn("id", F.when(F.col("series_id").isNull(), F.col("movies_id")).otherwise(F.col("series_id"))) \
    .withColumn("company", F.when(F.col("series_id").isNull(), F.col("movies_company")).otherwise(F.col("series_company"))) \
    .withColumn("origin_country", F.when(F.col("series_id").isNull(), F.col("movies_origin_country")).otherwise(F.col("series_origin_country"))) \
    .select(["id", "company", "origin_country"]) \
    .where(F.col("origin_country").isNotNull())

dwh_companies.show(5)

+---+-------------------+--------------+
| id|            company|origin_country|
+---+-------------------+--------------+
|  1|     Lucasfilm Ltd.|            US|
|  5|  Columbia Pictures|            US|
|  6| RKO Radio Pictures|            US|
|  7|DreamWorks Pictures|            US|
|  9|            Gaumont|            FR|
+---+-------------------+--------------+
only showing top 5 rows



## Transform
Transform `series` and `movies` before loaded to data warehouse.
I drop `genre` and `production_companies` because we decided to normalize it.

In [24]:
initial_columns = df_movies.schema.names

dwh_movies = df_movies \
    .withColumn("genre", F.explode("genres")) \
    .select(*initial_columns, F.col("genre.id").alias("genre_id")) \
    .groupBy(*initial_columns).agg(F.collect_list("genre_id").alias("genre_ids")) \
    .withColumn("company", F.explode("production_companies")) \
    .select(*initial_columns, "genre_ids", F.col("company.id").alias("company_id")) \
    .groupBy(*initial_columns, "genre_ids").agg(F.collect_list("company_id").alias("company_ids")) \
    .drop("genres", "production_companies")
    
dwh_movies.show(5)

+-----+-------------+---------------------+------+--------+------+---------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+------------+-------+-------+----------------+--------+-------+--------------------+-----+------------+----------+---------+-------------+
|adult|backdrop_path|belongs_to_collection|budget|homepage|    id|  imdb_id|original_language|      original_title|            overview|popularity|         poster_path|production_countries|release_date|revenue|runtime|spoken_languages|  status|tagline|               title|video|vote_average|vote_count|genre_ids|  company_ids|
+-----+-------------+---------------------+------+--------+------+---------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+------------+-------+-------+----------------+--------+-------+--------------------+-----+------------+----------+---------+-------------+
|false|         

In [25]:
# drop genre and companies
initial_columns = df_series.schema.names

dwh_series = df_series \
    .withColumn("genre", F.explode("genres")) \
    .select(*initial_columns, F.col("genre.id").alias("genre_id")) \
    .groupBy(*initial_columns).agg(F.collect_list("genre_id").alias("genre_ids")) \
    .withColumn("company", F.explode("production_companies")) \
    .select(*initial_columns, "genre_ids", F.col("company.id").alias("company_id")) \
    .groupBy(*initial_columns, "genre_ids").agg(F.collect_list("company_id").alias("company_ids")) \
    .drop("genres", "production_companies")

dwh_series.show(5)

# the table below will be saved to dwh as factual table

+-------------+----------+----------------+--------------+--------------------+------+-------------+---------+-------------+-------------------+-----------------------------------+--------------------+-------------------+------------------+-----------------+--------------+-----------------+-----------------------------------+--------+----------+--------------------+--------------------+-------------+--------+------------+----------+--------------------+--------------+
|backdrop_path|created_by|episode_run_time|first_air_date|            homepage|    id|in_production|languages|last_air_date|last_episode_to_air|                               name|            networks|next_episode_to_air|number_of_episodes|number_of_seasons|origin_country|original_language|                      original_name|overview|popularity|         poster_path|             seasons|       status|    type|vote_average|vote_count|           genre_ids|   company_ids|
+-------------+----------+----------------+-----------

## Load
Load the data to data warehouse

Save all the tables into parquet format

In [26]:
dwh_series.write.parquet("series.parquet")
dwh_movies.write.parquet("movies.parquet")
dwh_genres.write.parquet("genres.parquet")
dwh_companies.write.parquet("companies.parquet")

Copy file from local machine to GCS

In [27]:
!gcloud auth login

Go to the following link in your browser:

    https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=32555940559.apps.googleusercontent.com&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&scope=openid+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fuserinfo.email+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fappengine.admin+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcompute+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Faccounts.reauth&state=I9wUmk4UBaXzuMwWgUo4LeDLALmp1e&prompt=consent&access_type=offline&code_challenge=1iCecBWVgqwW2d1KI111-GygpEHysI2pZKF3CFzxY_o&code_challenge_method=S256

Enter verification code: 4/1AX4XfWgQMV-8TDUh8IV4dw8zgtK_sW2wcnIlju1G36fF7L6dXIAkjnLrjyw

You are now logged in as [afahmi13@gmail.com].
Your current project is [None].  You can change this setting by running:
  $ gcloud config set project PROJECT_ID


In [28]:
!gsutil cp -r ./series.parquet gs://de-porto/qoala/series.parquet

Copying file://./series.parquet/._SUCCESS.crc [Content-Type=application/octet-stream]...
Copying file://./series.parquet/_SUCCESS [Content-Type=application/octet-stream]...
Copying file://./series.parquet/.part-00001-6ed1c87f-0825-4639-815d-b35b92850225-c000.snappy.parquet.crc [Content-Type=application/octet-stream]...
Copying file://./series.parquet/part-00001-6ed1c87f-0825-4639-815d-b35b92850225-c000.snappy.parquet [Content-Type=application/octet-stream]...
\
==> NOTE: You are performing a sequence of gsutil operations that may
run significantly faster if you instead use gsutil -m cp ... Please
see the -m section under "gsutil help options" for further information
about when gsutil -m can be advantageous.

Copying file://./series.parquet/.part-00000-6ed1c87f-0825-4639-815d-b35b92850225-c000.snappy.parquet.crc [Content-Type=application/octet-stream]...
Copying file://./series.parquet/part-00000-6ed1c87f-0825-4639-815d-b35b92850225-c000.snappy.parquet [Content-Type=application/octet-st

Then we load the data from Google Cloud Storage to BigQuery data warehouse.

In [29]:
!pip install -q google-cloud-bigquery google-cloud-storage

In [30]:
def get_files_in_gcs(bucket: str, prefix: str, extension: str):
    from google.cloud import storage
    gcs_client = storage.Client.from_service_account_json("./de-porto-key.json")

    res = []
    for blob in gcs_client.list_blobs(bucket, prefix=prefix, ):
        suffix = blob.name.split(".")[-1]
        if suffix == extension:
            res.append(f"gs://{bucket}/{blob.name}")
        
    return res

In [32]:
files = get_files_in_gcs("de-porto", "qoala/movies.parquet", "parquet")
print(files)

['gs://de-porto/qoala/movies.parquet/part-00000-903e7f64-15aa-4cab-8145-1e8f470adf52-c000.snappy.parquet', 'gs://de-porto/qoala/movies.parquet/part-00001-903e7f64-15aa-4cab-8145-1e8f470adf52-c000.snappy.parquet', 'gs://de-porto/qoala/movies.parquet/part-00002-903e7f64-15aa-4cab-8145-1e8f470adf52-c000.snappy.parquet', 'gs://de-porto/qoala/movies.parquet/part-00003-903e7f64-15aa-4cab-8145-1e8f470adf52-c000.snappy.parquet', 'gs://de-porto/qoala/movies.parquet/part-00004-903e7f64-15aa-4cab-8145-1e8f470adf52-c000.snappy.parquet']


In [33]:
# load to bigquery
def load_parquet_to_bq(project:str, dataset:str, table:str, *uris):
    from google.cloud import bigquery
    client = bigquery.Client.from_service_account_json("./de-porto-key.json")

    config = bigquery.LoadJobConfig(source_format=bigquery.SourceFormat.PARQUET)
    table_id = f"{project}.{dataset}.{table}"

    for uri in uris:
        job = client.load_table_from_uri(uri, table_id, job_config=config)

        job.result()

In [34]:
load_parquet_to_bq("de-porto", "de_porto", "movie_test", *files)