Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support asset partitions for load_assets_from_dbt_project #7683

Closed
OwenKephart opened this issue May 2, 2022 · 16 comments
Closed

support asset partitions for load_assets_from_dbt_project #7683

OwenKephart opened this issue May 2, 2022 · 16 comments
Assignees
Labels

Comments

@OwenKephart
Copy link
Contributor

OwenKephart commented May 2, 2022

We don't currently support partitioned dbt assets, but we should

Relevant mentions in Slack:

@GeorgePearse
Copy link

Would love this

@aryan-tmg
Copy link

I second the above. I see a ton of value in incremental partitioning because they are partitioned by nature in some sense. Most cases I've seen use a date for example.

@sryza
Copy link
Contributor

sryza commented Jun 24, 2022

@GeorgePearse @aryan-tmg - just to deepen my understanding, are you using the BigQuery? The main information I see when I Google dbt partitioning is about BigQuery.

@GeorgePearse
Copy link

My data warehouse is currently v. budget. Just in postgres, might migrate to Snowflake or Firebolt at some point in the future. Just don't want to have to run SQL queries over huge amounts of data to do a computation that unnecessarily recreates everything from scratch.

I haven't thoroughly explored what would already be available for this though.

@aryan-tmg
Copy link

aryan-tmg commented Jun 24, 2022

My use case is that I have a lot of data in my dbt models. If I were to materialize them as views each day it would greatly extend my workflows. Right now, I have my models configured as incremental and I pass a variable "run_date" whenever I do a dbt run. I'd like to be able to configure that somehow in dagster so that I can pass a partition definition to all assets in my workflow including dbt

@sryza sryza added this to the 0.15.3 milestone Jun 24, 2022
@sryza
Copy link
Contributor

sryza commented Jun 27, 2022

We're currently investigating adding this. Some questions for those who are interested in this:

  • In your project, what partitions do you use (i.e. daily), and are all models partitioned or are some partitioned and some not? Do models within a project have different partitions?
  • How do you change behavior of dbt to respect the partition (is it just vars?)
  • Do you use incremental models to handle inserting just the records corresponding to the partition, or is there something else going on?
  • If you set a var named something like "run_date", does that typically correspond to the start of the time window or the end of the time window?

Tagging people who 👍 'ed this issue who I haven't yet reached out to in other contexts: @deugene @hungtd9 @xaniasd @nvinhphuc.

@nvinhphuc
Copy link
Contributor

nvinhphuc commented Jun 28, 2022

Hi @sryza , I will describe what we currently do.

I have an ingestion job running daily.
When the ingestion job finishes, we get the start_time and end_time from the partition and run an dbt ops with variables start_time and end_time.
A simple dbt model looks like this:

{{ config(
    materialized='incremental',
    unique_key=['id'],
) }}
SELECT 
    id::string,
    date_modified::timestamp,
    date_modified_partition::date
FROM {{ source('raw', 'source_table') }}
{% if is_incremental() %}
WHERE date_modified_partition >= '{{ var('start_time') }}'
    AND date_modified_partition < '{{ var('end_time') }}'
{% endif %}

This is to process data from start_time to end_time, not fully refresh all the data. However, we can run full refresh with dbt run --full-refresh if we want to reprocess all data. But the idea is to bring the partition in Dagster into DBT for coherence.

  • In your project, what partitions do you use (i.e. daily), and are all models partitioned or are some partitioned and some not? Do models within a project have different partitions?
    It can be daily or hourly or monthly, but if it's start_time and end_time variables in DBT, then the problem is solved.

  • How do you change behavior of dbt to respect the partition (is it just vars?)
    I would like Dagster to allow custom dbt commands on each dbt asset, with not only vars but also other arguments/parameters.

@xaniasd
Copy link

xaniasd commented Jun 28, 2022

Hi @sryza here's some input from my side as well

In your project, what partitions do you use (i.e. daily), and are all models partitioned or are some partitioned and some not? Do models within a project have different partitions?

It really depends on the model. Typically daily partitions, could be hourly as well. Event-like tables are typically partitioned, but there are also models that partitioning is not needed, for instance small tables with master data (think zipcode per city).

How do you change behavior of dbt to respect the partition (is it just vars?). Do you use incremental models to handle inserting just the records corresponding to the partition, or is there something else going on?

For incremental models, you can also define static partitions. You could tell dbt to figure out all missing partitions itself and update the model accordingly, but this is not a great option when each partition contains gigabytes of data. By providing a partition as a variable, I can update exactly the partition that I want.

If you set a var named something like "run_date", does that typically correspond to the start of the time window or the end of the time window?

