Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

πŸŽ‰ Incremental Normalization #7162

Merged
merged 31 commits into from
Oct 29, 2021

Conversation

ChristopheDuong
Copy link
Contributor

@ChristopheDuong ChristopheDuong commented Oct 19, 2021

What

Closes #4286

How

This PR is pretty "standard" when using dbt and is mostly following instructions from https://docs.getdbt.com/docs/building-a-dbt-project/building-models/configuring-incremental-models (you will find many copy/paste parts between that doc page and this PR whether in the code or in the description)

What is an incremental model?

Incremental models are built as tables in your data warehouse – the first time a model is run, the table is built by transforming all rows of source data. On subsequent runs, dbt transforms only the rows in your source data that you tell dbt to filter for, inserting them into the table that has already been built (the target table).

Often, the rows you filter for on an incremental run will be the rows in your source data that have been created or updated since the last time dbt ran. As such, on each dbt run, your model gets built incrementally.

Using an incremental model limits the amount of data that needs to be transformed, vastly reducing the runtime of your transformations. This improves warehouse performance and reduces compute costs.

To use incremental models, you also need to tell dbt:

  1. how to filter the rows on an incremental run.
  2. the uniqueness constraint of the model (if any).

1. Filtering rows on an incremental run

To tell dbt which rows it should transform on an incremental run, we append the incremental_clause() macro (defined in airbyte-integrations/bases/base-normalization/dbt-project-template/macros/incremental.sql) in where clauses at the end of every model generated by normalization.

This ensures that every model filters for "new" rows, as in, rows that have been created since the last time dbt ran this model.

The best way to find the timestamp of the most recent run of this model is by checking the most recent timestamp in the target table. dbt makes it easy to query your target table by using the "{{ this }}" variable.

and {{ col_emitted_at }} > (select max({{ col_emitted_at }}) from {{ this }})

This condition is surrounded in jinja by {% if is_incremental() %} statement to toggle the incremental behavior depending on the mode dbt is run with for that particular model: full_refresh vs incremental?

The is_incremental() macro will return True if:

- the destination table already exists in the database
- dbt is not running in full-refresh mode
- the running model is configured with materialized='incremental'

2. Defining a uniqueness constraint (optional)

This is totally optional and we could completely ignore this (except in deduplicated tables that relies on primary keys)
However, we made the choice to always put a unique constraint on all models generated by normalization.

unique_key is an optional parameter for incremental models that specifies a field that should be unique within your model. If the unique key of an existing row in your target table matches one of your incrementally transformed rows, the existing row will be updated. This ensures that you don't have multiple rows in your target table for a single row in your source data.

You can define unique_key in a configuration block at the top of your model. The unique_key should be a single field name that is present in your model definition. While some databases support using expressions (eg. concat(user_id, session_number)), this syntax is not universally supported, so is not recommended. If you do not have a single field that is unique, consider first creating such a field in your model.

Building this model incrementally without the unique_key parameter would result in multiple rows in the target table for a single day – one row for each time dbt runs on that day. Instead, the inclusion of the unique_key parameter ensures the existing row is updated instead.
Without unique_key, the generated SQL would be a simple insert into whereas with the unique_key defined, it is generating (when possible) a merge on unique_key statement (or equivalent through delete where unique_key; insert into where unique_key)

For every stream, we do have a single field that is unique for each row: _airbyte_ab_id in the _airbyte_raw tables.
Airbyte destination connectors always included a randomUUID value in the _airbyte_ab_id column when writing a new row. This was always present on the _airbyte_raw tables but unused in normalization until now.

Thus we can use that for every normalized table too. In the special case of deduplicated tables, we need to use (potentially composite) primary keys defined by the user, so we first generate a single column _airbyte_unique_key by hashing the primary keys together (in generate_scd_type_2_model function in stream_processor.py).

For these special tables that need to be deduplicated, if we don't specify the unique_key, we risk breaking the constraint of having only one row per primary key (not being fully deduplicated anymore). So, it is mandatory to have the unique_key = primary keys.

Because we rely on this new column _airbyte_ab_id column to be present on all normalized tables (which were not propagated before this PR). We are effectively introducing a schema change with a new "mandatory" field (non-nullable).

So, how does dbt handle this schema change?

New on_schema_change config in dbt version v0.21.0

Incremental models can now be configured to include an optional on_schema_change parameter to enable additional control when incremental model columns change. These options enable dbt to continue running incremental models in the presence of schema changes,

