###  Process the customers data

  - Ingest the data into the data lakehouse - bronze_customers
  - Perform data quality checks and trnsform the data as required - silver_customers_clean
  - Apply chnages to the customers data - silver_customers



#### Create streaming table


In [0]:
CREATE OR REFRESH STREAMING TABLE bronze_customers
TBLPROPERTIES ('quality' = 'bronze')
AS
SELECT *,
       _metadata.file_path AS input_file_path,
       current_timestamp AS ingestion_timestamp
FROM cloud_files(
  '/Volumes/circuitbox/landing/operational_data/customers/',
  'json',
  map(
    'cloudFiles.inferColumnTypes','true',
    'cloudFiles.schemaLocation', '/Volumes/circuitbox/default/checkpoints_v/bronze_customers_schema'
  )
);


### silver_customers_clean
- Use expectations to perform data quality checks
- we use keyword 'Stream' to tell that to only load the incremental data from bronze table
- And the keyword 'LIVE' can only be used inside same notebooks to get the table...it is DLT syntax.

In [0]:
CREATE OR REFRESH STREAMING TABLE silver_customers_clean(
  CONSTRAINT valid_customer_id  EXPECT(customer_id IS NOT NULL) ON VIOLATION FAIL UPDATE,
  CONSTRAINT valid_customer_name EXPECT(customer_name IS NOT NULL) ON VIOLATION DROP ROW,
  CONSTRAINT valid_email EXPECT(email IS NOT NULL),
  CONSTRAINT valid_telephone EXPECT(LENGTH(telephone) >=10),
  CONSTRAINT valid_created_date EXPECT(created_date IS NOT NULL),
  CONSTRAINT valid_date_of_birth EXPECT(date_of_birth > '1920-01-01')
)
TBLPROPERTIES ('quality' = 'silver')
AS
SELECT 
customer_id,
customer_name,
CAST(date_of_birth AS DATE) as date_of_birth,
telephone,
email,
CAST(created_date AS DATE) as created_date
FROM STREAM(LIVE.bronze_customers)

### Apply changes to silver_customers_clean

In [0]:
CREATE OR REFRESH STREAMING TABLE silver_customers
TBLPROPERTIES ('quality' = 'silver')

In [0]:
APPLY CHANGES INTO LIVE.silver_customers
FROM STREAM(LIVE.silver_customers_clean)
KEYS (customer_id)
SEQUENCE BY created_date
STORED AS SCD TYPE 1