# **3.1 DATA MODEL DESCRIPTION**
**Brazilian E-Commerce Public Dataset by Olist**

This is a Brazilian ecommerce public dataset of orders made at Olist Store. The dataset has information of 100k orders from 2016 to 2018 made at multiple marketplaces in Brazil. Its features allow viewing an order from multiple dimensions: from order status, price, payment and freight performance to customer location, product attributes and finally reviews written by customers. 


**Context**

This dataset was provided by Olist, the largest department store in Brazilian marketplaces. Olist connects small businesses from all over Brazil to channels without hassle and with a single contract. Those merchants are able to sell their products through the Olist Store and ship them directly to the customers using Olist logistics partners. 

After a customer purchases the product from Olist Store a seller gets notified to fulfill that order. Once the customer receives the product, or the estimated delivery date is due, the customer gets a satisfaction survey by email where he can give a note for the purchase experience and write down some comments.

**Attention**
- An order might have multiple items.
- Each item might be fulfilled by a distinct seller.
- All text identifying stores and partners were replaced by the names of Game of Thrones for data privacy .

Source: [Kaggle link](https://www.kaggle.com/datasets/olistbr/brazilian-ecommerce/data) 


# **Data exploration**

The Olist dataset contains transactional data from a Brazilian marketplace covering roughly three years (2016–2018). Below is a summary of the full dataset with the number of rows and key attributes for each source table:



| Source | No. Rows | Key fields | Purpose |
|---|---:|---|---|
| `olist_customers_dataset` | 99,441 | `customer_id` | One row per customer. Contains a unique row-level **customer ID**, a persistent customer unique ID (to track returning customers), ZIP code prefix, city and state. |
| `olist_orders_dataset` | 99,441 | `order_id` | One row per **order**. Contains the customer identifier (`customer_id`), order status, purchase timestamp, order approval timestamp, carrier pick-up date, delivery date and estimated delivery date. These timestamps can be used to derive date dimensions and analyze fulfillment performance. |
| `olist_order_items_dataset` | 112,650 | `order_id`, `order_item_id` | One row per **order–item**. Contains order ID, line number (`order_item_id`), product ID, seller ID, shipping deadline (`shipping_limit_date`), product price and freight value. Because an order can have multiple items and items can be fulfilled by different sellers, this is the most granular transactional table and a natural candidate for the fact table. |
| `olist_products_dataset` | 32,951 | `product_id` | One row per **product**. Contains product ID, Portuguese category name, product name length, description length, number of photos, and physical dimensions (weight and package size). Can be joined to `product_category_name_translation` for English category names. |
| `product_category_name_translation` | 71 | `product_category_name` | Maps Portuguese **product category names to English**. Used to enrich the product dimension. |
| `olist_order_payments_dataset` | 103,886 | `order_id`, `payment_sequential` | Records **payments** per order. Each record has order ID, sequence number (1..n), payment type (credit card, voucher, boleto, etc.), number of installments, and payment value. Most orders have a single payment; some have multiple payments. |
| `olist_geolocation_dataset` | 1,000,163 | `geolocation_zip_code_prefix` | **Maps** ZIP code prefixes to latitude/longitude, city, and state. Useful for building a geography dimension and enabling spatial analysis. |
| `olist_sellers_dataset` | 3,095 | `seller_id` | One row per seller. Contains seller ID, ZIP code prefix, city and state. |
| `olist_order_reviews_dataset` | 104,162 | `review_id`, `order_id` | Contains review ID, order ID, review score, optional title/message, review creation date, and seller response timestamp. **Note:** Multiple `review_id`s per `order_id` suggest a potential data-quality issue; for modeling, decide whether to keep latest review per order or treat reviews as a separate fact-like table. |


_Note: The syntax of identify Primary keys as follows:_

In [0]:
USE workspace.default;


In [0]:
--example 
SELECT 
    COUNT(*),
    customer_id
FROM workspace.default.olist_customers_dataset
GROUP BY customer_id
HAVING COUNT(*) > 1;



#  4.1	DATA UNDERSTANDING & MODELING

The core entities used for this assignment as follows:
- Customer 
- Product
- Seller
- Geolocation
- Payment 
- Order 
- Order_items
- product_category_name_translation (mapping table)
- Review (I haven’t used for this assignment)

##### Customer entity schema

| Field name | Data type |
|---|---|
| `customer_id` | STRING |
| `customer_unique_id` | STRING |
| `customer_zip_code_prefix` | INT |
| `customer_city` | STRING |
| `customer_state` | STRING |


##### Product entity schema

| Field name | Data type |
|---|---|
| `product_id` | STRING |
| `product_category_name` | STRING |
| `product_name_lenght` | DECIMAL |
| `product_description_lenght` | DECIMAL |
| `product_photos_qty` | DECIMAL |
| `product_weight_g` | INT |
| `product_length_cm` | INT |
| `product_height_cm` | INT |
| `product_width_cm` | INT |

##### Seller entity schema

| Field name | Data type |
|---|---|
| `seller_id` | STRING |
| `seller_zip_code_prefix` | INT |
| `seller_city` | STRING |
| `seller_state` | STRING |

##### Geolocation entity schema

| Field name | Data type |
|---|---|
| `geolocation_zip_code_prefix` | INT |
| `geolocation_lat` | DECIMAL |
| `geolocation_lng` | DECIMAL |
| `geolocation_city` | STRING |
| `geolocation_state` | STRING |

##### Payment entity schema

| Field name | Data type |
|---|---|
| `order_id` | STRING |
| `payment_sequential` | INT |
| `payment_type` | STRING |
| `payment_installments` | INT |
| `payment_value` | DECIMAL |

##### Order entity schema

| Field name | Data type |
|---|---|
| `order_id` | STRING |
| `customer_id` | STRING |
| `order_status` | STRING |
| `order_purchase_timestamp` | DATE |
| `order_approved_at` | DATE |
| `order_delivered_carrier_date` | DATE |
| `order_delivered_customer_date` | DATE |
| `order_estimated_delivery_date` | DATE |

##### Order Items entity schema

| Field name | Data type |
|---|---|
| `order_id` | STRING |
| `order_item_id` | STRING |
| `product_id` | STRING |
| `seller_id` | STRING |
| `shipping_limit_date` | DATE |
| `price` | DECIMAL |
| `freight_value` | DECIMAL |







# Dimensional Model
For this assignment the proposed dimensional model is Star schema. The selected schema will serve the propose of the assignment and within the range of data volumes we have within each dataset.
Mainly for:

- **Query performance**: star schema joins one fact table to each dimension, so queries are faster and simpler 
- **Data Volume**: the Olist dataset volume considered as low volumes we have thousand of orders. so, space saving in this case doesn’t have high impact. otherwise, if data volume was high maybe to consider snowflake.
- **Flexibility**: efficiency in extending and enriching the data model with new entity. for example, if want to add review as new entity to our model 

### Fact Table grain

The data model needs to be order centric, as in the note description in Kaggle each order may have multiple order items, using id_order and order_tem_id allow us the following:
- **Maintains information details:** having this level of grain preserves the details for analysis purpose for example we can identify price per item enabling product-level profitability analysis 
- **More flexible aggregation:** for analysis propose can roll up to order, product, seller, or customer level by aggregating the fact table but can’t drill down from an order level fact to individual items if the grain were higher.
- **Consistent with business process:** The core transaction is “selling one product to a customer from a seller,” not just an order total. The order item grain reflects this process directly.

### Star Schema vs Snowflake
Chosen star schema for:
- Simpler joins  
- Faster BI queries  
- Easier extensibility  

Snowflake would only matter if:
- Dimension sizes increased dramatically  
- High normalization needed for governance  

**Below table summarize the Dim tables created for the assignment:**

To support robust analytics. **Each order can have multiple items and items can be served by different seller.** payments are recorded per order and may include multiple payment lines; Geography information is supplied both for customers and sellers via ZIP prefixes and a geolocation mapping. 


##### Dimensions overview

| Dimension | Description | Essential attributes | Optional attributes / notes |
|---|---|---|---|
| `dim_customer` | One row per customer (Type 2 SCD to track changes). Links via the persistent `customer_unique_id`. | Surrogate key (`customer_sk`), `customer_unique_id`, ZIP code prefix, city, state. | Customer segmentation or demographic attributes (if derived). Historical effective/expiry dates to support SCD. |
| `dim_product` | One row per product. Joins to `product_category_name_translation` for English names. | Surrogate key (`product_sk`), `product_id`, category name (English and Portuguese), product name length, description length, number of photos. | Physical attributes such as weight and dimensions, which may be useful for freight analysis. |
| `dim_seller` | One row per seller. | Surrogate key (`seller_sk`), `seller_id`, ZIP code prefix, city, state. | Seller rating or marketplace metrics if available in an extended dataset. |
| `dim_geography` | Derived from the geolocation table; one row per ZIP code prefix. | Surrogate key (`geography_sk`), ZIP code prefix, city, state, latitude, longitude. | Region classification (e.g., North/South), population or socio-economic indicators if appended from external sources. |
| `dim_date` | Calendar dimension used for all dates in the model (purchase, approval, delivery, etc.). | Surrogate key (`date_sk`), date, year, month, day, quarter, day of week. | Holiday flags and seasonality indicators. Multiple date roles (order date, shipping date, delivery date) can be separate FK columns in the fact table. |
| `dim_payment` | Captures payment method characteristics. One row per unique combination of payment type and installments. | Surrogate key (`payment_sk`), payment type (credit card, boleto, voucher, etc.), number of installments. | Payment sequence number (if modeling multiple payments per order) or extra financial attributes (interest/fees). Since most orders have one payment in the sample, this dimension is small. |


![schema_dig_update.png](./schema_dig_update.png "schema_dig_update.png")

### Creating Dim Tables

In [0]:
-- Fix : rebuild dim_customer with a historical effective_date
CREATE OR REPLACE TABLE workspace.default.dim_customer (
  customer_sk        BIGINT GENERATED ALWAYS AS IDENTITY,
  customer_id        STRING,
  customer_unique_id STRING,
  geography_sk       BIGINT,
  customer_city      STRING,
  customer_state     STRING,
  effective_date     DATE,
  expiry_date        DATE,
  current_flag       BOOLEAN
);

INSERT INTO workspace.default.dim_customer (
  customer_id,
  customer_unique_id,
  geography_sk,
  customer_city,
  customer_state,
  effective_date,
  expiry_date,
  current_flag
)
SELECT
    c.customer_id,
    c.customer_unique_id,
    g.geography_sk,
    c.customer_city,
    c.customer_state,
    DATE '1900-01-01'  AS effective_date,   --  covers all historical orders
    DATE '9999-12-31'  AS expiry_date,
    TRUE               AS current_flag
FROM workspace.default.olist_customers_dataset c
LEFT JOIN workspace.default.dim_geography g
  ON CAST(c.customer_zip_code_prefix AS INT) = g.zip_code_prefix;


In [0]:
CREATE OR REPLACE TABLE dim_product AS
SELECT
    monotonically_increasing_id() AS product_sk,   -- monotonically_increasing_idis a built-in Spark/Databricks SQL function Generates a unique 64-bit integer
    p.product_id,
    p.product_category_name,
    t.product_category_name_english AS product_name_english,
    p.product_name_lenght           AS product_name_length,
    p.product_description_lenght    AS product_description_length,
    p.product_photos_qty,
    p.product_weight_g,
    p.product_length_cm,
    p.product_height_cm,
    p.product_width_cm
FROM olist_products_dataset p
LEFT JOIN product_category_name_translation t
  ON p.product_category_name = t.product_category_name
WHERE p.product_id IS NOT NULL;


In [0]:
CREATE OR REPLACE TABLE dim_payment (
  payment_sk BIGINT GENERATED ALWAYS AS IDENTITY,
  payment_type STRING,
  payment_installments INT
);

TRUNCATE TABLE dim_payment; --avoiding duplicate

INSERT INTO dim_payment (payment_type, payment_installments)
SELECT DISTINCT
    payment_type,
    payment_installments
FROM olist_order_payments_dataset;


In [0]:
CREATE OR REPLACE TABLE dim_seller (
  seller_sk BIGINT GENERATED ALWAYS AS IDENTITY,
  seller_id STRING,
  geography_sk BIGINT,
  seller_city STRING,
  seller_state STRING
);

TRUNCATE TABLE dim_seller; -- avoid duplication in the second run 

INSERT INTO dim_seller (seller_id, geography_sk, seller_city, seller_state)
SELECT
    s.seller_id,
    g.geography_sk,
    s.seller_city,
    s.seller_state
FROM olist_sellers_dataset s
LEFT JOIN dim_geography g
  ON CAST(s.seller_zip_code_prefix AS INT) = g.zip_code_prefix;


In [0]:
CREATE OR REPLACE TABLE dim_date AS
SELECT
    ROW_NUMBER() OVER (ORDER BY d) AS date_sk,
    d AS calendar_date,
    YEAR(d)  AS year,
    MONTH(d) AS month,
    DAY(d) AS day,
    DAYOFWEEK(d) AS day_of_week,
    WEEKOFYEAR(d) AS week_of_year
FROM (
    SELECT explode(
        sequence(
            MIN(CAST(order_purchase_timestamp AS DATE)),
            MAX(CAST(order_purchase_timestamp AS DATE)),
            interval 1 day
        )
    ) AS d
    FROM olist_orders_dataset
);


In [0]:
CREATE OR REPLACE TABLE dim_geography (
  geography_sk BIGINT GENERATED ALWAYS AS IDENTITY,
  zip_code_prefix INT,
  city          STRING,
  state         STRING,
  latitude      DOUBLE,
  longitude     DOUBLE
);

TRUNCATE TABLE dim_geography;

INSERT INTO dim_geography (zip_code_prefix, city, state, latitude, longitude)
SELECT
    CAST(geolocation_zip_code_prefix AS INT)  AS zip_code_prefix,
    FIRST(geolocation_city)                  AS city,
    FIRST(geolocation_state)                 AS state,
    AVG(geolocation_lat)                     AS latitude,
    AVG(geolocation_lng)                     AS longitude
FROM olist_geolocation_dataset
GROUP BY geolocation_zip_code_prefix;


In [0]:
--adding column customer_id which i forgot 
-- ALTER TABLE workspace.default.fact_order_item
-- ADD COLUMNS (customer_id STRING);
--I comment this cell to avoid re-runing it

### Creating Fact Table 

Order centric 

In [0]:
TRUNCATE TABLE fact_order_item;

WITH first_payment AS (
  SELECT
    order_id,
    payment_type,
    payment_installments,
    ROW_NUMBER() OVER (
      PARTITION BY order_id ORDER BY payment_value DESC
    ) AS rn
  FROM olist_order_payments_dataset
),
dpay_dedup AS (
  SELECT payment_type, payment_installments, MIN(payment_sk) AS payment_sk
  FROM dim_payment
  GROUP BY payment_type, payment_installments
)

INSERT INTO fact_order_item (
    order_id,
    order_item_id,
    customer_sk,
    product_sk,
    seller_sk,
    geography_sk,
    date_sk,
    payment_sk,
    price,
    freight_value,
    customer_id          -- adding the new cliumn in last for alret sprak 
)
SELECT
    oi.order_id,
    oi.order_item_id,
    dc.customer_sk,
    dp.product_sk,
    ds.seller_sk,
    dg.geography_sk,
    dd.date_sk,
    dpay.payment_sk,
    oi.price,
    oi.freight_value,
    o.customer_id AS customer_id   -- same here
FROM olist_order_items_dataset oi
JOIN olist_orders_dataset o
  ON oi.order_id = o.order_id
JOIN dim_customer dc
  ON o.customer_id = dc.customer_id AND dc.current_flag = TRUE
JOIN dim_product dp
  ON oi.product_id = dp.product_id
JOIN dim_seller ds
  ON oi.seller_id = ds.seller_id
LEFT JOIN dim_geography dg
  ON dc.geography_sk = dg.geography_sk
JOIN dim_date dd
  ON CAST(o.order_purchase_timestamp AS DATE) = dd.calendar_date
LEFT JOIN first_payment fp
  ON oi.order_id = fp.order_id AND fp.rn = 1
LEFT JOIN dpay_dedup dpay
  ON fp.payment_type = dpay.payment_type
 AND fp.payment_installments = dpay.payment_installments
WHERE oi.order_id IS NOT NULL;


# 4.2	 SCD IMPLEMENTATION

Customer geolocation is not changing frequently over time, therefore using customer as slowly changing dimensions, by implementing **SCD TYPE 2** on Dim_customer table. Because type2 reserve history by closing current row and inserting a new one when detecting a change. Also type2 help us in the (AS-IS) analysis, or for example spending behaviour of a customer based on their location .

This logic ensures that each customer has exactly one “current” row (current_flag= TRUE) and an unlimited history of prior versions. By storing effective_date and expiry_date the fact table can be joined on dates to reconstruct the customer attributes at the time of purchase.

**SCD2 Logic:**
1. Snapshot current customer records daily  
2. Compare against existing rows  
3. If changes in city/state/geography_sk → close current row (expiry_date = yesterday)  
4. Insert new current row (effective_date = today)  
5. Maintain only one `current_flag = TRUE` row per customer


In [0]:
-- SCD + MERGE 
-- 1) Daily snapshot view (today’s version of customers)
CREATE OR REPLACE TEMP VIEW stg_customer_snapshot AS
SELECT
    c.customer_id,
    c.customer_unique_id,
    g.geography_sk,
    c.customer_city,
    c.customer_state,
    current_date() AS snapshot_date
