In [1]:
from pyspark.sql import SparkSession
from datetime import datetime, timezone

ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")

spark = SparkSession.builder \
    .appName(f"Silver-product-item-{ts}") \
    .getOrCreate()

In [2]:
from pyspark.sql.functions import current_timestamp, to_utc_timestamp, col
from pyspark.sql.types import DecimalType

In [3]:
#first execution !!!

df_bronze = spark.read.parquet("data_lake/bronze/product_item")
df_bronze.createOrReplaceTempView("product_item")

df_product_item_silver = spark.sql("""
WITH 
base AS (
    SELECT 
        a.transaction_datetime,
        a.transaction_date,
        a.purchase_id,
        a.product_id,
        a.item_quantity,
        a.purchase_value,
        ROW_NUMBER() OVER (
            PARTITION BY a.product_id, a.transaction_datetime
            ORDER BY a.transaction_datetime DESC
        ) AS rn,
        to_utc_timestamp(current_timestamp(), 'UTC') AS line_created_at,
        a.ingestion_date AS bronze_ingestion_date
    FROM product_item a
)
SELECT *
FROM base
WHERE rn = 1
""")

# Salvar como Delta Table
df_product_item_silver.coalesce(1).write \
    .format("parquet") \
    .mode("overwrite") \
    .partitionBy("transaction_date") \
    .save("data_lake/silver/product_item")

In [4]:
from pyspark.sql.functions import (
    col,
    max as max_,
    row_number,
    current_timestamp,
    to_utc_timestamp
)
from pyspark.sql.window import Window
from pyspark.sql.utils import AnalysisException

SILVER_PATH = "data_lake/silver/product_item"
BRONZE_PATH = "data_lake/bronze/product_item"

# =====================================================
# 1. Ler Silver atual (se existir)
# =====================================================
try:
    df_silver_current = spark.read.parquet(SILVER_PATH)
except AnalysisException:
    df_silver_current = None


# =====================================================
# 2. Última ingestion processada
# =====================================================
last_ingestion = (
    df_silver_current
    .agg(max_("bronze_ingestion_date").alias("max_date"))
    .first()["max_date"]
) if df_silver_current is not None else None


# =====================================================
# 3. Ler Bronze (incremental ou full)
# =====================================================
df_bronze = spark.read.parquet(BRONZE_PATH)

df_bronze_incremental = (
    df_bronze.filter(col("ingestion_date") > last_ingestion)
    if last_ingestion
    else df_bronze
)


# =====================================================
# 4. Transformações Bronze → Silver
# =====================================================
df_bronze_ready = (
    df_bronze_incremental
    .withColumn(
        "line_created_at",
        to_utc_timestamp(current_timestamp(), "UTC")
    )
    .withColumnRenamed(
        "ingestion_date",
        "bronze_ingestion_date"
    )
)


# =====================================================
# 5. Union Silver + Bronze incremental
# =====================================================
df_union = (
    df_silver_current.drop("rn").unionByName(df_bronze_ready)
    if df_silver_current is not None
    else df_bronze_ready
)


# =====================================================
# 6. Deduplicação por evento (CDC)
# =====================================================
event_window = Window.partitionBy(
    "product_id",
    "transaction_datetime"
).orderBy(
    col("bronze_ingestion_date").desc()
)

df_dedup = (
    df_union
    .withColumn("rn", row_number().over(event_window))
    .filter(col("rn") == 1)
    .drop("rn")
)


# =====================================================
# 7. Flag de último estado por product
# =====================================================
product_window = Window.partitionBy(
    "product_id"
).orderBy(
    col("transaction_datetime").desc()
)

df_product_item_silver_final = (
    df_dedup
    .withColumn("rk", row_number().over(product_window))
    .withColumn("is_latest", col("rk") == 1)
    .drop("rk")
)


In [5]:
# =====================================================
# 7. Escrita final da Silver
# =====================================================
df_product_item_silver_final.write \
    .format("parquet") \
    .mode("overwrite") \
    .partitionBy("transaction_date") \
    .save("data_lake/silver/product_item")

In [6]:
# Criar view temporária
df_silver_final = spark.read.parquet("data_lake/silver/product_item")
df_silver_final.createOrReplaceTempView("product_silver")

spark.sql("SELECT * FROM product_silver").show()

+--------------------+-----------+----------+-------------+--------------+--------------------+---------------------+---------+----------------+
|transaction_datetime|purchase_id|product_id|item_quantity|purchase_value|     line_created_at|bronze_ingestion_date|is_latest|transaction_date|
+--------------------+-----------+----------+-------------+--------------+--------------------+---------------------+---------+----------------+
| 2023-02-26 03:00:00|         69|    373737|            2|       2000.00|2026-01-14 23:18:...| 2026-01-14 23:10:...|     true|      2023-02-26|
| 2023-01-25 23:59:59|         56|    808080|          120|       2400.00|2026-01-14 23:18:...| 2026-01-14 23:10:...|     true|      2023-01-25|
| 2023-07-12 09:00:00|         55|    696969|           10|         55.00|2026-01-14 23:18:...| 2026-01-14 23:10:...|     true|      2023-07-12|
| 2023-01-20 22:02:00|         55|    696969|           10|         50.00|2026-01-14 23:18:...| 2026-01-14 23:10:...|    false|   