# ETL Sample - Using RDBMS as Data Warehouse

ETL sample from sample procurement data source, into RDBMS data warehouse. In this case we will be using different postgresql instance between OLTP (data source) and OLAP (data warehouse).

Before starting, make sure you already do these in sequence:
  1. create tables and insert sample data into source database, by download file `data-warehouse-procurement-sample.sql` from last lecture of the course, and running that script at **source database instance**. 
  2. create schema, tables for staging, and tables for data warehouse, by download file `data-warehouse-staging-dw-ddl.sql` from last lecture of the course, and running that script at **data warehouse database instance**. 
  3. create schema, and materialized view (logical table) for finance data mart, by download file `data-warehouse-finance-datamart-ddl.sql` from last lecture of the course, and running that script at **data warehouse database instance**. 

So there is **two postgresql instances**.
  1. Data source, with `public` schema (default when create postgresql instance)
  2. Data warehouse, with three schemas:
     - `staging` for staging tables
     - `dw` for fact & dimension tables
     - `finance_datamart` for the datamart

If you are using one postgresql instance (e.g. localhost), make sure there are four schemas: `public`, `staging`, `dw`, and `finance_datamart`. Then, adjust the connection string below.

## Procurement Data Source

<img src="img/data_source_procurement.png" align="left" width="800"/>

## Connection

In [None]:
import psycopg2
import psycopg2.extras
import sqlalchemy
import pyodbc
from datetime import date

Two connections required, one to data source, and one to data warehouse, because in this sample I use two different postgresql instances.

In [None]:
try:
    # data source connection
    conn_source = psycopg2.connect("postgresql://postgres:CourseDE888@34.101.229.192:5432/postgres")
    conn_source.set_session(autocommit=True)
    cur_source = conn_source.cursor()
    
    # data warehouse connection
    conn_dw = psycopg2.connect("postgresql://postgres:CourseDW888@34.101.88.242:5432/postgres")
    conn_dw.set_session(autocommit=True)
    cur_dw = conn_dw.cursor()
except Exception as e:
    print("Error: cannot open cursor for SQL interaction")
    print(e)

## Staging

<img src="img/dw_staging.png" align="left"/>

SQL to fetch from data source (vendor data), then insert into staging vendors.

In [None]:
sql_source_fetch_vendors = """
    SELECT
            vendor_name,
            vendor_site_name
        FROM
            po_vendors pv,
            po_vendor_sites pvs
        WHERE
            pv.vendor_id = pvs.vendor_id
            AND (pv.last_updated_date >= %s
                OR pvs.last_updated_date >= %s)
"""

sql_dw_clean_staging_vendors = """
    DELETE FROM staging.stg_vendors
        WHERE date_trunc('day', created_date) = date_trunc('day', %s);
"""

sql_dw_insert_staging_vendors = """
    INSERT INTO staging.stg_vendors(
        vendor_name,
        vendor_site)
    VALUES (
        %s,
        %s)
"""

Clear the staging which created today

In [None]:
cur_dw.execute(sql_dw_clean_staging_vendors, (date.today(),))

Fetch vendors from data source, and insert into staging table. 

In [None]:
# using hardcoded date because we will take all data in this example
# In actual, this can be date.today()
cur_source.execute(sql_source_fetch_vendors, ('2021-01-01', '2021-01-01'))
res_source = cur_source.fetchall();

psycopg2.extras.execute_batch(cur_dw, sql_dw_insert_staging_vendors, res_source)

print("Inserted {} vendors data into staging\n".format(len(res_source)))

SQL to fetch from data source (purchase-invoices), then insert into staging.

