Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/incremental non full refresh #3387

Merged
merged 50 commits into from Jul 21, 2021

Conversation

matt-winkler
Copy link
Contributor

@matt-winkler matt-winkler commented May 24, 2021

addresses #1132

Description

Goal is to identify whether there are different schemas in the source and target relations during an incremental run. The allowed behaviors are ['ignore', 'fail', 'append', 'sync'] specified in a new config option called on_schema_change on the incremental.sql materialization.

New macros added to /core/dbt/include/global_project/macros/materializations/incremental/on_schema_change.sql:

incremental_validate_on_schema_change: defaults to 'ignore' if None or invalid values are specified
sync_schemas: runs ALTER TABLE statements depending on the value of on_schema_change and whether the source / target column sets differ.
process_schema_changes: orchestrator macro that determines which others in this module should be called based on configuration.
get_column_names: Extracts col.column values from a list of columns
diff_arrays: detects the difference between two arrays
diff_columns: detects the difference between two arrays of Columns, and calls diff_arrays
check_for_schema_changes: Determines if a source and target schema are different due to the presence / absence of columns and / or data type differences

Checklist

  • I have signed the CLA
  • I have run this code in development and it appears to resolve the stated issue
  • This PR includes tests, or tests are not required/relevant for this PR
  • I have updated the CHANGELOG.md and added information about my change to the "dbt next" section.

@cla-bot cla-bot bot added the cla:yes label May 24, 2021
@matt-winkler
Copy link
Contributor Author

@jtcohen6 I've run through this for Postgres / Snowflake / Bigquery, exluding the full_refresh behavior per our discussion offline.

Copy link
Contributor

@jtcohen6 jtcohen6 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coming along nicely! Thanks for the work shoring up the progress so far, outside the trickier implications of on_schema_change: full_refresh.

I left some comments around consolidating logic. I'd like to keep the "footprint" on materializations as light as possible, since that's where we have the least modularity, greatest complexity, and most duplication across adapters.

Let me know how I can help as you get into unit + integration tests!

Comment on lines 17 to 27
{% macro get_column_names(columns) %}

{% set result = [] %}

{% for col in columns %}
{{ result.append(col.column) }}
{% endfor %}

{{ return(result) }}

{% endmacro %}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this macro is effectively the same as columns | map(attribute = 'name'), right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Boom, map for the win.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On further testing, I found that and additional | list was needed

Comment on lines 131 to 133
{% macro alter_relation_add_remove_columns(relation, add_columns = none, remove_columns = none) -%}
{{ return(adapter.dispatch('alter_relation_add_remove_columns')(relation, add_columns, remove_columns)) }}
{% endmacro %}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove this dispatch call and just keep the bigquery__ implementation below

{% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %}

