In [0]:
CREATE STREAMING LIVE TABLE sales
AS SELECT *, current_timestamp() as ingestion_date FROM cloud_files(
  's3://dlt-pipeline-main/raw_files/sales/',
  'csv'
);

In [0]:
CREATE STREAMING TABLE sales_silver
(
  CONSTRAINT valid_order_id EXPECT (order_id IS NOT NULL) ON VIOLATION DROP ROW
)
AS
SELECT DISTINCT * FROM STREAM(LIVE.sales);

In [0]:
CREATE STREAMING LIVE TABLE customers
AS SELECT *, current_timestamp() as ingestion_date FROM cloud_files(
  's3://dlt-pipeline-main/raw_files/customers/'
);

In [0]:
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE customers_silver;

APPLY CHANGES INTO
  live.customers_silver
FROM
  stream(LIve.customers)
KEYS
  (customer_id)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation,sequenceNum ,_rescued_data,ingestion_date
)
STORED AS
  SCD TYPE 2;

In [0]:
create streaming table customers_silver_active as 
select customer_id,customer_name,customer_email,customer_city,customer_state from STREAM(live.customers_silver) where `__END_AT` is null

In [0]:
CREATE STREAMING LIVE TABLE products
AS SELECT *, current_timestamp() as ingestion_date FROM cloud_files(
  's3://dlt-pipeline-main/raw_files/products/'
);

In [0]:
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE product_silver;

APPLY CHANGES INTO
  live.product_silver
FROM
  stream(LIve.products)
KEYS
  (product_id)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  seqNum
COLUMNS * EXCEPT
  (operation,seqNum ,_rescued_data,ingestion_date
)
STORED AS
  SCD TYPE 1;


In [0]:
---What are the total sales and total discount accounts for each customer?
CREATE LIVE TABLE total_sales_customer AS
SELECT 
    c.customer_id,
    c.customer_name,
    ROUND(SUM(s.total_amount)) AS total_sales,
    SUM(s.discount_amount) AS total_discount
FROM LIVE.sales_silver s
JOIN LIVE.customers_silver c
    ON s.customer_id = c.customer_id
GROUP BY 
    c.customer_id, 
    c.customer_name
ORDER BY 
    total_sales DESC;

In [0]:
---What is the total sales based on category?
CREATE LIVE TABLE total_sales_category AS
SELECT 
    p.product_category,
    SUM(s.total_amount) AS total_sales
FROM LIVE.sales_silver s
JOIN LIVE.product_silver p
    ON s.product_id = p.product_id
GROUP BY 
    p.product_category 
ORDER BY 
    total_sales DESC;