# dbt Databricks demo


Slides and example project available at  https://github.com/ConstantinoSchillebeeckx/dbt_databricks_demo

# data build tool (dbt)

* https://www.getdbt.com/
* SQL + Jinja (and now Python!)
* lineage
* data testing
* docs

# docs

```bash
dbt docs generate
dbt docs serve
```

http://127.0.0.1:8080/#!/overview

# config

* project file (e.g. dbt_project.yml)
* property file (e.g. schema.yml)
* config block

```yaml
name: 'e2'
version: '1.0.0'
config-version: 2

# This setting configures which "profile" dbt uses for this project.
profile: 'e2'

# Configuring models
models:
  +persist_docs:
    relation: true
    columns: true

  e2:
    +materialized: table
```

# SQL model

By default work is submitted as an "execution context" to either a SQLWarehouse cluster or an all-purpose cluster.

**e2/models/zip_ny.sql**

```sql
SELECT *
FROM {{ ref('uszips') }}
WHERE state_id = "NY"
```

# SQL model config

**e2/model/zip_ny.yaml**

```yaml
version: 2

models:
  - name: zip_ny
    description: Zipcodes limited to NY
    columns:
      - name: zip
        description: The zipcode
        tests:
          - not_null
```

# Python model

Here we configure dbt to execute the model as a Notebook.

**e2/models/zip_co.py**

```python
def model(dbt, session):
    dbt.config(create_notebook=True)

    df = dbt.ref("uszips")
    return df.filter(df.state_id == "CO")
```



* `dbt`: A class compiled by dbt Core, unique to each model, enables you to run your Python code in the context of your dbt project and DAG.
* `session`: A class representing your data platform’s connection to the Python backend. The session is needed to read in tables as DataFrames, and to write DataFrames back to tables. In PySpark, by convention, the SparkSession is named spark, and available globally. For consistency across platforms, we always pass it into the model function as an explicit argument called session.

# Python model config

**e2/models/zip_co.py**

```yaml
version: 2

models:
  - name: zip_co
    description: Zipcodes limited to CO
    columns:
      - name: zip
        description: The zipcode
        tests:
          - not_null
```          

# Python model

Models can be customized to execute in their own cluster; like all configurations, this can be done at the model level, in a propery file or at a project level; specifying a custom cluster always submits work as a Notebook.

**e2/models/zip_pr.py**

```python
def model(dbt, session):
    dbt.config(
        submission_method="job_cluster",
        job_cluster_config={
            "spark_version": "11.3.x-scala2.12",
            "node_type_id": "i3.xlarge",
            "num_workers": 0,
            "spark_conf": {
                "spark.databricks.cluster.profile": "singleNode",
                "spark.master": "local[*, 4]",
            },
            "custom_tags": {"ResourceClass": "SingleNode"},
        },
    )

    df = dbt.ref("uszips")

    return df.filter(df.state_id == "PR")
```

# Selecting nodes

* `dbt ls --model zip_co+` select `zip_co` and all children (downstream) models
* `dbt ls --model +join` select `join` and all parent (upstream) models
* `dbt ls --model @zip_co` select `zip` and all children and parents of those children

![dag](dag.png)


# Let's run!

```bash
dbt run
dbt run --model model_name+3
dbt run --select tag:prod
```

# How it *might* work

* a single flow responsible for executing all dbt models
* by default this flow does `dbt run` (execute all models)
* flow accepts parameters so that we can easily rerun failed models like `dbt run failed_model+`


# Unit tests

* [active discussion in dbt community](https://github.com/dbt-labs/dbt-core/discussions/4455) & [slack](https://getdbt.slack.com/archives/C03QUA7DWCW/p1659431685815919)
* dbt-core contains [testing framework](https://docs.getdbt.com/guides/dbt-ecosystem/adapter-development/4-testing-a-new-adapter) for pytest

if we roll our own unit testing ...

```python
from pytest import fixture
from pandas.testing import assert_frame_equal
from e2.models.zip_pr import model

@fixture
def dbt():
    # not sure what this looks like ...
    pass

@fixture
def dbt_input(dbt):
    # pydantic model? read CSV?
    
    dbt.ref.input = something
    
    yield dbt

def test_pr_model(dbt_input, local_spark_session):
    
    df_out = model(dbt_input, session=local_spark_session)
    
    assert_frame_equal(df_out, df_expected)
```

# Limitations

* EMR? this should work, haven't tried it yet, there is a `dbt-spark` adapter; you can also easily set dynamic backend targets, e.g. one for databricks and one for EMR
* data tests can only be written in SQL out of the box
* SQL and Python models don't have parity
  * cannot declare a custom S3 location for Python models (SQL supports `location_root`), there's an [GH issue](https://github.com/dbt-labs/dbt-spark/issues/559) for this
  * table comments not materializing for Python models, through column level comments do
* does not support retry out of the box, one suggestion is found [here](https://github.com/PrefectHQ/prefect-recipes/tree/main/prefect-v1-legacy/use-cases/rerun_dbt_models_from_failure) though that requires access to persitent state