The modes we could choose are (among other modes that raise exceptions or do nothing when detecting changes):

  • append_new_columns: Append new columns to the existing table. Note that this setting does not remove columns from the existing table that are not present in the new data.
  • sync_all_columns: Adds any new columns to the existing table, and removes any columns that are now missing. Note that this is inclusive of data type changes. On BigQuery, changing column types requires a full table scan; be mindful of the trade-offs when implementing.

Note: None of the on_schema_change behaviors backfill values in old records for newly added columns which is our case as we want _airbyte_ab_id to be always populated! Too bad, we can't leverage this... so FULL REFRESH!

To force dbt to rebuild the entire incremental model from scratch, we can use the --full-refresh flag on the command line. This flag will cause dbt to drop the existing target table in the database before rebuilding it for all-time for all rows. (basically, it reverts to behave like older versions of normalization)

But normalization is always run without that flag on the CLI (since we want incremental now!) and we can't tweak it from the UI or Airbyte worker. (plus we may run into scenarios where only certain models need full-refresh and others don't, so we need a more granular way to trigger full_refresh, aka per stream)

Full refresh existing destinations tables (when migrating from older normalization version to this PR)

To implement this more granular full_refresh decision, we override dbt's special macro should_full_refresh in airbyte-integrations/bases/base-normalization/dbt-project-template/macros/should_full_refresh.sql