FROM workspace.default.olist_customers_dataset c
LEFT JOIN workspace.default.dim_geography g
  ON CAST(c.customer_zip_code_prefix AS INT) = g.zip_code_prefix;

-- 2) Build MERGE source with forced insert rows for changes
CREATE OR REPLACE TEMP VIEW scd_source AS
SELECT
    s.customer_id,
    s.customer_unique_id,
    s.geography_sk,
    s.customer_city,
    s.customer_state,
    s.snapshot_date,
    FALSE AS force_insert
FROM stg_customer_snapshot s

UNION ALL

SELECT
    s.customer_id,
    s.customer_unique_id,
    s.geography_sk,
    s.customer_city,
    s.customer_state,
    s.snapshot_date,
    TRUE AS force_insert
FROM stg_customer_snapshot s
JOIN workspace.default.dim_customer t
  ON t.customer_id = s.customer_id
 AND t.current_flag = TRUE
WHERE
     t.customer_city  <> s.customer_city
  OR t.customer_state <> s.customer_state
  OR t.geography_sk   <> s.geography_sk;

-- 3) Single MERGE: expire old + insert new
MERGE INTO workspace.default.dim_customer AS target
USING scd_source AS source
ON target.customer_id = source.customer_id
AND target.current_flag = TRUE
AND source.force_insert = FALSE

