# Inspect DBT Models and Snapshots

In [None]:
import os
from pathlib import Path

from dotenv import load_dotenv
from sqlalchemy import create_engine
from snowflake.sqlalchemy import URL

In [None]:
PROJ_ROOT = Path().resolve().parents[3]
env_file_dir = PROJ_ROOT / '.env'
_ = load_dotenv(env_file_dir, verbose=True)

## About

Perform the following

1. View contents of Greenery DuckDB database that were created in my personal Snowflake schema using `dbt run` for project during weeks 1, 2 and 3
2. Identify products with weekly changes using DBT sbapshots

### Notes

1. This notebook supports <kbd>Run</kbd> > <kbd>Run All Cells</kbd>.

## User Inputs

In [None]:
schema_name = os.getenv("UPLIMIT_SNOWFLAKE_SCHEMA")

In [None]:
engine = create_engine(
    URL(
        drivername="driver",
        account=os.getenv("UPLIMIT_SNOWFLAKE_ACCOUNT"),
        user=os.getenv("UPLIMIT_SNOWFLAKE_USER"),
        password=os.getenv("UPLIMIT_SNOWFLAKE_PASS"),
        warehouse=os.getenv("UPLIMIT_SNOWFLAKE_WAREHOUSE"),
        role=os.getenv("UPLIMIT_SNOWFLAKE_ROLE"),
        database=os.getenv("UPLIMIT_SNOWFLAKE_DB_NAME"),
        schema=schema_name,
    )
)

## Connect

Load Jupyter SQL extension

In [None]:
%load_ext sql

Set the maximum number of rows to be displayed to `None` (shows all rows)

In [None]:
%config SqlMagic.displaylimit = None

Connect to DuckDB database

In [None]:
%sql engine --alias connection

## Explore Data

### Show All Data Model Tables in Database

In [None]:
%%sql
WITH tables_annotated AS (
    SELECT table_catalog,
           table_schema,
           table_name,
           table_type,
           is_insertable_into,
           is_typed,
           (
               CASE
                   WHEN (
                        table_name LIKE 'FCT_PRODUCTS%'
                        OR table_name = 'FCT_ORDERS'
                        OR table_name LIKE 'FCT_USER_%'
                        OR table_name LIKE 'FCT_PROMO_%'
                        OR table_name LIKE 'FCT_CONVERSION_RATE'
                    )
                    THEN True
                    ELSE False
               END
           ) AS is_model
    FROM information_schema.tables
    WHERE table_type LIKE '%TABLE'
    AND table_schema = '{{schema_name.upper()}}'
    ORDER BY table_name DESC, table_catalog, table_name
)
SELECT *
FROM tables_annotated
WHERE is_model = True

### Show All Data Model Views in Database

In [None]:
%%sql
WITH views_annotated AS (
    SELECT table_catalog,
           table_schema,
           table_name,
           table_type,
           is_insertable_into,
           is_typed,
           (
               CASE
                   WHEN (
                        table_name LIKE 'STG_POSTGRES_%'
                        OR table_name LIKE 'INT_PRODUCT_%'
                        OR table_name LIKE 'INT_ORDERS_%'
                        OR table_name LIKE 'INT_EVENTS_%'
                        OR table_name = 'FCT_PRODUCTS'
                        OR table_name = 'FCT_PRODUCTS_DAILY'
                    )
                    THEN True
                    ELSE False
               END
           ) AS is_model
    FROM information_schema.tables
    WHERE table_type LIKE '%VIEW'
    AND table_schema = '{{schema_name.upper()}}'
    ORDER BY table_name DESC, is_model, table_catalog, table_name, table_schema
)
SELECT *
FROM views_annotated
WHERE is_model = True

## Queries

### Snapshots

Snapshot table for monitoring changes to product inventory over time

In [None]:
%%sql
DESCRIBE inventory_snapshot

Show the snapshot table

In [None]:
%%sql
SELECT *
FROM inventory_snapshot
LIMIT 4

#### Changes from Week 1 to 2

Show changes in this snapshot table from week 1 to week 2

In [None]:
%%sql
WITH snapshot_updated_at AS (
    SELECT DISTINCT(dbt_updated_at) AS dbt_updated_at
    FROM inventory_snapshot
),
snapshot_runtimes AS (
    SELECT *,
           ROW_NUMBER() OVER(ORDER BY dbt_updated_at) AS week_number
    FROM snapshot_updated_at
),
inventory_snapshot_metadata AS (
    SELECT * EXCLUDE(price, dbt_scd_id)
    FROM inventory_snapshot
    LEFT JOIN snapshot_runtimes USING (dbt_updated_at)
),
inventory_changes AS (
    SELECT *,
           LAG(inventory) OVER(
               PARTITION BY product_id ORDER BY week_number
           ) AS inventory_previous,
           (inventory-inventory_previous) AS inventory_changed
    FROM inventory_snapshot_metadata
    WHERE dbt_valid_to IS NOT NULL
),
inventory_changes_week_two AS (
    SELECT dbt_updated_at,
           product_id,
           name,
           inventory,
           dbt_valid_from,
           dbt_valid_to,
           week_number,
           inventory_changed
    FROM inventory_changes
    WHERE week_number = 2
)
SELECT *
FROM inventory_changes_week_two

#### Changes from Week 2 to 3

Show changes in this snapshot table from week 2 to week 3

In [None]:
%%sql
WITH snapshot_updated_at AS (
    SELECT DISTINCT(dbt_updated_at) AS dbt_updated_at
    FROM inventory_snapshot
),
snapshot_runtimes AS (
    SELECT *,
           ROW_NUMBER() OVER(ORDER BY dbt_updated_at) AS week_number
    FROM snapshot_updated_at
),
inventory_snapshot_metadata AS (
    SELECT * EXCLUDE(price, dbt_scd_id)
    FROM inventory_snapshot
    LEFT JOIN snapshot_runtimes USING (dbt_updated_at)
),
inventory_changes AS (
    SELECT *,
           LAG(inventory) OVER(
               PARTITION BY product_id ORDER BY week_number
           ) AS inventory_previous,
           (inventory-inventory_previous) AS inventory_changed
    FROM inventory_snapshot_metadata
    WHERE week_number IN (2, 3)
),
inventory_changes_week_three AS (
    SELECT dbt_updated_at,
           product_id,
           name,
           inventory,
           dbt_valid_from,
           dbt_valid_to,
           week_number,
           inventory_changed
    FROM inventory_changes
    WHERE week_number = 3
)
SELECT *
FROM inventory_changes_week_three

## Disconnect

Close connection

In [None]:
%sql --close connection