Skip to content

Commit

Permalink
[daggy-u] [dbt] - Polishing (#20254)
Browse files Browse the repository at this point in the history
## Summary & Motivation

This PR does a bunch of things:

- Adds screenshots
- Cleans up some copy and errors
- Removes an unneeded section in Lesson 6

## How I Tested These Changes
  • Loading branch information
erinkcochran87 committed Mar 6, 2024
1 parent 8cf93fd commit cfa9d87
Show file tree
Hide file tree
Showing 20 changed files with 112 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ setup(
"s3fs",
"smart_open",
"boto3",
"pyarrow",
"fastparquet",
],
extras_require={"dev": ["dagster-webserver", "pytest"]},
)
Expand All @@ -53,7 +55,7 @@ cp .env.example .env
pip install -e ".[dev]"
```

The `e` flag installs the project in editable mode, you can modify existing Dagster assets without having to reload the code location. This allows you to shorten the time it takes to test a change. However, you’ll need to reload the code location in the Dagster UI when adding new assets or installing additional dependencies.
The `e` flag installs the project in editable mode so you can modify existing Dagster assets without having to reload the code location. This allows you to shorten the time it takes to test a change. However, you’ll need to reload the code location in the Dagster UI when adding new assets or installing additional dependencies.

To confirm everything works:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Refer to [the dbt documentation](https://docs.getdbt.com/reference/dbt_project.y

## profiles.yml

The next file we’ll cover is the `profiles.yml` file. This file contains connection details for your data platform, such as those for the DuckDB database we’ll use in this course. In this step, we’ll set up a `dev` environment for the project to use, which is where the DuckDB is located.
The next file we’ll cover is the `profiles.yml` file. This file contains connection details for your data platform, such as those for the DuckDB database we’ll use in this course. In this step, we’ll set up a `dev` environment for the project to use, which is where the DuckDB database is located.

Before we start working, you should know:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,20 @@ In the finished Dagster Essentials project, there should be a file called `asset

```python
from pathlib import Path
# import os
```

The `Path` class from the `pathlib` standard library will help us create an accurate pointer to where our dbt project is. At the bottom of this same file, add the following line:
The `Path` class from the `pathlib` standard library will help us create an accurate pointer to where our dbt project is. At the bottom of `constants.py`, add the following line:

```python
DBT_DIRECTORY = Path(__file__).joinpath("..", "..", "..", "analytics").resolve()
```

TODO: Tim to update as these instructions as the old instructions use `os.path.join`

This line creates a new constant called `DBT_DIRECTORY`. This line might look a little complicated, so let’s break it down:

- It uses `constants.py`'s file location (via `__file__`) as a point of reference for finding the dbt project
- The arguments in `joinpath` point us towards our dbt project in `analytics`
- It uses the location of the `constants.py` file (via `__file__`) as a point of reference for finding the dbt project
- The arguments in `joinpath` point us towards our dbt project by appending the following to the current path:
- Three directory levels up (`"..", "..", ".."`)
- A directory named `analytics`, which is the directory containing our dbt project
- The `resolve` method turns that path into an absolute file path that points to the dbt project correctly from any file we’re working in

Now that you can access your dbt project from any other file with the `DBT_DIRECTORY` constant, let’s move on to the first place where you’ll use it: creating the Dagster resource that will run dbt.
Now that you can access your dbt project from any other file with the `DBT_DIRECTORY` constant, let’s move on to the first place where you’ll use it: creating the Dagster resource that will run dbt.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Now is the moment that we’ve been building up to since the beginning of this m

---

## Turn dbt models into assets with @dbt_assets
## Turning dbt models into assets with @dbt_assets

The star of the show here is the `@dbt_assets` decorator. This is a specialized asset decorator that wraps around a dbt project to tell Dagster what dbt models exist. In the body of the `@dbt_assets` definition, you write exactly how you want Dagster to run your dbt models.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ After clicking around a bit and seeing the dbt models within Dagster, the next s

The run’s page should look similar to this:

![TODO](/images/dagster-dbt/lesson-3/dbt-run-details-page.png)
![Run details page in the Dagster UI](/images/dagster-dbt/lesson-3/dbt-run-details-page.png)