WHEN MATCHED
  AND (
       target.customer_city  <> source.customer_city
    OR target.customer_state <> source.customer_state
    OR target.geography_sk   <> source.geography_sk
  )
THEN UPDATE SET
    target.expiry_date  = date_sub(source.snapshot_date, 1),
    target.current_flag = FALSE

WHEN NOT MATCHED THEN
  INSERT (
      customer_id,
      customer_unique_id,
      geography_sk,
      customer_city,
      customer_state,
      effective_date,
      expiry_date,
      current_flag
  )
  VALUES (
      source.customer_id,
      source.customer_unique_id,
      source.geography_sk,
      source.customer_city,
      source.customer_state,
      source.snapshot_date,
      DATE '9999-12-31',
      TRUE
  );


In [0]:
--Validation od SCD
SELECT *
FROM workspace.default.dim_customer
WHERE customer_id = '<some_customer_id>'
ORDER BY effective_date;
-- in first run return 0 which is correct

### Validation SCD 
Simulate customer attribute changes to validate the SCD Type 2 implementation.

In [0]:
--step 1 : create a copy of Dim customer just for testing without impacting the result 
CREATE OR REPLACE TABLE workspace.default.dim_customer_copy AS
SELECT * FROM workspace.default.dim_customer;


In [0]:
-- step 2 : picking random 5 customers to do the simulation on
SELECT customer_id
FROM workspace.default.olist_customers_dataset
LIMIT 5;
-- in this stiumlation the foolowing customer_id was picked 
--  06b8999e2fba1a1fbc88172c00ba8bc7
--  18955e83d337fd6b2def6b18a428ac77
--  4e7b3e00288586ebd08712fdd0374a03
--  b2b6027bc5c5109e529d4dc6358b12c3
--  4f2d8ab171c80ec8364f7c12e35b23ad





