Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Selection method based on source freshness: new max_loaded_at, new data #4050

Closed
jtcohen6 opened this issue Oct 13, 2021 · 12 comments · Fixed by #4256
Closed

Selection method based on source freshness: new max_loaded_at, new data #4050

jtcohen6 opened this issue Oct 13, 2021 · 12 comments · Fixed by #4256
Assignees
Labels
enhancement New feature or request state Stateful selection (state:modified, defer)

Comments

@jtcohen6
Copy link
Contributor

jtcohen6 commented Oct 13, 2021

See also: #2465 (comment), #2743 (comment), #3804 (comment), #3862 (comment), Benn's tweet

Use the sources.json artifact to determine which sources have new data since the last dbt build.

Imagine:

$ dbt source freshness
$ dbt build -s sources:fresh+ --state my/previous/run/artifacts

This would require handling multiple sources.json:

  • the version from the last run in the --state directory
  • the one from just now in the target/ directory

Then, dbt would compare the high watermarks recorded in each, and select all source tables with a greater/changed max_loaded_at.

Questions

  • Should we reconsider the inclusion of the freshness task within dbt build? I think it would be tricky to check source freshness and perform dynamic selection and build those now-selected resources, all in one invocation. It would require blurring lines that are pretty well delineated today (selection → execution → artifacts).
  • Should we reconsider the consolidation of sources.json into run_results.json? It seems like the separation would actually be a plus, for the sake of this feature, but I'm sure we could manage either way
  • Would it be better to perform this comparison outside dbt-core (e.g. dbt Cloud metadata API), and then simply pass the set of sources that have new data? If we know that source:stripe and source:salesforce.account have new data, it's easy enough to then define a job with dbt build -s source:stripe+ source:salesforce.account+.

Big idea

Coupled with state:modified and result:<status> (#4017), you could dynamically select the part of your DAG that's relevant by comparing the current state of your sources/project and the artifacts from your last run.

selectors:
  - name: one_selector_to_rule_them_all
    default: true
    description: Select sub-DAG based on new source data, changed logic, &/or failure in last run
    definition:
      union:
      
        # check freshness + test all sources
        - 'source:*'
      
        # new source data + table/incremental
        - intersection:
            - union:
                - 'config.materialized:table'
                - 'config.materialized:incremental'
            - method: source
              value: fresh  # imagine
              children: true
      
        # changed logic (need this for views + MVs too)
        - method: state
          value: modified
          children: true

        # failed last time
        - union:
            - 'result:error'
            - 'result:skipped'
$ export DBT_ARTIFACT_STATE_PATH=my/previous/run/artifacts
$ dbt source freshness
$ dbt build

Schedule that to run every 5 minutes, set up solid slack notifications on failures, and put on a good album.

While this is very cool, it does take its cue from the source data loaders—whenever they've ingested new rows, we're going to build downstream tables. This is the inverse of defining freshness / latency expectations for specific "final" models, and using that as the primary input to deployment/scheduling. That still seems like a cool and important thing to do... perhaps via exposures??

@sungchun12
Copy link
Contributor

Wow, smart freshness runs. Now this answers a question I get all the time: "How do I get dbt jobs to run based on data freshness and not generically process everything?"

@sungchun12
Copy link
Contributor

@anaisvaillant and I want to work on this together 🧃

@jtcohen6, let us know if this is dependent on other functionality before we start! I noticed you explicitly mentioned having the one selector to rule them all. Is that something you'd want baked in for testing or that can wait?

@jtcohen6
Copy link
Contributor Author

Is this a good idea? A bad one? A huge step in the wrong direction? I'm open to discussion, and also experimentation. I included the big hair-brained idea as a motivation for this line of thinking; it can definitely wait.

@sungchun12 @anaisvaillant In that spirit, I'd be thrilled if you two want to play around with this. I imagine you could reuse some similar logic from #4016, though you'll need to find ways to handle multiple sources.json.

@barryaron @drewbanin I'd be especially interested in your thoughts here :)

@fabrice-etanchaud
Copy link

Hi Jérémy !
Thank you for your kind words concerning #4016 !
IMHO you are right, this is all about watermarks.

Here is a really good paper on the matter :

Data-Vault-Implementation-and-Automation-Consistency-and-Referential-Integrity

by Roelant Vos, a very very skilled data vault consultant. He wrote a paper about dbt

Did you already hear about him ?

His notion of "eventual consistency" is really interesting, and I wonder how it could apply to dbt, with its current threading feature, not to mention my dream of multi-processed partitioned DAG.

In data vault's world, The timestamp the data was fiirst loaded in the DWH is a first class citizen.
In any case, this seems to have something to do with the max_loaded_at feature !

(@jtcohen6)

@sungchun12
Copy link
Contributor

This is a great idea, and I'll expand on your original description with how I see it live and breathe in practice.

Problem 1

  • Current State: How do I create an events-based transformation pipeline, so I'm not generically running all 100 dbt models when I want to run only 10-15 dbt models every 10 minutes from my streamed in EL ingestion processes? Today, I have to go all or nothing or I have to look at history and manually scope out dbt commands to run the models in scope at specific time intervals.
  • Future State: I have a simple conditional workflow below.
    If dbt source freshness shows fresh data, then run dbt build -s sources:fresh+ --state my/previous/run/artifacts. If stale data only, run nothing. I have full confidence that all downstream data is based on the freshest data possible. No need to fiddle around with manually invoking certain models or over-engineering CLI commands to get things right.

Regarding Benn's Tweet, latency guarantees based on models to maintain(think: exposures) is a great working philosophy. Building this feature seems like it combats this vision at a surface level. However, this feature goes a step further in vision: latency is defined by ALL models in scope(with freshness requirements), not only those we manually scope downstream. If we work backwards from exposures, it will require at least one more step in the above workflow. I don't see value in an extra step to make some data fresh knowing ALL data can be fresh just as easily with less mental overhead.

"I want all my data to be as fresh as possible. I shouldn't have all these mental models and over-engineered project setups to feel confident of that." - Future data person

I'm excited for a future where events-based data experiences are the norm and not the exception.

For your open technical implementation questions, those are for future Sung and Anais to figure out in the planning process :)

