In [0]:
/* =============================================================================
   21_silver_streaming_create.sql
   Purpose
     Create/Refresh Silver streaming tables (Bronze ➜ Silver) with all
     cleansing and business rules equivalent to the original T-SQL procedure.

   Notes
     • Requires Bronze streaming tables to exist and be populated.
     • We include dwh_create_date explicitly (Option A).
     • Date handling in crm_sales_details guards invalid yyyymmdd values
       before calling to_date() to avoid parsing failures.
============================================================================= */

USE CATALOG datawarehouse;
USE SCHEMA silver;

/* ------------------------------------------------------------------------- */
/* Safeguard: drop any existing non-streaming tables with the same names     */
/* ------------------------------------------------------------------------- */
DROP TABLE IF EXISTS crm_cust_info;
DROP TABLE IF EXISTS crm_prd_info;
DROP TABLE IF EXISTS crm_sales_details;
DROP TABLE IF EXISTS erp_loc_a101;
DROP TABLE IF EXISTS erp_cust_az12;
DROP TABLE IF EXISTS erp_px_cat_g1v2;

/* ---------------------------------------------------------------------------
   CRM: Customers  (trim, normalize values, latest row per cst_id)
--------------------------------------------------------------------------- */
CREATE OR REFRESH STREAMING TABLE crm_cust_info AS
WITH src AS (
  SELECT
    try_cast(cst_id AS INT)                                        AS cst_id,
    cst_key,
    trim(cst_firstname)                                            AS cst_firstname,
    trim(cst_lastname)                                             AS cst_lastname,
    CASE upper(trim(cst_marital_status))
      WHEN 'S' THEN 'Single'
      WHEN 'M' THEN 'Married'
      ELSE 'n/a'
    END                                                            AS std_marital_status,
    CASE upper(trim(cst_gndr))
      WHEN 'F' THEN 'Female'
      WHEN 'M' THEN 'Male'
      ELSE 'n/a'
    END                                                            AS std_gndr,
    to_date(cst_create_date)                                       AS cst_create_date,
    coalesce(to_timestamp(cst_create_date), timestamp '1900-01-01 00:00:00')
                                                                  AS sort_ts
  FROM STREAM datawarehouse.bronze.crm_cust_info
  WHERE cst_id IS NOT NULL
),
agg AS (
  SELECT
    cst_id,
    -- pick the entire structured row associated with the max create timestamp
    max_by(
      named_struct(
        'cst_key',           cst_key,
        'cst_firstname',     cst_firstname,
        'cst_lastname',      cst_lastname,
        'cst_marital_status',std_marital_status,
        'cst_gndr',          std_gndr,
        'cst_create_date',   cst_create_date
      ),
      sort_ts
    ) AS rec
  FROM STREAM src
  GROUP BY cst_id
)
SELECT
  cst_id,
  rec.cst_key            AS cst_key,
  rec.cst_firstname      AS cst_firstname,
  rec.cst_lastname       AS cst_lastname,
  rec.cst_marital_status AS cst_marital_status,
  rec.cst_gndr           AS cst_gndr,
  rec.cst_create_date    AS cst_create_date,
  current_timestamp()    AS dwh_create_date
FROM STREAM agg;

/* ---------------------------------------------------------------------------
   CRM: Products  (cat_id from prd_key, decode prd_line, start/end dating)
--------------------------------------------------------------------------- */
CREATE OR REPLACE STREAMING TABLE crm_prd_info AS
SELECT
  try_cast(prd_id AS INT)                                 AS prd_id,
  regexp_replace(substr(prd_key, 1, 5), '-', '_')         AS cat_id,        -- CATxx_
  substr(prd_key, 7)                                      AS prd_key,       -- part after 'CATxx-'
  prd_nm,
  coalesce(try_cast(prd_cost AS INT), 0)                  AS prd_cost,
  CASE upper(trim(prd_line))
    WHEN 'M' THEN 'Mountain'
    WHEN 'R' THEN 'Road'
    WHEN 'S' THEN 'Other Sales'
    WHEN 'T' THEN 'Touring'
    ELSE 'n/a'
  END                                                     AS prd_line,
  to_date(prd_start_dt)                                   AS prd_start_dt,
  CAST(NULL AS DATE)                                      AS prd_end_dt,    -- compute later in Gold
  current_timestamp()                                     AS dwh_create_date
FROM STREAM datawarehouse.bronze.crm_prd_info;

