In [0]:
df = spark.read \
          .format('csv') \
          .option('inferSchema',True) \
          .option('header',True) \
          .load('/Volumes/duebu_datamodelling/raw/duebu_volume/source_data_july2024.csv')

In [0]:
df =df.sort("customer_id") \
      .dropDuplicates(['customer_email'])
display(df)

In [0]:
df = df.sort("order_id")

In [0]:
display(df)

In [0]:
df.createOrReplaceTempView('tv_df')

In [0]:
%sql
select 
  count(order_id),
  count(distinct order_id)
from v_df


In [0]:
%sql
create table if not exists duebu_datamodelling.default.v_df
(
  id bigint generated always as identity(start with 1 increment by 1),
  order_id int,
  order_date date,
  customer_id int,
  customer_name string,
  customer_email string,
  product_id int,
  product_name string,
  product_category string,
  quantity int,
  unit_price double,
  payment_type string,
  country string,
  last_updated date
)


In [0]:
%sql
insert into duebu_datamodelling.default.v_df
(order_id,
  order_date,
  customer_id,
  customer_name,
  customer_email,
  product_id,
  product_name,
  product_category,
  quantity,
  unit_price,
  payment_type,
  country,
  last_updated)
select 
  order_id,
  order_date,
  customer_id,
  customer_name,
  customer_email,
  product_id,
  product_name,
  product_category,
  quantity,
  unit_price,
  payment_type,
  country,
  last_updated
from v_df

In [0]:
%sql
select *
from duebu_datamodelling.default.v_df where id != order_id
order by id
limit 1 offset 1

In [0]:
%sql
WITH deduplicated_source AS (
    SELECT 
        order_id,
        order_date,
        customer_id,
        customer_name,
        customer_email,
        product_id,
        product_name,
        product_category,
        quantity,
        unit_price,
        payment_type,
        country,
        last_updated,
        ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY last_updated DESC) AS row_num
    FROM tv_silver
)
MERGE INTO duebu_datamodelling.silver.silver_data st
USING (SELECT * FROM deduplicated_source WHERE row_num = 1) sv
ON st.order_id = sv.order_id
WHEN MATCHED THEN 
    UPDATE SET 
        st.order_date = sv.order_date,
        st.customer_id = sv.customer_id,
        st.customer_name = sv.customer_name,
        st.customer_email = sv.customer_email,
        st.product_id = sv.product_id,
        st.product_name = sv.product_name,
        st.product_category = sv.product_category,
        st.quantity = sv.quantity,
        st.unit_price = sv.unit_price,
        st.payment_type = sv.payment_type,
        st.country = sv.country,
        st.last_updated = sv.last_updated
WHEN NOT MATCHED THEN 
    INSERT (
        order_id, order_date, customer_id, customer_name, customer_email, 
        product_id, product_name, product_category, quantity, unit_price, 
        payment_type, country, last_updated
    ) 
    VALUES (
        sv.order_id, sv.order_date, sv.customer_id, sv.customer_name, sv.customer_email, 
        sv.product_id, sv.product_name, sv.product_category, sv.quantity, sv.unit_price, 
        sv.payment_type, sv.country, sv.last_updated
    )