In [0]:
from pyspark.sql.functions import col, lit, struct, current_timestamp, current_date
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, BooleanType

####Leyendo todos los archivos .json nuevos que llegaron a las carpetas del DBFS

In [0]:
app_df = spark.read.json("/temp/trx_app/*.json")
web_df = spark.read.json("/temp/trx_web/*.json")
atm_df = spark.read.json("/temp/trx_atm/*.json")

####Completando columnas faltantes en cada df para luego poder ser unidos

In [0]:
app_df_agg = app_df.withColumn("transaccion",col("transaccion").withField(
                                                                "detalles_web",struct(
                                                                    lit("0.0.0.0").cast("string").alias("ip_origen"),
                                                                    lit("").cast("string").alias("navegador"),
                                                                    lit("").cast("string").alias("dispositivo")         
                                                                    )
                                                                )
                    ) \
                    .withColumn("transaccion",col("transaccion").withField(
                                                                "ubicacion_atm",struct(
                                                                    lit("").cast("string").alias("ciudad"), 
                                                                    lit("").cast("string").alias("dirección"),
                                                                    lit("").cast("string").alias("codigo_atm")
                                                                    )
                                                                )
                    ) \
                    .withColumn("transaccion",col("transaccion").withField(
                                                                "tarjeta",struct(
                                                                    lit("").cast("string").alias("numero_tarjeta"),
                                                                    lit("").cast("string").alias("titular")
                                                                    )
                                                                )
                    ) \
                    .withColumn("transaccion",col("transaccion").withField(
                                                                "autenticacion",struct(
                                                                    col("transaccion.autenticacion.metodo"),
                                                                    col("transaccion.autenticacion.exitoso"),
                                                                    lit("true").cast("string").alias("codigo_enviado")
                                                                    )
                                                                )

                    ) 

web_df_agg = web_df.withColumn("transaccion", col("transaccion").withField(
                                                                "detalles_app",struct(
                                                                    lit("").cast("string").alias("dispositivo"),
                                                                    lit("").cast("string").alias("sistema_operativo"),
                                                                    lit("").cast("string").alias("version_app")
                                                                    )
                                                                )
                    ) \
                    .withColumn("transaccion", col("transaccion").withField(
                                                                "ubicacion",struct(
                                                                    lit("").cast("double").alias("latitud"),
                                                                    lit("").cast("double").alias("longitud")
                                                                    )
                                                                )
                    ) \
                    .withColumn("transaccion",col("transaccion").withField(
                                                                "ubicacion_atm",struct(
                                                                    lit("").cast("string").alias("ciudad"), 
                                                                    lit("").cast("string").alias("dirección"),
                                                                    lit("").cast("string").alias("codigo_atm")
                                                                    )
                                                                )
                    ) \
                    .withColumn("transaccion",col("transaccion").withField(
                                                                "tarjeta",struct(
                                                                    lit("").cast("string").alias("numero_tarjeta"),
                                                                    lit("").cast("string").alias("titular")
                                                                    )
                                                                )
                    ) \
                    .withColumn("transaccion",col("transaccion").withField(
                                                                "autenticacion",struct(
                                                                    col("transaccion.autenticacion.metodo"),
                                                                    col("transaccion.autenticacion.codigo_enviado"),
                                                                    lit("true").cast("string").alias("exitoso")
                                                                    )
                                                                )
                    )

atm_df_agg = atm_df.withColumn("transaccion", col("transaccion").withField(
                                                                "detalles_app",struct(
                                                                    lit("").cast("string").alias("dispositivo"),
                                                                    lit("").cast("string").alias("sistema_operativo"),
                                                                    lit("").cast("string").alias("version_app")
                                                                    )
                                                                )
                    ) \
                    .withColumn("transaccion",col("transaccion").withField(
                                                                "detalles_web",struct(
                                                                    lit("0.0.0.0").cast("string").alias("ip_origen"),
                                                                    lit("").cast("string").alias("navegador"),
                                                                    lit("").cast("string").alias("dispositivo")         
                                                                    )
                                                                )
                    ) \
                    .withColumn("transaccion", col("transaccion").withField(
                                                                "ubicacion",struct(
                                                                    lit("").cast("double").alias("latitud"),
                                                                    lit("").cast("double").alias("longitud")
                                                                    )
                                                                )
                    ) \
                    .withColumn("transaccion", col("transaccion").withField(
                                                                "autenticacion",struct(
                                                                    lit("").cast("string").alias("metodo"),
                                                                    lit("true").cast("string").alias("exitoso"),
                                                                    lit("true").cast("string").alias("codigo_enviado")
                                                                    )
                                                                )
                    ) \
                    .withColumn("transaccion", col("transaccion").withField(
                                                                "cuenta_destino",struct(
                                                                    lit("").cast("string").alias("numero_cuenta"),
                                                                    lit("").cast("string").alias("banco"),
                                                                    lit("").cast("string").alias("titular")
                                                                    )
                                                                )
                    )

####Uniendo cada df en uno solo llamado trx_bronze_df

In [0]:
trx_bronze_df = app_df_agg.unionByName(web_df_agg, allowMissingColumns=True) \
                          .unionByName(atm_df_agg, allowMissingColumns=True)

####Se le agrega las columnas de fecha

In [0]:
trx_bronze_df = trx_bronze_df.withColumn("fecha_auditoria", current_date()) \
                            .withColumn("timestamp_auditoria", current_timestamp())

####Se lee la tabla delta anterior (o la generada en la primera ingesta)

In [0]:
previous_delta_table_df = spark.read.format("delta").load("catalgbanco.capabronze.trx_bronze")

####Se une la tabla previa con la tabla nueva

In [0]:
unioned_table_df = trx_bronze_df.union(previous_delta_table_df)

####Se guarda la tabla resultante en el catalogo y esquema de la capa bronze con el nombre trx_bronze

In [0]:
unioned_table_df.write.format("delta").mode("overwrite").saveAsTable("catalgbanco.capabronze.trx_bronze")

####Se optimiza la tabla delta

In [0]:
spark.sql("OPTIMIZE catalgbanco.capabronze.trx_bronze")