In [0]:
%sql
WITH online_retail_cte AS (
  SELECT 
    invoice, stockcode, customer_id
    , quantity, price
    , invoicedate, country, description
    , ROW_NUMBER() OVER(
      PARTITION BY invoice, stockcode
      ORDER BY 
        invoicedate DESC
        , _ingest_timestamp DESC
    ) rw
  FROM lab_2026.bronze_online_retail
  WHERE quantity > 0.0 AND price > 0.0
)
SELECT 
  invoice, stockcode, customer_id
  , quantity, price
  , invoicedate, country, description
FROM online_retail_cte WHERE rw = 1;

In [0]:
from pyspark.sql import functions as F, Window as W

_duplicate_window = (
    W
    .partitionBy('invoice', 'stockcode')
    .orderBy(
        F.col('invoicedate').desc()
        , F.col('_ingest_timestamp').desc()
    )
)
online_retail_df = (
    spark.read.table('lab_2026.bronze_online_retail')
    .selectExpr(
        "invoice", "stockcode", "CAST(CAST(customer_id AS NUMERIC) AS BIGINT) customer_id"
        , "CAST(quantity AS BIGINT) quantity", "CAST(price AS DECIMAL(10, 2)) price"
        , "CAST(invoicedate AS TIMESTAMP) invoicedate", "country", "description"
        , '_ingest_timestamp'
    )
    .withColumn('rw', F.row_number().over(_duplicate_window))
    .filter(F.col('rw') == 1)
    .drop('rw', '_ingest_timestamp')
)

online_retail_df.limit(10).display()

In [0]:
%sql

DROP TABLE IF EXISTS lab_2026.silver_online_retail;

CREATE TABLE IF NOT EXISTS lab_2026.silver_online_retail(
  invoice VARCHAR(256), stockcode VARCHAR(256), customer_id BIGINT
  , quantity BIGINT, price DECIMAL(10, 2)
  , invoicedate TIMESTAMP, country VARCHAR(256), description STRING
  , _insert_timestamp TIMESTAMP, _update_timestamp TIMESTAMP
);

In [0]:
%sql

DESC FORMATTED lab_2026.silver_online_retail;

In [0]:
from delta.tables import DeltaTable

(
    DeltaTable
    .forName(spark, 'lab_2026.silver_online_retail')
    .alias('silver')
    .merge(
        online_retail_df.alias('src')
        , 'src.invoice = silver.invoice AND src.stockcode = silver.stockcode'
    )
    .whenMatchedUpdate(
        set = {
            'customer_id': 'src.customer_id'
            , 'quantity': 'src.quantity'
            , 'price': 'src.price'
            , 'invoicedate': 'src.invoicedate'
            , 'country': 'src.country'
            , 'description': 'src.description'
            , '_update_timestamp': F.current_timestamp()
        }
    )
    .whenNotMatchedInsert(
        values = {
            'invoice': 'src.invoice'
            , 'stockcode': 'src.stockcode'
            , 'customer_id': 'src.customer_id'
            , 'quantity': 'src.quantity'
            , 'price': 'src.price'
            , 'invoicedate': 'src.invoicedate'
            , 'country': 'src.country'
            , 'description': 'src.description'
            , '_insert_timestamp': F.current_timestamp()
            , '_update_timestamp': F.current_timestamp()
        }
    )
    .execute()
)

In [0]:
%sql

select * from lab_2026.silver_online_retail