In [0]:
-- step 3 :build fake tomorrow snapshot with fake changes for the customer_ids we picked in pervouis step 
CREATE OR REPLACE TEMP VIEW stg_customer_snapshot AS
SELECT
    c.customer_id,
    c.customer_unique_id,
    g.geography_sk,

    -- Simulated city change
    CASE 
      WHEN c.customer_id IN ('06b8999e2fba1a1fbc88172c00ba8bc7','18955e83d337fd6b2def6b18a428ac77','4e7b3e00288586ebd08712fdd0374a03','b2b6027bc5c5109e529d4dc6358b12c3','4f2d8ab171c80ec8364f7c12e35b23ad')
      THEN 'simulated_city'
      ELSE c.customer_city
    END AS customer_city,

    -- Simulated state change
    CASE 
      WHEN c.customer_id IN ('06b8999e2fba1a1fbc88172c00ba8bc7','18955e83d337fd6b2def6b18a428ac77','4e7b3e00288586ebd08712fdd0374a03','b2b6027bc5c5109e529d4dc6358b12c3','4f2d8ab171c80ec8364f7c12e35b23ad')
      THEN 'SC'
      ELSE c.customer_state
    END AS customer_state,

    date_add(current_date(), 1) AS snapshot_date   -- pretend snapshot is tomorrow
