## Helpers --> Bronze Tables: 

Airbyte tables cointaining JSON string in _airbyte_data and Airbyte metadata with extracted key to remove duplicates.

In [0]:
-- ORDERS: 
CREATE OR REPLACE TEMP VIEW _stg_orders_latest AS
WITH j AS (
  SELECT
    _airbyte_raw_id,
    _airbyte_extracted_at,
    _airbyte_loaded_at,
    _airbyte_generation_id,
    _airbyte_data,
    -- Extract the natural key from JSON for de-dupe
    CAST(get_json_object(_airbyte_data, '$.id') AS BIGINT) AS order_id
  FROM workspace.airbyte_internal.default_raw__stream_orders
),
r AS (
  SELECT *,
         ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY _airbyte_extracted_at DESC) AS rn
  FROM j
)
SELECT * FROM r WHERE rn = 1;


In [0]:
-- PRODUCTS:
CREATE OR REPLACE TEMP VIEW _stg_products_latest AS
WITH j AS (
  SELECT
    _airbyte_raw_id,
    _airbyte_extracted_at,
    _airbyte_loaded_at,
    _airbyte_generation_id,
    _airbyte_data,
    TRY_CAST(get_json_object(_airbyte_data, '$.id') AS BIGINT) AS product_id
  FROM workspace.airbyte_internal.default_raw__stream_products
)
SELECT *
FROM j
QUALIFY ROW_NUMBER() OVER (PARTITION BY product_id ORDER BY _airbyte_extracted_at DESC) = 1;


In [0]:
-- USERS:
CREATE OR REPLACE TEMP VIEW _stg_users_latest AS
WITH j AS (
  SELECT
    _airbyte_raw_id,
    _airbyte_extracted_at,
    _airbyte_loaded_at,
    _airbyte_generation_id,
    _airbyte_data,
    TRY_CAST(get_json_object(_airbyte_data, '$.id') AS BIGINT) AS user_id
  FROM workspace.airbyte_internal.default_raw__stream_users
)
SELECT *
FROM j
QUALIFY ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY _airbyte_extracted_at DESC) = 1;


In [0]:
-- REVIEWS:
CREATE OR REPLACE TEMP VIEW _stg_reviews_latest AS
WITH j AS (
  SELECT
    _airbyte_raw_id,
    _airbyte_extracted_at,
    _airbyte_loaded_at,
    _airbyte_generation_id,
    _airbyte_data,
    TRY_CAST(get_json_object(_airbyte_data, '$.id') AS BIGINT) AS review_id
  FROM workspace.airbyte_internal.default_raw__stream_reviews
)
SELECT *
FROM j
QUALIFY ROW_NUMBER() OVER (PARTITION BY review_id ORDER BY _airbyte_extracted_at DESC) = 1;


## Silver Tables

Json array is flattened and duplicated records are removed

In [0]:
-- Table ORDERS
CREATE TABLE IF NOT EXISTS workspace.default.orders_silver (
  order_id         BIGINT,
  user_id          BIGINT,
  total_amount     DECIMAL(18,2),
  status           STRING,
  order_ts         TIMESTAMP,
  delivery_ts      TIMESTAMP,
  products_json    STRING,        -- raw array string for later explode
  _airbyte_extracted_at TIMESTAMP,
  _airbyte_loaded_at    TIMESTAMP
) USING DELTA;

-- Incremental upsert from raw
MERGE INTO workspace.default.orders_silver AS t
USING (
  SELECT
    CAST(get_json_object(_airbyte_data, '$.id')          AS BIGINT)          AS order_id,
    CAST(get_json_object(_airbyte_data, '$.userId')      AS BIGINT)          AS user_id,
    CAST(get_json_object(_airbyte_data, '$.totalAmount') AS DECIMAL(18,2))   AS total_amount,
    get_json_object(_airbyte_data, '$.status')                               AS status,
    try_cast(get_json_object(_airbyte_data, '$.orderDate') AS TIMESTAMP)     AS order_ts,
    try_cast(get_json_object(_airbyte_data, '$.deliveryDate') AS TIMESTAMP)  AS delivery_ts,
    get_json_object(_airbyte_data, '$.products')                             AS products_json,
    _airbyte_extracted_at,
    _airbyte_loaded_at
  FROM _stg_orders_latest
  WHERE _airbyte_extracted_at >
        COALESCE((SELECT MAX(_airbyte_extracted_at) FROM workspace.default.orders_silver),
                 TIMESTAMP('1900-01-01'))
) AS s
ON  t.order_id = s.order_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
20,0,0,20


In [0]:
-- Table PRODUCTS
CREATE TABLE IF NOT EXISTS workspace.default.products_silver (
  product_id     BIGINT,
  title          STRING,
  price          DECIMAL(18,2),
  description    STRING,
  category       STRING,
  brand          STRING,
  stock          INT,
  image_url      STRING,
  specs_color    STRING,
  specs_weight   STRING,
  specs_storage  STRING,
  rating_rate    DOUBLE,
  rating_count   INT,
  _airbyte_extracted_at TIMESTAMP
) USING DELTA;

