# dbt Build & Database Inspection

This notebook provides:
1. **dbt commands** - Run and test your dbt models
2. **SQL linting** - Check SQL format quality with sqlfluff
3. **Database inspection** - Explore schemas, tables, views, and SQL definitions

## Part 1: dbt Commands

Run dbt commands to build and test your models before inspecting the database.

In [68]:
import subprocess
import os
from pathlib import Path

# Set project paths
project_root = Path('..').resolve()
dbt_project_dir = project_root / 'dbt' / 'ecommerce_analytics'
dbt_profiles_dir = project_root / 'dbt'

print(f"Project root: {project_root}")
print(f"dbt project: {dbt_project_dir}")
print(f"dbt profiles: {dbt_profiles_dir}")

Project root: D:\repos\dbt-ml-analytics-pipeline
dbt project: D:\repos\dbt-ml-analytics-pipeline\dbt\ecommerce_analytics
dbt profiles: D:\repos\dbt-ml-analytics-pipeline\dbt


In [69]:
def run_dbt_command(command, *args):
    """
    Run a dbt command and display the output.
    
    Args:
        command: dbt command (e.g., 'run', 'test', 'build')
        *args: Additional arguments (e.g., '--select', 'stg_orders')
    
    Examples:
        run_dbt_command('run')
        run_dbt_command('run', '--select', 'staging')
        run_dbt_command('test', '--select', 'fct_orders')
    """
    # Build the command
    cmd = [
        'dbt', command,
        '--profiles-dir', str(dbt_profiles_dir),
        '--project-dir', str(dbt_project_dir)
    ]
    cmd.extend(args)
    
    print(f"Running: {' '.join(cmd)}")
    print("=" * 80)
    
    # Run the command
    result = subprocess.run(
        cmd,
        cwd=project_root,
        capture_output=True,
        text=True
    )
    
    # Display output
    print(result.stdout)
    if result.stderr:
        print("STDERR:", result.stderr)
    
    print("=" * 80)
    print(f"Return code: {result.returncode}")
    
    return result

print("dbt command helper loaded successfully!")

dbt command helper loaded successfully!


### Common dbt Commands

Run these cells to execute dbt commands. Uncomment the ones you want to run.

In [70]:
# Run all models
# run_dbt_command('run')

In [71]:
# Run only staging models
# run_dbt_command('run', '--select', 'staging')

In [72]:
# Run a specific model and its downstream dependencies
# run_dbt_command('run', '--select', 'stg_orders+')

In [73]:
# Run all tests
# run_dbt_command('test')

In [74]:
# Test a specific model
# run_dbt_command('test', '--select', 'fct_orders')

In [75]:
# Run and test all models (build command)
run_dbt_command('build')