Notice that there is only one “block,” or step, in this chart. That’s because Dagster runs dbt as it’s intended to be run: in a single execution of a `dbt` CLI command. This step will be named after the `@dbt_assets` -decorated asset, which we called `dbt_analytics` in the `assets/dbt.py` file.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,19 @@ To demonstrate, we’re going to intentionally make a bug in our dbt model code,
from raw_zones
```

2. Open a separate terminal instance and run:
2. Navigate to the Dagster UI and reload the code location by either clicking the **Reload Definitions** button or using **Option+Shift+R**.

```bash
cd analytics
dbt parse
```

3. Navigate to the Dagster UI and reload the code location by either clicking the **Reload Definitions** button or using **Option+Shift+R**. **Note:** Your first terminal window with `dagster dev` should still be running.

4. On the asset graph, locate the `stg_zones` asset. You’ll see a yellow **Code version** tag indicating that Dagster recognized the SQL code changed:
3. On the asset graph, locate the `stg_zones` asset. You’ll see a yellow **Code version** tag indicating that Dagster recognized the SQL code changed:

![dbt std_zones asset with a code version badge in the Dagster UI](/images/dagster-dbt/lesson-4/stg-zones-code-version.png)

5. Select the `stg_zones` asset and click the **Materialize** button.
4. Select the `stg_zones` asset and click the **Materialize** button.

6. Navigate to the run’s details page.
5. Navigate to the run’s details page.

7. On the run’s details page, click the `dbt_analytics` step.
6. On the run’s details page, click the `dbt_analytics` step.

8. To view the logs, click the `stdout` button on the top-left of the pane. You’ll see the logs that typically come from executing `dbt`:
7. To view the logs, click the `stdout` button on the top-left of the pane. You’ll see the logs that typically come from executing `dbt`:

![stdout logs showing failure for std_zones materialization in the Dagster UI](/images/dagster-dbt/lesson-4/stg-zones-stdout-failure.png)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ lesson: '5'

# Connecting dbt models to Dagster assets

With where we left off, you may have noticed that the sources for your dbt projects are not just tables that exist in DuckDB, but also *assets* that Dagster created. However, the staging models that use those sources aren’t linked to the Dagster assets that produced them.
With where we left off, you may have noticed that the sources for your dbt projects are not just tables that exist in DuckDB, but also *assets* that Dagster created. However, the staging models (`stg_trips`, `stg_zones`) that use those sources aren’t linked to the Dagster assets (`taxi_trips`, `taxi_zones`) that produced them:

Let’s fix that by telling Dagster that the dbt sources are the tables that the `taxi_trips` and `taxi_zones` asset definitions produce. To match up these assets, we’ll override dbt’s asset key with the name `taxi_trips`. By having the asset keys line up, Dagster will know that these assets are the same and should merge them.
![dbt models unconnected to Dagster assets](/images/dagster-dbt/lesson-5/unconnected-sources-assets.png)

Let’s fix that by telling Dagster that the dbt sources are the tables that the `taxi_trips` and `taxi_zones` asset definitions produce. To match up these assets, we'll override the dbt assets' keys. By having the asset keys line up, Dagster will know that these assets are the same and should merge them.

This is accomplished by changing the dbt source’s asset keys to be the same as the matching assets that Dagster makes. In this case, the dbt source’s default asset key is `raw_taxis/trips`, and the table that we’re making with Dagster has an asset key of `taxi_trips`.

Expand Down Expand Up @@ -49,7 +51,7 @@ Open the `assets/dbt.py` file and do the following:

4. Now, let’s fill in the `get_asset_key` method with our own logic for defining asset keys.

1. There are two properties that we’ll want from `dbt_resource_props`: the `type` (ex., model or source) and the `name`, such as `trips` or `stg_trips`. Access both of those properties from the `dbt_resource_props` argument and store them in their own respective variables (`type` and `name`):
1. There are two properties that we’ll want from `dbt_resource_props`: the `type` (ex., model, source, seed, snapshot) and the `name`, such as `trips` or `stg_trips`. Access both of those properties from the `dbt_resource_props` argument and store them in their own respective variables (`type` and `name`):

```python
@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ At this point, you’ve loaded your dbt models as Dagster assets and linked the

In this section, you’ll learn how to do this by defining a new Dagster asset that depends on a dbt model. We’ll make some metrics in a dbt model and then use Python to generate a chart with that data.

If you’re familiar with New York City, you might know that there are three major airports - JFK, LGA, and EWR - in different parts of the metropolitan area. Hypothetically, you’re curious how their final destination impacts the airport they fly into. For example, how many people staying in Queens flew into LGA?
If you’re familiar with New York City, you might know that there are three major airports - JFK, LGA, and EWR - in different parts of the metropolitan area. Let's say you’re curious if a traveler's final destination impacts the airport they fly into. For example, how many people staying in Queens flew into LGA?

