# Silver layer

Silver = curated and incremental staging: standardizes types, normalizes values (status, dates), deduplicates and prepares SCD. "Technical" tables with suffixes _clean, _dedup and operational customer SCD T2 (e.g.: `silver.dim_customer_scd`).

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS `workshop_modelagem`.`silver`;
USE CATALOG `workshop_modelagem`;

## Incremental approach (Databricks / Delta)
Use `MERGE INTO` + watermark by event/update time column (`order_date`, `updated_at`, `last_update_date` or `_ingestion_timestamp` if available). The entire Silver layer can (and should) be incremental:

### 1) Silver Orders – incremental cleaning
Objective: type dates/numerics, standardize status, remove critical nulls and handle duplicates by `order_id`.
On first build, create the table; on subsequent runs, MERGE by NK (`order_id`).

In [0]:
%sql
DROP TABLE silver.orders_clean;

In [0]:
%sql
-- =========================================================
-- SILVER: orders_clean (dedupe + idempotency via hash)
-- =========================================================
CREATE TABLE IF NOT EXISTS silver.orders_clean (
  order_id      STRING,
  customer_id   STRING,
  order_date    DATE,
  order_status  STRING,
  total_amount  DECIMAL(18,2),
  row_hash      STRING
) USING DELTA;

-- 1) Stage: normalize and parse order_date to timestamp (order_ts) + DATE
CREATE OR REPLACE TEMP VIEW stage_orders AS
SELECT
  order_id,
  customer_id,
  -- reference timestamp (supports multiple formats)
  COALESCE(
    try_to_timestamp(order_date, 'yyyy-MM-dd HH:mm:ss'),
    try_to_timestamp(order_date, 'yyyy/MM/dd HH:mm:ss'),
    try_to_timestamp(order_date, 'dd/MM/yyyy HH:mm:ss'),
    try_to_timestamp(order_date, 'dd-MM-yyyy HH:mm:ss'),
    try_to_timestamp(order_date, 'yyyy-MM-dd'),
    try_to_timestamp(order_date, 'yyyy/MM/dd'),
    try_to_timestamp(order_date, 'dd/MM/yyyy'),
    try_to_timestamp(order_date, 'dd-MM-yyyy')
  ) AS order_ts,
  UPPER(TRIM(order_status)) AS order_status_norm,
  CAST(regexp_replace(total_amount, ',', '.') AS DECIMAL(18,2)) AS total_amount_norm
FROM bronze.orders
WHERE order_id IS NOT NULL;

-- 2) Incremental window (60-day watermark)
CREATE OR REPLACE TEMP VIEW stage_orders_win AS
SELECT *
FROM stage_orders
WHERE order_ts >= date_sub(current_timestamp(), 60);

-- 3) Dedup: keep 1 row per order_id (most recent by order_ts)
CREATE OR REPLACE TEMP VIEW stage_orders_dedup AS
SELECT
  order_id,
  customer_id,
  CAST(order_ts AS DATE)          AS order_date,
  order_status_norm               AS order_status,
  total_amount_norm               AS total_amount,
  order_ts
FROM (
  SELECT
    s.*,
    ROW_NUMBER() OVER (
      PARTITION BY order_id
      ORDER BY order_ts DESC NULLS LAST,
               customer_id DESC            -- deterministic tie-breaker
    ) AS rn
  FROM stage_orders_win s
  WHERE order_ts IS NOT NULL
) z
WHERE rn = 1;

-- 4) Calculate hash for idempotency (avoid UPDATE without real change)
CREATE OR REPLACE TEMP VIEW stage_orders_final AS
SELECT
  order_id,
  customer_id,
  order_date,
  order_status,
  total_amount,
  sha2(concat_ws('||',
    coalesce(customer_id,''),
    coalesce(date_format(order_date,'yyyy-MM-dd'),''),
    coalesce(order_status,''),
    cast(coalesce(total_amount,0) as string)
  ), 256) AS row_hash
FROM stage_orders_dedup;

-- 5) Idempotent MERGE: only update when hash differs
MERGE INTO silver.orders_clean AS t
USING stage_orders_final AS s
ON t.order_id = s.order_id
WHEN MATCHED AND (t.row_hash IS NULL OR t.row_hash <> s.row_hash) THEN UPDATE SET
  t.customer_id  = s.customer_id,
  t.order_date   = s.order_date,
  t.order_status = s.order_status,
  t.total_amount = s.total_amount,
  t.row_hash     = s.row_hash
WHEN NOT MATCHED THEN INSERT (order_id, customer_id, order_date, order_status, total_amount, row_hash)
VALUES (s.order_id, s.customer_id, s.order_date, s.order_status, s.total_amount, s.row_hash);


