#Customer Pipeline

## JSON -> BRONZE

In [0]:
CREATE STREAMING TABLE 1_bronze_db.customers_bronze_raw
  COMMENT "Raw data from customers CDC feed"
  TBLPROPERTIES (
    "quality" = "bronze"
  )
AS
SELECT
  *, 
  current_timestamp() AS processing_time,
  _metadata.file_name as source_file
FROM STREAM read_files(
  "${source}/customers",
  format => "json"
);

##Quality enforcement

In [0]:
CREATE STREAMING TABLE 1_bronze_db.customers_bronze_clean
(
  CONSTRAINT valid_id EXPECT (customer_id IS NOT NULL) ON VIOLATION FAIL UPDATE,
  CONSTRAINT valid_operation EXPECT (operation IS NOT NULL) ON VIOLATION DROP ROW,
  CONSTRAINT valid_name EXPECT (name IS NOT NULL OR operation = "DELETE"),
  CONSTRAINT valid_address EXPECT (
    ( address IS NOT NULL and city IS NOT NULL and
      state IS NOT NULL ) OR
    operation = "DELETE"),
  CONSTRAINT valid_email EXPECT (
    rlike(email, '^[a-zA-Z0-9_\\-\\.]+)@([a-zA-Z0-9_\\-\\.]+)\\.([a-zA-Z]{2,5})$') OR
    operation = "DELETE") ON VIOLATION DROP ROW
)
COMMENT 'Raw customer data'
AS
SELECT *, CAST(from_unixtime(timestamp) AS timestamp) AS timestamp_datetime
FROM STREAM 1_bronze_db.customers_bronze_raw



### Processing CDC Data. BRONZE => SILVER

In [0]:
-- Create the streaming table without schema
CREATE OR REFRESH STREAMING TABLE 2_silver_db.customers_silver
  COMMENT "SDC Type 2 Historical (Customer Data)";

-- Apply CDC changes
APPLY CHANGES INTO 2_silver_db.customers_silver
FROM STREAM 1_bronze_db.customers_bronze_clean
KEYS (customer_id)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY timestamp_datetime
COLUMNS * EXCEPT (timestamp, _rescued_data, operation)
STORED AS SCD TYPE 2;


### SILVER => GOLD

In [0]:
CREATE OR REFRESH MATERIALIZED VIEW 3_gold_db.current_customers_gold
  COMMENT "Current customers list of active customers"
AS
SELECT 
  * EXCEPT (processing_time),
  current_timestamp() AS updated_at
FROM STREAM 2_silver_db.customers_silver
WHERE `__END_AT` IS NULL;