### Leer los datos que son requeridos

In [0]:
%run "../includes/configuration"

In [0]:
%run "../includes/commom_functions"

In [0]:
dbutils.widgets.text("p_file_date", "2024-12-30")
v_file_date = dbutils.widgets.get("p_file_date")

In [0]:
movies_df = spark.read.parquet(f"{silver_folder_path}/movies").filter(
    f"file_date = '{v_file_date}'"
)

In [0]:
country_df = spark.read.parquet(f"{silver_folder_path}/countries")

In [0]:
production_country_df = spark.read.parquet(f"{silver_folder_path}/productions_countries").filter(
    f"file_date = '{v_file_date}'"
)

### Join "genres" y "movies_genres"

In [0]:
country_prod_coun_df = country_df.join(
  production_country_df,
  country_df.country_id == production_country_df.country_id,
  "inner"
).select(
  country_df.country_name,
  production_country_df.movie_id
)


### Join "movies_df" y "genres_mov_gen_df"

- Filtrar las peliculas donde su fecha de lanzamiento sea mayor o igual a 2015

In [0]:
movies_filter_df = movies_df.filter("year_release_date >= 2015")

In [0]:
results_movies_prod_coun_df = movies_filter_df.join(
    country_prod_coun_df,
    movies_filter_df.movie_id == country_prod_coun_df.movie_id,
    "inner"
)

In [0]:
results_df = results_movies_prod_coun_df.select("year_release_date", "country_name", "budget", "revenue")

In [0]:
from pyspark.sql.functions import sum

In [0]:
results_groupby_df = results_df.groupBy(
    "year_release_date", "country_name"
).agg(
    sum("budget").alias("total_budget"),
    sum("revenue").alias("total_revenue")
)

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import desc, dense_rank, asc, lit

In [0]:
results_dense_rank_df = Window.partitionBy("year_release_date").orderBy(desc("total_budget"), desc("total_revenue"))
final_df = results_groupby_df.withColumn(
    "dense_rank", dense_rank().over(results_dense_rank_df)
).withColumn(
    "created_date", lit(v_file_date)
)


### Escribir datos en el DataLake en formato "Parquet"

In [0]:
overwrite_partition(final_df, "movie_gold", "results_group_movie_country", "created_date")

In [0]:
#final_df.write.mode("overwrite").parquet(f"{gold_folder_path}/results_group_movie_country")

#final_df.write.mode("overwrite").format("parquet").saveAsTable("movie_gold.results_group_movie_country")

final_df.write.mode("append").partitionBy("created_date").format("parquet").saveAsTable("movie_gold.results_group_movie_country")

In [0]:
#display(spark.read.parquet(f"{gold_folder_path}/results_group_movie_country"))

In [0]:
%sql
select created_date, count(1) from movie_gold.results_group_movie_country group by created_date

In [0]:
%sql
select * from movie_gold.results_group_movie_country