In [0]:
%sql
use database bronze_layer 

In [0]:
%sql
Create table if not exists silver_products (
  product_id	bigint,
  name	string,
  category	string,
  brand	string,
  price	double,
  stock_quantity	string,
  rating	double,
  is_active	boolean,
  price_category string,
  stock_status  string,
  last_updated timestamp
)


In [0]:
#Get the last processed timestamp from silver_layer

Last_processed_df=spark.sql("""select max(last_updated) as last_processed from silver_products""")
last_processed=Last_processed_df.collect()[0]['last_processed']

if last_processed is None:
    last_processed='1900-01-01'

In [0]:
#Create a temp view of incremental bronze data (only the updated data)
spark.sql(f"""
          create or replace temp view bronze_incremental as
          select * from bronze_layer.product where ingestion_date > '{last_processed}'
          """)

Out[5]: DataFrame[]

In [0]:
Data Transformations:
    Price normalization (setting negative prices to 0)
    stock quantity normalization (setting negative stock quantity to 0)
    rating normalization (setting rating greater than 5 to 5)
    price categorizing (setting price less than 10 to 'low', price between 10 and 50 to 'medium', price greater than 50 to 'high')
    stock status calculation(out of stock, low stock, moderate stock, sufficient stock)
    

In [0]:
%sql
create or replace temporary view bronze_products as
select product_id,
       name,
       category,
       brand,
       case when price<0 then 0
           else price  
       end as price,
       case when stock_quantity<0 then 0
            else stock_quantity
       end as stock_quantity,
       case when rating<0 then 0
            when rating>5 then 5
          else rating
       end as rating,
       is_active,
       case when price>1000 then 'Premium'
           when price>100 then 'Standard'
           else 'Budget'
           end as price_category,
        case when stock_quantity=0 then 'Out of Stock'
             when stock_quantity between 0 and 10 then 'Low Stock'
             when stock_quantity between 10 and 50 then 'Moderate Stock'
             else 'Sufficient Stock'
        end as stock_status,
       current_timestamp() as last_updated
from bronze_incremental
where name is not null 
  and category is not null;

In [0]:
spark.sql(""" merge into silver_products as s
              using bronze_products  as b
              on s.product_id=b.product_id
              when matched then update set *
              when not matched then insert *""")

Out[13]: DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]