### Setup e Variáveis

In [1]:
from core.bemol_lakehouse import BemolLakeHouse
from core.bemol_controller import BemolController
from core.bemol_logger import BemolLogger
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum

In [None]:
# Instanciando classe de logger
logger = BemolLogger("silver_products_sales")

# Configurando Spark com Delta Lake
spark = (
  SparkSession.builder
  .appName("TransformacaoSilverProdutos")
  .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0")
  .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
  .getOrCreate()
)

# Intanciando classe de leitura/escrita de dados
lakehouse = BemolLakeHouse(spark, logger)

In [3]:
# Definindo paths de origem e destino
origin_path_carts = "../data/bronze/carts/"
origin_path_products = "../data/bronze/products/"
destination_path = "../data/silver/products_sales/"

# Path para salvar os dados de monitoramento
destination_path_monitor = "../data/monitoring/"

### Leitura

In [4]:
# Lendo dados da camada bronze usando a classe BemolLakeHouse
df_carts = lakehouse.read_bronze(origin_path_carts)
df_products = lakehouse.read_bronze(origin_path_products)

2025-10-16 21:13:07,378 - INFO - Iniciando operação: read_bronze
2025-10-16 21:13:07,383 - INFO - Lendo dados da camada Bronze: ../data/bronze/carts/
25/10/16 21:13:12 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
2025-10-16 21:13:16,357 - INFO - Dados lidos com sucesso da camada Bronze: 14 linhas, 6 colunas.
2025-10-16 21:13:16,358 - INFO - Operação read_bronze finalizada em 8.98 segundos.
2025-10-16 21:13:16,360 - INFO - Iniciando operação: read_bronze
2025-10-16 21:13:16,361 - INFO - Lendo dados da camada Bronze: ../data/bronze/products/
2025-10-16 21:13:17,896 - INFO - Dados lidos com sucesso da camada Bronze: 20 linhas, 9 colunas.
2025-10-16 21:13:17,897 - INFO - Operação read_bronze finalizada em 1.54 segundos.


### Tranformações

In [5]:
# Remove duplicatas
df_products = df_products.dropDuplicates(["id"])

# Agrega a quantidade total vendida por produto
df_sales = df_carts.groupBy("product_id").agg(
    sum("product_quantity").alias("total_quantity_sold")
)

In [6]:
df_sales.show(5, truncate=False)

[Stage 21:>                                                         (0 + 7) / 7]

+----------+-------------------+
|product_id|total_quantity_sold|
+----------+-------------------+
|7         |1                  |
|8         |1                  |
|5         |2                  |
|1         |20                 |
|2         |5                  |
+----------+-------------------+
only showing top 5 rows



                                                                                

In [29]:
# Enriquecer o dataframe de produtos com a quantidade total vendida
df_products_silver = df_products.join(df_sales, df_products.id == df_sales.product_id, "left").drop(df_sales.product_id)

# Seleciona as colunas relevantes
df_products_silver = df_products_silver.select("id", "product_title", "category", "rating_count", "rating", "price", "total_quantity_sold")

# Substitui valores nulos na coluna total_quantity_sold por 0
df_products_silver = df_products_silver.fillna(0, subset=["total_quantity_sold"])

# Calcula a receita total por produto
df_products_silver = df_products_silver.withColumn("total_revenue", col("price") * col("total_quantity_sold"))

In [30]:
df_products_silver.show(5)

+---+--------------------+--------------+------------+------+------+-------------------+-------------+
| id|       product_title|      category|rating_count|rating| price|total_quantity_sold|total_revenue|
+---+--------------------+--------------+------------+------+------+-------------------+-------------+
|  1|Fjallraven - Fold...|men's clothing|         120|   3.9|109.95|                 20|       2199.0|
|  2|Mens Casual Premi...|men's clothing|         259|   4.1|  22.3|                  5|        111.5|
|  3|  Mens Cotton Jacket|men's clothing|         500|   4.7| 55.99|                  6|       335.94|
|  4|Mens Casual Slim Fit|men's clothing|         430|   2.1| 15.99|                  0|          0.0|
|  5|John Hardy Women'...|      jewelery|         400|   4.6| 695.0|                  2|       1390.0|
+---+--------------------+--------------+------------+------+------+-------------------+-------------+
only showing top 5 rows



### Escrita

In [31]:
# Adiciona coluna de timestamp de ingestão
df_products_silver = BemolController.control_field(df_products_silver, "silver")

In [32]:
df_products_silver.show(5)

+---+--------------------+--------------+------------+------+------+-------------------+-------------+--------------------+
| id|       product_title|      category|rating_count|rating| price|total_quantity_sold|total_revenue|    insertion_silver|
+---+--------------------+--------------+------------+------+------+-------------------+-------------+--------------------+
|  1|Fjallraven - Fold...|men's clothing|         120|   3.9|109.95|                 20|       2199.0|2025-10-16 21:25:...|
|  2|Mens Casual Premi...|men's clothing|         259|   4.1|  22.3|                  5|        111.5|2025-10-16 21:25:...|
|  3|  Mens Cotton Jacket|men's clothing|         500|   4.7| 55.99|                  6|       335.94|2025-10-16 21:25:...|
|  4|Mens Casual Slim Fit|men's clothing|         430|   2.1| 15.99|                  0|          0.0|2025-10-16 21:25:...|
|  5|John Hardy Women'...|      jewelery|         400|   4.6| 695.0|                  2|       1390.0|2025-10-16 21:25:...|
+---+---

In [33]:
# Escreve os dados na camada silver no formato Delta Lake e no modo overwrite como padrão
lakehouse.write_silver(df_products_silver, destination_path, table_name="silver_products_sales")

2025-10-16 21:25:21,246 - INFO - Iniciando operação: write_silver
2025-10-16 21:25:21,273 - INFO - Escrevendo dados na camada silver em ../data/silver/products_sales/
2025-10-16 21:25:24,067 - INFO - Dados escritos com sucesso na camada silver.
2025-10-16 21:25:24,382 - INFO - Métricas silver_products_sales: 20 linhas, 9 colunas.
2025-10-16 21:25:24,384 - INFO - Operação write_silver finalizada em 3.14 segundos.


In [34]:
# Escreve os dados de monitoramento no formato Delta Lake e no modo overwrite como padrão
lakehouse.monitor.export_delta(spark, destination_path_monitor)

2025-10-16 21:25:52,254 - INFO - Métricas exportadas com sucesso para ../data/monitoring/


DataFrame[table_name: string, row_count: bigint, col_count: bigint, timestamp: string]

In [35]:
df_monitor = spark.read.format("delta").load(destination_path_monitor)
df_monitor.show(5, truncate=False)

+---------------------+---------+---------+-------------------+
|table_name           |row_count|col_count|timestamp          |
+---------------------+---------+---------+-------------------+
|silver_products_sales|20       |9        |2025-10-16 21:25:24|
|bronze_products      |20       |9        |2025-10-16 20:27:04|
|bronze_carts         |14       |6        |2025-10-16 20:27:10|
|silver_users         |10       |8        |2025-10-16 21:10:37|
|bronze_users         |10       |13       |2025-10-16 19:46:39|
+---------------------+---------+---------+-------------------+