The macro tries to detect if there are existing normalized tables in the destination that were produced by older versions of normalization (ie that don't have a _airbyte_ab_id columns). In that case, it returns true to signal a need for full_refresh.

If we didn't choose to have a unique_key constraint by _airbyte_ab_id, we wouldn't need this logic at all.

But introducing this column now allows users to make a clean join between the normalized tables and the raw tables and align how the JSON blob is expanded into tabular columns. As a result, we can also ensure that one row in the raw tables produces only one row in the normalized tables. We avoid the risk that multiple normalized rows are duplicated every time dbt is run. This is a new concern because we are now in incremental and not rebuilding the whole table from 0 every time dbt is run.

Recommended reading order

In terms of code changes:

  1. The main implementation airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py
  2. The macro for incremental airbyte-integrations/bases/base-normalization/dbt-project-template/macros/incremental.sql
  3. The macro to detect when to full-refresh (for migrating to new normalization version with incremental) airbyte-integrations/bases/base-normalization/dbt-project-template/macros/should_full_refresh.sql

Or in terms of changes to the generated SQL:

  1. Normal tables built on top of raw tables directly: airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/test_normalization/exchange_rate.sql
    Screenshot 2021-10-22 at 11 57 21
  2. Deduplicated tables (using different unique_key) built on top of the SCD tables: airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/test_normalization/dedup_exchange_rate.sql
    Screenshot 2021-10-22 at 11 57 43
  3. SCD table where the unique_key column is built:
    Screenshot 2021-10-22 at 11 58 02

Under the (dbt) hood

  1. for full refresh version or if table doesn't exist yet airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/first_output/airbyte_incremental/test_normalization/exchange_rate.sql
    Screenshot 2021-10-22 at 11 58 39
  2. when incrementally updating it, dbt runs the incremental read query internally and probably built a temp table that contains only the new records to be transformed. Then, the SQL engine will incrementally overwrites in the destination table using the following query: airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/second_output/airbyte_incremental/test_normalization/exchange_rate.sql
    Screenshot 2021-10-22 at 11 58 57
  • By using an insert into ... query, only the new data records are written = incrementally.
  • When using a create table ... as select ... from ... query, it would rewrites the entire data = non-incremental
  • the delete ... from ... query ensures unicity of records per _airbyte_ab_id or _airbyte_unique_key

Known Issues or limitations

While implementing this PR, I ran into the following issues that need to be addressed in the future:

  1. dbt incremental does not do well yet with special characters and quotes in columns names. Corresponding failing tests cases in normalization are therefore disabled.
  2. dbt incremental does not work out of the box for dbt-community-contributed adapters for MySql and Oracle. Additional effort is required to make those work in incremental. (they are not based on dbt 0.21.0 which introduces quite some improvements over incremental & schema change)
  3. This PR introduces performance optimizations for mostly WRITE operations by normalization. READ performance are still not optimal and requires additional changes in specific destination connectors to "index" (partition) efficiently the raw tables, so normalization can take advantage of it when retrieving the new data to be normalized.
  4. An experimental PR to push limits on schema changes while running incremental normalization is proposed here: Add test for schema change on incremental normalizationΒ #7277
  5. incremental normalization produces partition keys on the final normalized tables by default, you can follow this guide (on any destinations supported by dbt) to add/edit/tweak them: https://airbyte.io/recipes/bigquery-partition-cluster. These may (or may not) have been used by destination connectors on the raw tables

@ChristopheDuong ChristopheDuong temporarily deployed to more-secrets October 19, 2021 12:19 Inactive

{%- macro incremental_clause(col_emitted_at) -%}
{% if is_incremental() %}
and {{ col_emitted_at }} > (select max({{ col_emitted_at }}) from {{ this }})
Copy link
Contributor Author

@ChristopheDuong ChristopheDuong Oct 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @andresbravog

The incremental clause is isolated in a dbt macro to make it easier for a user to override it without having to rebuild the normalization docker image. It would be doable by exporting the generated dbt project and editing the macro file to behave differently as mentioned here #4286 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it important for this the col_emitted_at to be indexed so that we avoid a full table scan on this query?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having an and in here feels wrong, the calling context should have knowledge of how to chain these predicates together whereas this macros can't be expected to know that. So shouldn't the context have the and?

Copy link
Contributor Author

@ChristopheDuong ChristopheDuong Oct 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it important for this the col_emitted_at to be indexed so that we avoid a full table scan on this query?

Yes, it's important for READ performances and dependent on the destination

That's why on some warehouse destinations, we would need to introduce the option of partitioning/clustering on raw tables. Maybe on databases destinations, it'd make sense to do create index.

without those changes on destinations sides, this PR starts to introduce optimization on the WRITE side at least.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChristopheDuong should this be >=? do we have a guarantee that in between normalization runs that another record with the same timestamp cannot be added? i don't think we have that guarantee. especially dodgy since the emitted_at timestamp is created by the worker. since we can't rely on the fact that timestamps are monotonically increasing, i think we always have to do >=. I think that's okay, because you handle deduping records with airbyte_ab_id, so the only cost is we may re-process a handful of records. That seems fine relative to the potential of missing a few records.

(this is another agument for keeping the raw data around like we were talking about the other day. it is definitely nice to be able to go back and re-process if we make a mistake in normalization without having to resend data).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes we can make it >= just in case

@ChristopheDuong ChristopheDuong temporarily deployed to more-secrets October 19, 2021 12:29 Inactive
@ChristopheDuong ChristopheDuong temporarily deployed to more-secrets October 19, 2021 12:48 Inactive
@jrhizor jrhizor temporarily deployed to more-secrets October 19, 2021 13:13 Inactive
@ChristopheDuong
Copy link
Contributor Author

ChristopheDuong commented Oct 19, 2021

/test connector=bases/base-normalization

πŸ•‘ bases/base-normalization https://github.com/airbytehq/airbyte/actions/runs/1359887912
βœ… bases/base-normalization https://github.com/airbytehq/airbyte/actions/runs/1359887912
Python tests coverage:

	 ---------- coverage: platform linux, python 3.8.10-final-0 -----------
	 Name                                       Stmts   Miss  Cover
	 --------------------------------------------------------------
	 base_python/__init__.py                       13      0   100%
	 base_python/catalog_helpers.py                10      6    40%
	 base_python/cdk/__init__.py                    0      0   100%
	 base_python/cdk/abstract_source.py            83     59    29%
	 base_python/cdk/streams/__init__.py            0      0   100%
	 base_python/cdk/streams/auth/__init__.py       0      0   100%
	 base_python/cdk/streams/auth/core.py           8      1    88%
	 base_python/cdk/streams/auth/jwt.py            5      5     0%
	 base_python/cdk/streams/auth/oauth.py         37     26    30%
	 base_python/cdk/streams/auth/token.py          9      4    56%
	 base_python/cdk/streams/core.py               63     32    49%
	 base_python/cdk/streams/exceptions.py         10      2    80%
	 base_python/cdk/streams/http.py               67     33    51%
	 base_python/cdk/streams/rate_limiting.py      30     14    53%
	 base_python/cdk/utils/__init__.py              0      0   100%
	 base_python/cdk/utils/casing.py                4      0   100%
	 base_python/client.py                         56     33    41%
	 base_python/entrypoint.py                     70     56    20%
	 base_python/integration.py                    52     25    52%
	 base_python/logger.py                         33     19    42%
	 base_python/schema_helpers.py                 56     41    27%
	 base_python/source.py                         51     34    33%
	 main_dev.py                                    3      3     0%
	 --------------------------------------------------------------
	 TOTAL                                        660    393    40%
	 ---------- coverage: platform linux, python 3.8.10-final-0 -----------
	 Name                                                              Stmts   Miss  Cover
	 -------------------------------------------------------------------------------------
	 main_dev_transform_catalog.py                                         3      3     0%
	 main_dev_transform_config.py                                          3      3     0%
	 normalization/__init__.py                                             4      0   100%
	 normalization/destination_type.py                                    12      0   100%
	 normalization/transform_catalog/__init__.py                           2      0   100%
	 normalization/transform_catalog/catalog_processor.py                143     77    46%
	 normalization/transform_catalog/destination_name_transformer.py     120      6    95%
	 normalization/transform_catalog/reserved_keywords.py                 11      0   100%
	 normalization/transform_catalog/stream_processor.py                 395    236    40%
	 normalization/transform_catalog/table_name_registry.py              174     34    80%
	 normalization/transform_catalog/transform.py                         45     26    42%
	 normalization/transform_catalog/utils.py                             33      7    79%
	 normalization/transform_config/__init__.py                            2      0   100%
	 normalization/transform_config/transform.py                         140     29    79%
	 -------------------------------------------------------------------------------------
	 TOTAL                                                              1087    421    61%
	 ---------- coverage: platform linux, python 3.8.10-final-0 -----------
	 Name                                                 Stmts   Miss  Cover
	 ------------------------------------------------------------------------
	 source_acceptance_test/__init__.py                       2      0   100%
	 source_acceptance_test/base.py                          10      4    60%
	 source_acceptance_test/config.py                        74      8    89%
	 source_acceptance_test/conftest.py                     108    108     0%
	 source_acceptance_test/plugin.py                        47     47     0%
	 source_acceptance_test/tests/__init__.py                 4      0   100%
	 source_acceptance_test/tests/test_core.py              200     94    53%
	 source_acceptance_test/tests/test_full_refresh.py       18     11    39%
	 source_acceptance_test/tests/test_incremental.py        69     38    45%
	 source_acceptance_test/utils/__init__.py                 6      0   100%
	 source_acceptance_test/utils/asserts.py                 37      2    95%
	 source_acceptance_test/utils/common.py                  41     24    41%
	 source_acceptance_test/utils/compare.py                 47     20    57%
	 source_acceptance_test/utils/connector_runner.py        82     49    40%
	 source_acceptance_test/utils/json_schema_helper.py     115     14    88%
	 ------------------------------------------------------------------------
	 TOTAL                                                  860    419    51%
	 ---------- coverage: platform linux, python 3.8.10-final-0 -----------
	 Name                                                              Stmts   Miss  Cover
	 -------------------------------------------------------------------------------------
	 main_dev_transform_catalog.py                                         3      3     0%
	 main_dev_transform_config.py                                          3      3     0%
	 normalization/__init__.py                                             4      0   100%
	 normalization/destination_type.py                                    12      0   100%
	 normalization/transform_catalog/__init__.py                           2      0   100%
	 normalization/transform_catalog/catalog_processor.py                143     77    46%
	 normalization/transform_catalog/destination_name_transformer.py     120      6    95%
	 normalization/transform_catalog/reserved_keywords.py                 11      0   100%
	 normalization/transform_catalog/stream_processor.py                 395    236    40%
	 normalization/transform_catalog/table_name_registry.py              174     34    80%
	 normalization/transform_catalog/transform.py                         45     26    42%
	 normalization/transform_catalog/utils.py                             33      7    79%
	 normalization/transform_config/__init__.py                            2      0   100%
	 normalization/transform_config/transform.py                         140     29    79%
	 -------------------------------------------------------------------------------------
	 TOTAL                                                              1087    421    61%
	 ---------- coverage: platform linux, python 3.8.10-final-0 -----------
	 Name                                                              Stmts   Miss  Cover
	 -------------------------------------------------------------------------------------
	 main_dev_transform_catalog.py                                         3      3     0%
	 main_dev_transform_config.py                                          3      3     0%
	 normalization/__init__.py                                             4      0   100%
	 normalization/destination_type.py                                    12      0   100%
	 normalization/transform_catalog/__init__.py                           2      0   100%
	 normalization/transform_catalog/catalog_processor.py                143     12    92%
	 normalization/transform_catalog/destination_name_transformer.py     120      4    97%
	 normalization/transform_catalog/reserved_keywords.py                 11      0   100%
	 normalization/transform_catalog/stream_processor.py                 395     33    92%
	 normalization/transform_catalog/table_name_registry.py              174     51    71%
	 normalization/transform_catalog/transform.py                         45     30    33%
	 normalization/transform_catalog/utils.py                             33      0   100%
	 normalization/transform_config/__init__.py                            2      0   100%
	 normalization/transform_config/transform.py                         140     45    68%
	 -------------------------------------------------------------------------------------
	 TOTAL                                                              1087    181    83%

@ChristopheDuong ChristopheDuong temporarily deployed to more-secrets October 19, 2021 15:41 Inactive
@jrhizor jrhizor temporarily deployed to more-secrets October 19, 2021 15:43 Inactive
@ChristopheDuong
Copy link
Contributor Author

ChristopheDuong commented Oct 26, 2021

/publish connector=bases/base-normalization

πŸ•‘ bases/base-normalization https://github.com/airbytehq/airbyte/actions/runs/1384494623
βœ… bases/base-normalization https://github.com/airbytehq/airbyte/actions/runs/1384494623

@ChristopheDuong ChristopheDuong temporarily deployed to more-secrets October 26, 2021 07:07 Inactive
@jrhizor jrhizor temporarily deployed to more-secrets October 26, 2021 07:08 Inactive
@ChristopheDuong ChristopheDuong temporarily deployed to more-secrets October 26, 2021 12:32 Inactive
@ChristopheDuong ChristopheDuong temporarily deployed to more-secrets October 26, 2021 14:07 Inactive
@ChristopheDuong ChristopheDuong temporarily deployed to more-secrets October 26, 2021 18:03 Inactive
@jrhizor jrhizor temporarily deployed to more-secrets October 26, 2021 18:04 Inactive
@ChristopheDuong ChristopheDuong temporarily deployed to more-secrets October 27, 2021 18:18 Inactive
@github-actions github-actions bot added the area/connectors Connector related issues label Oct 27, 2021
@ChristopheDuong ChristopheDuong temporarily deployed to more-secrets October 27, 2021 18:25 Inactive
* Fix incremental update of SCD tables

* add comments

* format code

* test

* Restore partitioning on final dedup model

* Apply changes from code review

* Add some sample SQL outputs

* format code

* Remove unused method

* update comments

* add comments

* add docs

* Add test

* Fix acceptance tests clean up

* Apply suggestions from code review

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>
@ChristopheDuong ChristopheDuong temporarily deployed to more-secrets October 29, 2021 08:20 Inactive
@ChristopheDuong
Copy link
Contributor Author

ChristopheDuong commented Oct 29, 2021

/publish connector=bases/base-normalization

πŸ•‘ bases/base-normalization https://github.com/airbytehq/airbyte/actions/runs/1398133365
❌ bases/base-normalization https://github.com/airbytehq/airbyte/actions/runs/1398133365
πŸ•‘ bases/base-normalization https://github.com/airbytehq/airbyte/actions/runs/1398133365
βœ… bases/base-normalization https://github.com/airbytehq/airbyte/actions/runs/1398133365

@jrhizor jrhizor temporarily deployed to more-secrets October 29, 2021 08:25 Inactive
@jrhizor jrhizor temporarily deployed to more-secrets October 29, 2021 09:38 Inactive
@ChristopheDuong ChristopheDuong temporarily deployed to more-secrets October 29, 2021 11:22 Inactive

## Partitioning, clustering, sorting, indexing

Normalization produces tables that are partitioned, clustered, sorted or indexed depending on the destination engine and on the type of tables being built. The goal of these are to make read more performant, especially when running incremental updates.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@avaidyanatha Maybe it's worth to point this new behavior in normalization (incremental + partitions) when releasing next version.

I am not sure if some exceptions are going to be raised or not. But I've noticed sometimes BigQuery throwing exceptions about detecting the partitioning specs being changed. Thus, requiring a manual drop of tables first.

Users (in bigquery at least, others should be fine...) may have to manually delete their normalized tables in the destinations (but keep the raw tables!)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues area/documentation Improvements or additions to documentation area/platform issues related to the platform area/worker Related to worker normalization
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Improve normalization incremental runtime (incremental DBT)
5 participants