### Ingestion del archivo "movie.csv"

In [0]:
dbutils.widgets.text("p_environment", "")
v_environment =  dbutils.widgets.get("p_environment")

In [0]:
dbutils.widgets.text("p_file_date", "2024-12-30")
v_file_date =  dbutils.widgets.get("p_file_date")

In [0]:
%run "../includes/configuration"

In [0]:
%run "../includes/common_functions"

Paso 1 - Leer el archivo CSV usando "DataFrameReader" de Spark 

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType
from pyspark.sql.functions import col, current_timestamp, lit

In [0]:
movie_schema = StructType( fields= [
    StructField("movieID", IntegerType(), False),
    StructField("title", StringType(), True),
    StructField("budget", DoubleType(), True),
    StructField("homePage", StringType(), True),
    StructField("overview", StringType(), True),
    StructField("popularity", DoubleType(), True),
    StructField("yearReleaseDate", IntegerType(), True),
    StructField("releaseDate", DateType(), True),
    StructField("revenue", DoubleType(), True),
    StructField("durationTime", IntegerType(), True),
    StructField("movieStatus", StringType(), True),
    StructField("tagline", StringType(), True),
    StructField("voteAverage", DoubleType(), True),
    StructField("voteCount", IntegerType(), True)
])

In [0]:
movie_df = spark.read \
    .option("header", True) \
    .schema(movie_schema) \
    .csv(f"{bronze_folder_path}/{v_file_date}/movie.csv")

Paso 2 - Seleccionar las columnas requeridas

In [0]:
movies_selected_df = movie_df.select(col("movieID"), col("title"), col("budget"), col("popularity"), col("yearReleaseDate"), col("releaseDate"), col("revenue"), col("durationTime"), col("voteAverage"), col("voteCount"))


Paso 3 - Cambiar el nombre de las columnas según

In [0]:
movies_renamed_df = movies_selected_df.withColumnRenamed("movieID", "movie_id") \
    .withColumnRenamed("yearReleaseDate", "year_release_date") \
    .withColumnRenamed("releaseDate", "release_date") \
    .withColumnRenamed("durationTime", "duration_time") \
    .withColumnRenamed("voteAverage", "vote_average") \
    .withColumnRenamed("voteCount", "vote_count") 


Paso 4 - Agregar una columna "ingestion_date" al DataFrame

In [0]:
movies_final_df = add_ingestion_date(movies_renamed_df)\
    .withColumn("env", lit(v_environment))\
    .withColumn("file_date", lit(v_file_date))


Paso 5 - Escribir datos en el datalake en formato "Parquet"

In [0]:
#overwrite_partition(movies_final_df, "movie_silver", "movies", "file_date")
merge_condition = "tgt.movie_id = src.movie_id AND tgt.file_date = src.file_date"
merge_delta_lake(movies_final_df, "movie_silver", "movies", silver_folder_path, merge_condition, "file_date")

# movies_final_df.write.mode("overwrite").format("delta").saveAsTable(f"movie_silver.movies")

In [0]:

# movies_final_df.write.mode("overwrite").parquet(f"{silver_folder_path}/movies")
# movies_final_df.write.mode("overwrite").partitionBy("year_release_date").parquet(f"{silver_folder_path}/movies")

In [0]:
%sql
select file_date, count(1) 
from movie_silver.movies
group by file_date;

In [0]:
dbutils.notebook.exit("Success")