## Widgets

In [0]:
dbutils.widgets.text("input_path", "abfss://datalake@dlsmde01user.dfs.core.windows.net/bronze/", "Input Path")

## 🧱 DLT Table: `transactions_bronze`
Esta tabla crea una capa **bronze** en el flujo de procesamiento de datos usando **Delta Live Tables (DLT)** con PySpark.

In [0]:
import dlt
from pyspark.sql.functions import *

@dlt.table(
  name="transactions_bronze",
  comment="Datos crudos de transacciones desde ADLS Gen2 usando Auto Loader",
)
def transactions_bronze():
    # Lee datos en tiempo real desde un almacenamiento en formato CSV
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "csv")  # Especifica el formato de archivo como CSV
        .option("cloudFiles.inferColumnTypes", "true")  # Inferir tipos de columna automáticamente
        .load(dbutils.widgets.get("input_path"))  # Carga los datos desde la ruta especificada en el widget
    )

ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 642, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/spark/python/dlt/helpers.py", line 31, in call
    res = self.func()
          ^^^^^^^^^^^
  File "/root/.ipykernel/2092/command-6917130923330172-923502315", line 14, in transactions_bronze
    .load(dbutils.widgets.get("input_path"))  # Carga los datos desde la ruta especificada en el widget
     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/sql/streaming/readwriter.py", line 306, in load
    return self._df(self._jreader.load(path))
                    ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1355, 

py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "/databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 642, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/spark/python/dlt/helpers.py", line 31, in call
    res = self.func()
          ^^^^^^^^^^^
  File "/root/.ipykernel/2092/command-6917130923330172-923502315", line 14, in transactions_bronze
    .load(dbutils.widgets.get("input_path"))  # Carga los datos desde la ruta especificada en el widget
     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/sql/streaming/readwriter.py", line 306, in load
    return self._df(self._jreader.load(path))
                    ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1355, in __call__
    ret

## 🧪DLT Table: `transactions_silver`

Esta tabla representa la **capa silver** del pipeline de datos, en la cual se aplican **validaciones y transformaciones** a los datos crudos de transacciones previamente ingestados en la capa bronze.

### 📌 Descripción
`transactions_silver` limpia y transforma los datos de la tabla `transactions_bronze` para asegurar su calidad y prepararlos para análisis más avanzados o para construir la capa gold.

In [0]:
@dlt.table(
  name="transactions_silver",
  comment="Datos de transacciones validados y transformados"
)
def transactions_silver():
    df = dlt.read_stream("transactions_bronze")
    return (
        df.filter("transactionId IS NOT NULL AND amount IS NOT NULL")
        .withColumn("amount", col("amount").cast("double"))
        .withColumn("timestamp", to_timestamp("timestamp"))
        .withColumn("transactionType", upper(col("transactionType")))
    )

Name,Type
transactionId,string
userId,string
timestamp,timestamp
amount,double
currency,string
transactionType,string
merchantName,string
merchantCategory,string
paymentMethod,string
transactionStatus,string


## 🏆 DLT Table: `transactions_gold`

Esta tabla corresponde a la **capa gold** dentro del pipeline de datos, donde se generan **agregaciones finales listas para el análisis de negocio o consumo por dashboards**.

### 📊 Métricas generadas
`transactions_gold` calcula el **importe total de transacciones** agrupado por categoría de comercio (`merchantCategory`), lo que permite identificar el volumen de transacciones por sector comercial.

In [0]:
from pyspark.sql.functions import sum, round

@dlt.table(
  name="transactions_gold",
  comment="Total de importe de transacciones por categoría de comercio")
def transactions_gold():
    df = dlt.read_stream("transactions_silver")
    
    total_amount_by_market = (
        df.groupBy("merchantCategory")
        .agg(sum("amount").alias("total_amount"))
        .withColumn("total_amount", round("total_amount", 2))
    )

    return total_amount_by_market

Name,Type
merchantCategory,string
total_amount,double