### 2) Silver Order Items – incremental deduplication

Grain: (`order_id`, `product_id`) keeping the most recent by `updated_at`.
Create the table and MERGE by pair (`order_id`, `product_id`) with `ROW_NUMBER()`.

In [0]:
%sql
DROP TABLE silver.order_items_clean;

In [0]:
%sql
CREATE TABLE IF NOT EXISTS silver.order_items_clean (
  order_id       STRING,
  product_id     STRING,
  quantity       INT,
  unit_price     DECIMAL(18,2),
  discount       DECIMAL(18,2),
  promo_code     STRING,
  updated_at     TIMESTAMP,
  row_hash       STRING
) USING DELTA;

-- 1) Stage: normalize and parse
CREATE OR REPLACE TEMP VIEW stage_order_items AS
SELECT
  order_id,
  product_id,
  CAST(quantity AS INT) AS quantity,
  CAST(regexp_replace(unit_price, ',', '.') AS DECIMAL(18,2)) AS unit_price,
  CAST(regexp_replace(discount,   ',', '.') AS DECIMAL(18,2)) AS discount,
  promo_code,
  COALESCE(
    try_to_timestamp(updated_at, 'yyyy-MM-dd HH:mm:ss'),
    try_to_timestamp(updated_at, 'yyyy/MM/dd HH:mm:ss'),
    try_to_timestamp(updated_at, 'dd/MM/yyyy HH:mm:ss'),
    try_to_timestamp(updated_at, 'dd-MM-yyyy HH:mm:ss'),
    try_to_timestamp(updated_at, 'yyyy-MM-dd'),
    try_to_timestamp(updated_at, 'yyyy/MM/dd'),
    try_to_timestamp(updated_at, 'dd/MM/yyyy'),
    try_to_timestamp(updated_at, 'dd-MM-yyyy')
  ) AS parsed_updated_at
FROM bronze.order_items
WHERE order_id IS NOT NULL
  AND product_id IS NOT NULL;

-- 2) Incremental window (60-day watermark)
CREATE OR REPLACE TEMP VIEW stage_order_items_win AS
SELECT *
FROM stage_order_items
WHERE parsed_updated_at >= date_sub(current_timestamp(), 60);

-- 3) Dedup: keep 1 row per (order_id, product_id) pair, most recent updated_at
CREATE OR REPLACE TEMP VIEW stage_order_items_dedup AS
SELECT
  order_id,
  product_id,
  quantity,
  unit_price,
  discount,
  promo_code,
  parsed_updated_at AS updated_at
FROM (
  SELECT
    s.*,
    ROW_NUMBER() OVER (
      PARTITION BY order_id, product_id
      ORDER BY
        parsed_updated_at DESC NULLS LAST,
        quantity DESC,
        unit_price DESC
    ) AS rn
  FROM stage_order_items_win s
) z
WHERE rn = 1;

-- 4) Calculate row hash (cheap and stable comparison)
CREATE OR REPLACE TEMP VIEW stage_order_items_final AS
SELECT
  order_id,
  product_id,
  quantity,
  unit_price,
  discount,
  promo_code,
  updated_at,
  sha2(concat_ws('||',
    cast(coalesce(quantity,   0) as string),
    cast(coalesce(unit_price, 0) as string),
    cast(coalesce(discount,   0) as string),
    coalesce(promo_code,''),
    coalesce(date_format(updated_at,'yyyy-MM-dd HH:mm:ss'),'')
  ), 256) AS row_hash
FROM stage_order_items_dedup;

-- 5) Idempotent MERGE (only update when hash differs)
MERGE INTO silver.order_items_clean AS t
USING stage_order_items_final AS s
ON t.order_id = s.order_id AND t.product_id = s.product_id
WHEN MATCHED AND (
  t.row_hash IS NULL OR t.row_hash <> s.row_hash
) THEN UPDATE SET
  t.quantity   = s.quantity,
  t.unit_price = s.unit_price,
  t.discount   = s.discount,
  t.promo_code = s.promo_code,
  t.updated_at = s.updated_at,
  t.row_hash   = s.row_hash
WHEN NOT MATCHED THEN INSERT (
  order_id, product_id, quantity, unit_price, discount,
  promo_code, updated_at, row_hash
) VALUES (
  s.order_id, s.product_id, s.quantity, s.unit_price, s.discount,
  s.promo_code, s.updated_at, s.row_hash
);


### 3) Silver Customers – incremental SCD Type 2

SCD-2 with hash-based idempotency (avoids inserting duplicate versions).
Expire current rows that changed and insert new versions only when necessary.

