# Fluxo ETL de dados da camada Silver para camada Gold

## Introdução

Na camada Silver (lakehouse), os dados estão agregados e podem ser atualizados constantemente. Por consistir em uma única tabela, a pipeline irá quebrar os dados nas tabelas fato e dimensão para ingestão no Data Warehouse. A documentação das tabelas no schema `warehouse` está disponível no arquivo [GOLD_MER_DER_DLD.pdf](https://github.com/Eric-chagas/film-data-analytics/blob/main/DataLayer/gold/Gold_MER_DER_DLD.pdf). Nessa pipeline serão realizadas as seguintes operações:

1. **Ingestão dos dados do lakehouse**: Os dados serão lidos diretamente do banco de dados, da tabela `lakehouse.film_lakehouse` para um dataframe no pyspark
2. **Criação das entidades**: Os dados da camada silver serão quebrados em entidades fato e dimensão, para que sejam inseridos no warehouse e possam ser feitas queries de business analytics
3. **Inserção dos dados no schema data warehouse**


In [2]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import split, explode, arrays_zip, col, trim, ltrim, lit, count, isnan, to_timestamp, row_number
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, BooleanType, DateType, LongType, TimestampType
from pyspark.sql.window import Window
import time, os, shutil

# Inicio sessão spark
jar_path = os.path.abspath("../postgresql-42.7.8.jar")
spark = SparkSession.builder.appName("tmdbEtlSilverToGold") \
                            .config("spark.jars", jar_path) \
                            .config("spark.driver.memory", "4g") \
                            .config("spark.executor.memory", "4g") \
                            .getOrCreate()
                            
spark.sparkContext.setLogLevel("ERROR")


### 1. Ingestão dos dados do lakehouse

Definição dos schemas das tabelas e Ingestão dos dados do banco.

In [None]:
lakehouse_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("title", StringType(), True),
    StructField("vote_average", FloatType(), True),
    StructField("vote_count", IntegerType(), True),
    StructField("status", StringType(), True),
    StructField("release_date", DateType(), True),
    StructField("revenue", IntegerType(), True),
    StructField("runtime", IntegerType(), True),
    StructField("adult", BooleanType(), True),
    StructField("backdrop_path", StringType(), True),
    StructField("budget", IntegerType(), True),
    StructField("homepage", StringType(), True),
    StructField("imdb_id", StringType(), True),
    StructField("original_language", StringType(), True),
    StructField("original_title", StringType(), True),
    StructField("overview", StringType(), True),
    StructField("popularity", FloatType(), True),
    StructField("poster_path", StringType(), True),
    StructField("tagline", StringType(), True),
    StructField("genre", StringType(), True),
    StructField("production_company", StringType(), True),
    StructField("production_country", StringType(), True),
    StructField("spoken_language", StringType(), True)
])

fct_movie_schema = StructType([
    StructField("ID_MOVIE_SRK", IntegerType(), True),
    StructField("CD_MOVIE_NK", LongType(), True),
    StructField("NM_TITLE", StringType(), True),
    StructField("DT_RELEASE", TimestampType(), True),
    StructField("NM_ORIGINAL_TITLE", StringType(), True),
    StructField("QT_RUNTIME", IntegerType(), True),
    StructField("VL_BUDGET", LongType(), True),
    StructField("VL_REVENUE", LongType(), True),
    StructField("VL_POPULARITY", FloatType(), True),
    StructField("VL_VOTE_AVG", FloatType(), True),
    StructField("QT_VOTE_COUNT", IntegerType(), True),
    StructField("TX_OVERVIEW", StringType(), True),
    StructField("TX_TAGLINE", StringType(), True),
    StructField("IN_ADULT", BooleanType(), True),
    StructField("CD_ORIGINAL_LANGUAGE", StringType(), True),
])

dim_genre_schema = StructType([
    StructField("FK_MOVIE", IntegerType(), True),
    StructField("NM_GENRE", StringType(), True)
])

dim_prod_comp_schema = StructType([
    StructField("FK_MOVIE", IntegerType(), True),
    StructField("NM_PRODUCTION_COMPANY", StringType(), True)
])

dim_country_schema = StructType([
    StructField("FK_MOVIE", IntegerType(), True),
    StructField("NM_PRODUCTION_COUNTRY", StringType(), True)
])

dim_spoken_lan_schema = StructType([
    StructField("FK_MOVIE", IntegerType(), True),
    StructField("NM_SPOKEN_LANGUAGE", StringType(), True)
])