@barryaron
Copy link

Completely agree with @sungchun12 here. This would be awesome, @jtcohen6.
Still doing work on some of the other open questions (should this live within run_results.json).

@Jeffrey-Amst
Copy link

Currently we are not able to do these things with dbt and therefore created all kinds of complex and expensive macros to check for looking at source freshness states. A solution like @sungchun12 did, would be great!

@GJMcClintock
Copy link

I love this idea, it will be so great.

@AdeelK93
Copy link

From reading dbt documentation, I was under the impression that this functionality was already implemented. But maybe it's not really implemented as I'm unable to get it working (I modify a source table, but it's not picked up in state:modified). Here's the line of code that should allegedly be comparing freshness.

image

It's totally possible I'm misunderstanding something here. Thanks for any clarity anyone can provide!

@GJMcClintock
Copy link

From reading dbt documentation, I was under the impression that this functionality was already implemented. But maybe it's not really implemented as I'm unable to get it working (I modify a source table, but it's not picked up in state:modified). Here's the line of code that should allegedly be comparing freshness.

image

It's totally possible I'm misunderstanding something here. Thanks for any clarity anyone can provide!

This is saying that if you change the source freshness config of a source, that is considered a modification and all dependent models will run starting with everything using that table as a source.

This issue suggestion is to only run dependencies that have new records after calculating source freshness.

@AdeelK93
Copy link

Thank you Garrett, that makes sense. Seems it's an ambiguous use of "freshness property" in the documentation.

The issue suggestion is really the functionality I'm interested in.

@joellabes
Copy link
Contributor

@AdeelK93 thanks for calling that out - I've just made a change in the docs repo based on your feedback. Do you think this is clearer?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request state Stateful selection (state:modified, defer)
Projects
None yet
Development

Successfully merging a pull request may close this issue.

9 participants