In [None]:
sql_source_fetch_purchase_invoices = """
    SELECT
        poh.po_number,
        pol.line_num,
        mi.item_name,
        initcap(pa.first_name || ' ' || pa.last_name) AS po_agent_full_name,
        COALESCE((
            SELECT
                CASE WHEN ld.lookup_code IN ('ORDERED', 'RECEIVED') THEN
                    'Not Invoiced'
                WHEN ld.lookup_code = 'INVOICED' THEN
                    'Invoiced not paid'
                WHEN ld.lookup_code = 'PAID' THEN
                    'Paid'
                END
            FROM lookup_dtl ld
            WHERE
                ld.lookup_dtl_id = pol.line_status), 'Not Invoiced') AS item_payment_status,
        aih.invoice_number,
        ld_invoice.lookup_description invoice_status,
        aih.invoice_created_date,
        aih.invoice_received_date,
        aih.invoice_cancelled_date,
        aih.invoice_paid_date,
        aih.invoice_currency_code,
        aih.payment_currency_code,
        ail.amount_invoiced,
        ail.amount_paid,
        ail.description AS invoice_line_description,
        initcap(aa.first_name || ' ' || aa.last_name) AS ap_agent_full_name,
        COALESCE(
            CASE WHEN ld_invoice.lookup_code IN ('NEW', 'ON_APPROVAL') THEN
                TRUE
            WHEN ld_invoice.lookup_code IN ('CANCELLED', 'PAID') THEN
                FALSE
            END, TRUE) AS liability_flag,
        pv.vendor_name,
        pvs.vendor_site_name
    FROM
        po_headers poh
        JOIN po_lines pol ON poh.po_header_id = pol.po_header_id
        JOIN po_agents pa ON poh.agent_id = pa.po_agent_id
        JOIN mtl_items mi ON pol.item_id = mi.item_id
        JOIN po_vendor_sites pvs ON poh.vendor_site_id = pvs.vendor_site_id
        JOIN po_vendors pv ON pvs.vendor_id = pv.vendor_id
        LEFT JOIN ap_invoice_lines ail ON pol.po_line_id = ail.po_line_id
        LEFT JOIN ap_invoice_headers aih ON ail.invoice_id = aih.invoice_id
        LEFT JOIN ap_agents aa ON aih.agent_id = aa.ap_agent_id
        LEFT JOIN lookup_dtl ld_invoice ON ld_invoice.lookup_dtl_id = aih.status
    WHERE
        COALESCE(aih.last_updated_date, now()) > '2021-01-01'
        OR COALESCE(ail.last_updated_date, now()) > '2021-01-01'
        OR COALESCE(poh.last_updated_date, now()) > '2021-01-01'
        OR COALESCE(pol.last_updated_date, now()) > '2021-01-01'
"""

sql_dw_clean_staging_purchase_invoices = """
    DELETE FROM staging.stg_purchase_invoices
        WHERE date_trunc('day', created_date) = date_trunc('day', %s);
"""

sql_dw_insert_staging_purchase_invoices = """
    INSERT INTO staging.stg_purchase_invoices (
        po_number,
        po_line_number,
        item_name,
        po_agent_full_name,
        item_payment_status,
        invoice_number,
        invoice_status,
        invoice_created_date,
        invoice_received_date,
        invoice_cancelled_date,
        invoice_paid_date,
        invoice_currency_code,
        payment_currency_code,
        amount_invoiced,
        amount_paid,
        invoice_line_description,
        ap_agent_full_name,
        liability_flag,
        vendor_name,
        vendor_site_name)
    VALUES (
        %s,
        %s,
        %s,
        %s,
        %s,
        %s,
        %s,
        %s,
        %s,
        %s,
        %s,
        %s,
        %s,
        %s,
        %s,
        %s,
        %s,
        %s,
        %s,
        %s)
"""

Clear the staging that created today.

In [None]:
cur_dw.execute(sql_dw_clean_staging_purchase_invoices, (date.today(),))

Fetch purchase invoices from data source, and insert into staging table. 

In [None]:
# using hardcoded date because we will take all data in this example
# In actual, this can be date.today()
cur_source.execute(sql_source_fetch_purchase_invoices, ('2021-01-01', '2021-01-01'))
res_source = cur_source.fetchall();

psycopg2.extras.execute_batch(cur_dw, sql_dw_insert_staging_purchase_invoices, res_source)

print("Inserted {} purchase-invoices data into staging\n".format(len(res_source)))