---

Expand Down Expand Up @@ -102,7 +102,9 @@ Now we’re ready to create the asset!

**Note:** Because Dagster doesn’t discriminate and treats all dbt models as assets, you’ll add this dependency just like you would with any other asset.

4. Fill in the body of the function with the following code to follow a similar pattern to your project’s existing pipelines: query for the data, use a library to generate a chart, save the chart as a file, and embed the chart:
4. Fill in the body of the function with the following code to follow a similar pattern to your project’s existing pipelines: query for the data, use a library to generate a chart, save the chart as a file, and embed the chart.

At this point, the `airport_trips` asset should look like this:

```python
@asset(
Expand Down Expand Up @@ -154,4 +156,6 @@ Now we’re ready to create the asset!
)
```

5. Reload your code location to see the new `airport_trips` asset within the `metrics` group. Notice how the asset graph links the dependency between the `location_metrics` dbt asset and the new `airport_trips` chart asset.
5. Reload your code location to see the new `airport_trips` asset within the `metrics` group. Notice how the asset graph links the dependency between the `location_metrics` dbt asset and the new `airport_trips` chart asset:

![The airport_trips asset in the Asset Graph of the Dagster UI](/images/dagster-dbt/lesson-5/airport-trips-asset.png)
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ Did you realize that your dbt models have already been scheduled to run on a reg

Check it out in the Dagster UI by clicking **Overview** in the top navigation bar, then the **Jobs** tab. Click `trip_update_job` to check out the job’s details. It looks like the dbt models are already attached to this job!

![dbt assets in the trip_update_job in the Dagster UI](/images/dagster-dbt/lesson-5/trip-update-job-dbt-assets.png)

Pretty cool, right? Let’s check out the code that made this happen. Open the `dagster_university/jobs/__init__.py` and look at the definition for `trip_update_job`:

```python
Expand Down Expand Up @@ -61,15 +63,19 @@ The function will return an `AssetSelection` of the dbt models that match your d

4. Reload the code location and confirm that the dbt models are not in the `trip_update_job` anymore!

You might notice that the `airport_trips` asset is still scheduled to run with this job! That’s because the `build_dbt_asset_selection` function only selects *dbt models* and **not** Dagster assets.
![trip_update_job without dbt models](/images/dagster-dbt/lesson-5/job-with-dbt-models.png)

You might notice that the `airport_trips` asset is still scheduled to run with this job! That’s because the `build_dbt_asset_selection` function only selects *dbt models* and **not** Dagster assets.

If you want to also exclude the new `airport_trips` asset from this job, modify the `dbt_trips_selection` to include all *downstream assets*, too. Because we’re using Dagster’s native functionality to select all downstream assets, we can now drop the `+` from the dbt selector:

```python
dbt_trips_selection = build_dbt_asset_selection([dbt_analytics], "stg_trips").downstream()
```

Reload the code location and look at the `trip_update_job` once more to verify that everything looks right.
Reload the code location and look at the `trip_update_job` once more to verify that everything looks right:

![trip_update_job with the airport_trips asset](/images/dagster-dbt/lesson-5/job-without-dbt-models.png)

{% callout %}
> 💡 **Want an even more convenient utility to do this work for you?** Consider using the similar [`build_schedule_from_dbt_selection`](https://docs.dagster.io/_apidocs/libraries/dagster-dbt#dagster_dbt.build_schedule_from_dbt_selection) function to quickly create a job and schedule for a given dbt selection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,6 @@ To partition an incremental dbt model, you’ll need first to partition your `@d

---

## Defining a new daily partition

Let’s start by defining a new daily partition for the model.

In `dagster_university/partitions/init.py`, make the following changes:

1. import `DailyPartitionsDefinition` from `dagster`, and
2. Define a new `daily_partition` like the following:

```python
from dagster import MonthlyPartitionsDefinition, WeeklyPartitionsDefinition, DailyPartitionsDefinition

# ...existing partitions here

daily_partition = DailyPartitionsDefinition(
start_date=start_date,
end_date=end_date
)
```

---

## Defining an incremental selector

We have a few changes to make to our dbt setup to get things working. In `dagster_university/assets/dbt.py`:
Expand All @@ -47,7 +25,7 @@ We have a few changes to make to our dbt setup to get things working. In `dagste
import json
```

This imports the new `daily_partition` and the `json` standard module. We’ll use the `json` module to format how we tell dbt what partition to materialize.
This imports the `daily_partition` from `/dagster-university/partitions/__init__.py` and the `json` standard module. We’ll use the `json` module to format how we tell dbt what partition to materialize.