In [0]:
%sql
DROP TABLE silver.dim_customer_scd;

In [0]:
%sql
-- =========================================================
-- SILVER: dim_customer_scd (SCD Type 2 with hash idempotency)
-- =========================================================
CREATE TABLE IF NOT EXISTS silver.dim_customer_scd (
  customer_id     STRING,
  customer_name   STRING,
  email           STRING,
  city            STRING,
  state           STRING,
  effective_start TIMESTAMP,
  effective_end   TIMESTAMP,
  is_current      BOOLEAN,
  row_hash        STRING
) USING DELTA;

-- 1) Normalize source data
CREATE OR REPLACE TEMP VIEW stage_customers_norm AS
SELECT
  customer_id,
  TRIM(customer_name) AS customer_name,
  LOWER(TRIM(email))  AS email,
  TRIM(city)          AS city,
  UPPER(TRIM(CASE
    WHEN UPPER(state) IN ('SP','SÃO PAULO','SAO PAULO') THEN 'SP'
    WHEN UPPER(state) IN ('RJ','RIO DE JANEIRO')         THEN 'RJ'
    WHEN UPPER(state) IN ('MG','MINAS GERAIS')           THEN 'MG'
    ELSE UPPER(state)
  END)) AS state,
  COALESCE(
    try_to_timestamp(last_update_date, 'yyyy-MM-dd HH:mm:ss'),
    try_to_timestamp(last_update_date, 'yyyy/MM/dd HH:mm:ss'),
    try_to_timestamp(last_update_date, 'dd/MM/yyyy HH:mm:ss'),
    try_to_timestamp(last_update_date, 'dd-MM-yyyy HH:mm:ss'),
    try_to_timestamp(last_update_date, 'yyyy-MM-dd'),
    try_to_timestamp(last_update_date, 'yyyy/MM/dd'),
    try_to_timestamp(last_update_date, 'dd/MM/yyyy'),
    try_to_timestamp(last_update_date, 'dd-MM-yyyy')
  ) AS src_ts
FROM bronze.customers
WHERE customer_id IS NOT NULL
  AND TRIM(customer_name) <> '';

-- 2) Incremental window (60-day watermark on source timestamp)
CREATE OR REPLACE TEMP VIEW stage_customers_win AS
SELECT *
FROM stage_customers_norm
WHERE src_ts >= date_sub(current_timestamp(), 60);

-- 3) Dedup: keep most recent row per customer_id
CREATE OR REPLACE TEMP VIEW stage_customers_dedup AS
SELECT
  customer_id,
  customer_name,
  email,
  city,
  state,
  src_ts
FROM (
  SELECT
    s.*,
    ROW_NUMBER() OVER (
      PARTITION BY customer_id
      ORDER BY src_ts DESC NULLS LAST,
               customer_name DESC
    ) AS rn
  FROM stage_customers_win s
) z
WHERE rn = 1;

-- 4) Calculate hash (ignoring customer_id and src_ts)
CREATE OR REPLACE TEMP VIEW stage_customers_hash AS
SELECT
  customer_id,
  customer_name,
  email,
  city,
  state,
  src_ts,
  sha2(concat_ws('||',
    coalesce(customer_name,''),
    coalesce(email,''),
    coalesce(city,''),
    coalesce(state,'')
  ), 256) AS source_hash
FROM stage_customers_dedup;

-- 5) Only process customers with changed data or new customers
CREATE OR REPLACE TEMP VIEW stage_customers_to_process AS
SELECT s.*
FROM stage_customers_hash s
LEFT JOIN silver.dim_customer_scd c
  ON c.customer_id = s.customer_id
 AND c.is_current  = TRUE
WHERE c.customer_id IS NULL           -- new customer
   OR c.row_hash <> s.source_hash;    -- changed attributes

-- 6) Expire current versions THAT changed (uses hash to avoid unnecessary updates)
MERGE INTO silver.dim_customer_scd AS tgt
USING stage_customers_hash AS src
ON  tgt.customer_id = src.customer_id
AND tgt.is_current  = TRUE
WHEN MATCHED AND tgt.row_hash <> src.source_hash THEN
  UPDATE SET
    tgt.effective_end = COALESCE(src.src_ts, current_timestamp()),
    tgt.is_current    = FALSE;

-- 7) Insert first version OR new version only when necessary
INSERT INTO silver.dim_customer_scd (
  customer_id, customer_name, email, city, state,
  effective_start, effective_end, is_current, row_hash
)
SELECT
  s.customer_id,
  s.customer_name,
  s.email,
  s.city,
  s.state,
  COALESCE(s.src_ts, current_timestamp()) AS effective_start,
  TIMESTAMP('9999-12-31')                 AS effective_end,
  TRUE                                    AS is_current,
  s.source_hash                           AS row_hash