## Fact-Dimension tables

<img src="img/dw_factdim.png" align="left"/>

### Populate `dim_dates` & update staging fact

Feed data into `dim_dates`. In this sample, date is unique, so we can run the script for 10 years ahead for example.

In [None]:
sql_dim_dates_init_data = """
    DO $$
    DECLARE
        counter integer := 0;
        generated_date date;
    BEGIN
        LOOP
            EXIT
            WHEN counter = 3650;
            SELECT
                date '2022-01-01' + counter INTO generated_date;
            INSERT INTO dw.dim_dates (
                full_date,
                "date",
                "month",
                "month_name",
                "year",
                quarter,
                day_of_week)
            VALUES (
                generated_date,
                EXTRACT(
                    DAY FROM generated_date),
                EXTRACT(
                    MONTH FROM generated_date),
                to_char(
                    generated_date, 'Month'),
                EXTRACT(
                    YEAR FROM generated_date),
                EXTRACT(
                    quarter FROM generated_date),
                to_char(
                    generated_date, 'Day'))
            ON CONFLICT(full_date) DO NOTHING;

            counter := counter + 1;
        END LOOP;
    END;
    $$;
"""

In [None]:
cur_dw.execute(sql_dim_dates_init_data)

After `dim_dates` populated, update the foreign key at `stg_purchase_invoices` that related to dates, by referring to surrogate PK (`dim_dates.date_id`).  

*Sequence is not matter.*  

In this sample let's start with `stg_purchase_invoices.dim_invoice_created_date_id`.

In [None]:
sql_update_stg_invoice_created_date = """
    UPDATE
        staging.stg_purchase_invoices stg
    SET
        dim_invoice_created_date_id = dd.date_id
    FROM
        dw.dim_dates dd
    WHERE
        stg.invoice_created_date IS NOT NULL
        AND date_trunc('day', stg.invoice_created_date) = dd.full_date;
"""

In [None]:
cur_dw.execute(sql_update_stg_invoice_created_date)
print("Updated {} rows on staging".format(cur_dw.rowcount))

Then, update `stg_purchase_invoices.dim_invoice_received_date_id` with related `dim_dates.date_id`.

In [None]:
sql_update_stg_invoice_received_date = """
    UPDATE
        staging.stg_purchase_invoices stg
    SET
        dim_invoice_received_date_id = dd.date_id
    FROM
        dw.dim_dates dd
    WHERE
        stg.invoice_received_date IS NOT NULL
        AND date_trunc('day', stg.invoice_received_date) = dd.full_date;
"""

In [None]:
cur_dw.execute(sql_update_stg_invoice_received_date)
print("Updated {} rows on staging".format(cur_dw.rowcount))

Then, update `stg_purchase_invoices.dim_invoice_paid_date_id` with related `dim_dates.date_id`.

In [None]:
sql_update_stg_invoice_paid_date = """
    UPDATE
        staging.stg_purchase_invoices stg
    SET
        dim_invoice_paid_date_id = dd.date_id
    FROM
        dw.dim_dates dd
    WHERE
        stg.invoice_paid_date IS NOT NULL
        AND date_trunc('day', stg.invoice_paid_date) = dd.full_date;
"""

In [None]:
cur_dw.execute(sql_update_stg_invoice_paid_date)
print("Updated {} rows on staging".format(cur_dw.rowcount))

Then, update `stg_purchase_invoices.dim_invoice_cancelled_date_id` with related `dim_dates.date_id`.

In [None]:
sql_update_stg_invoice_cancelled_date = """
    UPDATE
        staging.stg_purchase_invoices stg
    SET
        dim_invoice_cancelled_date_id = dd.date_id
    FROM
        dw.dim_dates dd
    WHERE
        stg.invoice_cancelled_date IS NOT NULL
        AND date_trunc('day', stg.invoice_cancelled_date) = dd.full_date;
"""

In [None]:
cur_dw.execute(sql_update_stg_invoice_cancelled_date)
print("Updated {} rows on staging".format(cur_dw.rowcount))