2. We now need a way to indicate that we’re selecting or excluding incremental models, so we’ll make a new constant in the `dbt.py` file called `INCREMENTAL_SELECTOR:`

Expand Down Expand Up @@ -143,7 +121,68 @@ def dbt_analytics(context: AssetExecutionContext, dbt: DbtCliResource):
At this point, the `dagster_university/assets/dbt.py` file should look like this:

```python
TODO
import os
import json
from dagster import AssetExecutionContext, AssetKey
from dagster_dbt import dbt_assets, DbtCliResource, DagsterDbtTranslator

from .constants import DBT_DIRECTORY
from ..partitions import daily_partition
from ..resources import dbt_resource


INCREMENTAL_SELECTOR = "config.materialized:incremental"


class CustomizedDagsterDbtTranslator(DagsterDbtTranslator):
@classmethod
def get_asset_key(cls, dbt_resource_props):
resource_type = dbt_resource_props["resource_type"]
name = dbt_resource_props["name"]
if resource_type == "source":
return AssetKey(f"taxi_{name}")
else:
return super().get_asset_key(dbt_resource_props)


dbt_resource.cli(["--quiet", "parse"]).wait()

if os.getenv("DAGSTER_DBT_PARSE_PROJECT_ON_LOAD"):
dbt_manifest_path = (
dbt_resource.cli(["--quiet", "parse"])
.wait()
.target_path.joinpath("manifest.json")
)
else:
dbt_manifest_path = DBT_DIRECTORY.joinpath("target", "manifest.json")


@dbt_assets(
manifest=dbt_manifest_path,
dagster_dbt_translator=CustomizedDagsterDbtTranslator(),
exclude=INCREMENTAL_SELECTOR,
)
def dbt_analytics(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()


@dbt_assets(
manifest=dbt_manifest_path,
dagster_dbt_translator=CustomizedDagsterDbtTranslator(),
select=INCREMENTAL_SELECTOR,
partitions_def=daily_partition,
)
def incremental_dbt_models(context: AssetExecutionContext, dbt: DbtCliResource):
time_window = context.partition_time_window
dbt_vars = {
"min_date": time_window.start.isoformat(),
"max_date": time_window.end.isoformat(),
}

yield from dbt.cli(
["build", "--vars", json.dumps(dbt_vars)], context=context
).stream()

```

---
Expand All @@ -152,16 +191,12 @@ TODO

Finally, we’ll modify the `daily_metrics.sql` file to reflect that dbt knows what partition range is being materialized. Since the partition range is passed in as variables at runtime, the dbt model can access them using the `var` dbt macro.

In `analytics/models/marts/daily_metrics.sql`, update the model's incremental logic to the following:
In `analytics/models/marts/daily_metrics.sql`, update the contents of the model's incremental logic (`% if is_incremental %}`) to the following:

```sql
`{% if is_incremental() %}`
where date_of_business >= strptime('{{ var('min_date') }}', '%c') and date_of_business < strptime('{{ var('max_date') }}', '%c')
`{% endif %}`
where date_of_business >= strptime('{{ var('min_date') }}', '%c') and date_of_business < strptime('{{ var('max_date') }}', '%c')
```

**TODO: AWARE THIS IS BROKEN - NEEDS A MARKDOC FIX**

Here, we’ve changed the logic to say that we only want to select rows between the `min_date` and the `max_date`. Note that we are turning the variables into timestamps using `strptime` because they’re loaded as strings.

---
Expand All @@ -170,7 +205,10 @@ Here, we’ve changed the logic to say that we only want to select rows between

That’s it! Now you can check out the new `daily_metrics` asset in Dagster.

1. In the Dagster UI, reload the code location. Once loaded, you should see the new partitioned `daily_metrics` asset.
1. In the Dagster UI, reload the code location. Once loaded, you should see the new partitioned `daily_metrics` asset:

![daily_metrics asset in the Asset Graph of the Dagster UI](/images/dagster-dbt/lesson-6/daily-metrics-asset.png)

2. Click the `daily_metrics` asset and then the **Materialize selected** button. You’ll be prompted to select some partitions first.
3. Once the run starts, navigate to the run’s details page to check out the event logs. The executed dbt command should look something like this:
```bash
Expand Down
Loading

1 comment on commit cfa9d87

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-university ready!

✅ Preview
https://dagster-university-ntemwi04u-elementl.vercel.app

Built with commit cfa9d87.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.