In [None]:
%sql
-- ============================================================
-- SCD TYPE 2 - Equipment (Silver) com HASH + DEDUP incremental
-- ============================================================
CREATE TABLE IF NOT EXISTS silver.equipment_scd (
  equipment_sk      BIGINT GENERATED ALWAYS AS IDENTITY,
  equipment_id      STRING,
  equipment_name    STRING,
  equipment_type    STRING,
  location          STRING,
  manufacturer      STRING,
  model             STRING,
  status            STRING,
  effective_start   TIMESTAMP,
  effective_end     TIMESTAMP,
  is_current        BOOLEAN,
  row_hash          STRING
) USING DELTA;

-- Backfill do hash se necessário
UPDATE silver.equipment_scd
SET row_hash = COALESCE(row_hash,
  sha2(concat_ws('||',
    coalesce(equipment_name,''),
    coalesce(equipment_type,''),
    coalesce(location,''),
    coalesce(manufacturer,''),
    coalesce(model,''),
    coalesce(status,'')
  ), 256)
);

-- Stage bruto: parse robusto do last_update_date -> src_ts
CREATE OR REPLACE TEMP VIEW stage_equipment_scd_raw AS
SELECT
  equipment_id,
  equipment_name,
  equipment_type,
  location,
  manufacturer,
  model,
  UPPER(TRIM(status)) AS status_norm,
  COALESCE(
    try_to_timestamp(last_update_date, 'yyyy-MM-dd HH:mm:ss'),
    try_to_timestamp(last_update_date, 'yyyy/MM/dd HH:mm:ss'),
    try_to_timestamp(last_update_date, 'dd/MM/yyyy HH:mm:ss'),
    try_to_timestamp(last_update_date, 'dd-MM-yyyy HH:mm:ss'),
    try_to_timestamp(last_update_date, 'yyyy-MM-dd'),
    try_to_timestamp(last_update_date, 'yyyy/MM/dd'),
    try_to_timestamp(last_update_date, 'dd/MM/yyyy'),
    try_to_timestamp(last_update_date, 'dd-MM-yyyy')
  ) AS src_ts
FROM bronze.equipment_master
WHERE equipment_id IS NOT NULL;

-- Janela incremental (watermark de 90 dias)
CREATE OR REPLACE TEMP VIEW stage_equipment_scd_window AS
SELECT *
FROM stage_equipment_scd_raw
WHERE COALESCE(src_ts, current_timestamp()) >= date_sub(current_timestamp(), 90);

-- Dedup por equipment_id (última versão por src_ts)
CREATE OR REPLACE TEMP VIEW stage_equipment_scd_latest AS
SELECT
  equipment_id,
  equipment_name,
  equipment_type,
  location,
  manufacturer,
  model,
  status_norm AS status,
  src_ts
FROM (
  SELECT
    s.*,
    ROW_NUMBER() OVER (
      PARTITION BY equipment_id
      ORDER BY src_ts DESC NULLS LAST,
               equipment_name DESC,
               location DESC,
               status_norm DESC
    ) AS rn
  FROM stage_equipment_scd_window s
) z
WHERE rn = 1;

-- Calcula hash da "linha de negócio"
CREATE OR REPLACE TEMP VIEW stage_equipment_scd_hash AS
SELECT
  equipment_id,
  equipment_name,
  equipment_type,
  location,
  manufacturer,
  model,
  status,
  src_ts,
  sha2(concat_ws('||',
    coalesce(equipment_name,''),
    coalesce(equipment_type,''),
    coalesce(location,''),
    coalesce(manufacturer,''),
    coalesce(model,''),
    coalesce(status,'')
  ), 256) AS source_hash
FROM stage_equipment_scd_latest;

-- Expirar versões correntes QUE mudaram
MERGE INTO silver.equipment_scd AS tgt
USING stage_equipment_scd_hash AS src
ON  tgt.equipment_id = src.equipment_id
AND tgt.is_current  = TRUE
WHEN MATCHED AND tgt.row_hash <> src.source_hash THEN
  UPDATE SET
    tgt.effective_end = COALESCE(src.src_ts, current_timestamp()),
    tgt.is_current    = FALSE;

-- Inserir primeira versão OU nova versão apenas quando necessário
INSERT INTO silver.equipment_scd (
  equipment_id, equipment_name, equipment_type, location, manufacturer, model, status,
  effective_start, effective_end, is_current, row_hash
)
SELECT
  s.equipment_id,
  s.equipment_name,
  s.equipment_type,
  s.location,
  s.manufacturer,
  s.model,
  s.status,
  COALESCE(s.src_ts, current_timestamp()) AS effective_start,
  TIMESTAMP('9999-12-31')                 AS effective_end,
  TRUE                                    AS is_current,
  s.source_hash                           AS row_hash
FROM stage_equipment_scd_hash s
LEFT JOIN silver.equipment_scd c
  ON c.equipment_id = s.equipment_id AND c.is_current = TRUE
WHERE c.equipment_id IS NULL           -- novo equipamento
   OR c.row_hash <> s.source_hash;     -- mudança real
