#### Leer todos los datos que son requeridos

In [0]:
dbutils.widgets.text("p_file_date", "2024-12-30")
v_file_date = dbutils.widgets.get("p_file_date")

In [0]:
%run "../includes/configuration"

In [0]:
%run "../includes/common_functions"

In [0]:
movie_df = spark.read.format("delta").load(f"{silver_folder_path}/movies") \
    .filter(f"file_date = '{v_file_date}'")

In [0]:
production_country_df = spark.read.format("delta").load(f"{silver_folder_path}/productions_countries") \
    .filter(f"file_date = '{v_file_date}'")

In [0]:
country_df = spark.read.format("delta").load(f"{silver_folder_path}/countries")

In [0]:
movie_company_df = spark.read.format("delta").load(f"{silver_folder_path}/movies_companies") \
    .filter(f"file_date = '{v_file_date}'")

In [0]:
production_company_df = spark.read.format("delta").load(f"{silver_folder_path}/productions_companies") \
    .filter(f"file_date = '{v_file_date}'")

#### Join "country" y "production_country"

In [0]:
country_prod_con_df = country_df.join(
            production_country_df,
            country_df.country_id == production_country_df.country_id, 
            "inner") \
      .select(
            production_country_df.movie_id, 
            country_df.country_id,
            country_df.country_name)                               

#### Join "production_company" y "movie_company"

In [0]:
production_company_mov_com_df = production_company_df.join(
        movie_company_df, 
        movie_company_df.company_id == production_company_df.company_id, 
        "inner") \
    .select(
        movie_company_df.movie_id, 
        production_company_df.company_id,
        production_company_df.company_name)

#### Join "movies_df", "languages_mov_lan" y "genres_mov_lan_df"

- Filtrar las películas dondes su fecha de lanzamiento sea mayor o igual a 2010

In [0]:
movie_filter_df = movie_df.filter("year_release_date >= 2010")

In [0]:
results_country_prod_company_df = movie_filter_df.join(
        country_prod_con_df,                                                     movie_filter_df.movie_id == country_prod_con_df.movie_id, 
        "inner") \
    .join(
        production_company_mov_com_df, 
        movie_filter_df.movie_id == production_company_mov_com_df.movie_id, 
        "inner")

- Agregar la columna "created_date"

In [0]:
from pyspark.sql.functions import lit

In [0]:
resuls_df = results_country_prod_company_df \
    .select(movie_filter_df.movie_id, "title", "budget", "revenue", "duration_time", "release_date", "country_name", "company_name",
    "country_id", "company_id") \
    .withColumn("created_date", lit(v_file_date))

- Ordenar por la columna "release_date" de manera descedente

In [0]:
from pyspark.sql.functions import desc

In [0]:
results_order_by_df = resuls_df.orderBy(resuls_df.title.asc())

#### Escribir datos en el DataLake en formato "Parquet"

In [0]:
# overwrite_partition_data(results_order_by_df, "movie_gold", "results_country_prod_company", "created_date")

merge_condition = 'tgt.movie_id = src.movie_id AND tgt.country_id = src.country_id AND tgt.company_id = src.company_id AND tgt.created_date = src.created_date'

merge_delta_lake(results_order_by_df, 'movie_gold', 'results_country_prod_company', gold_folder_path, merge_condition, 'created_date')

In [0]:
%sql
SELECT created_date, COUNT(0) 
FROM movie_gold.results_country_prod_company
GROUP BY created_date;