### Ingestion del archivo "movie.csv"

In [0]:
dbutils.widgets.help()

In [0]:
dbutils.widgets.text("p_environment","")
v_environment = dbutils.widgets.get("p_environment")

In [0]:
dbutils.widgets.text("p_file_date","2024-12-30")
v_file_date = dbutils.widgets.get("p_file_date")

In [0]:
%run "../includes/configuration"

In [0]:
%run "../includes/commom_functions"

### Paso 1 - Leer el archivo CSV usando "DataFrameReader" de Spark

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, DateType

In [0]:
movie_schema = StructType( fields= [
    StructField("movieId", IntegerType(), False),
    StructField("title", StringType(), True),
    StructField("budget", DoubleType(), True),
    StructField("homePage", StringType(), True),
    StructField("overview", StringType(), True),
    StructField("popularity", DoubleType(), True),
    StructField("yearReleaseDate", IntegerType(), True),
    StructField("releaseDate", DateType(), True),
    StructField("revenue", DoubleType(), True),
    StructField("durationTime", IntegerType(), True),
    StructField("movieStatus", StringType(), True),
    StructField("tagline", StringType(), True),
    StructField("voteAverage", DoubleType(), True),
    StructField("voteCount", IntegerType(), True)
])

In [0]:
#option("header", True) => es para que el Data Frame agregue la primera fila como el header
movie_df = spark.read \
    .option("header", True) \
    .schema(movie_schema) \
    .csv(f"{bronze_folder_path}/{v_file_date}/movie.csv")

In [0]:
movie_df.printSchema()

### Paso 2 - Seleccionar sólo columnas "requeridas"

In [0]:
from pyspark.sql.functions import col

In [0]:
movies_selected_df = movie_df.select(col("movieId"), col("title"), col("budget"), col("popularity"), col("yearReleaseDate"), col("releaseDate"), col("revenue"), col("durationTime"), col("voteAverage"), col("voteCount").alias("vote_count"))

### Paso 3 - Cambiar el nombre de las columnas según lo "requerido"

In [0]:
movies_renamed_df = movies_selected_df \
                    .withColumnRenamed("movieId", "movie_id") \
                    .withColumnRenamed("yearReleaseDate", "year_release_date") \
                    .withColumnRenamed("releaseDate", "release_date") \
                    .withColumnRenamed("durationTime", "duration_time") \
                    .withColumnRenamed("voteAverage", "vote_average") \
                    .withColumnRenamed("voteCount", "vote_count")

In [0]:
movies_renamed_df = movies_selected_df \
                    .withColumnsRenamed({"movieId": "movie_id", "yearReleaseDate": "year_release_date", "releaseDate": "release_date", "durationTime": "duration_time", "voteAverage": "vote_average", "voteCount": "vote_count"})

### Paso - 4 Agregar la columnas ingestion_date al Data Frame

In [0]:
from pyspark.sql.functions import current_timestamp, lit

In [0]:
movies_final_df = add_ingestion_date(movies_renamed_df) \
                  .withColumn("environment", lit(v_environment)) \
                  .withColumn("file_date", lit(v_file_date))

### Paso 5 - Escribir datos en el Data Lake en formato Parquet

In [0]:
# Esto no es optimo, ya que en una particion podriamos tener miles de datos, por lo que no es optimo estar borrarando y creando la particion cada vez que se ejecute el Notebook
#overwrite_partition(movies_final_df,"movie_silver","movies","file_date")

In [0]:
# movies_final_df.write.mode("overwrite").partitionBy("year_release_date").parquet(f"{silver_folder_path}/movies")
# movies_final_df.write.mode("overwrite").parquet(f"{silver_folder_path}/movies")

# Crear una tabla en base al data frame movies_final_df
# Creara la tabla movies en a DB que se llame movie_silver
# Modulo: Spark SQL - Database / Table / View: seccion: Crear Tabla - Capa Silver

# Con overwrite, reemplazamos todos los archivos cada vez que se ejecute este Notebook (Carga completa de datos)
#movies_final_df.write.mode("overwrite").format("parquet").saveAsTable("movie_silver.movies")

# Para este caso vamos hacer una carga incremental de datos, para esto usamos append

#Nota: Esto es una tabla administrada por Databricks, esto quiere decir que si la tabla se elimina, los registros tambien se van a eliminar
# Append, agrega los registros a la tabla existente, carga incremental de datos
#movies_final_df.write.mode("append").partitionBy("file_date").format("parquet").saveAsTable("movie_silver.movies")



# Modulo: Delta Lake

# Cuando se tengan coincidencias en "tgt.movie_id = src.movie_id"" (whenMatchedUpdateAll()). Pues actuliza todos los valores de las columnas. Es decir en la tabla tgt se va a proceder a actualizar todos los registros en base a la tabla de origen (movies_final_df).

# Si no hay concidencias (whenNotMatchedInsertAll()), pues se insertan todos los registros en la tabla tgt.
# Es decir, se ingresar los registros de movies_final_df a la tabla tgt.
merge_condition = "tgt.movie_id = src.movie_id AND tgt.file_date = src.file_date" # Buscar por particion
merge_delta_lake(movies_final_df,"movie_silver","movies",silver_folder_path,merge_condition,"file_date")



In [0]:

%sql
-- Ahora podemos probar la tabla creada en a carpeta silver (Tabla Administrada por Databricks)
SELECT file_date, COUNT(1)
FROM movie_silver.movies
GROUP BY 1;

In [0]:
%fs
ls /mnt/sacjccmoviehistory/silver/movies

In [0]:
#df = spark.read.parquet(f"{silver_folder_path}/movies")
df = spark.read.format("delta").load(f"{silver_folder_path}/movies")

In [0]:
display(df)

In [0]:
dbutils.notebook.exit("Success")