DB_CONFIG = {
    "user": os.environ.get("DB_USER") or "postgres",
    "password": os.environ.get("DB_PASSWORD") or "secret",
    "driver": "org.postgresql.Driver",
    "db_name": os.environ.get("DB_NAME") or "postgres",
}

jdbc_string = f"jdbc:postgresql://localhost:5432/{DB_CONFIG["db_name"]}"
table_name = "lakehouse.film_lakehouse"

lakehouse_df = spark.read.jdbc(
    url=jdbc_string,
    table=table_name,
    properties=DB_CONFIG
)

### 2. Criação das entidades

Quebra dos dados do lakehouse nas dataframes dedicadas de fato e dimensão. Para isso serão feitas as seguintes operações:

1. Quebra das colunas `genres`, `production_companies`, `production_countries` e `spoken_languages` em dataframes separados
2. Criação da dataframe final fato
3. Inserção no banco nas tabelas do DW

#### Quebra das colunas

In [None]:
print("Count lakehouse raw: ", lakehouse_df.count())

dim_genre_df = (
    lakehouse_df
    .select(
        col("id").alias("FK_MOVIE"),
        col("genre").alias("NM_GENRE")
    )
    .filter(col("NM_GENRE").isNotNull() & (col("NM_GENRE") != ""))
    .distinct()
)

print("Count lakehouse genre: ", lakehouse_df.count())

dim_prod_comp_df = (
    lakehouse_df
    .select(
        col("id").alias("FK_MOVIE"),
        col("production_company").alias("NM_PRODUCTION_COMPANY")
    )
    .filter(col("NM_PRODUCTION_COMPANY").isNotNull() & (col("NM_PRODUCTION_COMPANY") != ""))
    .distinct()
)

print("Count lakehouse comp: ", lakehouse_df.count())

dim_country_df = (
    lakehouse_df
    .select(
        col("id").alias("FK_MOVIE"),
        col("production_country").alias("NM_PRODUCTION_COUNTRY")
    )
    .filter(col("NM_PRODUCTION_COUNTRY").isNotNull() & (col("NM_PRODUCTION_COUNTRY") != ""))
    .distinct()
)

print("Count lakehouse country: ", lakehouse_df.count())

dim_spoken_lan_df = (
    lakehouse_df
    .select(
        col("id").alias("FK_MOVIE"),
        col("spoken_language").alias("NM_SPOKEN_LANGUAGE")
    )
    .filter(col("NM_SPOKEN_LANGUAGE").isNotNull() & (col("NM_SPOKEN_LANGUAGE") != ""))
    .distinct()
)

print("Count lakehouse spoken: ", lakehouse_df.count())

dim_genre_df.show()
dim_prod_comp_df.show()
dim_country_df.show()
dim_spoken_lan_df.show()


                                                                                

+--------+---------------+
|FK_MOVIE|       NM_GENRE|
+--------+---------------+
|   51497|          Crime|
|   50456|         Action|
|    2330|         Action|
|   93856|         Horror|
|    7451|      Adventure|
|  575452|       Thriller|
|   10628|          Crime|
|  726684|         Action|
|  572154|         Comedy|
|  589761|        Fantasy|
|    9839|        Romance|
|  509598|       Thriller|
|    1246|          Drama|
|   49046|          Drama|
|   10452|        Romance|
|  943930|         Horror|
|   13491|        Romance|
|    9389|Science Fiction|
|   68818|      Adventure|
|   29572|          Drama|
+--------+---------------+
only showing top 20 rows


                                                                                

+--------+---------------------+
|FK_MOVIE|NM_PRODUCTION_COMPANY|
+--------+---------------------+
|     752|            DC Comics|
|    6477|  Regency Enterprises|
|    2179|      New Line Cinema|
|    9691| Dino De Laurentii...|
|    9703| Dino De Laurentii...|
|   11236|    American Zoetrope|
|   12783|         Qwerty Films|
|  254375|           ARD Degeto|
|  485942|     DC Entertainment|
|  339964|                  UFA|
|   26715|            IM Global|
|   10491|         Jagged Films|
|  112205|             Malavita|
|   60243| Memento Films Pro...|
|   18897|           Samsa Film|
|    4824|     Alphaville Films|
|  399174| American Empirica...|
|   63025| American Internat...|
|  516329|    TSG Entertainment|
|   89185| Two 4 The Money M...|
+--------+---------------------+
only showing top 20 rows


                                                                                

