## 1. Data Quality Validation

In [0]:
%sql
-- 1. Checking invalid customers
CREATE OR REPLACE TABLE ecommerce.silver.invalid_customers AS
SELECT *
FROM ecommerce.silver.slv_customers
WHERE customer_id IS NULL;

In [0]:
%sql
SELECT COUNT(*) AS invalid_record_count
FROM ecommerce.silver.invalid_customers;


In [0]:
%sql
-- 2. Checking duplicate customers
CREATE OR REPLACE TABLE ecommerce.silver.duplicate_customers AS
SELECT customer_id, COUNT(*) AS cnt
FROM ecommerce.silver.slv_customers
GROUP BY customer_id
HAVING COUNT(*) > 1;


In [0]:
%sql
SELECT COUNT(*) AS duplicate_record_count
FROM ecommerce.silver.duplicate_customers;

In [0]:
%sql
-- 3. Checking invalid products
CREATE OR REPLACE TABLE ecommerce.silver.invalid_products AS
SELECT *
FROM ecommerce.silver.slv_order_items
WHERE product_id IS NULL OR unit_price <= 0;

In [0]:
%sql
SELECT COUNT(*) AS invalid_record_count
FROM ecommerce.silver.invalid_products

In [0]:
%sql
DESCRIBE ecommerce.silver.slv_products;


## 2. Quarantine / Rejected Data

In [0]:
%sql
-- Quarantine customers with invalid IDs
CREATE OR REPLACE TABLE ecommerce.silver.quarantine_customers AS
SELECT *,
'NULL customer_id' AS rejection_reason
FROM ecommerce.silver.slv_customers
WHERE customer_id IS NULL;


In [0]:
%sql
SELECT rejection_reason, COUNT(*) AS record_count
FROM ecommerce.silver.quarantine_customers
GROUP BY rejection_reason;


In [0]:
%sql
-- Quarantine invalid order items
CREATE OR REPLACE TABLE ecommerce.silver.quarantine_order_items AS
SELECT *,
       CASE
         WHEN order_id IS NULL THEN 'NULL order_id'
         WHEN product_id IS NULL THEN 'NULL product_id'
         WHEN quantity <= 0 THEN 'Invalid quantity'
         WHEN unit_price < 0 THEN 'Invalid unit_price'
         ELSE 'Unknown'
       END AS rejection_reason
FROM ecommerce.silver.slv_order_items
WHERE order_id IS NULL
   OR product_id IS NULL
   OR quantity <= 0
   OR unit_price < 0;

In [0]:
%sql
-- validation of our quarantineed data
SELECT rejection_reason, COUNT(*) AS record_count
FROM ecommerce.silver.quarantine_order_items
GROUP BY rejection_reason;


## Incremental Load Simulation

In [0]:
%sql
CREATE TABLE IF NOT EXISTS ecommerce.silver.pipeline_metadata (
    pipeline_name STRING,
    last_processed_date DATE
);


In [0]:
%sql
INSERT INTO ecommerce.silver.pipeline_metadata
VALUES ('order_items_pipeline', DATE('2000-01-01'));

In [0]:
%sql
-- Incremental logic
SELECT *
FROM ecommerce.silver.slv_order_items
WHERE dt >
(
  SELECT last_processed_date
  FROM ecommerce.silver.pipeline_metadata
  WHERE pipeline_name = 'order_items_pipeline'
);


In [0]:
%sql
-- Updating the watermark
UPDATE ecommerce.silver.pipeline_metadata
SET last_processed_date =
(
  SELECT MAX(dt)
  FROM ecommerce.silver.slv_order_items
)
WHERE pipeline_name = 'order_items_pipeline';