### Populate `dim_vendors` & update staging fact

Upsert data as is from staging into dim. The data is already clean in the sample, so no further transformation needed.

In [None]:
sql_dim_vendors = """
    INSERT INTO dw.dim_vendors (
        vendor_name,
        vendor_site) ( SELECT DISTINCT
            vendor_name,
            vendor_site
        FROM
            staging.stg_vendors)
    ON CONFLICT
        DO NOTHING;
"""

In [None]:
cur_dw.execute(sql_dim_vendors)
print("Updated {} rows on dimension table".format(cur_dw.rowcount))

Now that we have up-to-date data on `dim_vendors`, update the `stg_purchase_invoices.dim_vendor_id` by referring to `dim_vendors.vendor_id`  

In [None]:
sql_update_stg_vendors = """
    UPDATE
        staging.stg_purchase_invoices stg
    SET
        dim_vendor_id = vendor_id
    FROM
        dw.dim_vendors dv
    WHERE
        stg.vendor_name = dv.vendor_name
        AND stg.vendor_site_name = dv.vendor_site;
"""

In [None]:
cur_dw.execute(sql_update_stg_vendors)
print("Updated {} rows on staging".format(cur_dw.rowcount))

### Populate `dim_purchase_lines` & update staging fact

In [None]:
sql_dim_purchase_lines = """
    INSERT INTO dw.dim_purchase_lines (
        po_number,
        po_line_number,
        item_name,
        item_payment_status,
        agent_full_name) ( SELECT DISTINCT
            po_number,
            po_line_number,
            item_name,
            item_payment_status,
            po_agent_full_name
        FROM
            staging.stg_purchase_invoices spi)
    ON CONFLICT (po_number,
        po_line_number)
        DO UPDATE SET
            item_name = excluded.item_name,
            item_payment_status = excluded.item_payment_status;
"""

In [None]:
cur_dw.execute(sql_dim_purchase_lines)
print("Updated {} rows on dimension table".format(cur_dw.rowcount))

Now that we have up-to-date data on `dim_purchase_lines`, update the `stg_purchase_invoices.dim_purchase_line_id` by referring to `dim_purchase_lines.purchase_line_id`  

In [None]:
sql_update_stg_purchase_lines = """
    UPDATE
        staging.stg_purchase_invoices spi
    SET
        dim_purchase_line_id = dpl.purchase_line_id
    FROM
        dw.dim_purchase_lines dpl
    WHERE
        spi.po_number = dpl.po_number
        AND spi.po_line_number = dpl.po_line_number;
"""

In [None]:
cur_dw.execute(sql_update_stg_purchase_lines)
print("Updated {} rows on staging".format(cur_dw.rowcount))

### Populate `dim_invoice_headers` & update staging fact

In [None]:
sql_dim_invoice_headers = """
    INSERT INTO dw.dim_invoice_headers (
        invoice_number,
        status,
        invoice_currency_code,
        payment_currency_code,
        agent_full_name,
        liability_flag) ( SELECT DISTINCT
            invoice_number,
            invoice_status,
            invoice_currency_code,
            payment_currency_code,
            ap_agent_full_name,
            liability_flag
        FROM
            staging.stg_purchase_invoices spi
        WHERE
            invoice_number IS NOT NULL)
    ON CONFLICT (invoice_number)
        DO UPDATE SET
            status = excluded.status,
            invoice_currency_code = excluded.invoice_currency_code,
            payment_currency_code = excluded.payment_currency_code,
            agent_full_name = excluded.agent_full_name,
            liability_flag = excluded.liability_flag;
"""

In [None]:
cur_dw.execute(sql_dim_invoice_headers)
print("Updated {} rows on dimension table".format(cur_dw.rowcount))

Now that we have up-to-date data on `dim_invoice_headers`, update the `stg_purchase_invoices.dim_invoice_header_id` by referring to `dim_invoice_headers.invoice_header_id`  