+--------+---------------------+
|FK_MOVIE|NM_PRODUCTION_COUNTRY|
+--------+---------------------+
|     652|                Malta|
|  258216|              Denmark|
|  899112| United States of ...|
|    9480| United States of ...|
|  417466|             Colombia|
|  626735| United States of ...|
|   70670|               Sweden|
|    2275| United States of ...|
|    9531| United States of ...|
|  668461| United States of ...|
|  491472|               France|
|  718444|         South Africa|
|  597433|               Brazil|
|  301337|               Norway|
|  549053| United States of ...|
|  153738| United States of ...|
|   13350| United States of ...|
|  188761|               Norway|
|   50845|               France|
|  518880|                Spain|
+--------+---------------------+
only showing top 20 rows


[Stage 70:>                                                         (0 + 1) / 1]

+--------+------------------+
|FK_MOVIE|NM_SPOKEN_LANGUAGE|
+--------+------------------+
|   59436|            German|
|    1620|           Serbian|
|   72113|           English|
|  127585|           Russian|
|     311|           English|
|   49517|           Turkish|
|  581726|           Italian|
|   11286|          Mandarin|
|     218|           English|
|   19562|            French|
|   36970|           English|
|   68737|           English|
|   19597|            French|
|   37563|            French|
|  785976|           Spanish|
|    9645|           English|
|   11779|           Spanish|
|   11358|           English|
|  889699|           Persian|
|   49850|           Italian|
+--------+------------------+
only showing top 20 rows


                                                                                

#### Criação dataframe fato filmes

In [25]:
fct_movie_df = (
    lakehouse_df
    .select(
        col("id").alias("CD_MOVIE_NK"),
        col("title").alias("NM_TITLE"),
        col("release_date").alias("DT_RELEASE"),
        col("original_title").alias("NM_ORIGINAL_TITLE"),
        col("runtime").alias("QT_RUNTIME"),
        col("budget").alias("VL_BUDGET"),
        col("revenue").alias("VL_REVENUE"),
        col("popularity").alias("VL_POPULARITY"),
        col("vote_average").alias("VL_VOTE_AVG"),
        col("vote_count").alias("QT_VOTE_COUNT"),
        col("overview").alias("TX_OVERVIEW"),
        col("tagline").alias("TX_TAGLINE"),
        col("adult").alias("IN_ADULT"),
        col("status").alias("VL_STATUS"),
        col("original_language").alias("CD_ORIGINAL_LANGUAGE")
    )
    .distinct()
)
lakehouse_df.count()
fct_movie_df.count()

                                                                                

182736

### 3. Inserção dos dados no schema data warehouse

In [23]:
table_name = "warehouse.FCT_MOVIE"
fct_movie_df.write \
  .format("jdbc") \
  .option("url", jdbc_string) \
  .option("dbtable", table_name) \
  .option("user", DB_CONFIG["user"]) \
  .option("password", DB_CONFIG["password"]) \
  .option("driver", "org.postgresql.Driver") \
  .mode("append") \
  .save()
  
table_name = "warehouse.DIM_GENRE"
dim_genre_df.write \
  .format("jdbc") \
  .option("url", jdbc_string) \
  .option("dbtable", table_name) \
  .option("user", DB_CONFIG["user"]) \
  .option("password", DB_CONFIG["password"]) \
  .option("driver", "org.postgresql.Driver") \
  .mode("append") \
  .save()
  
table_name = "warehouse.DIM_PROD_COMP"
dim_prod_comp_df.write \
  .format("jdbc") \
  .option("url", jdbc_string) \
  .option("dbtable", table_name) \
  .option("user", DB_CONFIG["user"]) \
  .option("password", DB_CONFIG["password"]) \
  .option("driver", "org.postgresql.Driver") \
  .mode("append") \
  .save()
  
table_name = "warehouse.DIM_COUNTRY"
dim_country_df.write \
  .format("jdbc") \
  .option("url", jdbc_string) \
  .option("dbtable", table_name) \
  .option("user", DB_CONFIG["user"]) \
  .option("password", DB_CONFIG["password"]) \
  .option("driver", "org.postgresql.Driver") \
  .mode("append") \
  .save()
  
table_name = "warehouse.DIM_SPOKEN_LAN"
dim_spoken_lan_df.write \
  .format("jdbc") \
  .option("url", jdbc_string) \
  .option("dbtable", table_name) \
  .option("user", DB_CONFIG["user"]) \
  .option("password", DB_CONFIG["password"]) \
  .option("driver", "org.postgresql.Driver") \
  .mode("overwrite") \
  .save()


                                                                                