# **Exercise 2 \- Data Storage**

## **Data Storage, Sekolah Engineering, Pacmann**

**Outline**

**[Objective 2](#objective)**

[**Task Description 2**](#task-description)

[Step \#1 \- Requirements Gathering (10 points) 2](#step-#1---requirements-gathering-\(10-points\))

[Step \#2 \- Slowly Changing Dimension (SCD) (15 points) 2](#step-#2---slowly-changing-dimension-\(scd\)-\(15-points\))

[Step \#3 \- ELT with Python & SQL (50 points) 3](#step-#3---elt-with-python-&-sql-\(50-points\))

[Step \#4 \- Orchestrate ELT with Luigi (20 points) 3](#step-#4---orchestrate-elt-with-luigi-\(20-points\))

[Step \#5 \- Create Report(5 points) 3](#step-#5---create-report\(5-points\))

# **Objective**

The objective of this exercise is to:

- Implement Slowly Changing Dimensions (SCD)
- Build an ELT process using Python and SQL
- Orchestrate the ELT process using Luigi

# **Task Description**

In the first exercise, you designed a Data Warehouse model for Olist. The next step is to determine the historical data and versioning requirements the company wishes to maintain and building a data pipeline.

## **Step \#1 \- Requirements Gathering (10 points)**

In this step, pretend to have a meeting with stakeholders to find out their needs for the SCD strategy you will implement. Make **a list of questions** to ask them. After each question, provide a **possible answe**r that a stakeholder might give based on the dataset and business needs. This information will help you create your SCD strategy.

- **Create Questions:** Write questions to ask stakeholders about the Data Warehouse.
- **Provide Stakeholder Answers:** Give a possible answer for each question based on Olist's business and dataset.

### **Answers**

#### 1. Which dimension tables are likely to have changes overtime that we should track of?
  
  **Response**
  
  Primarily `customers` and `sellers`. </br>
  Customers may change their address or city (captured via `geolocation`) and sellers sometimes update their location or business names. </br>
  `products` and `category name` are mostly static, but we may occassionally reclassify a product category
  
#### 2. Do we need to maintain historical versions of data?
  
  **Response**
  
  We wanted to preserve historical changes for both `customers` and `sellers`, especially location changes, as they can affect delivery time and logistics. </br>
  For `products` and `category_name`, just keeping the latest version is enough since changes are rare and typically administrative </br>
  
#### 3. What kind of historical tracking is preferred, -- full versioning (type 2), overwrite (type 1), or current vs previous field (type 3)?
  
  **Response**
  
  SCD Type 2 (full-versioning)
  
  - `dim_customers`
  - `dim_sellers`
  
  SCD Type 1 (overwrite)
  
  - `dim_category_name`
  - `dim_products`

#### 4. How should we handle geolocation changes? Should customers and sellers be linked to geolocation with historical traceability?
  
  **Response**
  
  Yes, because geolocation information, current or previous, is valuable for analysis, eg. comparing delivery times or satisfaction scores before and after a move. </br>
  So we'll need foreign key-based Type 2 handling for geolocation information.
  
#### 5. Who will consume the historical data, and how will it be used (eg. in reports, machine learning, fraud detection, etc)?
  
  **Response**
  
  - Data analyst and operations team
    - delivery performance reports, churn analysis, and predictive models

---

## **Step \#2 \- Slowly Changing Dimension (SCD) (15 Points)**

From the requirements you gathered, decide **which SCD strategies** you want to use. Explain why you chose these strategies.

- You can implement more than one **SCD strategies**
- If there are any changes to the existing Entity-Relationship Diagram (ERD), show the new ERD.

(Note: You will earn extra points for using SCD types other than Type 1.)

### **Answers**

**SCD Type 1 / Overwrite**

- `dim_product_category_name`
    - track changes in the names of the category
    - utilizing Type 1 / overwrite so as to simplify the overall process
    - historical tracking of category names are not significant because at the end of the day, the main analysis to be conducted are mainly about products, customers, sellers, locations, and periodicals.

**SCD Type 2 / Add New Row-Column**

- `dim_customers`
    - customers may change their location
    - because of this change, logistic fees and performance changes may affect future purchases
- `dim_sellers`
    - sellers may change their location
    - because of this change, 
- `dim_products`
    - changes in products may reflect changes in purchases
    - for example, the bigger the dimension may inflict higher freight cost, which may affect purchasing decision
    - also, the changes in price tends to affect purchasing decision also, especially if the customers are price-sensitive
    - hence, it is important to track price changes and dimension changes in relation to the total price & cost that the customers need to pay.

---
## **Step \#3 \- ELT with Python & SQL (50 points)**

Create a data pipeline to process ELT using Python and SQL.

- Workflow Description: Draw your workflow and explain how it handles your SCD strategy.
- Write clean code for your scripts
- include alerts for any errors.

### **Answers**

Workflow
  - Establish the tables in `stg` schema in `olist-dwh` database
  - Establish the `dim` and `fact` tables in `dwh` schema in `olist-dwh` database
  - Create a script to populate the `dim_date` and `dim_time` tables in `dwh` schema in `olist-dwh` database
  - Establish ELT script to pull data from `olist-src` database to `stg` schema in `olist-dwh` database 
  - Establish ELT script to pull data from `stg` schema in `olist-dwh` database to `dwh` schema in `olist-dwh` database

In [6]:
import sqlalchemy
import psycopg2
%load_ext sql

The sql extension is already loaded. To reload it, use:
  %reload_ext sql


In [7]:
%sql postgresql://postgres:postgres@localhost:5434/olist-dwh

**Schema in source database (`olist-src`)**

In [None]:
CREATE TABLE public.customers (
    customer_id text NOT NULL,
    customer_unique_id text,
    customer_zip_code_prefix integer,
    customer_city text,
    customer_state text
);

CREATE TABLE public.geolocation (
    geolocation_zip_code_prefix integer NOT NULL,
    geolocation_lat real,
    geolocation_lng real,
    geolocation_city text,
    geolocation_state text
);

CREATE TABLE public.order_items (
    order_id text NOT NULL,
    order_item_id integer NOT NULL,
    product_id text,
    seller_id text,
    shipping_limit_date text,
    price real,
    freight_value real
);

CREATE TABLE public.order_payments (
    order_id text NOT NULL,
    payment_sequential integer NOT NULL,
    payment_type text,
    payment_installments integer,
    payment_value real
);

CREATE TABLE public.order_reviews (
    review_id text NOT NULL,
    order_id text NOT NULL,
    review_score integer,
    review_comment_title text,
    review_comment_message text,
    review_creation_date text
);

CREATE TABLE public.orders (
    order_id text NOT NULL,
    customer_id text,
    order_status text,
    order_purchase_timestamp text,
    order_approved_at text,
    order_delivered_carrier_date text,
    order_delivered_customer_date text,
    order_estimated_delivery_date text
);

CREATE TABLE public.product_category_name_translation (
    product_category_name text NOT NULL,
    product_category_name_english text
);

CREATE TABLE public.products (
    product_id text NOT NULL,
    product_category_name text,
    product_name_lenght real,
    product_description_lenght real,
    product_photos_qty real,
    product_weight_g real,
    product_length_cm real,
    product_height_cm real,
    product_width_cm real
);

CREATE TABLE public.sellers (
    seller_id text NOT NULL,
    seller_zip_code_prefix integer,
    seller_city text,
    seller_state text
);

**Establish tables in the `public` schema in the `olist_dwh` database**

In [113]:
%%sql

CREATE TABLE IF NOT EXISTS public.geolocation (
    geolocation_zip_code_prefix INTEGER UNIQUE NOT NULL,
    geolocation_lat REAL,
    geolocation_lng REAL,
    geolocation_city TEXT,
    geolocation_state TEXT
);

CREATE TABLE IF NOT EXISTS public.product_category_name_translation (
    product_category_name TEXT UNIQUE NOT NULL,
    product_category_name_english TEXT
);

CREATE TABLE IF NOT EXISTS public.sellers (
    seller_id TEXT UNIQUE NOT NULL,
    seller_zip_code_prefix INTEGER,
    seller_city TEXT,
    seller_state TEXT
);

CREATE TABLE IF NOT EXISTS public.customers (
    customer_id TEXT UNIQUE NOT NULL,
    customer_unique_id TEXT,
    customer_zip_code_prefix INTEGER,
    customer_city TEXT,
    customer_state TEXT
);

CREATE TABLE IF NOT EXISTS public.products (
    product_id TEXT UNIQUE NOT NULL,
    product_category_name TEXT REFERENCES public.product_category_name_translation(product_category_name),
    product_name_lenght REAL,
    product_description_lenght REAL,
    product_photos_qty REAL,
    product_weight_g REAL,
    product_length_cm REAL,
    product_height_cm REAL,
    product_width_cm REAL
);

CREATE TABLE IF NOT EXISTS public.orders (
    order_id TEXT UNIQUE NOT NULL,
    customer_id TEXT REFERENCES public.customers(customer_id),
    order_status TEXT,
    order_purchase_timestamp TEXT,
    order_approved_at TEXT,
    order_delivered_carrier_date TEXT,
    order_delivered_customer_date TEXT,
    order_estimated_delivery_date TEXT
);

CREATE TABLE IF NOT EXISTS public.order_items (
    order_id TEXT NOT NULL REFERENCES public.orders(order_id),
    order_item_id INTEGER NOT NULL,
    product_id TEXT REFERENCES public.products(product_id),
    seller_id TEXT REFERENCES public.sellers(seller_id),
    shipping_limit_date TEXT,
    price NUMERIC,
    freight_value NUMERIC
);

CREATE TABLE IF NOT EXISTS public.order_payments (
    order_id TEXT NOT NULL,
    payment_sequential INTEGER NOT NULL,
    payment_type TEXT,
    payment_installments INTEGER,
    payment_value NUMERIC
);

CREATE TABLE IF NOT EXISTS public.order_reviews (
    review_id text NOT NULL,
    order_id text NOT NULL REFERENCES public.orders(order_id),
    review_score integer,
    review_comment_title text,
    review_comment_message text,
    review_creation_date text
);

 * postgresql://postgres:***@localhost:5434/olist-dwh
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.


[]

**Establish tables in the `stg` schema in `olist-dwh` database**

In [119]:
%%sql

CREATE TABLE IF NOT EXISTS stg.geolocation (
    geolocation_zip_code_prefix INTEGER UNIQUE NOT NULL,
    geolocation_lat REAL,
    geolocation_lng REAL,
    geolocation_city TEXT,
    geolocation_state TEXT
);

CREATE TABLE IF NOT EXISTS stg.product_category_name_translation (
    product_category_name TEXT UNIQUE NOT NULL,
    product_category_name_english TEXT
);

CREATE TABLE IF NOT EXISTS stg.sellers (
    seller_id TEXT UNIQUE NOT NULL,
    seller_zip_code_prefix INTEGER,
    seller_city TEXT,
    seller_state TEXT
);

CREATE TABLE IF NOT EXISTS stg.customers (
    customer_id TEXT UNIQUE NOT NULL,
    customer_unique_id TEXT,
    customer_zip_code_prefix INTEGER,
    customer_city TEXT,
    customer_state TEXT
);

CREATE TABLE IF NOT EXISTS stg.products (
    product_id TEXT UNIQUE NOT NULL,
    product_category_name TEXT REFERENCES stg.product_category_name_translation(product_category_name),
    product_name_length REAL,
    product_description_length REAL,
    product_photos_qty REAL,
    product_weight_g REAL,
    product_length_cm REAL,
    product_height_cm REAL,
    product_width_cm REAL
);

CREATE TABLE IF NOT EXISTS stg.orders (
    order_id TEXT UNIQUE NOT NULL,
    customer_id TEXT REFERENCES stg.customers(customer_id),
    order_status TEXT,
    order_purchase_timestamp TEXT,
    order_approved_at TEXT,
    order_delivered_carrier_date TEXT,
    order_delivered_customer_date TEXT,
    order_estimated_delivery_date TEXT
);

CREATE TABLE IF NOT EXISTS stg.order_items (
    order_id TEXT NOT NULL REFERENCES stg.orders(order_id),
    order_item_id INTEGER NOT NULL,
    product_id TEXT REFERENCES stg.products(product_id),
    seller_id TEXT REFERENCES stg.sellers(seller_id),
    shipping_limit_date TEXT,
    price NUMERIC,
    freight_value NUMERIC
);

CREATE TABLE IF NOT EXISTS stg.order_payments (
    order_id TEXT NOT NULL,
    payment_sequential INTEGER NOT NULL,
    payment_type TEXT,
    payment_installments INTEGER,
    payment_value NUMERIC
);

CREATE TABLE IF NOT EXISTS stg.order_reviews (
    review_id text UNIQUE NOT NULL,
    order_id text NOT NULL REFERENCES stg.orders(order_id),
    review_score integer,
    review_comment_title text,
    review_comment_message text,
    review_creation_date text
);

 * postgresql://postgres:***@localhost:5434/olist-dwh
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.


[]

**Establish `dim` and `facts` tables in the `dwh` schema in `olist-dwh` database**

In [263]:
%%sql

CREATE EXTENSION IF NOT EXISTS "uuid-ossp";

CREATE SCHEMA IF NOT EXISTS dwh AUTHORIZATION postgres;

CREATE TABLE dwh.dim_date (
    date_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    full_date TIMESTAMP UNIQUE NOT NULL,
    date_only DATE UNIQUE NOT NULL,
    year NUMERIC NOT NULL,
    month NUMERIC NOT NULL,
    day NUMERIC NOT NULL,
    week NUMERIC NOT NULL,
    quarter NUMERIC NOT NULL,
    semester NUMERIC NOT NULL,
    day_of_week VARCHAR(15) NOT NULL,
    month_name VARCHAR(15) NOT NULL,
    year_day VARCHAR(15) UNIQUE NOT NULL, --eg. 2025-02
    year_month VARCHAR(8) NOT NULL, -- e.g. '2025-06'
    year_week VARCHAR(8) NOT NULL, -- e.g. '2025-W23'
    year_quarter VARCHAR(8) NOT NULL, -- e.g. '2025-Q2'
    year_semester VARCHAR(8) NOT NULL -- e.g. '2025-S1'
);

CREATE TABLE dwh.dim_time (
    time_id TIME PRIMARY KEY,
    hour SMALLINT NOT NULL,
    minute SMALLINT NOT NULL,
    second SMALLINT NOT NULL
);

CREATE TABLE dwh.dim_geolocation (
    geolocation_sk SERIAL PRIMARY KEY,
    geolocation_zip_code_prefix INTEGER UNIQUE NOT NULL,
    geolocation_lat REAL,
    geolocation_lng REAL,
    geolocation_city TEXT,
    geolocation_state TEXT
);

CREATE TABLE dwh.dim_product_category (
    product_category_sk SERIAL PRIMARY KEY,
    product_category_name TEXT UNIQUE NOT NULL,
    product_category_name_english TEXT
);

CREATE TABLE dwh.dim_sellers (
    seller_sk SERIAL PRIMARY KEY,
    seller_id TEXT UNIQUE NOT NULL,
    seller_zip_code_prefix INTEGER,
    seller_city TEXT,
    seller_state TEXT,
    record_start_date DATE NOT NULL DEFAULT CURRENT_DATE,
    record_end_date DATE,
    is_current BOOLEAN NOT NULL DEFAULT TRUE
);

CREATE TABLE dwh.dim_customers (
    customer_sk SERIAL PRIMARY KEY,
    customer_id TEXT UNIQUE NOT NULL,
    customer_unique_id TEXT,
    customer_zip_code_prefix INTEGER,
    customer_city TEXT,
    customer_state TEXT,
    record_start_date DATE NOT NULL DEFAULT CURRENT_DATE,
    record_end_date DATE,
    is_current BOOLEAN NOT NULL DEFAULT TRUE
);

CREATE TABLE dwh.dim_products (
    product_sk SERIAL PRIMARY KEY,
    product_id TEXT NOT NULL,
    product_category_name TEXT REFERENCES dwh.dim_product_category(product_category_name),
    product_name_length REAL,
    product_description_length REAL,
    product_photos_qty REAL,
    product_weight_g REAL,
    product_length_cm REAL,
    product_height_cm REAL,
    product_width_cm REAL,
    record_start_date DATE NOT NULL DEFAULT CURRENT_DATE,
    record_end_date DATE,
    is_current BOOLEAN NOT NULL DEFAULT TRUE
);


CREATE TABLE dwh.fact_orders_comprehensive (
    order_compr_id SERIAL PRIMARY KEY, -- surrogate key

    order_id TEXT NOT NULL, -- natural key             
    order_date_id DATE NOT NULL REFERENCES dwh.dim_date(full_date),
    order_time_id TIME NOT NULL REFERENCES dwh.dim_time(time_id),
    customer_sk INTEGER NOT NULL REFERENCES dwh.dim_customers(customer_sk),
    seller_sk INTEGER NOT NULL REFERENCES dwh.dim_sellers(seller_sk),
    product_sk INTEGER NOT NULL REFERENCES dwh.dim_products(product_sk),
    product_category_sk INTEGER NOT NULL REFERENCES dwh.dim_product_category(product_category_sk),

    price NUMERIC NOT NULL,
    freight_value NUMERIC NOT NULL,
    total_value NUMERIC GENERATED ALWAYS AS (price + freight_value) STORED,

    inserted_at TIMESTAMP DEFAULT now()
);

CREATE TABLE dwh.fact_reviews_sentiment(
  review_id TEXT NOT NULL,

  order_compr_id INTEGER NOT NULL REFERENCES dwh.fact_orders_comprehensive(order_compr_id),
  customer_sk INTEGER NOT NULL REFERENCES dwh.dim_customers(customer_sk),
  product_sk INTEGER NOT NULL REFERENCES dwh.dim_products(product_sk),

  review_title TEXT,
  review_comment_message TEXT,
  review_score INTEGER
);

CREATE TABLE dwh.daily_periodical_snapshot_order_trends (
    daily_snapshot_id SERIAL PRIMARY KEY,
    order_date DATE UNIQUE REFERENCES dwh.dim_date(date_only),
    year_day VARCHAR NOT NULL,
    year_week VARCHAR NOT NULL,
    year_month VARCHAR NOT NULL,
    year_quarter VARCHAR NOT NULL,
    year_semester VARCHAR NOT NULL,
    distinct_customer_count INTEGER NOT NULL,
    distinct_seller_count INTEGER NOT NULL,
    order_id_count INTEGER NOT NULL,
    product_id_count INTEGER NOT NULL,
    sum_price NUMERIC NOT NULL,
    sum_freight_value NUMERIC NOT NULL,
    inserted_at TIMESTAMP NOT NULL DEFAULT NOW()
);

CREATE TABLE dwh.accumulating_snapshot_logistic_performance (
    accm_order_id SERIAL PRIMARY KEY,
    order_id TEXT,
    product_sk INTEGER NOT NULL REFERENCES dwh.dim_products(product_sk),
    seller_sk INTEGER NOT NULL REFERENCES dwh.dim_sellers(seller_sk),
    customer_sk INTEGER NOT NULL REFERENCES dwh.dim_customers(customer_sk),

    order_purchase_timestamp TIMESTAMP NOT NULL,
    order_approved_at TIMESTAMP,
    purchase_approve_difference INTERVAL,

    order_estimated_delivery_date TIMESTAMP,
    order_shipping_limit_date TIMESTAMP,
    order_delivered_carrier_date TIMESTAMP,
    order_delivered_customer_date TIMESTAMP,

    approved_carrier_difference INTERVAL,
    approved_delivered_difference INTERVAL,

    order_purchase_date_id UUID REFERENCES dwh.dim_date(date_id),
    order_delivered_date_id UUID REFERENCES dwh.dim_date(date_id),

    last_updated TIMESTAMP NOT NULL DEFAULT NOW(),

    order_status_code VARCHAR
);

 * postgresql://postgres:***@localhost:5434/olist-dwh
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.


[]

**Script to populate `dim_date` and `dim_time` in `dwh` schema in `olist-dwh` database**

In [None]:
%%sql 

WITH date_series AS (
    SELECT generate_series(DATE '2015-01-01', DATE '2020-12-31', INTERVAL '1 day')::DATE AS full_date
),

date_enriched AS (
    SELECT
        gen_random_uuid() AS date_id,
        full_date,
        full_date AS date_only,
        EXTRACT(YEAR FROM full_date)::INT AS year,
        EXTRACT(MONTH FROM full_date)::INT AS month,
        EXTRACT(DAY FROM full_date)::INT AS day,
        EXTRACT(WEEK FROM full_date)::INT AS week,
        EXTRACT(QUARTER FROM full_date)::INT AS quarter,
        CASE WHEN EXTRACT(MONTH FROM full_date) <= 6 THEN 1 ELSE 2 END AS semester,
        TO_CHAR(full_date, 'Day') AS day_of_week,
        TO_CHAR(full_date, 'Month') AS month_name,
        TO_CHAR(full_date, 'YYYY-DDD') AS year_day,
        TO_CHAR(full_date, 'YYYY-MM') AS year_month,
        TO_CHAR(full_date, 'IYYY-IW') AS year_week,
        TO_CHAR(full_date, 'YYYY-"Q"Q') AS year_quarter,
        TO_CHAR(full_date, 'YYYY-"S"') || CASE WHEN EXTRACT(MONTH FROM full_date) <= 6 THEN '1' ELSE '2' END AS year_semester
    FROM date_series
)

INSERT INTO dwh.dim_date (
    date_id, full_date, date_only,
    year, month, day, week, quarter, semester,
    day_of_week, month_name, year_day, year_month, year_week, year_quarter, year_semester
)
SELECT
    date_id, full_date, date_only,
    year, month, day, week, quarter, semester,
    TRIM(day_of_week), TRIM(month_name),
    year_day, year_month, year_week, year_quarter, year_semester
FROM date_enriched
ORDER BY full_date;


DO $$
DECLARE
    h INT;
    m INT;
    s INT;
    time_val TIME;
BEGIN
    FOR h IN 0..23 LOOP
        FOR m IN 0..59 LOOP
            FOR s IN 0..59 LOOP
                time_val := MAKE_TIME(h, m, s);
                INSERT INTO dim_time (time_id, hour, minute, second)
                VALUES (time_val, h, m, s);
            END LOOP;
        END LOOP;
    END LOOP;
END $$;

**ELT to populate `stg` schema in `olist-dwh` database from `public` schema in `olist-src` database`**

In [None]:
# simple code
import psycopg2

# Connection details
src_db_params = {
    "dbname": "olist-src",
    "user": "postgres",
    "password": "postgres",
    "host": "localhost",
    "port": "5433"
}

dst_db_params = {
    "dbname": "olist-dwh",
    "user": "postgres",
    "password": "postgres",
    "host": "localhost",
    "port": "5434"
}

# Tables to transfer
tables = [
    "geolocation", "product_category_name_translation", 
    "sellers", "customers", "products", 
    "orders", "order_items", "order_payments", "order_reviews"
]

try:
    # Connect to source database
    src_conn = psycopg2.connect(**src_db_params)
    src_cursor = src_conn.cursor()

    # Connect to destination database
    dst_conn = psycopg2.connect(**dst_db_params)
    dst_cursor = dst_conn.cursor()

    for table in tables:
        print(f"Transferring data for {table}...")

        # Fetch data from the source table
        src_cursor.execute(f"SELECT * FROM public.{table}")
        rows = src_cursor.fetchall()

        # Truncate destination table to prevent duplicates
        dst_cursor.execute(f"TRUNCATE TABLE stg.{table} RESTART IDENTITY CASCADE")

        # Insert extracted data into destination table
        for row in rows:
            placeholders = ", ".join(["%s"] * len(row))
            query = f"INSERT INTO stg.{table} VALUES ({placeholders})"
            dst_cursor.execute(query, row)

    # Commit changes
    dst_conn.commit()
    print("Data transfer completed successfully!")

except Exception as e:
    print("Error:", e)

finally:
    # Close connections
    src_cursor.close()
    src_conn.close()
    dst_cursor.close()
    dst_conn.close()

Transferring data for geolocation...
Transferring data for product_category_name_translation...
Transferring data for sellers...
Transferring data for customers...
Transferring data for products...
Transferring data for orders...
Transferring data for order_items...
Transferring data for order_payments...
Transferring data for order_reviews...
Data transfer completed successfully!


**ELT to populate `stg` schema from `public` schema in `olist-dwh` database**

In [None]:
INSERT INTO stg.geolocation
    (geolocation_zip_code_prefix,
    geolocation_lat,
    geolocation_lng,
    geolocation_city,
    geolocation_state) 
SELECT
    geolocation_zip_code_prefix, 
    geolocation_lat,
    geolocation_lng,
    geolocation_city,
    geolocation_state
FROM public.geolocation

ON CONFLICT (geolocation_zip_code_prefix) DO NOTHING;

INSERT INTO stg.product_category_name_translation 
    (product_category_name, 
    product_category_name_english) 
SELECT
    product_category_name, 
    product_category_name_english
FROM public.product_name_category_translation

ON CONFLICT(product_category_name) DO NOTHING;

INSERT INTO stg.sellers (
  seller_id, seller_zip_code_prefix, seller_city, seller_state
)
SELECT
  seller_id, seller_zip_code_prefix, seller_city, seller_state
FROM public.sellers

ON CONFLICT(seller_id) DO NOTHING;

INSERT INTO stg.customers (
  customer_id, customer_unique_id, customer_zip_code_prefix, customer_city, customer_state
)
SELECT customer_id, customer_unique_id, customer_zip_code_prefix, customer_city, customer_state
FROM public.customers

ON CONFLICT(customer_id) DO NOTHING;

INSERT INTO stg.products (
  product_id, product_category_name,
  product_name_length, product_description_length,
  product_photos_qty, product_weight_g,
  product_length_cm, product_height_cm, product_width_cm
)
SELECT product_id, product_category_name,
  product_name_length, product_description_length,
  product_photos_qty, product_weight_g,
  product_length_cm, product_height_cm, product_width_cm
FROM public.products
ON CONFLICT(product_id) DO NOTHING;

INSERT INTO stg.orders (
  order_id, customer_id,
  order_status, order_purchase_timestamp, order_approved_at,
  order_delivered_carrier_date, order_delivered_customer_date, order_estimated_delivery_date
)
SELECT order_id, customer_id,
  order_status, order_purchase_timestamp, order_approved_at,
  order_delivered_carrier_date, order_delivered_customer_date, order_estimated_delivery_date
FROM public.orders
ON CONFLICT(order_id) DO NOTHING;

INSERT INTO stg.order_items (
  order_id, order_item_id,
  product_id, seller_id, 
  shipping_limit_date, price, freight_value
 )
SELECT order_id, order_item_id,
  product_id, seller_id, 
  shipping_limit_date, price, freight_value
FROM public.order_items;

INSERT INTO stg.order_payments (
  order_id, payment_sequential, payment_type, payment_installments, payment_value
)
SELECT order_id, payment_sequential, payment_type, payment_installments, payment_value
FROM public.order_payments;

INSERT INTO stg.order_reviews (
  review_id, order_id, 
  review_score, review_comment_title, review_comment_message, 
  review_creation_date
)
SELECT review_id, order_id, 
  review_score, review_comment_title, review_comment_message, 
  review_creation_date
FROM public.order_reviews
ON CONFLICT(review_id) DO NOTHING;

**ELT Script for `dim` tables**

In [8]:

-- dim geolocation
INSERT INTO dwh.dim_geolocation (
    geolocation_zip_code_prefix,
    geolocation_lat, geolocation_lng,
    geolocation_city, geolocation_state
)
SELECT geolocation_zip_code_prefix, 
  geolocation_lat, geolocation_lng, 
  geolocation_city, geolocation_state
FROM stg.geolocation
WHERE geolocation_zip_code_prefix IS NOT NULL
ON CONFLICT (geolocation_zip_code_prefix) DO NOTHING;

-- dim product category
INSERT INTO dwh.dim_product_category (
    product_category_name,
    product_category_name_english
    )
 SELECT DISTINCT
    product_category_name,
    product_category_name_english
 FROM stg.product_category_name_translation
 WHERE product_category_name IS NOT NULL
 ON CONFLICT(product_category_name) DO NOTHING;

-- dim sellers
-- Step 1: Expire old records if there's a change
UPDATE dwh.dim_sellers d
SET 
    record_end_date = CURRENT_DATE - INTERVAL '1 day',
    is_current = FALSE
FROM stg.sellers s
WHERE d.seller_id = s.seller_id
  AND d.is_current = TRUE
  AND (
        d.seller_zip_code_prefix IS DISTINCT FROM s.seller_zip_code_prefix OR
        d.seller_city IS DISTINCT FROM s.seller_city OR
        d.seller_state IS DISTINCT FROM s.seller_state
      );

-- Step 2: Insert new records (only new or changed)
INSERT INTO dwh.dim_sellers (
    seller_id,
    seller_zip_code_prefix,
    seller_city,
    seller_state,
    record_start_date,
    is_current
)
SELECT
    s.seller_id,
    s.seller_zip_code_prefix,
    s.seller_city,
    s.seller_state,
    CURRENT_DATE,
    TRUE
FROM stg.sellers s
LEFT JOIN dwh.dim_sellers d
  ON s.seller_id = d.seller_id AND d.is_current = TRUE
WHERE 
    d.seller_id IS NULL -- new
    OR (
        d.seller_zip_code_prefix IS DISTINCT FROM s.seller_zip_code_prefix OR
        d.seller_city IS DISTINCT FROM s.seller_city OR
        d.seller_state IS DISTINCT FROM s.seller_state
    );

-- dim customers
UPDATE dwh.dim_customers d
SET 
    is_current = FALSE,
    record_end_date = CURRENT_DATE
FROM stg.customers c
WHERE 
    d.customer_id = c.customer_id
    AND d.is_current = TRUE
    AND (
        d.customer_unique_id IS DISTINCT FROM c.customer_unique_id OR
        d.customer_zip_code_prefix IS DISTINCT FROM c.customer_zip_code_prefix OR
        d.customer_city IS DISTINCT FROM c.customer_city OR
        d.customer_state IS DISTINCT FROM c.customer_state
    );

INSERT INTO dwh.dim_customers (
    customer_id,
    customer_unique_id,
    customer_zip_code_prefix,
    customer_city,
    customer_state,
    record_start_date,
    is_current
)
SELECT
    c.customer_id,
    c.customer_unique_id,
    c.customer_zip_code_prefix,
    c.customer_city,
    c.customer_state,
    CURRENT_DATE,
    TRUE
FROM stg.customers c
LEFT JOIN dwh.dim_customers d
    ON c.customer_id = d.customer_id AND d.is_current = TRUE
WHERE 
    d.customer_id IS NULL OR (
        d.customer_unique_id IS DISTINCT FROM c.customer_unique_id OR
        d.customer_zip_code_prefix IS DISTINCT FROM c.customer_zip_code_prefix OR
        d.customer_city IS DISTINCT FROM c.customer_city OR
        d.customer_state IS DISTINCT FROM c.customer_state
    );

-- dim products
-- Step 1: Expire the current records with changes
UPDATE dwh.dim_products d
SET 
    record_end_date = CURRENT_DATE - INTERVAL '1 day',
    is_current = FALSE
FROM stg.products p
WHERE d.product_id = p.product_id
  AND d.is_current = TRUE
  AND (
      d.product_category_name IS DISTINCT FROM p.product_category_name OR
      d.product_name_length IS DISTINCT FROM p.product_name_length OR
      d.product_description_length IS DISTINCT FROM p.product_description_length OR
      d.product_photos_qty IS DISTINCT FROM p.product_photos_qty OR
      d.product_weight_g IS DISTINCT FROM p.product_weight_g OR
      d.product_length_cm IS DISTINCT FROM p.product_length_cm OR
      d.product_height_cm IS DISTINCT FROM p.product_height_cm OR
      d.product_width_cm IS DISTINCT FROM p.product_width_cm
  );

-- Step 2: Insert new records (new or changed)
INSERT INTO dwh.dim_products (
    product_id,
    product_category_name,
    product_name_length,
    product_description_length,
    product_photos_qty,
    product_weight_g,
    product_length_cm,
    product_height_cm,
    product_width_cm,
    record_start_date,
    is_current
)
SELECT DISTINCT
    p.product_id,
    p.product_category_name,
    p.product_name_length,
    p.product_description_length,
    p.product_photos_qty,
    p.product_weight_g,
    p.product_length_cm,
    p.product_height_cm,
    p.product_width_cm,
    CURRENT_DATE,
    TRUE
FROM stg.products p
LEFT JOIN dwh.dim_products d
  ON p.product_id = d.product_id AND d.is_current = TRUE
WHERE 
    d.product_id IS NULL -- New product
    OR (
      d.product_category_name IS DISTINCT FROM p.product_category_name OR
      d.product_name_length IS DISTINCT FROM p.product_name_length OR
      d.product_description_length IS DISTINCT FROM p.product_description_length OR
      d.product_photos_qty IS DISTINCT FROM p.product_photos_qty OR
      d.product_weight_g IS DISTINCT FROM p.product_weight_g OR
      d.product_length_cm IS DISTINCT FROM p.product_length_cm OR
      d.product_height_cm IS DISTINCT FROM p.product_height_cm OR
      d.product_width_cm IS DISTINCT FROM p.product_width_cm
    );

SyntaxError: invalid syntax (284399526.py, line 1)

**ELT code for comprehensive `fact` tables**

In [None]:
-- fact order comprehensive
INSERT INTO dwh.fact_orders_comprehensive (
    order_id, order_date_id, order_time_id,
    customer_sk, seller_sk, product_sk, product_category_sk,
    price, freight_value, quantity
)
SELECT
    oi.order_id, dd.full_date, dt.time_id,
    dc.customer_sk, ds.seller_sk, dp.product_sk, dpc.product_category_sk,
    oi.price, oi.freight_value, oi.quantity

FROM stg.order_items oi
JOIN stg.orders o ON oi.order_id = o.order_id

JOIN dwh.dim_date dd ON dd.full_date = DATE(o.order_approved_at)
JOIN dwh.dim_time dt ON
    EXTRACT(HOUR FROM o.order_approved_at)::INT = dt.hours AND
    EXTRACT(MINUTE FROM o.order_approved_at)::INT = dt.minutes AND
    EXTRACT(SECOND FROM o.order_approved_at)::INT = dt.seconds

JOIN dwh.dim_customers dc ON dc.customer_id = o.customer_id AND dc.is_current = TRUE
JOIN dwh.dim_sellers ds ON ds.seller_id = oi.seller_id AND ds.is_current = TRUE
JOIN dwh.dim_products dp ON dp.product_id = oi.product_id AND dp.is_current = TRUE
JOIN dwh.dim_product_category dpc ON dpc.product_category_name = dp.product_category_name;


-- Example ELT script for populating periodical_snapshot_order_trends
INSERT INTO dwh.daily_periodical_snapshot_order_trends (
    order_date,
    year_day,
    year_week,
    year_month,
    year_quarter,
    year_semester,
    distinct_customer_count,
    distinct_seller_count,
    order_id_count,
    product_id_count,
    sum_price,
    sum_freight_value
)
SELECT
    dd.full_date AS order_date,
    dd.year_day,
    dd.year_week,
    dd.year_month,
    dd.year_quarter,
    dd.year_semester,

    COUNT(DISTINCT o.customer_id) AS distinct_customer_count,
    COUNT(DISTINCT oi.seller_id) AS distinct_seller_count,
    COUNT(DISTINCT o.order_id) AS order_id_count,
    COUNT(DISTINCT oi.product_id) AS product_id_count,
    SUM(oi.price) AS sum_price,
    SUM(oi.freight_value) AS sum_freight_value

FROM stg.orders o
JOIN stg.order_items oi ON oi.order_id = o.order_id
JOIN dwh.dim_date dd ON dd.full_date = TO_DATE(o.order_approved_at, 'YYYY-MM-DD')

-- Optional: avoid duplicate snapshot inserts
LEFT JOIN dwh.daily_periodical_snapshot_order_trends existing
    ON existing.order_date = dd.full_date
WHERE o.order_approved_at IS NOT NULL
  AND existing.order_date IS NULL

GROUP BY
    dd.full_date,
    dd.year_day,
    dd.year_week,
    dd.year_month,
    dd.year_quarter,
    dd.year_semester;


 
-- fact accumulating snapshot
INSERT INTO dwh.accumulating_snapshot_logistic_performance (
    order_id,
    product_sk,
    seller_sk,
    customer_sk,
    order_purchase_timestamp,
    order_approved_at,
    purchase_approve_difference,
    order_estimated_delivery_date,
    order_shipping_limit_date,
    order_delivered_carrier_date,
    order_delivered_customer_date,
    approved_carrier_difference,
    approved_delivered_difference,
    order_purchase_date_id,
    order_delivered_date_id,
    last_updated,
    order_status_code
)
SELECT
    o.order_id::UUID,

    dp.product_sk,
    ds.seller_sk,
    dc.customer_sk,

    TO_TIMESTAMP(o.order_purchase_timestamp, 'YYYY-MM-DD HH24:MI:SS') AS order_purchase_ts,
    TO_TIMESTAMP(o.order_approved_at, 'YYYY-MM-DD HH24:MI:SS') AS approved_ts,

    CASE 
        WHEN o.order_approved_at IS NOT NULL 
        THEN TO_TIMESTAMP(o.order_approved_at, 'YYYY-MM-DD HH24:MI:SS') - TO_TIMESTAMP(o.order_purchase_timestamp, 'YYYY-MM-DD HH24:MI:SS')
        ELSE NULL
    END AS purchase_approve_diff,

    TO_TIMESTAMP(o.order_estimated_delivery_date, 'YYYY-MM-DD') AS estimated_delivery,
    TO_TIMESTAMP(oi.shipping_limit_date, 'YYYY-MM-DD HH24:MI:SS') AS shipping_limit,
    TO_TIMESTAMP(o.order_delivered_carrier_date, 'YYYY-MM-DD HH24:MI:SS') AS delivered_carrier,
    TO_TIMESTAMP(o.order_delivered_customer_date, 'YYYY-MM-DD HH24:MI:SS') AS delivered_customer,

    CASE 
        WHEN o.order_delivered_carrier_date IS NOT NULL AND o.order_approved_at IS NOT NULL
        THEN TO_TIMESTAMP(o.order_delivered_carrier_date, 'YYYY-MM-DD HH24:MI:SS') - TO_TIMESTAMP(o.order_approved_at, 'YYYY-MM-DD HH24:MI:SS')
        ELSE NULL
    END AS approved_carrier_diff,

    CASE 
        WHEN o.order_delivered_customer_date IS NOT NULL AND o.order_approved_at IS NOT NULL
        THEN TO_TIMESTAMP(o.order_delivered_customer_date, 'YYYY-MM-DD HH24:MI:SS') - TO_TIMESTAMP(o.order_approved_at, 'YYYY-MM-DD HH24:MI:SS')
        ELSE NULL
    END AS approved_delivered_diff,

    dd_order.date_id AS order_purchase_date_id,
    dd_delivered.date_id AS order_delivered_date_id,

    NOW() AS last_updated,

    o.order_status AS order_status_code

FROM stg.orders o
JOIN stg.order_items oi ON o.order_id = oi.order_id

-- Dimension lookups with SCD type 2 handling
JOIN dwh.dim_customers dc ON o.customer_id = dc.customer_id AND dc.is_current = TRUE
JOIN dwh.dim_sellers ds ON oi.seller_id = ds.seller_id AND ds.is_current = TRUE
JOIN dwh.dim_products dp ON oi.product_id = dp.product_id

-- Dates
LEFT JOIN dwh.dim_date dd_order ON dd_order.full_date = TO_DATE(o.order_purchase_timestamp, 'YYYY-MM-DD')
LEFT JOIN dwh.dim_date dd_delivered ON dd_delivered.full_date = TO_DATE(o.order_delivered_customer_date, 'YYYY-MM-DD')

-- Only if order_purchase_timestamp is present
WHERE o.order_purchase_timestamp IS NOT NULL;

## **Step \#4 \- Orchestrate ELT with Luigi (20 points)**

- Use Luigi to orchestrate the pipeline you created
- set up scheduling (using cron if applicable).

#### **Extract Script**

In [22]:
import luigi
from datetime import datetime
import logging
import time
import pandas as pd
import os
import sys

sys.path.append(r"C:\Users\LENOVO THINKPAD\Documents\Github\Pacmann---Data-Storage_-Warehouse--Mart---Lake\mentoring 2\olist-dwh-project")

from pipeline.utility.db_conn import db_connection
from pipeline.utility.read_sql import read_sql_file
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Define DIR
DIR_ROOT_PROJECT = os.getenv("DIR_ROOT_PROJECT")
DIR_TEMP_LOG = os.getenv("DIR_TEMP_LOG")
DIR_TEMP_DATA = os.getenv("DIR_TEMP_DATA")
DIR_EXTRACT_QUERY = os.getenv("DIR_EXTRACT_QUERY")
DIR_LOG = os.getenv("DIR_LOG")

class Extract(luigi.Task):
    # Define tables to be extracted from db sources
    tables_to_extract = ['public.geolocation', 
                        'public.product_category_name_translation', 
                        'public.sellers', 
                        'public.customers', 
                        'public.products', 
                        'public.orders',
                        'public.order_items',
                        'public.order_payments',
                        'public.order_reviews']
    
    def requires(self):
        pass


    def run(self):
        try:
            # Configure logging
            logging.basicConfig(filename = f'{DIR_TEMP_LOG}/logs.log', 
                                level = logging.INFO, 
                                format = '%(asctime)s - %(levelname)s - %(message)s')
            
            # Define db connection engine
            src_engine, _ = db_connection()
            
            # Define the query using the SQL content
            extract_query = read_sql_file(
                file_path = f'{DIR_EXTRACT_QUERY}/extract-all-tables.sql'
            )
            
            start_time = time.time()  # Record start time
            logging.info("==================================STARTING EXTRACT DATA=======================================")
            
            for index, table_name in enumerate(self.tables_to_extract):
                try:
                    # Read data into DataFrame
                    df = pd.read_sql_query(extract_query.format(table_name = table_name), src_engine)

                    # Write DataFrame to CSV
                    df.to_csv(f"{DIR_TEMP_DATA}/{table_name}.csv", index=False)
                    
                    logging.info(f"EXTRACT '{table_name}' - SUCCESS.")
                    
                except Exception:
                    logging.error(f"EXTRACT '{table_name}' - FAILED.")  
                    raise Exception(f"Failed to extract '{table_name}' tables")
            
            logging.info(f"Extract All Tables From Sources - SUCCESS")
            
            end_time = time.time()  # Record end time
            execution_time = end_time - start_time  # Calculate execution time
            
            # Get summary
            summary_data = {
                'timestamp': [datetime.now()],
                'task': ['Extract'],
                'status' : ['Success'],
                'execution_time': [execution_time]
            }
            
            # Get summary dataframes
            summary = pd.DataFrame(summary_data)
            
            # Write DataFrame to CSV
            summary.to_csv(f"{DIR_TEMP_DATA}/extract-summary.csv", index = False)
                    
        except Exception:   
            logging.info(f"Extract All Tables From Sources - FAILED")
             
            # Get summary
            summary_data = {
                'timestamp': [datetime.now()],
                'task': ['Extract'],
                'status' : ['Failed'],
                'execution_time': [0]
            }
            
            # Get summary dataframes
            summary = pd.DataFrame(summary_data)
            
            # Write DataFrame to CSV
            summary.to_csv(f"{DIR_TEMP_DATA}/extract-summary.csv", index = False)
            
            # Write exception
            raise Exception(f"FAILED to execute EXTRACT TASK !!!")
        
        logging.info("==================================ENDING EXTRACT DATA=======================================")
                
    def output(self):
        outputs = []
        for table_name in self.tables_to_extract:
            outputs.append(luigi.LocalTarget(f'{DIR_TEMP_DATA}/{table_name}.csv'))
            
        outputs.append(luigi.LocalTarget(f'{DIR_TEMP_DATA}/extract-summary.csv'))
            
        outputs.append(luigi.LocalTarget(f'{DIR_TEMP_LOG}/logs.log'))
        return outputs
    
#if __name__ == '__main__':
 #   luigi.build([Extract()])

In [23]:
luigi.build([Extract()], local_scheduler=True)

DEBUG: Checking if Extract() is complete
INFO: Informed scheduler that task   Extract__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 21524] Worker Worker(salt=6265168950, workers=1, host=LAPTOP-H2OPKO7T, username=LENOVO THINKPAD, pid=21524) running   Extract()
INFO: [pid 21524] Worker Worker(salt=6265168950, workers=1, host=LAPTOP-H2OPKO7T, username=LENOVO THINKPAD, pid=21524) done      Extract()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Extract__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=6265168950, workers=1, host=LAPTOP-H2OPKO7T, username=LENOVO THINKPAD, pid=21524) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfull

True

#### **Load script**

In [24]:
import luigi
import logging
import pandas as pd
import time
import sqlalchemy

import os
import sys

sys.path.append(r"C:\Users\LENOVO THINKPAD\Documents\Github\Pacmann---Data-Storage_-Warehouse--Mart---Lake\mentoring 2\olist-dwh-project")

from datetime import datetime

from pipeline.extract import Extract
from pipeline.utility.db_conn import db_connection
from pipeline.utility.read_sql import read_sql_file
from sqlalchemy.orm import sessionmaker
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Define DIR
DIR_ROOT_PROJECT = os.getenv("DIR_ROOT_PROJECT")
DIR_TEMP_LOG = os.getenv("DIR_TEMP_LOG")
DIR_TEMP_DATA = os.getenv("DIR_TEMP_DATA")
DIR_LOAD_QUERY = os.getenv("DIR_LOAD_QUERY")
DIR_LOG = os.getenv("DIR_LOG")

# read sql query files to load from `public` to `stg` (only read, not execute)
# reading csv file into df
# establishing connection to dwh_engine
# truncate / delete the content of the public schema
# do `to_sql()` from the df, to the `public` schema
# execute the queries to load from `public` to `stg` schema

class Load(luigi.Task):   
    
    def requires(self):
        return Extract()
    
    def run(self):
         
        # Configure logging
        logging.basicConfig(filename = f'{DIR_TEMP_LOG}/logs.log', 
                            level = logging.INFO, 
                            format = '%(asctime)s - %(levelname)s - %(message)s')
        
        #----------------------------------------------------------------------------------------------------------------------------------------
        # Read query to be executed
        try:
            # Read query to truncate public schema in dwh
            truncate_query = read_sql_file(
                file_path = f'{DIR_LOAD_QUERY}/load-public-truncate_tables.sql'
            )
            
            # Read load query to staging schema
            geolocation_query = read_sql_file(
                file_path = f'{DIR_LOAD_QUERY}/load-stg-geolocation.sql'
            )
            
            product_category_query = read_sql_file(
                file_path = f'{DIR_LOAD_QUERY}/load-stg-product_category_name_translation.sql'
            )
            
            sellers_query = read_sql_file(
                file_path = f'{DIR_LOAD_QUERY}/load-stg-sellers.sql'
            )
            
            customers_query = read_sql_file(
                file_path = f'{DIR_LOAD_QUERY}/load-stg-customers.sql'
            )
            
            products_query = read_sql_file(
                file_path = f'{DIR_LOAD_QUERY}/load-stg-products.sql'
            )
            
            orders_query = read_sql_file(
                file_path = f'{DIR_LOAD_QUERY}/load-stg-orders.sql'
            )  
            
            order_items_query = read_sql_file(
                file_path = f'{DIR_LOAD_QUERY}/load-stg-order_items.sql'
            )  
            
            order_payments_query = read_sql_file(
                file_path = f'{DIR_LOAD_QUERY}/load-stg-order_payments.sql'
            )
            
            order_reviews_query = read_sql_file(
                file_path = f'{DIR_LOAD_QUERY}/load-stg-order_reviews.sql'
            )                                        
            
            logging.info("Read Load Query - SUCCESS")
            
        except Exception:
            logging.error("Read Load Query - FAILED")
            raise Exception("Failed to read Load Query")

        #----------------------------------------------------------------------------------------------------------------------------------------
        # Read Data to be load
        try:
            # Read csv
            geolocation = pd.read_csv(self.input()[0].path)
            product_category = pd.read_csv(self.input()[1].path)
            sellers = pd.read_csv(self.input()[2].path)
            customers = pd.read_csv(self.input()[3].path)
            products = pd.read_csv(self.input()[4].path)
            orders = pd.read_csv(self.input()[5].path)
            order_items = pd.read_csv(self.input()[6].path)
            order_payments = pd.read_csv(self.input()[7].path)
            order_reviews = pd.read_csv(self.input()[8].path)
            
            logging.info(f"Read Extracted Data - SUCCESS")
            
        except Exception:
            logging.error(f"Read Extracted Data  - FAILED")
            raise Exception("Failed to Read Extracted Data")
        
        
        #----------------------------------------------------------------------------------------------------------------------------------------
        # Establish connections to DWH
        try:
            _, dwh_engine = db_connection()
            logging.info(f"Connect to olist-dwh - SUCCESS")
            
        except Exception:
            logging.info(f"Connect to olist-dwh - FAILED")
            raise Exception("Failed to connect to Data Warehouse")
        
        
        #----------------------------------------------------------------------------------------------------------------------------------------
        # Truncate all tables in public schema before load
        # This purpose to avoid errors because duplicate key value violates unique constraint
        try:            
            # Split the SQL queries if multiple queries are present
            truncate_query = truncate_query.split(';')

            # Remove newline characters and leading/trailing whitespaces
            truncate_query = [query.strip() for query in truncate_query if query.strip()]
            
            # Create session
            Session = sessionmaker(bind = dwh_engine)
            session = Session()

            # Execute each query
            for query in truncate_query:
                query = sqlalchemy.text(query)
                session.execute(query)
                
            session.commit()
            
            # Close session
            session.close()

            logging.info(f"Truncate `public` Schema in olist-dwh - SUCCESS")
        
        except Exception:
            logging.error(f"Truncate `public` Schema in olist-dwh - FAILED")
            
            raise Exception("Failed to Truncate `public` schema in olist-dwh")
        
        
        
        #----------------------------------------------------------------------------------------------------------------------------------------
        # Record start time for loading tables
        start_time = time.time()  
        logging.info("==================================STARTING LOAD DATA=======================================")
        # Load to tables
        try:
            
            try:
                # Load to public schema   
                geolocation.to_sql('geolocation', 
                                    con = dwh_engine, 
                                    if_exists = 'append', 
                                    index = False, 
                                    schema = 'public')
                
                product_category.to_sql('product_category_name_translation', 
                                    con = dwh_engine, 
                                    if_exists = 'append', 
                                    index = False, 
                                    schema = 'public')
                
                sellers.to_sql('sellers', 
                                con = dwh_engine, 
                                if_exists = 'append', 
                                index = False, 
                                schema = 'public')
                
                customers.to_sql('customers', 
                            con = dwh_engine, 
                            if_exists = 'append', 
                            index = False, 
                            schema = 'public')
                
                products.to_sql('products', 
                            con = dwh_engine, 
                            if_exists = 'append', 
                            index = False, 
                            schema = 'public')
                
                orders.to_sql('orders', 
                            con = dwh_engine, 
                            if_exists = 'append', 
                            index = False, 
                            schema = 'public')
                
                order_items.to_sql('order_items', 
                                   con = dwh_engine, 
                                   if_exists = 'append', 
                                   index = False, 
                                   schema= 'public')
                
                order_payments.to_sql('order_payments',
                                      con = dwh_engine,
                                      if_exists= 'append',
                                      index=False,
                                      schema='public')
                
                order_reviews.to_sql('order_reviews',
                                     con = dwh_engine,
                                     if_exists= 'append',
                                     index=False,
                                     schema='public')
                
                logging.info(f"LOAD All Tables To olist-dwh-public - SUCCESS")
                
            except Exception:
                logging.error(f"LOAD All Tables To olist-dwh-public - FAILED")
                raise Exception('Failed Load Tables To olist-dwh-public')
            
            
            #----------------------------------------------------------------------------------------------------------------------------------------
            # Load from public schema to staging schema
            try:
                # List query
                load_stg_queries = [geolocation_query, product_category_query,
                                    sellers_query, customers_query, products_query,
                                    orders_query, order_items_query, order_payments_query, order_reviews_query]
                
                # Create session
                Session = sessionmaker(bind = dwh_engine)
                session = Session()

                # Execute each query
                for query in load_stg_queries:
                    query = sqlalchemy.text(query)
                    session.execute(query)
                    
                session.commit()
                
                # Close session
                session.close()
                
                logging.info("LOAD All Tables To DWH-Staging - SUCCESS")
                
            except Exception:
                logging.error("LOAD All Tables To DWH-Staging - FAILED")
                raise Exception('Failed Load Tables To DWH-Staging')
        
        
            # Record end time for loading tables
            end_time = time.time()  
            execution_time = end_time - start_time  # Calculate execution time
            
            # Get summary
            summary_data = {
                'timestamp': [datetime.now()],
                'task': ['Load'],
                'status' : ['Success'],
                'execution_time': [execution_time]
            }

            # Get summary dataframes
            summary = pd.DataFrame(summary_data)
            
            # Write Summary to CSV
            summary.to_csv(f"{DIR_TEMP_DATA}/load-summary.csv", index = False)
            
                        
        #----------------------------------------------------------------------------------------------------------------------------------------
        except Exception:
            # Get summary
            summary_data = {
                'timestamp': [datetime.now()],
                'task': ['Load'],
                'status' : ['Failed'],
                'execution_time': [0]
            }

            # Get summary dataframes
            summary = pd.DataFrame(summary_data)
            
            # Write Summary to CSV
            summary.to_csv(f"{DIR_TEMP_DATA}/load-summary.csv", index = False)
            
            logging.error("LOAD All Tables To DWH - FAILED")
            raise Exception('Failed Load Tables To DWH')   
        
        logging.info("==================================ENDING LOAD DATA=======================================")
        
    #----------------------------------------------------------------------------------------------------------------------------------------
    def output(self):
        return [luigi.LocalTarget(f'{DIR_TEMP_LOG}/logs.log'),
                luigi.LocalTarget(f'{DIR_TEMP_DATA}/load-summary.csv')]

In [25]:
luigi.build([Extract(),Load()], local_scheduler=True)

DEBUG: Checking if Extract() is complete
INFO: Informed scheduler that task   Extract__99914b932b   has status   DONE
DEBUG: Checking if Load() is complete
DEBUG: Checking if Extract() is complete
INFO: Informed scheduler that task   Load__99914b932b   has status   PENDING
INFO: Informed scheduler that task   Extract__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 21524] Worker Worker(salt=4202338360, workers=1, host=LAPTOP-H2OPKO7T, username=LENOVO THINKPAD, pid=21524) running   Load()
INFO: [pid 21524] Worker Worker(salt=4202338360, workers=1, host=LAPTOP-H2OPKO7T, username=LENOVO THINKPAD, pid=21524) done      Load()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Load__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(sa

True

#### **Transform Script**

In [26]:
import luigi
import logging
import pandas as pd
import time
import sqlalchemy
from datetime import datetime

import os
import sys

sys.path.append(r"C:\Users\LENOVO THINKPAD\Documents\Github\Pacmann---Data-Storage_-Warehouse--Mart---Lake\mentoring 2\olist-dwh-project")

from pipeline.extract import Extract
from pipeline.load import Load
from pipeline.utility.db_conn import db_connection
from pipeline.utility.read_sql import read_sql_file
from sqlalchemy.orm import sessionmaker

from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Define DIR
DIR_ROOT_PROJECT = os.getenv("DIR_ROOT_PROJECT")
DIR_TEMP_LOG = os.getenv("DIR_TEMP_LOG")
DIR_TEMP_DATA = os.getenv("DIR_TEMP_DATA")
DIR_TRANSFORM_QUERY = os.getenv("DIR_TRANSFORM_QUERY")
DIR_LOG = os.getenv("DIR_LOG")

class Transform(luigi.Task):
    
    def requires(self):
        return Load()
    
    def run(self):
         
        # Configure logging
        logging.basicConfig(filename = f'{DIR_TEMP_LOG}/logs.log', 
                            level = logging.INFO, 
                            format = '%(asctime)s - %(levelname)s - %(message)s')
        
        #----------------------------------------------------------------------------------------------------------------------------------------
        # Establish connections to DWH
        try:
            _, dwh_engine = db_connection()
            logging.info(f"Connect to DWH - SUCCESS")
            
        except Exception:
            logging.info(f"Connect to DWH - FAILED")
            raise Exception("Failed to connect to Data Warehouse")
        
        #----------------------------------------------------------------------------------------------------------------------------------------
        # Read query to be executed
        try:
            
            # Read transform query to final schema
            dim_geolocation_query = read_sql_file(
                file_path = f'{DIR_TRANSFORM_QUERY}/transform-dwh-dim_geolocation.sql'
            )
            
            dim_product_category_query = read_sql_file(
                file_path = f'{DIR_TRANSFORM_QUERY}/transform-dwh-dim_product_category.sql'
            )
            
            dim_sellers_query = read_sql_file(
                file_path = f'{DIR_TRANSFORM_QUERY}/transform-dwh-dim_sellers.sql'
            )
            
            dim_customers_query = read_sql_file(
                file_path = f'{DIR_TRANSFORM_QUERY}/transform-dwh-dim_customers.sql'
            )
            
            dim_products_query = read_sql_file(
                file_path = f'{DIR_TRANSFORM_QUERY}/transform-dwh-dim_products.sql'
            )
            
            fact_orders_comprehensive_query = read_sql_file(
                file_path = f'{DIR_TRANSFORM_QUERY}/transform-dwh-fact_orders_comprehensive.sql'
            )
            
            fact_reviews_sentiment_query = read_sql_file(
                file_path = f'{DIR_TRANSFORM_QUERY}/transform-dwh-fact_reviews_sentiment.sql'
            )
            
            fact_daily_periodicals_query = read_sql_file(
                file_path = f'{DIR_TRANSFORM_QUERY}/transform-dwh-fact_daily_periodical_snapshot_order_trends.sql'
            )
            
            fact_accumulating_snapshot_query = read_sql_file(
                file_path = f'{DIR_TRANSFORM_QUERY}/transform-dwh-fact_accumulating_snapshot_logistic_performance.sql'
            )
            
            logging.info("Read Transform Query - SUCCESS")
            
        except Exception:
            logging.error("Read Transform Query - FAILED")
            raise Exception("Failed to read Transform Query")        
        
        #----------------------------------------------------------------------------------------------------------------------------------------
        # Record start time for transform tables
        start_time = time.time()
        logging.info("==================================STARTING TRANSFROM DATA=======================================")  
               
        # Transform to dimensions tables
        try:
            # Create session
            Session = sessionmaker(bind = dwh_engine)
            session = Session()
            
            # Transform to `dwh` tables
            
            query = sqlalchemy.text(dim_geolocation_query)
            session.execute(query)
            logging.info("Transform to 'dwh.dim_geolocation' - SUCCESS")
            
            query = sqlalchemy.text(dim_product_category_query)
            session.execute(query)
            logging.info("Transform to 'dwh.dim_product_category' - SUCCESS")
            
            query = sqlalchemy.text(dim_sellers_query)
            session.execute(query)
            logging.info("Transform to 'dwh.dim_sellers' - SUCCESS")
            
            query = sqlalchemy.text(dim_customers_query)
            session.execute(query)
            logging.info("Transform to 'dwh.dim_customers' - SUCCESS")
        
            query = sqlalchemy.text(dim_products_query)
            session.execute(query)
            logging.info("Transform to 'dwh.dim_products' - SUCCESS")
            
            query = sqlalchemy.text(fact_orders_comprehensive_query)
            session.execute(query)
            logging.info("Transform to 'dwh.fact_orders_comprehensive' - SUCCESS")
            
            query = sqlalchemy.text(fact_reviews_sentiment_query)
            session.execute(query)
            logging.info("Transform to 'dwh.fact_reviews_sentiment' - SUCCESS")
            
            query = sqlalchemy.text(fact_daily_periodicals_query)
            session.execute(query)
            logging.info("Transform to 'dwh.fact_daily_periodicals_order_trends' - SUCCESS")
            
            query = sqlalchemy.text(fact_accumulating_snapshot_query)
            session.execute(query)
            logging.info("Transform to 'dwh.fact_accumulating_snapshot_logistic_performance' - SUCCESS")
            
            # Commit transaction
            session.commit()
            
            # Close session
            session.close()

            logging.info(f"Transform to All Dimensions and Fact Tables - SUCCESS")
            
            # Record end time for loading tables
            end_time = time.time()  
            execution_time = end_time - start_time  # Calculate execution time
            
            # Get summary
            summary_data = {
                'timestamp': [datetime.now()],
                'task': ['Transform'],
                'status' : ['Success'],
                'execution_time': [execution_time]
            }

            # Get summary dataframes
            summary = pd.DataFrame(summary_data)
            
            # Write Summary to CSV
            summary.to_csv(f"{DIR_TEMP_DATA}/transform-summary.csv", index = False)
            
        except Exception:
            logging.error(f"Transform to All Dimensions and Fact Tables - FAILED")
        
            # Get summary
            summary_data = {
                'timestamp': [datetime.now()],
                'task': ['Transform'],
                'status' : ['Failed'],
                'execution_time': [0]
            }

            # Get summary dataframes
            summary = pd.DataFrame(summary_data)
            
            # Write Summary to CSV
            summary.to_csv(f"{DIR_TEMP_DATA}/transform-summary.csv", index = False)
            
            logging.error("Transform Tables - FAILED")
            raise Exception('Failed Transforming Tables')   
        
        logging.info("==================================ENDING TRANSFROM DATA=======================================") 

    #----------------------------------------------------------------------------------------------------------------------------------------
    def output(self):
        return [luigi.LocalTarget(f'{DIR_TEMP_LOG}/logs.log'),
                luigi.LocalTarget(f'{DIR_TEMP_DATA}/transform-summary.csv')]

In [27]:
luigi.build([Extract(),Load(),Transform()], local_scheduler=True)

DEBUG: Checking if Extract() is complete
INFO: Informed scheduler that task   Extract__99914b932b   has status   DONE
DEBUG: Checking if Load() is complete
INFO: Informed scheduler that task   Load__99914b932b   has status   DONE
DEBUG: Checking if Transform() is complete
DEBUG: Checking if Load() is complete
INFO: Informed scheduler that task   Transform__99914b932b   has status   PENDING
INFO: Informed scheduler that task   Load__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 21524] Worker Worker(salt=8244291368, workers=1, host=LAPTOP-H2OPKO7T, username=LENOVO THINKPAD, pid=21524) running   Transform()
INFO: [pid 21524] Worker Worker(salt=8244291368, workers=1, host=LAPTOP-H2OPKO7T, username=LENOVO THINKPAD, pid=21524) done      Transform()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Transform__99914b932b   has stat

True

### **`elt_main` script**

In [28]:
import luigi
#import sentry_sdk
import pandas as pd
import os

from pipeline.extract import Extract
from pipeline.load import Load
from pipeline.transform import Transform
from pipeline.utility.concat_dataframe import concat_dataframes
from pipeline.utility.copy_log import copy_log
from pipeline.utility.delete_temp_data import delete_temp

from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Read env variables
DIR_ROOT_PROJECT = os.getenv("DIR_ROOT_PROJECT")
DIR_TEMP_LOG = os.getenv("DIR_TEMP_LOG")
DIR_TEMP_DATA = os.getenv("DIR_TEMP_DATA")
DIR_LOG = os.getenv("DIR_LOG")
SENTRY_DSN = os.getenv("SENTRY_DSN")

# Track the error using sentry
#sentry_sdk.init(
    #dsn = f"{SENTRY_DSN}"
#)

# Function to ensure a file exists with a specified header
def ensure_file_exists_with_header(file_path, header=None):
    if not os.path.exists(file_path):
        with open(file_path, 'w') as f:
            if header:
                f.write(header)

# Ensure pipeline_summary.csv exists with header
pipeline_summary_path = f'{DIR_ROOT_PROJECT}/pipeline_summary.csv'
ensure_file_exists_with_header(pipeline_summary_path, 'timestamp,task,status,execution_time\n')

# Execute the functions when the script is run
#if __name__ == "__main__":
    # Build the task
luigi.build([Extract(),
                Load(),
                Transform()], local_scheduler=True)

# Concat temp extract summary to final summary
concat_dataframes(
    df1 = pd.read_csv(f'{DIR_ROOT_PROJECT}/pipeline_summary.csv'),
    df2 = pd.read_csv(f'{DIR_TEMP_DATA}/extract-summary.csv')
)

# Concat temp load summary to final summary
concat_dataframes(
    df1 = pd.read_csv(f'{DIR_ROOT_PROJECT}/pipeline_summary.csv'),
    df2 = pd.read_csv(f'{DIR_TEMP_DATA}/load-summary.csv')
)

# Concat temp load summary to final summary
concat_dataframes(
    df1 = pd.read_csv(f'{DIR_ROOT_PROJECT}/pipeline_summary.csv'),
    df2 = pd.read_csv(f'{DIR_TEMP_DATA}/transform-summary.csv')
)

# Append log from temp to final log
copy_log(
    source_file = f'{DIR_TEMP_LOG}/logs.log',
    destination_file = f'{DIR_LOG}/logs.log'
)

# Delete temp data
delete_temp(
    directory = f'{DIR_TEMP_DATA}'
)

# Delete temp log
delete_temp(
    directory = f'{DIR_TEMP_LOG}'
)

DEBUG: Checking if Extract() is complete
INFO: Informed scheduler that task   Extract__99914b932b   has status   DONE
DEBUG: Checking if Load() is complete
INFO: Informed scheduler that task   Load__99914b932b   has status   DONE
DEBUG: Checking if Transform() is complete
INFO: Informed scheduler that task   Transform__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=559604840, workers=1, host=LAPTOP-H2OPKO7T, username=LENOVO THINKPAD, pid=21524) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 3 tasks of which:
* 3 complete ones were encountered:
    - 1 Extract()
    - 1 Load()
    - 1 Transform()

Did not run any tasks
This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====



An error occurred: [Errno 2] No such file or directory: 'C:\\Users\\LENOVO THINKPAD\\Documents\\Github\\Pacmann---Data-Storage_-Warehouse--Mart---Lake\\mentoring 2\\olist-dwh-project\\logs\\/logs.log'
An error occurred: [WinError 32] The process cannot access the file because it is being used by another process: 'C:\\Users\\LENOVO THINKPAD\\Documents\\Github\\Pacmann---Data-Storage_-Warehouse--Mart---Lake\\mentoring 2\\olist-dwh-project\\pipeline\\temp\\log\\logs.log'


## **Step \#5 \- Create Report(5 points)**

The report can take the form of an article (which should include a link to your GitHub project) or a README file on GitHub. Ensure you thoroughly explain the points mentioned above.

### **Data Pipeline Report**

This project is an attempt to design a data warehouse and data pipeline for an e-commerce platform. Olist, the e-commerce in question, is a Brazilian venture, connecting various sellers and customers across the nation. Therefore, the need for a robust data warehouse and data pipeline design is of utmost importance. Here, we are trying to provide the solution for those two concerns.

#### **Pretend Interview with Shareholders**

##### 1. Which dimension tables are likely to have changes overtime that we should track of?
  
  **Response**
  
  Primarily `customers` and `sellers`. </br>
  Customers may change their address or city (captured via `geolocation`) and sellers sometimes update their location or business names. </br>
  `products` and `category name` are mostly static, but we may occassionally reclassify a product category
  
##### 2. Do we need to maintain historical versions of data?
  
  **Response**
  
  We wanted to preserve historical changes for both `customers` and `sellers`, especially location changes, as they can affect delivery time and logistics. </br>
  For `products` and `category_name`, just keeping the latest version is enough since changes are rare and typically administrative </br>
  
##### 3. What kind of historical tracking is preferred, -- full versioning (type 2), overwrite (type 1), or current vs previous field (type 3)?
  
  **Response**
  
  SCD Type 2 (full-versioning)
  
  - `dim_customers`
  - `dim_sellers`
  
  SCD Type 1 (overwrite)
  
  - `dim_category_name`
  - `dim_products`

##### 4. How should we handle geolocation changes? Should customers and sellers be linked to geolocation with historical traceability?
  
  **Response**
  
  Yes, because geolocation information, current or previous, is valuable for analysis, eg. comparing delivery times or satisfaction scores before and after a move. </br>
  So we'll need foreign key-based Type 2 handling for geolocation information.
  
##### 5. Who will consume the historical data, and how will it be used (eg. in reports, machine learning, fraud detection, etc)?
  
  **Response**
  
  - Data analyst and operations team
    - delivery performance reports, churn analysis, and predictive models

#### **Determining SCD / Slowly Changing Dimension types for the tables**

**SCD Type 1 / Overwrite**

- `dim_product_category_name`
    - track changes in the names of the category
    - utilizing Type 1 / overwrite so as to simplify the overall process
    - historical tracking of category names are not significant because at the end of the day, the main analysis to be conducted are mainly about products, customers, sellers, locations, and periodicals.

**SCD Type 2 / Add New Row-Column**

- `dim_customers`
    - customers may change their location
    - because of this change, logistic fees and performance changes may affect future purchases
- `dim_sellers`
    - sellers may change their location
    - because of this change, 
- `dim_products`
    - changes in products may reflect changes in purchases
    - for example, the bigger the dimension may inflict higher freight cost, which may affect purchasing decision
    - also, the changes in price tends to affect purchasing decision also, especially if the customers are price-sensitive
    - hence, it is important to track price changes and dimension changes in relation to the total price & cost that the customers need to pay.

#### **ELT / Extract, Load, Transform with Python and SQL, and Luigi tasks**

The source of the database is in `olist-src` database, while the target database is in the `olist-dwh` database. In its current state, the `olist-dwh` database is empty without any schema or database. Hence, we need to design schemas and tables in the `olist-dwh` database.

1. Design `public` schema in data warehouse (using the same DDL and schema as the tables in the `olist-src` database)

2. Design `stg` / staging schema in `olist-dwh`(using similar schema to the `public` schema, but addressing the mistyped column names in the `public` schema)

3. Design `dwh` / data warehouse schema in `olist-dwh`, with dimension and fact tables. The design already addresses the SCD types for the required tables, and comprehensive fact tables to address the need for data analytics (sales performance and logistic performance).

4. Create an SQL script to fill the `dim.date` and `dim.time` tables in the `dwh` schema in `olist-dwh` database.

5. Create a python script to extract the data from `olist-src` to csv files, named `extract.py`.

6. Create SQL scripts to accomplish these functions:
  - transfer data from `public` schema to `stg` schema in `olist-dwh`
  - transfer data from `stg` schema to `dwh` schema in `olist-dwh`

7. Create a python script named `load.py` to accomplish these functions
  - read the csv files results from `extract.py` into dataframes
  - read the sql scripts to transfer from `public` schema to `stg` schema
  - clean the tables in `public` schema before being inserted new data
  - put the data from the dataframe previously into the `public` schema
  - transfer the data from the `public` schema to the `stg` schema

8. Create a python script named `transform.py` to transfer the data from `stg` schema to `dwh` schema

9. Combine all the scripts together inside `elt_main.py` to run all the scripts together.