FROM workspace.default.olist_customers_dataset c
LEFT JOIN workspace.default.dim_geography g
  ON CAST(c.customer_zip_code_prefix AS INT) = g.zip_code_prefix;


In [0]:
-- step 4 :Building scd_source (same logic, but will apply to copy)
CREATE OR REPLACE TEMP VIEW scd_source AS
SELECT
    s.customer_id,
    s.customer_unique_id,
    s.geography_sk,
    s.customer_city,
    s.customer_state,
    s.snapshot_date,
    FALSE AS force_insert
FROM stg_customer_snapshot s

UNION ALL

SELECT
    s.customer_id,
    s.customer_unique_id,
    s.geography_sk,
    s.customer_city,
    s.customer_state,
    s.snapshot_date,
    TRUE AS force_insert
FROM stg_customer_snapshot s
JOIN workspace.default.dim_customer_copy t
  ON t.customer_id = s.customer_id
 AND t.current_flag = TRUE
WHERE
     t.customer_city  <> s.customer_city
  OR t.customer_state <> s.customer_state
  OR t.geography_sk   <> s.geography_sk;


In [0]:
-- step 5 : Run the Type-2 SCD MERGE into the copy
MERGE INTO workspace.default.dim_customer_copy AS target
USING scd_source AS source
ON target.customer_id = source.customer_id
AND target.current_flag = TRUE
AND source.force_insert = FALSE

WHEN MATCHED
  AND (
       target.customer_city  <> source.customer_city
    OR target.customer_state <> source.customer_state
    OR target.geography_sk   <> source.geography_sk
  )
