# Introduction
This is a demo of dbt work flow using an data ingestion as an example. In finance data ETL (Extract, Transform, Load)
important to have sanitity check of input data, to avoid the ingesting invalid data....

In [1]:
# ```{mermaid}
# flowchart LR
#    empty["."] --> DBT["Container: dbt-core 
#        plugin: dbt-
#   DBT-pyspark[Hive]"] --> |Generate SQL with 'dbt seed', 'dbt compile'| T(Container: Spark-Thrift)
#   T -->|Query using the generated SQL| Storage[(Container: Postgre Hive metadata store)]

# ```

In [2]:
import pandas as pd
pd.read_csv("/usr/app/demo/dbt_spark_demo_prj/seeds/transactions/raw__transactions.csv")


Unnamed: 0,id,amount,currency,status
0,1,100,USD,ACTIVE
1,2,200,SGD,ACTIVE
2,3,150,EUR,ACTIVE
3,4,120,USD,INACTIVE
4,5,180,SGD,INACTIVE
5,6,300,JPY,ACTIVE


## Project file structure

In [3]:
!tree -L 2 --sort="name" /usr/app | grep -v ".*\.toml$" | grep -v ".*\.md$" | grep -v ".*files$"

[01;34m/usr/app[00m
├── [01;34mdbt[00m
│   ├── dbt.Dockerfile
│   ├── [01;34mdbt_spark_demo_prj[00m
│   ├── docker-compose.yml
│   └── requirements.txt
├── [01;34mdbt-spark[00m
│   ├── [01;34mdagger[00m
│   ├── [01;34mdbt[00m
│   ├── [01;34mdocker[00m
│   ├── docker-compose.yml
│   ├── [01;34mscripts[00m
│   ├── test.env.example
│   └── [01;34mtests[00m
├── [01;34mdemo[00m
│   ├── dbt-spark-demo.html
│   ├── dbt-spark-demo.ipynb
│   ├── [01;34mdbt-spark-demo_files[00m
│   ├── [01;34mdbt_spark_demo_prj[00m
│   ├── generate-documentation.sh
│   ├── profiles.yml
│   └── [01;34mresults[00m
├── index.html
└── run-demo.sh



# Environment setup

## Docker
### Shared network for docker containers
```bash
source ./docker-common.sh
```

## Install jupyter notebook for running this demo in notebook 
This notebook is executed using jupyer-notebook kernel (http://127.0.0.1:8888) of the loaded custom-dbt container. The container is loaded using command below
```bash
jupyter notebook --NotebookApp.token='' --NotebookApp.password='' --NotebookApp.disable_check_xsrf=True --allow-root --ip=0.0.0.0 --port=8888 --no-browser
```
use allow-root for demo purpose only, not for production due to cyber security


In [4]:
%%bash
export TZ='Asia/Singapore'
date
which python
pwd

Mon Jun 30 15:47:07 +08 2025


/usr/local/bin/python


/usr/app/dbt


### Connection to read table in Hive meta-data store

## Variables

In [5]:
from dotenv import load_dotenv
import os
!pwd

!cd ../
!pwd

load_dotenv()
spark_thrift_service_name=os.getenv('SPARK_CONTAINER_SERVICE_NAME')
dbt_project_dir=os.getenv('DBT_PROJECT_DIR')
dbt_project_name=os.getenv('DBT_PROJECT_NAME')
print(spark_thrift_service_name)
print(dbt_project_dir)
print(dbt_project_name)

/usr/app/dbt


/usr/app/dbt


dbt-spark3-thrift
/usr/app/dbt/dbt_spark_demo_prj
dbt_spark_demo_prj


# Start demo

In [6]:
filepath="/usr/app/demo/results/test_start.txt"
!echo start test at > $filepath 
!export TZ='Asia/Singapore'; date >> $filepath

## Helper codes

In [7]:
from pyhive import hive
import pandas as pd
from sqlalchemy.engine import create_engine

def check_table(host=spark_thrift_service_name, message_to_be_printed=""):
    # with hive.Connection(host=spark_thrift_service_name, port=10000, 
    #                     #    username='', 
    #                     database='default') as conn:

    #     # Query all tables in the default database
    #     df = pd.read_sql("SHOW TABLES", conn)

    #     # Display tables
    #     display(df)
    print(message_to_be_printed)
    engine = create_engine(f'hive://{spark_thrift_service_name}:10000/default')
    df = pd.read_sql("SHOW TABLES;", engine)
    # display(df)
    display(df)

def show_table_details(table_name, database_name='default',
                        # host=spark_thrift_service_name,
                        port=10000):
    # sqlAlchemy api fail here due to "sqlAlchemy relection": TODO investigate root cause
    # engine = create_engine(f'hive://{spark_thrift_service_name}:10000/default')
    # df = pd.read_sql(f"select * from {table_name} limit 10", engine)

    with hive.Connection(host=spark_thrift_service_name, port=port, database=database_name) as conn:
        df = pd.read_sql(f"select * from {database_name}.{table_name}", conn)

        display(df)

    
check_table()
print("------------")
# show_table_details("transactions")





Unnamed: 0,namespace,tableName,isTemporary
0,default,raw__transactions,False
1,default,transactions,False


------------


## DBT 

In [8]:
!dbt --version

Core:
  - installed: 1.10.2
  - latest:    1.10.2 - [32mUp to date![0m

Plugins:
  - spark: 1.9.2 - [32mUp to date![0m




## Verify DBT setup

### profiles.yml
```yml
formal_verification_prj_name:
  outputs:
    spark:
      type: spark
      method: thrift
      host: dbt-spark3-thrift
      schema: default
      connect_timeout: 30
      authentication: NONE
  target: spark

```

In [9]:
!cat ~/.dbt/profiles.yml

send_anonymous_usage_stats: false

dbt_spark_demo_prj:
  outputs:
    spark:
      type: spark
      method: thrift
      host: dbt-spark3-thrift
      port: 10000
      schema: default
      connect_timeout: 30
      authentication: NONE
  target: spark


### dbt_project.yml
```yml
name: 'formal_verification_prj_name'
version: '1.0.0'

# This setting configures which "profile" dbt uses for this project.
profile: 'formal_verification_prj_name'

# These configurations specify where dbt should look for different types of files.
# The `model-paths` config, for example, states that models in this project can be
# found in the "models/" directory. You probably won't need to change these!
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

clean-targets:         # directories to be removed by `dbt clean`
  - "target"
  - "dbt_packages"

models:
  formal_verification_prj_name:
    # +write_json: false
    +materialized: incremental
    +file_format: parquet    

```

In [10]:
!cat /usr/app/dbt/dbt_spark_demo_prj/dbt_project.yml


# Name your project! Project names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: 'dbt_spark_demo_prj'
version: '1.0.0'

# This setting configures which "profile" dbt uses for this project.
profile: 'dbt_spark_demo_prj'

# These configurations specify where dbt should look for different types of files.
# The `model-paths` config, for example, states that models in this project can be
# found in the "models/" directory. You probably won't need to change these!
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

clean-targets:         # directories to be removed by `dbt clean`
  - "target"
  - "dbt_packages"


# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models

# In this example config, we tell dbt to build

### dbt debug

In [11]:
!dbt debug --project-dir /usr/app/dbt/dbt_spark_demo_prj

#to print detailed messages for debugging 
#!dbt debug --debug --project-dir /usr/app/dbt/dbt_spark_demo_prj

[0m07:47:20  Running with dbt=1.10.2
[0m07:47:20  dbt version: 1.10.2
[0m07:47:20  python version: 3.11.2
[0m07:47:20  python path: /usr/local/bin/python3
[0m07:47:20  os info: Linux-5.15.167.4-microsoft-standard-WSL2-x86_64-with-glibc2.31


[0m07:47:20  Using profiles dir at /root/.dbt
[0m07:47:20  Using profiles.yml file at /root/.dbt/profiles.yml
[0m07:47:20  Using dbt_project.yml file at /usr/app/dbt/dbt_spark_demo_prj/dbt_project.yml
[0m07:47:20  adapter type: spark
[0m07:47:20  adapter version: 1.9.2


[0m07:47:21  Configuration:
[0m07:47:21    profiles.yml file [[32mOK found and valid[0m]
[0m07:47:21    dbt_project.yml file [[32mOK found and valid[0m]
[0m07:47:21  Required dependencies:
[0m07:47:21   - git [[32mOK found[0m]



[0m07:47:21  Connection:
[0m07:47:21    host: dbt-spark3-thrift
[0m07:47:21    port: 10000
[0m07:47:21    cluster: None
[0m07:47:21    endpoint: None
[0m07:47:21    schema: default
[0m07:47:21    organization: 0
[0m07:47:21  Registered adapter: spark=1.9.2


[0m07:47:21    Connection test: [[32mOK connection ok[0m]

[0m07:47:21  [32mAll checks passed![0m


## dbt command comparisons
| Command           | Purpose                                  | What it does                                                             | Output                                    |
| ----------------- | ---------------------------------------- | ------------------------------------------------------------------------ | ----------------------------------------- |
| **`dbt compile`** | Prepares SQL code for execution          | Converts Jinja + macros into raw SQL; writes compiled files to `target/` | Compiled SQL files (no warehouse changes) |
| **`dbt run`**     | Builds models (tables/views)             | Executes compiled SQL to materialize models in the data warehouse        | Tables/views in the warehouse             |
| **`dbt test`**    | Validates data quality assumptions       | Runs data tests (e.g., `not_null`, `unique`, custom) on warehouse data   | Test pass/fail results                    |
| **`dbt seed`**    | Loads static CSV data into the warehouse | Uploads CSV files from the `seeds/` directory into tables                 | Tables containing seed data               |


# Prepare SQL

## Read dbt input csv 

* read seeds/transactions.csv, 
* dbt generate raw__transaction.csv
* dbt create table default.raw_transaction at the database 

### Verify database before the operation

In [12]:
!dbt run-operation drop_table --args '{"table_name": "default.raw__transactions"}'  
!dbt run-operation drop_table --args '{"table_name": "default.transactions"}' 

[0m07:47:26  Running with dbt=1.10.2


[0m07:47:26  Registered adapter: spark=1.9.2


[0m07:47:27  Found 1 model, 1 test, 1 seed, 476 macros, 3 unit tests


[0m07:47:32  Running with dbt=1.10.2


[0m07:47:33  Registered adapter: spark=1.9.2


[0m07:47:33  Found 1 model, 1 test, 1 seed, 476 macros, 3 unit tests


In [13]:
check_table(message_to_be_printed="Database table before the operation")
!dbt seed --log-level info --project-dir $dbt_project_dir
check_table(message_to_be_printed="\nDatabase table after the operation")

Database table before the operation


Unnamed: 0,namespace,tableName,isTemporary


[0m07:47:39  Running with dbt=1.10.2


[0m07:47:39  Registered adapter: spark=1.9.2


[0m07:47:40  Found 1 model, 1 test, 1 seed, 476 macros, 3 unit tests
[0m07:47:40  
[0m07:47:40  Concurrency: 1 threads (target='spark')
[0m07:47:40  


[0m07:47:41  1 of 1 START seed file default.raw__transactions ............................... [RUN]


[0m07:47:42  1 of 1 OK loaded seed file default.raw__transactions ........................... [[32mINSERT 6[0m in 1.11s]


[0m07:47:42  
[0m07:47:42  Finished running 1 seed in 0 hours 0 minutes and 1.56 seconds (1.56s).


[0m07:47:42  
[0m07:47:42  [32mCompleted successfully[0m
[0m07:47:42  
[0m07:47:42  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 NO-OP=0 TOTAL=1



Database table after the operation


Unnamed: 0,namespace,tableName,isTemporary
0,default,raw__transactions,False


## dbt compile

#### removed target/compiled/

In [14]:
!rm -rf dbt_spark_demo_prj/target/compiled/
!ls dbt_spark_demo_prj/target/compile

ls: cannot access 'dbt_spark_demo_prj/target/compile': No such file or directory


In [15]:
!ls $dbt_project_dir

dbt_project.yml  logs  macros  models  seeds  target  tests


In [16]:
!dbt compile --log-level info --project-dir $dbt_project_dir

[0m07:47:49  Running with dbt=1.10.2


[0m07:47:50  Registered adapter: spark=1.9.2


[0m07:47:50  Found 1 model, 1 test, 1 seed, 476 macros, 3 unit tests
[0m07:47:50  
[0m07:47:50  Concurrency: 1 threads (target='spark')
[0m07:47:50  


In [17]:
!ls /usr/app/dbt/dbt_spark_demo_prj/target/compiled/dbt_spark_demo_prj

models	tests


## Build your models (dbt run)

## Delete database table prior dbt run that will create the table

In [18]:
check_table(message_to_be_printed="before")
!dbt run-operation drop_table --args '{"table_name": "default.transactions"}'
check_table(message_to_be_printed="\nafter")

before


Unnamed: 0,namespace,tableName,isTemporary
0,default,raw__transactions,False


[0m07:47:57  Running with dbt=1.10.2


[0m07:47:57  Registered adapter: spark=1.9.2


[0m07:47:58  Found 1 model, 1 test, 1 seed, 476 macros, 3 unit tests



after


Unnamed: 0,namespace,tableName,isTemporary
0,default,raw__transactions,False


In [19]:
!rm -r $dbt_project_dir/target/run
!dbt run-operation drop_table --args '{"table_name": "default.transactions"}' 
check_table(message_to_be_printed="before")

!dbt run --log-level info --project-dir $dbt_project_dir

print("\nAfter")
!tree $dbt_project_dir/target/run
check_table(message_to_be_printed="after")

[0m07:48:04  Running with dbt=1.10.2


[0m07:48:04  Registered adapter: spark=1.9.2


[0m07:48:05  Found 1 model, 1 test, 1 seed, 476 macros, 3 unit tests


before


Unnamed: 0,namespace,tableName,isTemporary
0,default,raw__transactions,False


[0m07:48:10  Running with dbt=1.10.2


[0m07:48:11  Registered adapter: spark=1.9.2


[0m07:48:12  Found 1 model, 1 test, 1 seed, 476 macros, 3 unit tests
[0m07:48:12  
[0m07:48:12  Concurrency: 1 threads (target='spark')
[0m07:48:12  


[0m07:48:12  1 of 1 START sql table model default.transactions .............................. [RUN]


[0m07:48:13  1 of 1 OK created sql table model default.transactions ......................... [[32mOK[0m in 1.03s]


[0m07:48:13  
[0m07:48:13  Finished running 1 table model in 0 hours 0 minutes and 1.46 seconds (1.46s).


[0m07:48:13  
[0m07:48:13  [32mCompleted successfully[0m
[0m07:48:13  
[0m07:48:13  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 NO-OP=0 TOTAL=1



After


[01;34m/usr/app/dbt/dbt_spark_demo_prj/target/run[00m
└── [01;34mdbt_spark_demo_prj[00m
    └── [01;34mmodels[00m
        └── [01;34mtransactions[00m
            └── transactions.sql

3 directories, 1 file


after


Unnamed: 0,namespace,tableName,isTemporary
0,default,raw__transactions,False
1,default,transactions,False


## test model using dbt

#### test name and its test vectors

In [20]:
!cat dbt_spark_demo_prj/tests/unit/test_transactions.yml

# tests/unit/test_transactions.yml

version: 2
unit_tests:
  - name: test_currency_usd
    model: transactions
    given:
      - input: ref('raw__transactions')
        rows:
          - {id: 1, amount: 100, currency: 'USD', status: 'ACTIVE'}
    expect:
      rows:
        - {id: 1, amount_usd: 100, currency: 'USD', branch_marker: "USD_BRANCH"}

  - name: test_currency_sgd
    model: transactions
    given:
      - input: ref('raw__transactions')
        rows:
          - {id: 2, amount: 100, currency: 'SGD', status: 'ACTIVE'}
    expect:
      rows:
        - {id: 2, amount_usd: 74, branch_marker: "SGD_BRANCH"}

  - name: test_inactive
    model: transactions
    given:
      - input: ref('raw__transactions')
        rows:
          - {id: 2, amount: 100, currency: 'SGD', status: 'INACTIVE'}
    expect: 
      rows: []

#### run test

In [21]:
%%bash
export DBT_PROJECT_DIR=/usr/app/dbt/dbt_spark_demo_prj
dbt test --log-level info

[0m07:48:20  Running with dbt=1.10.2


[0m07:48:20  Registered adapter: spark=1.9.2


[0m07:48:21  Found 1 model, 1 test, 1 seed, 476 macros, 3 unit tests


[0m07:48:21  


[0m07:48:21  Concurrency: 1 threads (target='spark')
[0m07:48:21  


[0m07:48:22  1 of 4 START test test_currency_coverage ....................................... [RUN]




[0m07:48:22  1 of 4 PASS test_currency_coverage ............................................. [[32

mPASS[0m in 0.46s]


[0m07:48:22  2 of 4 START unit_test transactions::test_currency_sgd ......................... [RUN]




[0m07:48:23  2 of 4 PASS transactions::test_currency_sgd .................................... [[32

mPASS[0m in 0.97s]


[0m07:48:23  3 of 4 START unit_test transactions::test_currency_usd ......................... [RUN]




[0m07:48:24  3 of 4 PASS transactions::test_currency_usd .................................... [[32

mPASS[0m in 0.57s]


[0m07:48:24  4 of 4 START unit_test transactions::test_inactive ............................. [RUN]




[0m07:48:24  4 of 4 PASS transactions::test_inactive ........................................ [[32

mPASS[0m in 0.60s]


[0m07:48:24  


[0m07:48:24  Finished running 1 test, 3 unit tests in 0 hours 0 minutes and 2.95 seconds (2.95s).


[0m07:48:24  


[0m07:48:24  [32mCompleted successfully[0m


[0m07:48:24  


[0m07:48:24  Done. PASS=4 WARN=0 ERROR=0 SKIP=0 NO-OP=0 TOTAL=4


In [22]:
!chmod -R a+rw /usr/app

In [23]:
filepath="/usr/app/demo/results/test_end.txt"
!date; echo end test at > $filepath 
!export TZ='Asia/Singapore'; date >> $filepath;

Mon Jun 30 07:48:27 UTC 2025


In [24]:
# assert False, "Stop execution here!"

# Apendix

## check the model built 

### Check the model built using Pyspark (In progress)

In [25]:
# !pip install pyspark==3.3.2

In [26]:
# %%bash
# # For Debian/Ubuntu-based containers:
# apt-get update && apt-get install -y openjdk-11-jdk

# # For Alpine-based containers:
# apk add openjdk11

In [27]:
# !ls /usr/lib/jvm/java-11-openjdk-amd64

In [28]:
# # !. /usr/spark/bin/load-spark-env.sh

# # Typically in Debian/Ubuntu:
# !export JAVA_HOME=$(dirname $(dirname $(readlink -f $(which java))))

# # Or set it explicitly:
# !export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
# from pyspark.sql import SparkSession

# spark = SparkSession.builder \
#     .appName("dbt_clean") \
#     .config("spark.hadoop.hive.metastore.schema.verification", "true") \
#     .config("spark.sql.catalogImplementation", "hive") \
#     .enableHiveSupport() \
#     .getOrCreate()

# # spark = SparkSession.builder \
# #     .config("hive.metastore.uris", "thrift://dbt-hive-metastore:10000") \
# #     .config("spark.sql.catalogImplementation", "hive") \
# #     .enableHiveSupport() \
# #     .getOrCreate()

# spark.sql("USE default")  # Or your specific database
# spark.sql("SHOW DATABASES").show()
# # spark.sql("select * from transactions").show()
# # # df.toPandas()

In [29]:
# spark.sql("SHOW TABLES IN default").show(truncate=False)

In [30]:
# # Query metastore system tables
# spark.sql("""
#   SELECT TBL_NAME, TBL_TYPE 
#   FROM default.TBLS 
#   JOIN default.DBS ON TBLS.DB_ID = DBS.DB_ID 
#   WHERE DBS.NAME = 'default'
# """).show(truncate=False)

In [31]:
# spark.conf.get("hive.metastore.uris")  # Should return your URI

In [32]:
# spark.conf.get("spark.sql.catalogImplementation")  # Should return "hive"

In [33]:
# from pyspark.sql import SparkSession

# spark.stop()
# spark = SparkSession.builder \
#     .appName("DBTIntegration") \
#     .config("hive.metastore.uris", "thrift://dbt-hive-metastore:9083") \
#     .config("spark.sql.catalogImplementation", "hive") \
#     .config("javax.jdo.option.ConnectionUserName", "dbt") \
#     .config("javax.jdo.option.ConnectionPassword", "dbt") \
#     .enableHiveSupport() \
#     .getOrCreate()
# spark.sql("SHOW TABLES IN default")

In [34]:
# from pyhive import hive
# conn = hive.connect(host=spark_thrift_service_name, port=10000)
# cursor = conn.cursor()
# cursor.execute("SELECT * FROM default.transactions LIMIT 1")
# print(cursor.fetchall())

### Test the built model (using the test model) 

### dbt test

In [35]:
!dbt test --log-level info

[0m07:48:35  Running with dbt=1.10.2


[0m07:48:35  Registered adapter: spark=1.9.2


[0m07:48:36  Found 1 model, 1 test, 1 seed, 476 macros, 3 unit tests
[0m07:48:36  
[0m07:48:36  Concurrency: 1 threads (target='spark')
[0m07:48:36  


[0m07:48:37  1 of 4 START test test_currency_coverage ....................................... [RUN]


[0m07:48:37  1 of 4 PASS test_currency_coverage ............................................. [[32mPASS[0m in 0.49s]
[0m07:48:37  2 of 4 START unit_test transactions::test_currency_sgd ......................... [RUN]


[0m07:48:38  2 of 4 PASS transactions::test_currency_sgd .................................... [[32mPASS[0m in 0.92s]
[0m07:48:38  3 of 4 START unit_test transactions::test_currency_usd ......................... [RUN]


[0m07:48:39  3 of 4 PASS transactions::test_currency_usd .................................... [[32mPASS[0m in 0.61s]
[0m07:48:39  4 of 4 START unit_test transactions::test_inactive ............................. [RUN]


[0m07:48:39  4 of 4 PASS transactions::test_inactive ........................................ [[32mPASS[0m in 0.55s]


[0m07:48:39  
[0m07:48:39  Finished running 1 test, 3 unit tests in 0 hours 0 minutes and 2.96 seconds (2.96s).


[0m07:48:40  
[0m07:48:40  [32mCompleted successfully[0m
[0m07:48:40  


[0m07:48:40  Done. PASS=4 WARN=0 ERROR=0 SKIP=0 NO-OP=0 TOTAL=4


In [36]:
# !cat /home/vin/01-prj/stripe/sql-formal-verification/formal_verification_prj_name/tests/unit/test_transactions.yml

In [37]:
# %%bash
# cd /usr/app/dbt
# dbt compile --profiles-dir /usr/app/dbt  --project-dir /usr/app/dbt/dbt_spark_demo_prj
# # dbt run-operation drop_table --args '{"table_name": "default.transactions"}'
# dbt run-operation drop_view --args '{"view_name": "default.my_first_dbt_model"}' --profiles-dir /usr/app/dbt  --project-dir /usr/app/dbt/dbt_spark_demo_prj
# dbt run-operation drop_view --args '{"view_name": "default.my_second_dbt_model"}' --profiles-dir /usr/app/dbt  --project-dir /usr/app/dbt/dbt_spark_demo_prj

<!-- # Todo
## Audit compliance 
### time tracing with time-stamping
1. Add timestamp for loaded data using dbt model
```jinja
SELECT
    *,
    '{{ run_started_at }}'::timestamp AS added_at
FROM {{ ref('my_seed_table') }}
```
### File tracing with source and targets file path, tablename -->


# End of notebook