FROM stage_customers_hash s
LEFT JOIN silver.dim_customer_scd c
  ON c.customer_id = s.customer_id AND c.is_current = TRUE
WHERE c.customer_id IS NULL           -- new customer
   OR c.row_hash <> s.source_hash;    -- real change


### 4) Silver Products – incremental cleaning

Standardize types/fields and keep ready to publish in `gold.dim_produto`.

In [0]:
%sql
DROP TABLE silver.products_clean;

In [0]:
%sql
CREATE TABLE IF NOT EXISTS silver.products_clean (
  product_id   STRING,
  product_name STRING,
  category     STRING,
  subcategory  STRING,
  brand        STRING,
  cost_price   DECIMAL(18,2),
  list_price   DECIMAL(18,2),
  is_active    STRING,
  last_update  TIMESTAMP,
  row_hash     STRING
) USING DELTA;

-- 1) Stage: normalize and parse
CREATE OR REPLACE TEMP VIEW stage_products AS
SELECT
  product_id,
  product_name,
  category,
  subcategory,
  brand,
  CAST(regexp_replace(cost_price, ',', '.') AS DECIMAL(18,2)) AS cost_price,
  CAST(regexp_replace(list_price, ',', '.') AS DECIMAL(18,2)) AS list_price,
  UPPER(CAST(is_active AS STRING)) AS is_active,
  COALESCE(
    try_to_timestamp(last_update, 'yyyy-MM-dd HH:mm:ss'),
    try_to_timestamp(last_update, 'yyyy/MM/dd HH:mm:ss'),
    try_to_timestamp(last_update, 'dd/MM/yyyy HH:mm:ss'),
    try_to_timestamp(last_update, 'dd-MM-yyyy HH:mm:ss'),
    try_to_timestamp(last_update, 'yyyy-MM-dd'),
    try_to_timestamp(last_update, 'yyyy/MM/dd'),
    try_to_timestamp(last_update, 'dd/MM/yyyy'),
    try_to_timestamp(last_update, 'dd-MM-yyyy')
  ) AS parsed_last_update
FROM bronze.products
WHERE product_id IS NOT NULL;

-- 2) Deterministic dedup (latest by last_update; ties with stable ordering)
CREATE OR REPLACE TEMP VIEW stage_products_dedup AS
SELECT
  product_id,
  product_name,
  category,
  subcategory,
  brand,
  cost_price,
  list_price,
  is_active,
  parsed_last_update AS last_update
FROM (
  SELECT
    p.*,
    ROW_NUMBER() OVER (
      PARTITION BY product_id
      ORDER BY
        parsed_last_update DESC NULLS LAST,
        product_name DESC,
        category DESC,
        subcategory DESC,
        brand DESC
    ) AS rn
  FROM stage_products p
) z
WHERE rn = 1;

-- 3) Calculate row hash (cheap and stable comparison)
CREATE OR REPLACE TEMP VIEW stage_products_final AS
SELECT
  product_id,
  product_name,
  category,
  subcategory,
  brand,
  cost_price,
  list_price,
  is_active,
  last_update,
  sha2(concat_ws('||',
    coalesce(product_name,''),
    coalesce(category,''),
    coalesce(subcategory,''),
    coalesce(brand,''),
    cast(coalesce(cost_price,   0) as string),
    cast(coalesce(list_price,   0) as string),
    coalesce(is_active,''),
    coalesce(date_format(last_update,'yyyy-MM-dd HH:mm:ss'), '')
  ), 256) AS row_hash
FROM stage_products_dedup;

-- 4) Idempotent MERGE (only update when hash differs)
MERGE INTO silver.products_clean AS t
USING stage_products_final AS s
ON t.product_id = s.product_id
WHEN MATCHED AND (
  t.row_hash IS NULL OR t.row_hash <> s.row_hash
) THEN UPDATE SET
  t.product_name = s.product_name,
  t.category     = s.category,
  t.subcategory  = s.subcategory,
  t.brand        = s.brand,
  t.cost_price   = s.cost_price,
  t.list_price   = s.list_price,
  t.is_active    = s.is_active,
  t.last_update  = s.last_update,
  t.row_hash     = s.row_hash
WHEN NOT MATCHED THEN INSERT (
  product_id, product_name, category, subcategory, brand,
  cost_price, list_price, is_active, last_update, row_hash
) VALUES (
  s.product_id, s.product_name, s.category, s.subcategory, s.brand,
  s.cost_price, s.list_price, s.is_active, s.last_update, s.row_hash
);