THEN UPDATE SET
    target.expiry_date  = date_sub(source.snapshot_date, 1),
    target.current_flag = FALSE

WHEN NOT MATCHED THEN
  INSERT (
      customer_id,
      customer_unique_id,
      geography_sk,
      customer_city,
      customer_state,
      effective_date,
      expiry_date,
      current_flag
  )
  VALUES (
      source.customer_id,
      source.customer_unique_id,
      source.geography_sk,
      source.customer_city,
      source.customer_state,
      source.snapshot_date,
      DATE '9999-12-31',
      TRUE
  );


In [0]:
-- step 6 : validation showing customer change history 
SELECT
  customer_id,
  customer_city,
  customer_state,
  effective_date,
  expiry_date,
  current_flag
FROM workspace.default.dim_customer_copy
WHERE customer_id IN ('06b8999e2fba1a1fbc88172c00ba8bc7','18955e83d337fd6b2def6b18a428ac77','4e7b3e00288586ebd08712fdd0374a03','b2b6027bc5c5109e529d4dc6358b12c3','4f2d8ab171c80ec8364f7c12e35b23ad')
ORDER BY customer_id, effective_date;

--the result contines some DQ issue becuase i re-run the copy dim customer 2 times


# 4.3	 CREATE A DENORMALISED REPORTING VIEW
Below is a single denormalised **baseline sales view** that will answer business needs.
**Key design choices:**

- Grain = order item (same as the fact table).

- Historically correct customer attributes via SCD “as-of” join using **customer_id + order_date BETWEEN effective/expiry.**

- Adds a new_customer_flag and first_order_date (for acquisition/lifecycle questions).

In [0]:
CREATE OR REPLACE VIEW workspace.default.vw_sales_order_item_baseline AS
WITH base AS (
  SELECT
      f.order_id,
      f.order_item_id,
      f.customer_id,
      f.customer_sk,
      f.product_sk,
      f.seller_sk,
      f.geography_sk,
      f.date_sk,
      f.payment_sk,
      f.price,
      f.freight_value,
      (f.price + f.freight_value) AS item_total,

      d.calendar_date AS order_date,
      d.year          AS order_year,
      d.month         AS order_month

  FROM workspace.default.fact_order_item f
  JOIN workspace.default.dim_date d
    ON f.date_sk = d.date_sk
),

-- first purchase per persistent customer (customer_unique_id)
cust_first_order AS (
  SELECT
      c.customer_unique_id,
      MIN(b.order_date) AS first_order_date
  FROM base b
  JOIN workspace.default.dim_customer c
    ON b.customer_id = c.customer_id
   AND b.order_date BETWEEN c.effective_date AND c.expiry_date
  GROUP BY c.customer_unique_id
)

SELECT
    b.order_id,
    b.order_item_id,

    -- Date 
    b.order_date,
    b.order_year,
    b.order_month,

    -- Measures
    b.price,
    b.freight_value,
    b.item_total,

    -- Product attributes 
    p.product_id,
    p.product_category_name AS product_category_pt,
    p.product_name_english  AS product_category_en,
    p.product_weight_g,
    p.product_length_cm,
    p.product_height_cm,
    p.product_width_cm,

    -- Seller attributes
    s.seller_id,
    s.seller_city,
    s.seller_state,

    -- Customer attributes AS-OF purchase time (historical correctness)
    c.customer_unique_id,
    c.customer_city  AS customer_city_asof,
    c.customer_state AS customer_state_asof,
    cg.zip_code_prefix AS customer_zip_asof,
    cg.latitude        AS customer_lat_asof,
    cg.longitude       AS customer_lng_asof,

    -- Payment attributes (mix by type / installments)
    pay.payment_type,
    pay.payment_installments,

    -- Lifecycle flags
    fco.first_order_date,
    CASE WHEN b.order_date = fco.first_order_date THEN TRUE ELSE FALSE END AS new_customer_flag

FROM base b

-- AS-OF join to SCD customer
JOIN workspace.default.dim_customer c
  ON b.customer_id = c.customer_id
 AND b.order_date BETWEEN c.effective_date AND c.expiry_date

LEFT JOIN workspace.default.dim_geography cg
  ON c.geography_sk = cg.geography_sk

JOIN workspace.default.dim_product p
  ON b.product_sk = p.product_sk

JOIN workspace.default.dim_seller s
  ON b.seller_sk = s.seller_sk

LEFT JOIN workspace.default.dim_payment pay
  ON b.payment_sk = pay.payment_sk

LEFT JOIN cust_first_order fco
  ON c.customer_unique_id = fco.customer_unique_id;


In [0]:
--Daily sales trend
SELECT
  order_date,
  SUM(item_total) AS total_sales,
  COUNT(DISTINCT order_id) AS num_orders,
  COUNT(*) AS num_items
