In [0]:
create streaming table iot_bronze_table
as
select *,_metadata.file_name as file_name,_metadata.file_modification_time as load_time
from cloud_files('abfss://catlog@storageiotprojectsa.dfs.core.windows.net/data/landing/iot_data','json');

In [0]:
create or refresh streaming table iot_silver_table
(constraint valid_consumption expect(CAST(consumption_kw AS DOUBLE) is not null and CAST(consumption_kw AS DOUBLE) >= 0) ON VIOLATION DROP ROW,
constraint valid_voltage expect(CAST(voltage AS DOUBLE) IS NOT NULL) ON VIOLATION DROP ROW,
constraint valid_dates expect(cast(timestamp as timestamp) is not null) on violation drop row,
constraint valid_status expect(status in ('OK','LOWBATTERY','ERROR')) on violation drop row)
as
select * from stream(live.iot_bronze_table);

In [0]:
create or refresh  streaming table iot_silver_clean_table;
apply changes into iot_silver_clean_table
from stream(live.iot_silver_table)
keys(meter_id)
sequence by timestamp;


In [0]:
create or refresh streaming table silver_anomaly_quarantine
(
  constraint Tampering expect(cast(voltage as double) <180 and cast(consumption_kw as double)>8) on violation drop row,
  constraint meter_stuck expect(cast(consumption_kw as double)=0 and status='OK') ON VIOLATION DROP ROW,
  constraint line_issue expect(cast(voltage as double)>260) on violation drop row,
  constraint issue_error expect(status='ERROR') on violation drop row
)as
select * from stream(live.iot_silver_table);

In [0]:
create or refresh materialized view city_hourly_usage
as
select city,date_format(timestamp, 'yyyy-MM-dd') AS usage_date,
  date_format(timestamp, 'HH') AS usage_hour,
  SUM(CAST(consumption_kw AS DOUBLE)) AS total_consumption_kw
from live.iot_silver_clean_table
group by city,date_format(timestamp, 'yyyy-MM-dd'),
  date_format(timestamp, 'HH');

In [0]:
create or refresh materialized view daily_meter_usage
as
select meter_id,date_format(timestamp, 'yyyy-MM-dd') AS usage_date,SUM(CAST(consumption_kw as double)) as total_consumption_kw
from live.iot_silver_clean_table
group by meter_id,date_format(timestamp, 'yyyy-MM-dd');

In [0]:
create or replace materialized view voltage_anomaly_alerts
as
select * from live.iot_silver_clean_table
where (cast(voltage as double)<180) or (cast(voltage as double)>260);


In [0]:
create materialized view top_consuming_meters
as select * from(
select *,dense_rank() over(partition by usage_date order by total_consumption_kw desc) as usage_rank
from live.daily_meter_usage)
where usage_rank<=10;

In [0]:
select * from iot_catalog.iot_schema.city_hourly_usage order by usage_date,usage_hour;

city,usage_date,usage_hour,total_consumption_kw
Delhi,2025-11-05,15,1.732
Mumbai,2025-11-06,0,0.0
Bangalore,2025-11-06,1,2.115000000000001
Delhi,2025-11-06,1,0.4259999999999999
Mumbai,2025-11-06,1,4.692
Bangalore,2025-11-06,2,2.7529999999999992
Delhi,2025-11-06,2,2.226
Delhi,2025-11-06,3,1.0780000000000003
Bangalore,2025-11-06,3,3.701000000000001
Mumbai,2025-11-06,4,22.119636909322494
