# Ingestion del archivo movie_cast.json

In [0]:
dbutils.widgets.text("p_environment", "")
v_env = dbutils.widgets.get("p_environment")
v_env

In [0]:
%run "../includes/configuration"

In [0]:
%run "../includes/common_functions"

### Paso 1 - Leer el archivo CSV usando "DataFrameReader" de Spark

In [0]:
from pyspark.sql.types import StructType, StringType, StructField, IntegerType, DoubleType, StringType, DateType

In [0]:
movie_cast_schema = StructType(fields=[
    StructField("movieId", IntegerType(), True),
    StructField("personId", IntegerType(), True),
    StructField("characterName", StringType(), True),
    StructField("genderId", StringType(), True),
    StructField("castOrder", IntegerType(), True)
])

In [0]:
movies_cast_df = spark.read.json(f'{bronze_folder_path}/movie_cast.json', schema=movie_cast_schema, multiLine=True)

In [0]:
display(movies_cast_df)

In [0]:
movies_cast_df.printSchema()

### Paso 2 - Renombrar columnas y adicionar columnas "ingestion_date" y "env"

In [0]:
from pyspark.sql.functions import col, lit, current_timestamp

In [0]:
movies_cast_renamed_df = (movies_cast_df
    .withColumnRenamed("personId", "person_id")
    .withColumnRenamed("movieId", "movie_id")
    .withColumnRenamed('characterName', 'character_name')
    .transform(add_ingestion_date)
    .withColumn('env', lit(v_env))
)

### Paso 3 - Eliminar columnas no deseadas

In [0]:
movies_cast_final_df = movies_cast_renamed_df.drop(col('genderId'), col('castOrder'))

### Paso 4 - Particionar por movie_id y Escribir datos en el DataLake en formato parquet

In [0]:
# movies_cast_final_df.write.mode('overwrite').parquet(f'{silver_folder_path}/movie_cast')
movies_cast_final_df.write.mode('overwrite').format('parquet').saveAsTable('movie_silver.movies_casts')

In [0]:
%python
display(dbutils.fs.ls("/mnt/moviehistory/silver/movies_casts"))

In [0]:
df = spark.read.parquet('/mnt/moviehistory/silver/movies_casts')
display(df)

In [0]:
dbutils.notebook.exit("Sucess")