/* ---------------------------------------------------------------------------
   CRM: Sales Details
   - Safe parse yyyymmdd ➜ DATE (guard invalid month/day)
   - Recompute sales when inconsistent; derive price if missing
--------------------------------------------------------------------------- */
CREATE OR REFRESH STREAMING TABLE crm_sales_details AS
WITH raw AS (
  SELECT
    sls_ord_num,
    sls_prd_key,
    try_cast(sls_cust_id AS INT)                              AS sls_cust_id,
    cast(sls_order_dt AS STRING)                              AS order_str,
    cast(sls_ship_dt  AS STRING)                              AS ship_str,
    cast(sls_due_dt   AS STRING)                              AS due_str,
    try_cast(sls_sales    AS INT)                             AS _sls_sales,
    try_cast(sls_quantity AS INT)                             AS _sls_quantity,
    try_cast(sls_price    AS INT)                             AS _sls_price
  FROM STREAM datawarehouse.bronze.crm_sales_details
),
v AS (
  SELECT
    *,
    try_cast(substr(order_str,5,2) AS INT) AS order_mm,
    try_cast(substr(order_str,7,2) AS INT) AS order_dd,
    try_cast(substr(ship_str ,5,2) AS INT) AS ship_mm,
    try_cast(substr(ship_str ,7,2) AS INT) AS ship_dd,
    try_cast(substr(due_str  ,5,2) AS INT) AS due_mm,
    try_cast(substr(due_str  ,7,2) AS INT) AS due_dd
  FROM STREAM raw
)
SELECT
  sls_ord_num,
  sls_prd_key,
  sls_cust_id,

  CASE
    WHEN order_str IS NOT NULL
     AND length(order_str) = 8
     AND order_mm BETWEEN 1 AND 12
     AND order_dd BETWEEN 1 AND 31
  THEN to_date(order_str,'yyyyMMdd') END                      AS sls_order_dt,

  CASE
    WHEN ship_str IS NOT NULL
     AND length(ship_str) = 8
     AND ship_mm BETWEEN 1 AND 12
     AND ship_dd BETWEEN 1 AND 31
  THEN to_date(ship_str,'yyyyMMdd') END                       AS sls_ship_dt,

  CASE
    WHEN due_str IS NOT NULL
     AND length(due_str) = 8
     AND due_mm BETWEEN 1 AND 12
     AND due_dd BETWEEN 1 AND 31
  THEN to_date(due_str,'yyyyMMdd') END                        AS sls_due_dt,

  CASE
    WHEN _sls_sales IS NULL OR _sls_sales <= 0
         OR _sls_sales <> (_sls_quantity * abs(_sls_price))
    THEN _sls_quantity * abs(_sls_price)
    ELSE _sls_sales
  END                                                         AS sls_sales,

  _sls_quantity                                               AS sls_quantity,

  CASE
    WHEN _sls_price IS NULL OR _sls_price <= 0
    THEN cast(_sls_sales AS DOUBLE) / nullif(_sls_quantity,0)
    ELSE _sls_price
  END                                                         AS sls_price,

  current_timestamp()                                         AS dwh_create_date
FROM STREAM v;

/* ---------------------------------------------------------------------------
   ERP: Customer Demographics  (strip NAS-, future DOB ➜ NULL, normalize gender)
--------------------------------------------------------------------------- */
CREATE OR REFRESH STREAMING TABLE erp_cust_az12 AS
SELECT
  CASE WHEN cid LIKE 'NAS%' THEN substr(cid,4) ELSE cid END   AS cid,
  CASE WHEN to_date(bdate) > current_date() THEN NULL ELSE to_date(bdate) END AS bdate,
  CASE
    WHEN upper(trim(gen)) IN ('F','FEMALE') THEN 'Female'
    WHEN upper(trim(gen)) IN ('M','MALE')   THEN 'Male'
    ELSE 'n/a'
  END                                                         AS gen,
  current_timestamp()                                         AS dwh_create_date
FROM STREAM datawarehouse.bronze.erp_cust_az12;

/* ---------------------------------------------------------------------------
   ERP: Locations  (normalize country, strip '-' in cid)
--------------------------------------------------------------------------- */
CREATE OR REFRESH STREAMING TABLE erp_loc_a101 AS
SELECT
  replace(cid,'-','')                                         AS cid,
  CASE
    WHEN trim(cntry) = 'DE'            THEN 'Germany'
    WHEN trim(cntry) IN ('US','USA')   THEN 'United States'
    WHEN trim(cntry) = '' OR cntry IS NULL THEN 'n/a'
    ELSE trim(cntry)
  END                                                         AS cntry,
  current_timestamp()                                         AS dwh_create_date
FROM STREAM datawarehouse.bronze.erp_loc_a101;

/* ---------------------------------------------------------------------------
   ERP: Product Categories  (pass-through)
--------------------------------------------------------------------------- */
CREATE OR REFRESH STREAMING TABLE erp_px_cat_g1v2 AS
SELECT
  id,
  cat,
  subcat,
  maintenance,
  current_timestamp()                                         AS dwh_create_date
FROM STREAM datawarehouse.bronze.erp_px_cat_g1v2;
