#### Ingestion de archivo "movie_cast.json"

In [0]:
dbutils.widgets.text("p_enviroment","")
v_enviroment = dbutils.widgets.get("p_enviroment")

In [0]:
dbutils.widgets.text("p_file_date","2024-12-16")
v_file_date = dbutils.widgets.get("p_file_date")

In [0]:
%run "../includes/common_functions"

In [0]:
%run "../includes/configuration"

##### Paso 1 - Leer el archivo JSON usando "DataFrameReader" de Spark

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [0]:
movie_cast_schema = StructType( fields = [
    StructField("movieId", IntegerType(), True),
    StructField("personId", IntegerType(), True),
    StructField("characterName", StringType(), True),
    StructField("genderId",IntegerType(),True),
    StructField("castOrder", IntegerType(), True)
])

In [0]:
movie_cast_df = spark.read.option("multiline","True").schema(movie_cast_schema).json(f"{bronze_folder_path}/{v_file_date}/movie_cast.json")

##### Paso 2 - Renombrar las columnas y añadir nuevas columnas
######    1. "movieID" renombrar a "movie_id"
######    2. "personId" renombrar a "person_id"
######    3. "characterName" renombrara a "character_name"
######    4. Agregar las columnas "ingestion_date" y "enviroment"

In [0]:
from pyspark.sql.functions import col, lit, current_timestamp

In [0]:
movie_cast_df_with_col = add_ingestion_date(movie_cast_df) \
                                      .withColumn("enviroment", lit(v_enviroment)) \
                                      .withColumn("file_date",lit(v_file_date)) \
                                      .withColumnRenamed("movieId","movie_id") \
                                      .withColumnRenamed("personId","person_id") \
                                      .withColumnRenamed("characterName","character_name")

##### Paso 3 - Eliminar columnas no necesarias

In [0]:
final_movie_cast_df = movie_cast_df_with_col.drop(col("castOrder"), col("genderId"))

##### Paso 4 - Exportar el DF en formato parquet

In [0]:
#overwrite_partition(final_movie_cast_df,"movie_silver","movie_cast","file_date")
merge_condition = 'tgt.movie_id = src.movie_id AND tgt.person_id = src.person_id AND tgt.file_date = src.file_date'
merge_delta_lake(final_movie_cast_df, "movie_silver", "movie_cast", silver_foler_path, merge_condition, "file_date")

In [0]:
%sql
SELECT COUNT(1), file_date
FROM movie_silver.movie_cast
GROUP BY file_date

In [0]:
dbutils.notebook.exit("Success!")