MERGE INTO workspace.default.products_silver t
USING (
  WITH j AS (
    SELECT
      CAST(get_json_object(_airbyte_data, '$.id') AS BIGINT)                    AS product_id,
      get_json_object(_airbyte_data, '$.title')                                 AS title,
      CAST(get_json_object(_airbyte_data, '$.price') AS DECIMAL(18,2))          AS price,
      get_json_object(_airbyte_data, '$.description')                           AS description,
      get_json_object(_airbyte_data, '$.category')                              AS category,
      get_json_object(_airbyte_data, '$.brand')                                 AS brand,
      CAST(get_json_object(_airbyte_data, '$.stock') AS INT)                    AS stock,
      get_json_object(_airbyte_data, '$.image')                                 AS image_url,
      get_json_object(_airbyte_data, '$.["specs.color"]')                       AS specs_color,
      get_json_object(_airbyte_data, '$.["specs.weight"]')                      AS specs_weight,
      get_json_object(_airbyte_data, '$.["specs.storage"]')                     AS specs_storage,
      CAST(get_json_object(_airbyte_data, '$.["rating.rate"]')  AS DOUBLE)      AS rating_rate,
      CAST(get_json_object(_airbyte_data, '$.["rating.count"]') AS INT)         AS rating_count,
      _airbyte_extracted_at
    FROM workspace.airbyte_internal.default_raw__stream_products
  )
  SELECT *
  FROM j
  WHERE _airbyte_extracted_at >
        COALESCE((SELECT MAX(_airbyte_extracted_at) FROM workspace.default.products_silver),
                 TIMESTAMP('1900-01-01'))
) s
ON t.product_id = s.product_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;


num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
40,0,0,40


In [0]:
-- Table USERS
CREATE TABLE IF NOT EXISTS workspace.default.users_silver (
  user_id      BIGINT,
  email        STRING,
  username     STRING,
  first_name   STRING,
  last_name    STRING,
  street       STRING,
  city         STRING,
  zipcode      STRING,
  country      STRING,
  phone        STRING,
  orders_json  STRING,       -- keep raw list if you want it
  _airbyte_extracted_at TIMESTAMP
) USING DELTA;

MERGE INTO workspace.default.users_silver t
USING (
  WITH j AS (
    SELECT
      CAST(get_json_object(_airbyte_data, '$.id') AS BIGINT)                 AS user_id,
      LOWER(TRIM(get_json_object(_airbyte_data, '$.email')))                AS email,
      TRIM(get_json_object(_airbyte_data, '$.username'))                    AS username,
      TRIM(get_json_object(_airbyte_data, '$.["name.firstname"]'))          AS first_name,
      TRIM(get_json_object(_airbyte_data, '$.["name.lastname"]'))           AS last_name,
      TRIM(get_json_object(_airbyte_data, '$.["address.street"]'))          AS street,
      TRIM(get_json_object(_airbyte_data, '$.["address.city"]'))            AS city,
      TRIM(get_json_object(_airbyte_data, '$.["address.zipcode"]'))         AS zipcode,
      TRIM(get_json_object(_airbyte_data, '$.["address.country"]'))         AS country,
      TRIM(get_json_object(_airbyte_data, '$.phone'))                        AS phone,
      get_json_object(_airbyte_data, '$.orders')                             AS orders_json,
      _airbyte_extracted_at
    FROM workspace.airbyte_internal.default_raw__stream_users
  )
  SELECT *
  FROM j
  WHERE _airbyte_extracted_at >
        COALESCE((SELECT MAX(_airbyte_extracted_at) FROM workspace.default.users_silver),
                 TIMESTAMP('1900-01-01'))
) s
ON t.user_id = s.user_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;


num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
40,0,0,40


In [0]:
-- Table Reviews
CREATE TABLE IF NOT EXISTS workspace.default.reviews_silver (
  review_id   BIGINT,
  product_id  BIGINT,
  user_id     BIGINT,
  rating      INT,
  title       STRING,
  content     STRING,
  review_ts   TIMESTAMP,
  _airbyte_extracted_at TIMESTAMP
) USING DELTA;

MERGE INTO workspace.default.reviews_silver t
USING (
  SELECT
    CAST(get_json_object(_airbyte_data, '$.id')         AS BIGINT)  AS review_id,
    CAST(get_json_object(_airbyte_data, '$.productId')  AS BIGINT)  AS product_id,
    CAST(get_json_object(_airbyte_data, '$.userId')     AS BIGINT)  AS user_id,
    CAST(get_json_object(_airbyte_data, '$.rating')     AS INT)     AS rating,
    get_json_object(_airbyte_data, '$.title')                        AS title,
    get_json_object(_airbyte_data, '$.content')                      AS content,
    TO_TIMESTAMP(get_json_object(_airbyte_data, '$.date'))           AS review_ts,
    _airbyte_extracted_at
  FROM workspace.airbyte_internal.default_raw__stream_reviews
  WHERE _airbyte_extracted_at >
        COALESCE((SELECT MAX(_airbyte_extracted_at) FROM workspace.default.reviews_silver),
                 TIMESTAMP('1900-01-01'))
) s
ON t.review_id = s.review_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;


num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
40,0,0,40
