##### Paso 1 - Leer el archivo SCV usando "DataFrameReader" de Spark

In [0]:
%run "../includes/configuration"

In [0]:
%run "../includes/common_functions"

In [0]:
spark.read.option("hesder", True).csv(f"{bronze_folder_path}/2024-12-23/movie.csv").createOrReplaceTempView("v_movie_2")

In [0]:
%sql
SELECT count(1)
from v_movie_2

In [0]:
spark.read.option("hesder", True).csv(f"{bronze_folder_path}/2024-12-30/movie.csv").createOrReplaceTempView("v_movie_3")

In [0]:
%sql
SELECT count(1)
from v_movie_3

In [0]:
dbutils.widgets.help()

In [0]:
dbutils.widgets.text("p_environment","")
v_environment = dbutils.widgets.get("p_environment")

dbutils.widgets.text("p_file_date","2024-12-30")
v_file_date = dbutils.widgets.get("p_file_date")

In [0]:
%run "../includes/common_functions"

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType

In [0]:
movie_schema = StructType([
    StructField("movieId", IntegerType(), True),
    StructField("title", StringType(), True),
    StructField("budget", DoubleType(), True),
    StructField("homePage", StringType(), True),
    StructField("overview", StringType(), True),
    StructField("popularity", DoubleType(), True),
    StructField("yearReleaseDate", IntegerType(), True),
    StructField("releaseDate", DateType(), True),
    StructField("revenue", DoubleType(), True),
    StructField("durationTime", IntegerType(), True),
    StructField("movieStatus", StringType(), True),
    StructField("tagline", StringType(), True),
    StructField("voteAverage", DoubleType(), True),
    StructField("voteCount", IntegerType(), True),
])

In [0]:
movie_df = spark.read\
    .option("header", "true")\
    .schema(movie_schema)\
    .csv(f"{bronze_folder_path}/{v_file_date}/movie.csv")

In [0]:
type(movie_df)

In [0]:
dbutils.fs.mounts()

In [0]:
%fs
ls /mnt/moviehistoryrodrigopenna/bronze

In [0]:
display(movie_df)

In [0]:
display(movie_df.describe())

### Paso 2 - Seleccionar solo las columnas "requeridas"

In [0]:
movies_selected_df = movie_df.select(movie_df.movieId, movie_df.title, movie_df.budget, movie_df.popularity, movie_df.yearReleaseDate, movie_df.releaseDate, movie_df.revenue, movie_df.durationTime, movie_df.voteAverage, movie_df.voteCount)

In [0]:
from pyspark.sql.functions import col


In [0]:
movies_selected_df = movie_df.select(col("movieId"), col("title"), col("budget"), col("popularity"), col("yearReleaseDate"), col("releaseDate"), col("revenue"), col("durationTime"), col("voteAverage"), col("voteCount"))

### Paso 3 - Cambiar el nombre de las columnas según lo "requerido"

In [0]:
movies_rename_df = movies_selected_df\
    .withColumnRenamed("movieId", "movie_id")\
    .withColumnRenamed("yearReleaseDate", "year_release_date")\
    .withColumnRenamed("releaseDate", "release_date")\
    .withColumnRenamed("durationTime", "duration_time")\
    .withColumnRenamed("voteAverage", "vote_average")\
    .withColumnRenamed("voteCount", "vote_count")

In [0]:
display(movies_rename_df)

### Paso - 4 Agregar la columna "ingestion_date" al DataFrame 

In [0]:
from pyspark.sql.functions import current_timestamp, lit

In [0]:
movies_final_df = add_ingestion_date(movies_rename_df)\
                    .withColumn("env", lit(v_environment))\
                    .withColumn("file_date", lit(v_file_date))

In [0]:
movies_final_df = movies_final_df.filter(movies_final_df.release_date.isNotNull())

In [0]:
display(movies_final_df)

#### Paso 5 - Escribir datos en el datalake en formato "Parquet"

In [0]:
#overwrite_partition(movies_final_df, "movie_silver", "movies")

In [0]:
merge_condicion = "tgt.movie_id = src.movie_id AND tgt.file_date = src.file_date"
merge_delta_lake(movies_final_df, "movie_silver", "movies", silver_folder_path, merge_condicion, "file_date")

In [0]:
%sql
SELECT file_date, COUNT(1)
FROM movie_silver.movies
GROUP BY file_date;

In [0]:
display(spark.read.format("delta").load("/mnt/moviehistoryrodrigopenna/silver/movies"))

In [0]:
dbutils.notebook.exit("success")