Running: dbt build --profiles-dir D:\repos\dbt-ml-analytics-pipeline\dbt --project-dir D:\repos\dbt-ml-analytics-pipeline\dbt\ecommerce_analytics
[0m17:59:58  Running with dbt=1.8.9
[0m17:59:58  Registered adapter: duckdb=1.10.0
There are 2 unused configuration paths:
- seeds.ecommerce_analytics
- snapshots.ecommerce_analytics
[0m17:59:59  Found 17 models, 70 data tests, 9 sources, 444 macros
[0m17:59:59  
[0m17:59:59  Concurrency: 4 threads (target='dev')
[0m17:59:59  
[0m17:59:59  1 of 87 START sql view model staging.stg_geolocation ........................... [RUN]
[0m17:59:59  2 of 87 START sql view model staging.stg_order_payments ........................ [RUN]
[0m17:59:59  3 of 87 START sql view model staging.stg_order_reviews ......................... [RUN]
[0m17:59:59  4 of 87 START test source_accepted_values_raw_olist_orders_dataset_order_status__delivered__shipped__canceled__unavailable__invoiced__processing__created__approved  [RUN]
[0m17:59:59  4 of 87 PASS sour



---

## Part 2: SQL Linting with sqlfluff

Check SQL code quality and formatting using sqlfluff.

In [76]:
def run_sqlfluff(command, *args):
    """
    Run sqlfluff commands on dbt models.
    
    Args:
        command: sqlfluff command ('lint', 'fix')
        *args: Additional arguments (e.g., 'models/staging')
    
    Examples:
        run_sqlfluff('lint')  # Lint all models
        run_sqlfluff('lint', 'models/staging')  # Lint staging models
        run_sqlfluff('fix', 'models/marts/fct_orders.sql')  # Fix a specific file
    """
    # Build the command
    cmd = ['sqlfluff', command]
    
    if args:
        # If specific paths provided, use them
        cmd.extend(args)
    else:
        # Default to all models
        cmd.append('models')
    
    # Add common flags
    cmd.extend([
        '--dialect', 'duckdb',
        '--exclude-rules', 'L034'  # Exclude "SELECT *" warning for staging models
    ])
    
    print(f"Running: {' '.join(cmd)}")
    print(f"Working directory: {dbt_project_dir}")
    print("=" * 80)
    
    # Run the command
    result = subprocess.run(
        cmd,
        cwd=dbt_project_dir,
        capture_output=True,
        text=True
    )
    
    # Display output
    print(result.stdout)
    if result.stderr:
        print("STDERR:", result.stderr)
    
    print("=" * 80)
    print(f"Return code: {result.returncode}")
    
    return result

print("sqlfluff helper loaded successfully!")

sqlfluff helper loaded successfully!


### Common sqlfluff Commands

In [77]:
# Lint all models
# run_sqlfluff('lint')

In [78]:
# Lint only staging models
# run_sqlfluff('lint', 'models/staging')

In [79]:
# Lint a specific file
# run_sqlfluff('lint', 'models/marts/fct_orders.sql')

In [80]:
# Lint entire dbt project (models, macros, analyses, etc.)
# run_sqlfluff('lint', '.')

In [81]:
# Auto-fix linting issues (use with caution!)
# run_sqlfluff('fix', 'models/staging')

**Note:** sqlfluff is configured to use the DuckDB dialect and excludes rule L034 (SELECT * warnings) for staging models. You can customize rules by creating a `.sqlfluff` config file in the dbt project directory.

---

## Part 3: Database Inspection

After running dbt, explore the database structure and query the data.

In [82]:
import duckdb
import pandas as pd
import sqlparse
from pathlib import Path

# Connect to database
db_path = Path('../ecommerce.duckdb')
con = duckdb.connect(str(db_path), read_only=True)

print(f"Connected to: {db_path}")
print(f"Database size: {db_path.stat().st_size / 1024 / 1024:.2f} MB")

Connected to: ..\ecommerce.duckdb
Database size: 45.01 MB


### 1. Database Schemas

In [83]:
# List all schemas
schemas_df = con.execute("""
    SELECT DISTINCT schema_name 
    FROM information_schema.schemata 
    WHERE schema_name NOT IN ('information_schema', 'pg_catalog', 'temp', 'system')
    ORDER BY schema_name
""").df()

print("Available Schemas:")
display(schemas_df)

Available Schemas:


Unnamed: 0,schema_name
0,intermediate
1,main
2,marts
3,raw
4,staging
5,test_results


### 2. Tables and Views by Schema

In [84]:
# Get all tables and views
objects_df = con.execute("""
    SELECT 
        table_schema as schema,
        table_name as name,
        table_type as type
    FROM information_schema.tables
    WHERE table_schema NOT IN ('information_schema', 'pg_catalog', 'temp', 'system')
    ORDER BY table_schema, table_type, table_name
""").df()

print(f"Total objects: {len(objects_df)}")
display(objects_df)

Total objects: 96


Unnamed: 0,schema,name,type
0,intermediate,int_customer_orders,VIEW
1,intermediate,int_product_performance,VIEW
2,intermediate,int_rfm_scores,VIEW
3,intermediate,int_seller_performance,VIEW
4,marts,dim_customers,VIEW
...,...,...,...
91,test_results,unique_fct_orders_order_key,BASE TABLE
92,test_results,unique_int_customer_orders_customer_unique_id,BASE TABLE
93,test_results,unique_int_product_performance_product_id,BASE TABLE
94,test_results,unique_int_rfm_scores_customer_unique_id,BASE TABLE


In [85]:
# Summary by schema and type
summary = objects_df.groupby(['schema', 'type']).size().reset_index(name='count')
print("\nSummary by Schema and Type:")
display(summary)


Summary by Schema and Type:


Unnamed: 0,schema,type,count
0,intermediate,VIEW,4
1,marts,VIEW,5
2,raw,BASE TABLE,9
3,staging,VIEW,8
4,test_results,BASE TABLE,70


### 3. View SQL Definitions

Use this cell to view the compiled SQL for any view in the database.

In [86]:
def show_view_sql(schema_name, view_name):
    """
    Display the SQL definition for a specific view.
    
    Args:
        schema_name: Schema containing the view (e.g., 'staging', 'intermediate')
        view_name: Name of the view (e.g., 'stg_orders', 'int_customer_orders')
    """
    result = con.execute("""
        SELECT sql 
        FROM duckdb_views() 
        WHERE schema_name = ? AND view_name = ?
    """, [schema_name, view_name]).fetchone()
    
    if result:
        sql = result[0]
        
        # Format the SQL for better readability
        formatted_sql = sqlparse.format(
            sql,
            reindent=True,
            keyword_case='upper',
            indent_width=4
        )
        
        print(f"\n{'='*80}")
        print(f"VIEW: {schema_name}.{view_name}")
        print(f"{'='*80}\n")
        print(formatted_sql)
        print(f"\n{'='*80}\n")
    else:
        print(f"View '{schema_name}.{view_name}' not found.")
        print("\nAvailable views:")
        views = con.execute("""
            SELECT schema_name, view_name 
            FROM duckdb_views() 
            ORDER BY schema_name, view_name
        """).df()
        display(views)

# Example usage:
show_view_sql('staging', 'stg_orders')


VIEW: staging.stg_orders

CREATE VIEW staging.stg_orders AS WITH SOURCE AS
    (SELECT *
     FROM ecommerce.raw.olist_orders_dataset), cleaned AS
    (SELECT order_id,
            customer_id,
            order_status,
            order_purchase_timestamp AS purchased_at,
            order_approved_at AS approved_at,
            order_delivered_carrier_date AS carrier_delivered_at,
            order_delivered_customer_date AS customer_delivered_at,
            order_estimated_delivery_date AS estimated_delivery_date,
            CASE
                WHEN (((order_delivered_customer_date IS NOT NULL)
                       AND (order_estimated_delivery_date IS NOT NULL))) THEN (datediff('day', order_estimated_delivery_date, CAST(order_delivered_customer_date AS DATE)))
                ELSE NULL
            END AS delivery_delay_days,
            COALESCE(((order_delivered_customer_date IS NOT NULL)
                      AND (CAST(order_delivered_customer_date AS DATE) > order_estimate

In [87]:
# Show another view - change these values as needed
show_view_sql('intermediate', 'int_customer_orders')


VIEW: intermediate.int_customer_orders

CREATE VIEW intermediate.int_customer_orders AS WITH orders AS
    (SELECT *
     FROM ecommerce.staging.stg_orders),
                                                     order_items AS
    (SELECT *
     FROM ecommerce.staging.stg_order_items),
                                                     payments AS
    (SELECT *
     FROM ecommerce.staging.stg_order_payments),
                                                     reviews AS
    (SELECT *
     FROM ecommerce.staging.stg_order_reviews),
                                                     customers AS
    (SELECT *
     FROM ecommerce.staging.stg_customers),
                                                     order_totals AS
    (SELECT order_id,
            sum(total_price) AS order_value,
            sum(item_price) AS items_value,
            sum(freight_price) AS freight_value,
            count_star() AS items_count
     FROM order_items
     GROUP BY order_id),
                   

### 4. List All Available Views

In [88]:
# Get all views with their schemas
views_df = con.execute("""
    SELECT 
        schema_name,
        view_name,
        length(sql) as sql_length
    FROM duckdb_views()
    WHERE schema_name NOT IN ('information_schema', 'pg_catalog')
    ORDER BY schema_name, view_name
""").df()

print(f"Total views: {len(views_df)}")
display(views_df)

Total views: 56


Unnamed: 0,schema_name,view_name,sql_length
0,intermediate,int_customer_orders,3131
1,intermediate,int_product_performance,2754
2,intermediate,int_rfm_scores,1853
3,intermediate,int_seller_performance,3159
4,main,duckdb_columns,92
5,main,duckdb_columns,92
6,main,duckdb_columns,92
7,main,duckdb_constraints,79
8,main,duckdb_constraints,79
9,main,duckdb_constraints,79


### 5. Table Row Counts

In [89]:
# Get row counts for all tables
def get_row_counts():
    tables = con.execute("""
        SELECT table_schema, table_name 
        FROM information_schema.tables
        WHERE table_type = 'BASE TABLE'
        AND table_schema NOT IN ('information_schema', 'pg_catalog', 'temp', 'system')
        ORDER BY table_schema, table_name
    """).fetchall()
    
    results = []
    for schema, table in tables:
        count = con.execute(f'SELECT COUNT(*) FROM "{schema}"."{table}"').fetchone()[0]
        results.append({'schema': schema, 'table': table, 'row_count': count})
    
    return pd.DataFrame(results)

row_counts_df = get_row_counts()
display(row_counts_df)

Unnamed: 0,schema,table,row_count
0,raw,olist_customers_dataset,99441
1,raw,olist_geolocation_dataset,1000163
2,raw,olist_order_items_dataset,112650
3,raw,olist_order_payments_dataset,103886
4,raw,olist_order_reviews_dataset,99224
...,...,...,...
74,test_results,unique_fct_orders_order_key,0
75,test_results,unique_int_customer_orders_customer_unique_id,0
76,test_results,unique_int_product_performance_product_id,0
77,test_results,unique_int_rfm_scores_customer_unique_id,0


### 6. Table Schema Details

In [90]:
def describe_table(schema_name, table_name):
    """
    Show detailed schema for a specific table or view.
    
    Args:
        schema_name: Schema name (e.g., 'marts', 'staging')
        table_name: Table/view name (e.g., 'fct_orders', 'stg_customers')
    """
    print(f"\nSchema for {schema_name}.{table_name}:")
    print("="*80)
    
    result = con.execute(f'DESCRIBE "{schema_name}"."{table_name}"').df()
    display(result)
    
    # Show sample rows
    print(f"\nSample data (first 5 rows):")
    sample = con.execute(f'SELECT * FROM "{schema_name}"."{table_name}" LIMIT 5').df()
    display(sample)

# Example usage:
describe_table('marts', 'fct_orders')


Schema for marts.fct_orders:


Unnamed: 0,column_name,column_type,null,key,default,extra
0,order_key,VARCHAR,YES,,,
1,customer_key,VARCHAR,YES,,,
2,order_date_key,DATE,YES,,,
3,order_status,VARCHAR,YES,,,
4,purchased_at,TIMESTAMP,YES,,,
5,approved_at,TIMESTAMP,YES,,,
6,carrier_delivered_at,TIMESTAMP,YES,,,
7,customer_delivered_at,TIMESTAMP,YES,,,
8,estimated_delivery_date,TIMESTAMP,YES,,,
9,delivery_delay_days,BIGINT,YES,,,



Sample data (first 5 rows):


Unnamed: 0,order_key,customer_key,order_date_key,order_status,purchased_at,approved_at,carrier_delivered_at,customer_delivered_at,estimated_delivery_date,delivery_delay_days,...,total_paid,payment_count,max_installments,items_count,unique_products,unique_sellers,is_delivered,is_canceled,is_multi_seller,created_at
0,e481f51cbdc54678b7cc49136f2d6af7,7c396fd4830fd04220f754e42b4e5bff,2017-10-02,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18,-8,...,38.71,3,1,1,1,1,True,False,False,2026-01-03 12:00:05.308000-06:00
1,53cdb2fc8bc7dce0b6741e2150273451,af07308b275d755c9edb36a90c618231,2018-07-24,delivered,2018-07-24 20:41:37,2018-07-26 03:24:27,2018-07-26 14:31:00,2018-08-07 15:27:45,2018-08-13,-6,...,141.46,1,1,1,1,1,True,False,False,2026-01-03 12:00:05.308000-06:00
2,47770eb9100c2d0c44946d9cf07ec65d,3a653a41f6f9fc3d2a113cf8398680e8,2018-08-08,delivered,2018-08-08 08:38:49,2018-08-08 08:55:23,2018-08-08 13:50:00,2018-08-17 18:06:29,2018-09-04,-18,...,179.12,1,3,1,1,1,True,False,False,2026-01-03 12:00:05.308000-06:00
3,949d5b44dbf5de918fe9c16f97b45f8a,7c142cf63193a1473d2e66489a9ae977,2017-11-18,delivered,2017-11-18 19:28:06,2017-11-18 19:45:59,2017-11-22 13:39:59,2017-12-02 00:28:42,2017-12-15,-13,...,72.2,1,1,1,1,1,True,False,False,2026-01-03 12:00:05.308000-06:00
4,ad21c59c0840e6cb83a9ceb5573f8159,72632f0f9dd73dfee390c9b22eb56dd6,2018-02-13,delivered,2018-02-13 21:18:39,2018-02-13 22:20:29,2018-02-14 19:46:34,2018-02-16 18:17:02,2018-02-26,-10,...,28.62,1,1,1,1,1,True,False,False,2026-01-03 12:00:05.308000-06:00


### 7. Quick Data Exploration

In [91]:
# Example: Order status distribution
query = """
    SELECT 
        order_status,
        COUNT(*) as order_count,
        ROUND(AVG(order_value), 2) as avg_value,
        ROUND(SUM(order_value), 2) as total_value
    FROM marts.fct_orders
    GROUP BY order_status
    ORDER BY order_count DESC
"""

result = con.execute(query).df()
print("Order Status Distribution:")
display(result)

Order Status Distribution:


Unnamed: 0,order_status,order_count,avg_value,total_value
0,delivered,96478,159.83,15419773.75
1,shipped,1107,160.01,177129.34
2,canceled,625,169.42,105885.72
3,unavailable,609,3.51,2140.49
4,invoiced,314,219.71,68988.75
5,processing,301,230.55,69394.11
6,created,5,0.0,0.0
7,approved,2,120.54,241.08


In [92]:
# Example: Customer state distribution
query = """
    SELECT 
        state,
        COUNT(*) as customer_count
    FROM marts.dim_customers
    GROUP BY state
    ORDER BY customer_count DESC
    LIMIT 10
"""

result = con.execute(query).df()
print("Top 10 Customer States:")
display(result)

Top 10 Customer States:


Unnamed: 0,state,customer_count
0,SP,40294
1,RJ,12383
2,MG,11255
3,RS,5275
4,PR,4880
5,SC,3529
6,BA,3276
7,DF,2073
8,ES,1964
9,GO,1950


### 8. Close Connection

In [94]:
# Close the connection when done
con.close()
print("Database connection closed.")

Database connection closed.
