In [0]:
dbutils.widgets.removeAll()

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
dbutils.widgets.text("container", "raw-jptq")
dbutils.widgets.text("catalogo", "catalog_jptq")
dbutils.widgets.text("esquema", "bronze_jptq")
dbutils.widgets.text("datalake", "adlsjptq0126")
dbutils.widgets.text("file", "Peliculas")
dbutils.widgets.text("name_file", "Movies.csv")
dbutils.widgets.text("storageLocation", "abfss://unity-catalog-jptq@adlsjptq0126.dfs.core.windows.net")

In [0]:
container = dbutils.widgets.get("container")
catalogo = dbutils.widgets.get("catalogo")
esquema = dbutils.widgets.get("esquema")
datalake = dbutils.widgets.get("datalake")
file = dbutils.widgets.get("file")
name_file = dbutils.widgets.get("name_file")
storageLocation = dbutils.widgets.get("storageLocation")
ruta = f"abfss://{container}@{datalake}.dfs.core.windows.net/{file}/{name_file}"

In [0]:
df_movies = spark.read.option('header', True)\
                        .option('inferSchema', True)\
                        .csv(ruta)

In [0]:
movies_schema = StructType(fields=[StructField("id", IntegerType(), False),
                                     StructField("title", StringType(), True),
                                     StructField("genere", StringType(), True),
                                     StructField("language", StringType(), True),
                                     StructField("user_score", DecimalType(), True),
                                     StructField("runtime_hour", IntegerType(), True),
                                     StructField("runtime_min", IntegerType(), True),
                                     StructField("release_date", DateType(), True),
                                     StructField("vote_count", IntegerType(), True)
])

In [0]:
df_movies_final = spark.read\
.option('header', True)\
.schema(movies_schema)\
.csv(ruta)

In [0]:
movies_selected_df = df_movies_final.select(col("id"), 
                                                col("title"), 
                                                col("genere"), 
                                                col("language"), 
                                                col("user_score"),
                                                col("runtime_hour"),
                                                col("runtime_min"),
                                                col("release_date"),
                                                col("vote_count")
                                            )

In [0]:
movies_renamed_df = movies_selected_df.withColumnRenamed("id", "ID_PELICULA") \
                                            .withColumnRenamed("title", "DES_PELICULA") \
                                            .withColumnRenamed("genere", "DES_GENERO_PELICULA") \
                                            .withColumnRenamed("language", "DES_IDIOMA_PELICULA") \
                                            .withColumnRenamed("user_score", "VAL_PUNTUACION_USUARIO") \
                                            .withColumnRenamed("runtime_hour", "VAL_DURACION_PELICULA_HORA") \
                                            .withColumnRenamed("runtime_min", "VAL_DURACION_PELICULA_MINUTOS") \
                                            .withColumnRenamed("release_date", "FEC_LANZAMIENTO") \
                                            .withColumnRenamed("vote_count", "VAL_NUMERO_VOTOS") 

In [0]:
movies_final_df = movies_renamed_df.withColumn("FEC_CARGA", current_timestamp())

In [0]:
#movies_final_df.write.mode("overwrite").saveAsTable(f"{catalogo}.{esquema}.TBL_PELICULAS")
movies_final_df.createOrReplaceTempView("tmp_movies_final_df")

In [0]:
def fn_create_table_peliculas(catalogo: str, esquema: str):
   if spark.catalog.tableExists(f"{catalogo}.{esquema}.TBL_PELICULAS"):
      print(f"La tabla ya existe: {catalogo}.{esquema}.TBL_PELICULAS")
   else:
      spark.sql(f"""CREATE TABLE IF NOT EXISTS {catalogo}.{esquema}.TBL_PELICULAS (
        ID_PELICULA INT,
        DES_PELICULA STRING,
        DES_GENERO_PELICULA STRING,
        DES_IDIOMA_PELICULA STRING,
        VAL_PUNTUACION_USUARIO DECIMAL(10,0),
        VAL_DURACION_PELICULA_HORA INT,
        VAL_DURACION_PELICULA_MINUTOS INT,
        FEC_LANZAMIENTO DATE,
        VAL_NUMERO_VOTOS INT,
        FEC_CARGA TIMESTAMP
        )
      USING DELTA
      LOCATION '{storageLocation}/bronze-jptq/TBL_PELICULAS'""")
      print(f"Tabla creada correctamente: {catalogo}.{esquema}.TBL_PELICULAS")

In [0]:
fn_create_table_peliculas(catalogo, esquema)

In [0]:
def fn_truncate_table_peliculas(catalogo: str, esquema: str):
    spark.sql(f"""
    TRUNCATE TABLE {catalogo}.{esquema}.TBL_PELICULAS
    """)  

In [0]:
fn_truncate_table_peliculas(catalogo, esquema)

In [0]:
def fn_insert_table_peliculas(catalogo: str, esquema: str):
  spark.sql(f"""
  INSERT INTO {catalogo}.{esquema}.TBL_PELICULAS
  SELECT * FROM tmp_movies_final_df
  """) 

In [0]:
fn_insert_table_peliculas(catalogo, esquema) 

In [0]:
%sql
SELECT
*
FROM
catalog_jptq.bronze_jptq.tbl_peliculas