In [0]:
create or refresh streaming table customer_bronze_table
as
select *,_metadata.file_name as file_name,_metadata.file_modification_time as load_time from cloud_files('abfss://data@storagescdsa.dfs.core.windows.net/catalog/project/landing/customers','csv',map('header','true','inferColumnTypes','true'))

In [0]:
create or refresh streaming table customer_updates_bronze_table
as
select *,_metadata.file_name as file_name,_metadata.file_modification_time as load_time,current_date as update_date from cloud_files('abfss://data@storagescdsa.dfs.core.windows.net/catalog/project/landing/customer_updates','csv',map('header','true','inferColumnTypes','true'))

In [0]:
create or refresh streaming table orders_bronze_table
as
select *,_metadata.file_name as file_name,_metadata.file_modification_time as load_time,current_date as update_date from cloud_files('abfss://data@storagescdsa.dfs.core.windows.net/catalog/project/landing/orders','csv',map('header','true','inferColumnTypes','true'))

In [0]:
create or refresh streaming table product_bronze_table
as
select *,_metadata.file_name as file_name,_metadata.file_modification_time as load_time,current_date as update_date from cloud_files('abfss://data@storagescdsa.dfs.core.windows.net/catalog/project/landing/products','csv',map('header','true','inferColumnTypes','true'))

In [0]:
CREATE OR REFRESH STREAMING TABLE customer_silver_main_table
(
  constraint valid_customer expect(customer_id IS NOT NULL) ON VIOLATION DROP ROW,
  constraint valid_email expect(email IS NOT NULL AND email LIKE '%@%.%') ON VIOLATION DROP ROW,
  constraint valid_phone expect(phone IS NOT NULL AND length(phone) = 10) ON VIOLATION DROP ROW
)
AS 
    SELECT 
        customer_id,
        trim(first_name)  AS first_name,
        trim(last_name)   AS last_name,
        trim(lower(email)) AS email,
        CASE 
            WHEN length(regexp_replace(phone, '[^0-9]', '')) > 10 THEN 
                substring(regexp_replace(phone, '[^0-9]', ''), -10, 10)
            ELSE 
                regexp_replace(phone, '[^0-9]', '')
        END AS phone,
        initcap(city) AS city,
        upper(state) AS state,
        cast(updated_date AS timestamp) AS updated_timestamp,
        load_time
    FROM STREAM(live.customer_bronze_table);


In [0]:
create or refresh streaming table customer_silver_cleaned_table;
apply changes into live.customer_silver_cleaned_table
from stream(live.customer_silver_main_table)
keys(customer_id)
sequence by load_time;

In [0]:
create or refresh streaming table customer_update_silver_main_table
(
  constraint valid_customer expect(customer_id is not null) on violation drop row,
  constraint valid_email expect((email is not null) and (email like '%@%.%')) on violation drop row,
  constraint valid_phone expect((phone is not null) and (length(phone)=10)) on violation drop row
)
as
select customer_id,trim(first_name) as first_name,trim(last_name) as last_name,trim(lower(email)) as email, CASE 
    WHEN length(regexp_replace(phone, '[^0-9]', '')) > 10 
      THEN substring(regexp_replace(phone, '[^0-9]', ''), -10, 10)
    ELSE regexp_replace(phone, '[^0-9]', '')
  END AS phone,initcap(city) as city,upper(state) as state,cast(updated_date as timestamp),load_time from stream(live.customer_updates_bronze_table);

In [0]:
CREATE OR REPLACE STREAMING TABLE orders_silver_table
(
  constraint valid_order expect(order_id is not null) on violation drop row,
  constraint valid_customer expect(customer_id is not null) on violation drop row,
  constraint valid_product expect(product_id is not null) on violation drop row,
  constraint valid_amount expect(order_amount>0) on violation drop row
  )
  as 
 select order_id,customer_id,product_id,order_amount,cast(order_timestamp as timestamp) as order_timestamp,channel,load_time,file_name from stream(live.orders_bronze_table);

In [0]:
create or refresh streaming table order_silver_cleaned_table;

apply changes into live.order_silver_cleaned_table
from stream(live.orders_silver_table)
keys(order_id)
sequence by order_timestamp;

