### Leer todos los datos que son requeridos

In [0]:
dbutils.widgets.text("p_file_date", "2024-12-30")
v_file_date = dbutils.widgets.get("p_file_date")

In [0]:
%run "../includes/configuration"

In [0]:
%run "../includes/common_functions"

In [0]:
movies_df = spark.read.format("delta").load(f'{silver_folder_path}/movies') \
                        .filter(f"file_date = '{v_file_date}'")

In [0]:
languages_df = spark.read.format("delta").load(f'{silver_folder_path}/languages')

In [0]:
movies_languages_df = spark.read.format("delta").load(f'{silver_folder_path}/movies_languages') \
                                .filter(f"file_date = '{v_file_date}'")

In [0]:
genres_df = spark.read.format("delta").load(f'{silver_folder_path}/genres')

In [0]:
movies_genres_df = spark.read.format("delta").load(f'{silver_folder_path}/movies_genres') \
                                .filter(f"file_date = '{v_file_date}'")

### Join "languages" y "movies_languages"

In [0]:
languages_mov_lan_df = languages_df.join(movies_languages_df,
                                        languages_df.language_id == movies_languages_df.language_id,
                                        'inner' ) \
                                    .select(languages_df.language_name,languages_df.language_id, movies_languages_df.movie_id)

### Join "genres" y "movies_genres"

In [0]:
genres_mov_gen_df = genres_df.join(movies_genres_df,
                                   genres_df.genre_id == movies_genres_df.genre_id,
                                   'inner' ) \
                             .select(genres_df.genre_name, genres_df.genre_id, movies_genres_df.movie_id)

### Join "movies_df", "languages_mov_lan_df" y "genres_mov_lan_df"

- Filtrar las películas donde su fecha de lanzamiento sea mayor o igual a 2000

In [0]:
movie_filter_df = movies_df.filter("year_release_date >= 2000")

In [0]:
results_movies_genres_languages_df = movie_filter_df.join(languages_mov_lan_df,
                                                movie_filter_df.movie_id == languages_mov_lan_df.movie_id,
                                                'inner') \
                                        .join(genres_mov_gen_df,
                                              movie_filter_df.movie_id == genres_mov_gen_df.movie_id,
                                              'inner')

- Agregar la columna "created_date"

In [0]:
from pyspark.sql.functions import current_timestamp, lit

In [0]:
results_df = results_movies_genres_languages_df \
    .select(movie_filter_df.movie_id, "language_id", "genre_id", 'title', 'duration_time', 'release_date', 'vote_average', 'language_name', 'genre_name') \
    .withColumn('created_date', lit(v_file_date))

- Ordenar por la columna "release_date" de manera decendente

In [0]:
results_order_by_df = results_df.orderBy(results_df.release_date.desc())

### Escribir los datos en el DataLake en formato "Delta"

In [0]:
#overwrite_partition(results_order_by_df, "movie_gold", "results_movie_genre_language", "created_date")

In [0]:
merge_condition = 'tgt.movie_id = src.movie_id AND tgt.language_id = src.language_id AND tgt.genre_id = src.genre_id AND tgt.created_date = src.created_date'

merge_delta_lake(results_order_by_df, 'movie_gold', 'results_movie_genre_language',gold_folder_path, merge_condition, "created_date")

In [0]:
%sql
SELECT * FROM movie_gold.results_movie_genre_language