{% else %}
{% do adapter.expand_target_column_types(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's always expand_target_column_types before check_for_schema_changes, even if it means running a few extra metadata queries, in order to avoid over-aggressively dropping and recreating columns that just need to be varchar/numeric-type expanded

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great call - I tested this on Snowflake and works beautifully.

{% elif target_not_in_source != [] %}
{% set schema_changed = True %}
{% endif %}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we centralize all schema-change-related logging in this macro, to avoid repeating it in the materialization code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

{%- endset -%}

{% set build_sql = get_merge_sql(target_relation, source_sql, unique_key, dest_columns) %}
{% set tmp_relation = make_temp_relation(target_relation) %}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stylistically, I think we can reorder the logic below to avoid repetition:

{% set tmp_relation = make_temp_relation(target_relation) %}
{% if on_schema_change != 'ignore' %}
  {# only create the temp table if it's needed. can we make this a free query? #}
  {% do run_query(create_table_as(True, tmp_relation, sql)) %}
  {% set schema_changed = check_for_schema_changes(tmp_relation, target_relation) %}
  {% if schema_changed %}
    {% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
  {% endif %}
{% endif %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% set build_sql = dbt_bq_get_incremental_sql(strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns) %}

Substantively, there is something subtle going on here: We're creating a temp table first thing, in order to power schema comparison. I think it's fair to document that the "price" of dynamic schema evolution is the need to produce that temp table, but today, dbt does not produce a temp table for the merge strategy, and produces that temp table as part of a script in the insert_overwrite strategy.

I'm wondering if we could adjust the temp table creation to be a "free" ($0) query, i.e. by adding a where false limit 0 condition. So like:

{% set tmp_sql %}
  select * from ({{ sql }}) where false limit 0
{% endset %}
{% do run_query(create_table_as(True, tmp_relation, tmp_sql)) %}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented this change on snowflake and rolling to other providers.

{% else %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% do adapter.expand_target_column_types(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As with the default materialization, let's consolidate this logic to:

  1. Always expand target column types first thing
  2. Check for schema changes if and only if on_schema_change != 'ignore'. If there are schema changes, process them, endif.
  3. Grab dest_columns and set build_sql

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is done for snowflake and porting to the other adapters

@matt-winkler matt-winkler temporarily deployed to Snowflake July 21, 2021 04:12 Inactive
@matt-winkler matt-winkler temporarily deployed to Bigquery July 21, 2021 04:12 Inactive
@matt-winkler matt-winkler temporarily deployed to Bigquery July 21, 2021 04:12 Inactive
…ental/incremental.sql


Accept Jeremy's suggested change

Co-authored-by: Jeremy Cohen <jeremy@fishtownanalytics.com>
@matt-winkler matt-winkler temporarily deployed to Bigquery July 21, 2021 15:44 Inactive
@matt-winkler matt-winkler temporarily deployed to Bigquery July 21, 2021 15:44 Inactive
@matt-winkler matt-winkler temporarily deployed to Redshift July 21, 2021 15:44 Inactive
@matt-winkler matt-winkler temporarily deployed to Redshift July 21, 2021 15:44 Inactive
@matt-winkler matt-winkler temporarily deployed to Snowflake July 21, 2021 15:44 Inactive
@matt-winkler matt-winkler temporarily deployed to Snowflake July 21, 2021 15:44 Inactive
@jtcohen6 jtcohen6 temporarily deployed to Postgres July 21, 2021 19:47 Inactive
@jtcohen6 jtcohen6 temporarily deployed to Redshift July 21, 2021 19:47 Inactive
@jtcohen6 jtcohen6 temporarily deployed to Redshift July 21, 2021 19:47 Inactive
@jtcohen6 jtcohen6 temporarily deployed to Bigquery July 21, 2021 19:47 Inactive
@jtcohen6 jtcohen6 temporarily deployed to Bigquery July 21, 2021 19:47 Inactive
@jtcohen6 jtcohen6 temporarily deployed to Snowflake July 21, 2021 19:47 Inactive
@jtcohen6 jtcohen6 temporarily deployed to Snowflake July 21, 2021 19:47 Inactive
Copy link
Contributor

@jtcohen6 jtcohen6 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's ship it :)

@jtcohen6 jtcohen6 merged commit bd70106 into develop Jul 21, 2021
@jtcohen6 jtcohen6 deleted the feature/incremental-non-full-refresh branch July 21, 2021 19:49
Copy link
Contributor

@b-luu b-luu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is brillant !!

Thanks a lot for your work here @matt-winkler & @jtcohen6 !

(I needed this !)

I've mainly added a note in sync_column_schemas as I think the current implementation would break if configured in append mode.

Also a question (for context) in incremental_upsert, but I can open a new issue for that if preferred ?

@@ -1,5 +1,6 @@

{% macro incremental_upsert(tmp_relation, target_relation, unique_key=none, statement_name="main") %}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@matt-winkler @jtcohen6 do you have context on why the existing "target" columns are used as dest_columns below ?
Why not use the "tmp" increment's own columns ?

Sorry, I stumbled on this PR because 1. I was trying to implement schema evolution on incremental runs as well and 2. I had issues with some of my increments not necessarily having all the columns of the existing "target".

So I've used tmp_relation instead in the line below in a custom materialization on my side.

If the reason is unclear, I'll open an issue to have a broader discussion on this topic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @b-luu here are my thoughts on use of target columns for each scenario:

  • If we ignore or fail in the case of a schema change (or leave off the on_schema_change entirely), we assume that the UPSERT is targeted towards the existing columns in the target table. Basically the thinking here is that the target schema should not be changing under these configuration options.
  • If we append_new_columns or sync_all_columns, the column addition / syncing actions have already been performed by the time we reach this point in the code.
  • I'd be interested to see more examples of the issue you describe @b-luu to see if we're missing something here; let us know!

@jtcohen6 Please comment if the above seems off

Schema change approach: {{ on_schema_change }}
Columns added: {{ add_to_target_arr }}
Columns removed: {{ remove_from_target_arr }}
Data types changed: {{ new_target_types }}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Warning: remove_from_target_arr and new_target_types are only set if on_schema_change == 'sync_all_columns'.

This should raise an exception if on_schema_change == 'append_new_columns'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@b-luu The assumption here is that when someone only wants to append new columns, we don't need to be concerned with existing column data type changes, or columns that have been removed from the upstream model vs. the target incremental model. Those scenarios are handled with the sync_all_columns configuration option.

@jtcohen6 Please comment if the above seems off

iknox-fa pushed a commit that referenced this pull request Feb 8, 2022
* detect and act on schema changes

* update incremental helpers code

* update changelog

* fix error in diff_columns from testing

* abstract code a bit further

* address matching names vs. data types

* Update CHANGELOG.md

Co-authored-by: Jeremy Cohen <jeremy@fishtownanalytics.com>

* updates from Jeremy's feedback

* multi-column add / remove with full_refresh

* simple changes from JC's feedback

* updated for snowflake

* reorganize postgres code

* reorganize approach

* updated full refresh trigger logic

* fixed unintentional wipe behavior

* catch final else condition

* remove WHERE string replace

* touch ups

* port core to snowflake

* added bigquery code

* updated impacted unit tests

* updates from linting tests

* updates from linting again

* snowflake updates from further testing

* fix logging

* clean up incremental logic

* updated for bigquery

* update postgres with new strategy

* update nodeconfig

* starting integration tests

* integration test for ignore case

* add test for append_new_columns

* add integration test for sync

* remove extra tests

* add unique key and snowflake test

* move incremental integration test dir

* update integration tests

* update integration tests

* Suggestions for #3387 (#3558)

* PR feedback: rationalize macros + logging, fix + expand tests

* Rm alter_column_types, always true for sync_all_columns

* update logging and integration test on sync

* update integration tests

* test fix SF integration tests

Co-authored-by: Matt Winkler <matt.winkler@fishtownanalytics.com>

* rename integration test folder

* Update core/dbt/include/global_project/macros/materializations/incremental/incremental.sql

Accept Jeremy's suggested change

Co-authored-by: Jeremy Cohen <jeremy@fishtownanalytics.com>

* Update changelog [skip ci]

Co-authored-by: Jeremy Cohen <jeremy@fishtownanalytics.com>

automatic commit by git-black, original commits:
  2799a8c
  bd70106
iknox-fa pushed a commit that referenced this pull request Feb 8, 2022
* detect and act on schema changes

* update incremental helpers code

* update changelog

* fix error in diff_columns from testing

* abstract code a bit further

* address matching names vs. data types

* Update CHANGELOG.md

Co-authored-by: Jeremy Cohen <jeremy@fishtownanalytics.com>

* updates from Jeremy's feedback

* multi-column add / remove with full_refresh

* simple changes from JC's feedback

* updated for snowflake

* reorganize postgres code

* reorganize approach

* updated full refresh trigger logic

* fixed unintentional wipe behavior

* catch final else condition

* remove WHERE string replace

* touch ups

* port core to snowflake

* added bigquery code

* updated impacted unit tests

* updates from linting tests

* updates from linting again

* snowflake updates from further testing

* fix logging

* clean up incremental logic

* updated for bigquery

* update postgres with new strategy

* update nodeconfig

* starting integration tests

* integration test for ignore case

* add test for append_new_columns

* add integration test for sync

* remove extra tests

* add unique key and snowflake test

* move incremental integration test dir

* update integration tests

* update integration tests

* Suggestions for #3387 (#3558)

* PR feedback: rationalize macros + logging, fix + expand tests

* Rm alter_column_types, always true for sync_all_columns

* update logging and integration test on sync

* update integration tests

* test fix SF integration tests

Co-authored-by: Matt Winkler <matt.winkler@fishtownanalytics.com>

* rename integration test folder

* Update core/dbt/include/global_project/macros/materializations/incremental/incremental.sql

Accept Jeremy's suggested change

Co-authored-by: Jeremy Cohen <jeremy@fishtownanalytics.com>

* Update changelog [skip ci]

Co-authored-by: Jeremy Cohen <jeremy@fishtownanalytics.com>

automatic commit by git-black, original commits:
  bd70106
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants