In [0]:
spark.sql("use database Retail_DB_Second_Layer")
spark.sql(""" create table if not exists silver_products (
    brand string,
    category string,
    is_active boolean,
    name string,
    price double,
    product_id long,
    rating double,
    stock_quantity string,
    last_updated_at timestamp) """
)

DataFrame[]

In [0]:
#It's just to make sure of incremental data coming in
last_updated_df = spark.sql('select max(last_updated_at) as last_completed from silver_products')
last_updated_time = last_updated_df.collect()[0]['last_completed']
if last_updated_time is None:
    last_updated_time = '1900-01-01T00:00:00.000+00:00'

In [0]:
#creating temporary view just to run in this case
spark.sql(f"""
          create or replace temporary view product_incremental as
          select * from Retail_DB_Initial.Bronze_products as c where c.recived_at  > '{last_updated_time}' """)

DataFrame[]

In [0]:
spark.sql("select *  from product_incremental limit 10").show()

+------------+-----------+---------+----------+-------+----------+------+--------------+--------------------+
|       brand|   category|is_active|      name|  price|product_id|rating|stock_quantity|          recived_at|
+------------+-----------+---------+----------+-------+----------+------+--------------+--------------------+
|  BeautyGlow|       Toys|     true| Product 1| 995.73|         1|   3.5|           989|2024-12-24 02:53:...|
|GardenMaster|     Garden|     true| Product 2| 497.76|         2|   3.8|           495|2024-12-24 02:53:...|
|  BeautyGlow|Electronics|     true| Product 3| 331.63|         3|   4.6|            10|2024-12-24 02:53:...|
|     TechPro|     Beauty|    false| Product 4| 798.83|         4|   4.7|           683|2024-12-24 02:53:...|
|   HomeSmart| Automotive|    false| Product 5|-454.98|         5|   4.4|           719|2024-12-24 02:53:...|
|    BookWorm|Electronics|    false| Product 6|  645.3|         6|   2.2|           823|2024-12-24 02:53:...|
|    Fashi

In [0]:
spark.sql(""" 
          create or replace temporary view silver_product_incremental as
          select
          case
            when brand is not null then lower(trim(brand))
            else 'Unknown'
          end as brand,
          case 
            when category is not null then initcap(trim(category))
            else 'Unknown'
          end as category,
          is_active,
          case 
            when name is not null then initcap(trim(name))
            else 'Unknown'
          end as name,
          case
            when price < 0 then 0
            else price
          end as price,
          product_id,
          case 
            when rating < 0 then 0
            when rating > 5 then 5
            else rating
          end as rating,
          case
            when stock_quantity > 0 or stock_quantity is null  then coalesce(stock_quantity, 0)
            else 0
          end as stock_quantity,
          current_timestamp() as last_updated_at 
          from product_incremental
          where product_id is not null and category is not null
          """)

DataFrame[]

In [0]:
spark.sql("""
          merge into silver_products target
          using silver_product_incremental source
          on target.product_id = source.product_id
          when matched then 
            update set *
          when not matched then
            insert *
            """)

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [0]:
spark.sql(" select count(*) from silver_products limit 10").show()

+--------+
|count(*)|
+--------+
|    1000|
+--------+