FROM workspace.default.vw_sales_order_item_baseline
GROUP BY order_date
ORDER BY order_date;

-- monthly sales trend 
SELECT
  order_year,
  order_month,
  SUM(item_total) AS total_sales,
  COUNT(DISTINCT order_id) AS num_orders,
  COUNT(*) AS num_items
FROM workspace.default.vw_sales_order_item_baseline
GROUP BY order_year, order_month
ORDER BY order_year, order_month;



In [0]:
--- Revenue breakdown 

-- revenue by product
SELECT
  product_category_en,
  SUM(item_total) AS category_sales,
  COUNT(*) AS items_sold
FROM workspace.default.vw_sales_order_item_baseline
GROUP BY product_category_en
ORDER BY category_sales DESC;

-- revenue by seller 
SELECT
  seller_id,
  seller_city,
  seller_state,
  SUM(item_total) AS seller_sales,
  COUNT(DISTINCT order_id) AS num_orders
FROM workspace.default.vw_sales_order_item_baseline
GROUP BY seller_id, seller_city, seller_state
ORDER BY seller_sales DESC;

--which combination drive most revenue 
SELECT
  product_category_en,
  customer_state_asof,
  SUM(item_total) AS combo_sales,
  COUNT(*) AS items_sold
FROM workspace.default.vw_sales_order_item_baseline
GROUP BY product_category_en, customer_state_asof
ORDER BY combo_sales DESC
LIMIT 20;




In [0]:
-- top seller 
SELECT
  seller_id,
  product_category_en,
  SUM(item_total) AS combo_sales,
  COUNT(*) AS items_sold
FROM workspace.default.vw_sales_order_item_baseline
GROUP BY seller_id, product_category_en
ORDER BY combo_sales DESC
LIMIT 20;


In [0]:
--New customers per month 
SELECT
  order_year,
  order_month,
  COUNT(DISTINCT customer_unique_id) AS new_customers
FROM workspace.default.vw_sales_order_item_baseline
WHERE new_customer_flag = TRUE
GROUP BY order_year, order_month
ORDER BY order_year, order_month;


--Average basket size by customer state
WITH order_totals AS (
  SELECT
    order_id,
    customer_state_asof,
    SUM(item_total) AS order_total,
    COUNT(*) AS num_items
  FROM workspace.default.vw_sales_order_item_baseline
  GROUP BY order_id, customer_state_asof
)
SELECT
  customer_state_asof,
  AVG(order_total) AS avg_order_value,
  AVG(num_items) AS avg_items_per_order,
  COUNT(DISTINCT order_id) AS orders
FROM order_totals
GROUP BY customer_state_asof
ORDER BY avg_order_value DESC;

----
-- Because the view is as-of SCD, we can track behaviour before vs after a customer’s geography change.

-- below example: spend by customer across their SCD versions
SELECT
  customer_unique_id,
  customer_city_asof,
  customer_state_asof,
  MIN(order_date) AS first_order_in_version,
  MAX(order_date) AS last_order_in_version,
  SUM(item_total) AS sales_in_version,
  COUNT(DISTINCT order_id) AS orders_in_version
FROM workspace.default.vw_sales_order_item_baseline
GROUP BY customer_unique_id, customer_city_asof, customer_state_asof
ORDER BY customer_unique_id, first_order_in_version;




If we had a change, this would be an example of behaviour change analysis.
| customer_unique_id | city_asof      | state_asof | first_order_in_version | last_order_in_version | sales_in_version | orders_in_version |
| ------------------ | -------------- | ---------- | ---------------------- | --------------------- | ---------------: | ----------------: |
| 0000366f3b9a7992bf8c76cfdf3221e2                 | sao paulo      | SP         | 2017-01-10             | 2017-06-20            |              200 |                 3 |
| 0000366f3b9a7992bf8c76cfdf3221e2                  | rio de janeiro | RJ         | 2017-09-02             | 2018-02-14            |              180 |                 2 |


# Optional analysis 
I create another view for **Operational & fulfilment insights**

why : 
thanks to the fact_order_item created before i can slice by:

- seller geography 
- customer geography as-of 
- product category 

So I can compute delivery/fulfillment durations and analyze by any dimension.

In [0]:
CREATE OR REPLACE VIEW workspace.default.vw_fulfillment_order_item AS
SELECT
    f.order_id,
    f.order_item_id,
    d.calendar_date AS order_date,

    -- Order timestamps
    o.order_purchase_timestamp,
    o.order_approved_at,
    o.order_delivered_carrier_date,
    o.order_delivered_customer_date,
    o.order_estimated_delivery_date,
    o.order_status,

    -- Duration 
    datediff(o.order_approved_at, o.order_purchase_timestamp)      AS days_to_approve,
    datediff(o.order_delivered_carrier_date, o.order_approved_at) AS days_to_ship,
    datediff(o.order_delivered_customer_date, o.order_purchase_timestamp) AS days_to_deliver,
    datediff(o.order_delivered_customer_date, o.order_estimated_delivery_date) AS days_vs_estimated,


    f.price,
    f.freight_value,
    (f.price + f.freight_value) AS item_total,

    -- Product
    p.product_id,
    p.product_category_name AS product_category_pt,
    p.product_name_english  AS product_category_en,

    -- Seller
    s.seller_id,
    s.seller_city,
    s.seller_state,

    -- Customer geography 
    c.customer_unique_id,
    c.customer_city  AS customer_city_asof,
    c.customer_state AS customer_state_asof