In [0]:
create or refresh streaming table products_silver_table
(
  constraint valid_product expect(product_id is not null) on violation drop row,
  constraint valid_product_name expect(product_name is not null) on violation drop row,
  constraint valid_category expect(category is not null) on violation drop row,
  constraint valid_amount expect(price>0) on violation drop row
)
as
select product_id,initcap(product_name) as product_name,upper(category) as category,price,file_name,load_time from stream(live.product_bronze_table)

In [0]:
create or refresh streaming table order_product_silver
as
select s.order_id,s.customer_id,s.product_id,s.order_amount,p.category,p.product_name,p.price,s.file_name,s.load_time,s.order_timestamp,s.channel from stream(live.order_silver_cleaned_table) s
join stream(live.products_silver_table) p
on s.product_id=p.product_id;

In [0]:
create or refresh streaming table customer_combined_table
as
select customer_id,first_name,last_name,email,phone,city,state,load_time from stream(live.customer_silver_cleaned_table)
union all
select customer_id,first_name,last_name,email,phone,city,state,load_time from stream(live.customer_update_silver_main_table);



In [0]:
create or refresh streaming table customer_combined_scd2_table;

apply changes into live.customer_combined_scd2_table
from stream(live.customer_combined_table)
keys(customer_id)
sequence by load_time
stored as scd type 2;


In [0]:
create or refresh materialized view customer_360_table
as
select c.customer_id,count(o.order_id) as total_orders,sum(o.order_amount) as total_sales from live.customer_combined_scd2_table c join live.order_product_silver o on c.customer_id=o.customer_id where c.__END_AT is null
group by c.customer_id
order by c.customer_id;

In [0]:
create or refresh materialized view category_sales_table
as
select category,order_timestamp,channel,sum(order_amount) as daily_sales,CASE when sum(order_amount)>=200 then'High demand' else 'Regular' end as badge
from live.order_product_silver
group by category,order_timestamp,channel
order by category,order_timestamp,channel;

In [0]:
CREATE OR REFRESH MATERIALIZED VIEW top_customers
AS
SELECT
    customer_id,first_name,last_name,
    total_sales,
    DENSE_RANK() OVER (ORDER BY total_sales DESC) AS customer_rank
FROM (
    SELECT
        o.customer_id,c.first_name,c.last_name,
        SUM(o.order_amount) AS total_sales
    FROM live.order_product_silver o join live.customer_combined_scd2_table c  
    on o.customer_id=c.customer_id
    where c.__END_AT is null
    GROUP BY o.customer_id,c.first_name,c.last_name);


In [0]:
select * from pratice_catalog1.schema1.customer_combined_scd2_table;

customer_id,first_name,last_name,email,phone,city,state,load_time,__START_AT,__END_AT
1,John,Doe,john@example.com,9876543210,Delhi,DL,2025-11-08T12:18:43.000Z,2025-11-08T12:18:43.000Z,
2,Emma,Smith,emma@sample.com,8765432109,Mumbai,MH,2025-11-08T12:18:43.000Z,2025-11-08T12:18:43.000Z,
3,Raj,Kumar,raj.k@example.in,7654321098,Pune,MH,2025-11-08T12:18:43.000Z,2025-11-08T12:18:43.000Z,2025-11-08T13:51:23.000Z
3,Rajiv,Kumar,rajiv.k@example.in,7654321000,Bangalore,KA,2025-11-08T13:51:23.000Z,2025-11-08T13:51:23.000Z,
4,Simran,Kaur,sim.kaur@example.in,6543210987,Chandigarh,PB,2025-11-08T12:18:43.000Z,2025-11-08T12:18:43.000Z,2025-11-08T13:51:23.000Z
4,Simran,Kaur Gill,sim.gill@example.in,9999912345,Chandigarh,PB,2025-11-08T13:51:23.000Z,2025-11-08T13:51:23.000Z,
5,Alex,Brown,alexbrown@test.com,5432109876,Delhi,DL,2025-11-08T12:18:43.000Z,2025-11-08T12:18:43.000Z,
101,Amit,Sharma,amit.sharma@newmail.com,9876543210,Delhi,DL,2025-11-09T09:16:19.000Z,2025-11-09T09:16:19.000Z,
103,Priya,Singh,priya.singh@mail.com,9998887777,Mumbai,MH,2025-11-09T09:16:19.000Z,2025-11-09T09:16:19.000Z,