In [None]:
sql_update_stg_invoice_headers = """
    UPDATE
        staging.stg_purchase_invoices spi
    SET
        dim_invoice_header_id = dih.invoice_header_id
    FROM
        dw.dim_invoice_headers dih
    WHERE
        dih.invoice_number = spi.invoice_number;
"""

In [None]:
cur_dw.execute(sql_update_stg_invoice_headers)
print("Updated {} rows on staging".format(cur_dw.rowcount))

### Populate `dim_invoice_lines` & update staging fact

In [None]:
sql_dim_invoice_lines = """
    INSERT INTO dw.dim_invoice_lines (
        invoice_line_description) ( SELECT DISTINCT
            invoice_line_description
        FROM
            staging.stg_purchase_invoices spi
        WHERE
            invoice_line_description IS NOT NULL);
"""

In [None]:
cur_dw.execute(sql_dim_invoice_lines)
print("Updated {} rows on dimension table".format(cur_dw.rowcount))

Now that we have up-to-date data on `dim_invoice_lines`, update the `stg_purchase_invoices.dim_invoice_line_id` by referring to `dim_invoice_lines.invoice_line_id`  

In [None]:
sql_update_stg_invoice_lines = """
    UPDATE
        staging.stg_purchase_invoices spi
    SET
        dim_invoice_line_id = dil.invoice_line_id
    FROM
        dw.dim_invoice_lines dil
    WHERE
        spi.invoice_line_description = dil.invoice_line_description;
"""

In [None]:
cur_dw.execute(sql_update_stg_invoice_lines)
print("Updated {} rows on staging".format(cur_dw.rowcount))

### Populate fact tables from staging

At this point, the staging table already has references FK to all dimensions, so we can ppopulate the `fact_purchase_invoices`.

In [None]:
sql_fact_purchase_invoices = """
    INSERT INTO dw.fact_purchase_invoices (
        amount_invoiced,
        amount_paid,
        received_date_id,
        created_date_id,
        cancelled_date_id,
        paid_date_id,
        purchase_line_id,
        vendor_id,
        invoice_header_id,
        invoice_line_id) (
        SELECT
            amount_invoiced,
            amount_paid,
            dim_invoice_received_date_id,
            dim_invoice_created_date_id,
            dim_invoice_cancelled_date_id,
            dim_invoice_paid_date_id,
            dim_purchase_line_id,
            dim_vendor_id,
            dim_invoice_header_id,
            dim_invoice_line_id
        FROM
            staging.stg_purchase_invoices)
    ON CONFLICT (purchase_line_id)
        DO UPDATE SET
            amount_invoiced = excluded.amount_invoiced,
            amount_paid = excluded.amount_paid;
"""

In [None]:
cur_dw.execute(sql_fact_purchase_invoices)
print("Updated {} rows on fact table".format(cur_dw.rowcount))

## Data Mart for Finance

<img src="img/dw_datamart.png" align="left"/>

Make sure you already download file `data-warehouse-finance-datamart-ddl.sql` from last lecture of the course, and running that script at **data warehouse database instance**.  

## Data Visualization Sample

One of free visualization tool is [Power BI Desktop](https://powerbi.microsoft.com/en-us/downloads/). However, if your data warehouse is SQL on cloud, it might not straightforward to connect from [Power BI Desktop](https://powerbi.microsoft.com/en-us/downloads/).  
This guide can be used to connect to cloud PostgreSQL:
  1. Download [PostgreSQL ODBC driver](https://www.postgresql.org/ftp/odbc/versions/msi/) according to PostgreSQL version you use.
  2. Install the ODBC Driver
  3. *Control Panel > Administrative Tools > Set up ODBC data sources (64 bit)*
  4. Add new ODBC data source for *PostgreSQL Unicode (x64)*
  5. Enter the detail & credential. For example, name it `my-postgresql-odbc`
  6. Test and save it
  
----
  
Download and open the [Power BI Desktop](https://powerbi.microsoft.com/en-us/downloads/).
  1. *Get data* > More > ODBC
  2. Select the ODBC data source you have just created (e.g. `my-postgresql-odbc`)