FROM workspace.default.fact_order_item f

JOIN workspace.default.olist_orders_dataset o
  ON f.order_id = o.order_id

JOIN workspace.default.dim_date d
  ON f.date_sk = d.date_sk

JOIN workspace.default.dim_product p
  ON f.product_sk = p.product_sk

JOIN workspace.default.dim_seller s
  ON f.seller_sk = s.seller_sk


JOIN workspace.default.dim_customer c
  ON f.customer_id = c.customer_id
 AND d.calendar_date BETWEEN c.effective_date AND c.expiry_date;


##### Example analysis 

In [0]:
-- Average delivery time by customer region
SELECT
  customer_state_asof,
  AVG(days_to_deliver) AS avg_days_to_deliver,
  COUNT(DISTINCT order_id) AS orders
FROM workspace.default.vw_fulfillment_order_item
WHERE order_status = 'delivered'
GROUP BY customer_state_asof
ORDER BY avg_days_to_deliver DESC;


In [0]:
-- Delivery time by product category
SELECT
  product_category_en,
  AVG(days_to_deliver) AS avg_days_to_deliver,
  PERCENTILE_APPROX(days_to_deliver, 0.9) AS p90_days_to_deliver,
  COUNT(DISTINCT order_id) AS orders
FROM workspace.default.vw_fulfillment_order_item
WHERE order_status = 'delivered'
GROUP BY product_category_en
ORDER BY avg_days_to_deliver DESC;


In [0]:
--Seller performance: fastest vs slowest sellers
SELECT
  seller_id,
  seller_state,
  AVG(days_to_deliver) AS avg_days_to_deliver,
  COUNT(DISTINCT order_id) AS orders
FROM workspace.default.vw_fulfillment_order_item
WHERE order_status = 'delivered'
GROUP BY seller_id, seller_state
HAVING orders >= 20
ORDER BY avg_days_to_deliver ASC;


In [0]:
-- late delivery 
SELECT
  customer_state_asof,
  product_category_en,
  COUNT(*) AS items,
  AVG(CASE WHEN days_vs_estimated > 0 THEN 1 ELSE 0 END) AS late_delivery_rate
FROM workspace.default.vw_fulfillment_order_item
WHERE order_status = 'delivered'
GROUP BY customer_state_asof, product_category_en
ORDER BY late_delivery_rate DESC;


# 4.4	 CDC (CHANGE DATA CAPTURE) DESIGN
The scope of analyst in this task is limited to define with data engineer the overview rules

**A)	Event structure and ingestion** normally done by ingestion engineers and could contains JSON events schema, like entity name, event time (timestamp of the change), operation (insert, delete, update), PKs, payload.
**As an analyst, I care that event_time preserved because it defines business-effective time, I needed in case of late arriving and SCD (AS-IS).**


**B)	Routing & application by entity:** as analyst I can identify a high-level rule that serves the business need in reporting mainly, for example : 
- **Customer events:** update dim_customer (SCD Type-2)
- **Product / Seller:** usually Type-1 overwrite unless history needed
- **Order / OrderItem / Payment:** MERGE into facts using stable business keys

**C)	How CDC updates SCD dimensions:** In this case triggering attribute will be as follows:
- geography_sk, Customer_city, customer_state 
- behaviour: if any of trigger change expire current row and insert new current row. Keep all past version for as-is analysis
- Expire the current row but never delete history.

**D)	Keeping fact + denormalised structure:**  act_order_item grain one row per (order_id, order_item_id)

**E)	Late-arriving handling:** usually analyst provide high level rules that reflect the business need and keep consistency of the reporting; however, the implementation is an processing engineer scope 
- Use event_time (business change time), not arrival time, for versioning.
- For SCD2: if event belongs in the past, insert a historical row with correct effective/expiry window.

**F)	Short design note (architecture / flow)**
Hourly CDC Flow
- 1.Systems emit JSON change events hourly.
- 2.Auto Loader ingests into cdc_bronze_events partitioned by entity/date.
- 3.Hourly job creates silver staging per entity 
- 4.Gold applies: SCD Type-2 MERGE for customers
- 5.Run data quality checks: 1 one current row per customer 2.no duplicate fact grains