All I can say: please avoid the confusion Airflow created with their execution_date (I think they're trying to move past that as well) :) I'm trying to use the var as the partition to be processed wherever possible. Perhaps it would be good to leave the interpretation to the individual user, as it can differ wildly.

Hope this helps, happy to share more if necessary

@OwenKephart
Copy link
Contributor Author

@xaniasd @nvinhphuc thanks for the feedback! You both mentioned sometimes partitioning by hour and sometimes by day, and I have some followup questions around that.

  • Do you have a way of distinguishing between models that should be run hourly vs those that should be run daily (i.e. a tag?)
  • Do you have any special logic to prevent duplicating data when you run dbt over the same partition multiple times?

@nvinhphuc
Copy link
Contributor

Hi @OwenKephart

  • It's not necessarily different in the models.
{% if is_incremental() %}
WHERE date_modified_partition >= '{{ var('start_time') }}'
    AND date_modified_partition < '{{ var('end_time') }}'
{% endif %}

The start_time and end_time variables is string, and I will CAST them to DATE or DATETIME depends on the partition is hourly or daily.

  • We deduplicate in dbt using windows function ROW_NUMBER, also we use upsert/merge strategy in incremental model to prevent duplicates.

@OwenKephart
Copy link
Contributor Author

Hi everyone! As an initial implementation of this behavior, we're planning to add a partitions_def and partition_key_to_vars_fn parameter to both load_assets_from_dbt_project and load_assets_from_dbt_manifest.

For simple cases, where you want to parameterize all models in a dbt project by (let's say) a run date, that would look something like:

from dagster import DailyPartitionsDefinition

my_partitions_def = DailyPartitionsDefinition(start_date="2022-02-02")

def partition_key_to_dbt_vars(partition_key):
    return {"run_date": partition_key}

dbt_assets = load_assets_from_dbt_project(
    ...,
    partitions_def=my_partitions_def,
    partition_key_to_vars_fn=partition_key_to_dbt_vars,
)

In this example, when Dagster is invoked to update the "2022-06-06" partition (for example), then it will invoke the dbt cli with a vars argument of {"run_date": "2022-06-06"}.

This does not currently provide any special support for dbt projects that have multiple partitioning schemes (although you can invoke the load_assets_... function multiple times, once for each set of dbt assets).

We're definitely interested in how well this setup maps to your usecases, so feel free to leave feedback. All of these changes are currently experimental and subject to change, so if this misses the mark, definitely let us know :)

@xaniasd
Copy link

xaniasd commented Jul 8, 2022

awesome, I'll give it a try. Thanks for this!

@sryza sryza removed this from the 0.15.4 milestone Jul 8, 2022
@sryza sryza changed the title [dagster-dbt] support asset partitions for load_assets_from_dbt_project support asset partitions for load_assets_from_dbt_project Aug 27, 2022
@its-a-gas
Copy link

Hi @sryza

In your project, what partitions do you use (i.e. daily), and are all models partitioned or are some partitioned and some not? Do models within a project have different partitions?

When we supplying partition information to a DB/query engine a sensible default is daily, however there are sometimes cases to make this lower eg if we want to make hourly partitions for more recent data to make scans cheaper. Conversely we might want to make partitions larger eg BigQuery has a default max quota of 4000 partitions per table, and if we want more than 4000

How do you change behavior of dbt to respect the partition (is it just vars?)

I've done it previously by partitioning BigQuery tables per date, and we've used incremental DBT models with variables representing the start/end dates passed into the is_incremental WHERE clause to do partial backfills.

Today we are using Dagster/DBT/Snowflake, letting Snowflake decide how it wants to partition/shard it's data files, and don't have anything better than passing timestamp bookmarks to DBT incremental models.

Do you use incremental models to handle inserting just the records corresponding to the partition, or is there something else going on?

Depends on the data requirements, but if it's just to create a clean copy ready for analysts we might make an incremental doing an insert only where the incoming set has timestamps greater than the max timestamp in the downstream table, or alternatively we might filter out on a WHERE NOT EXISTS looking for existence of the primary key in the downstream table.

If you set a var named something like "run_date", does that typically correspond to the start of the time window or the end of the time window?

Ideally we'd want to be able to start start/min stop/max bookends, for example if we need to backfill 1 specific week eg 2022-02-01 to 2022-02-07 without having to backfill 2022-02-01 to whatever today's timestamp is.


One alternative is for us to do full loads, which is expensive when you're at/above 100s of TB in an insert statement. Another is to not invoke DBT via Dagster, and have another runner/orchestrator invoke DBT passing in the timestamp bookends but we really like using Dagster.

It would be a massive time saving feature for Dagster to be able to push partition/date-ranges/id-ranges down to DBT to do partial backfills.

@sryza
Copy link
Contributor

sryza commented Feb 28, 2023

Thanks a ton for the info @its-a-gas. Would the approach that @OwenKephart added support for and mentioned here work for you?

@its-a-gas
Copy link

Hi @sryza 👋

I think this approach would get us a step closer to having Dagster push the partitioned-asset concept down to the DBT layer. My interpretation is that start_date would be passed to DBT to mutate the exact single date, as opposed to a range. Further that this proposal assumes all models, within a DBT Project, are partitioned daily, at the exclusion of hourly/weekly/etc.

I think we all agree the ideal state is to pass start, and end, bookmarks of arbitrary granularity.... but @OwenKephart solution would get us closer to the ideal state

@sryza
Copy link
Contributor

sryza commented Mar 7, 2023

I'm going to close this because the partitions_def and partition_key_to_vars_fn arguments enable this.

@sryza sryza closed this as completed Mar 7, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

7 participants