Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Incremental Normalization #7162

Merged
merged 31 commits into from
Oct 29, 2021
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
872c1f7
implement incremental normalization
ChristopheDuong Oct 7, 2021
e025720
Add where clause
ChristopheDuong Oct 14, 2021
235688b
Add incremental clause
ChristopheDuong Oct 14, 2021
8244cc1
Implement incremental tests for normalization
ChristopheDuong Oct 15, 2021
f675a93
disable test with quotes in columns
ChristopheDuong Oct 15, 2021
18fc89b
normalize test outputs
ChristopheDuong Oct 18, 2021
86fa2bd
add ab_id
ChristopheDuong Oct 18, 2021
1e7bf64
Reenable cleaning
ChristopheDuong Oct 18, 2021
6e7a64e
Handle normalization migration to incremental
ChristopheDuong Oct 18, 2021
f2757e1
Re-Implement should_full_refresh macro to backfill _airbyte_ab_id
ChristopheDuong Oct 18, 2021
1be482e
Modify incremental test case for nested streams
ChristopheDuong Oct 19, 2021
9a5f0f0
move unique_key to dbt_project.yml
ChristopheDuong Oct 19, 2021
aa14022
Add comment on disabled test
ChristopheDuong Oct 19, 2021
e8169c8
Add some sample SQL output
ChristopheDuong Oct 19, 2021
930fd1c
Move unique_key from dbt_project to env variable...
ChristopheDuong Oct 19, 2021
335240c
Format code
ChristopheDuong Oct 19, 2021
a3ea02a
remove unecessary check for full refresh
ChristopheDuong Oct 21, 2021
de66cae
sample sql outputs
ChristopheDuong Oct 22, 2021
13d07d2
bumpversion
ChristopheDuong Oct 25, 2021
35490bc
Add docs
ChristopheDuong Oct 25, 2021
04b62d4
add schema change tsts
ChristopheDuong Oct 25, 2021
777f5d5
improve incremental macro
ChristopheDuong Oct 25, 2021
5632c7b
add cast
ChristopheDuong Oct 25, 2021
9f35604
Merge remote-tracking branch 'origin/master' into normalization-incre…
ChristopheDuong Oct 26, 2021
a8af628
fix massql tests
ChristopheDuong Oct 26, 2021
a778537
Fix scd tests
ChristopheDuong Oct 26, 2021
fc54ec8
Bump again
ChristopheDuong Oct 26, 2021
4fd8e4d
Re-enable tests
ChristopheDuong Oct 27, 2021
47f1518
Merge remote-tracking branch 'origin/master' into normalization-incre…
ChristopheDuong Oct 27, 2021
fdba19c
format code
ChristopheDuong Oct 27, 2021
fd395a3
Fix normalization incremental scd (#7428)
ChristopheDuong Oct 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte-integrations/bases/base-normalization/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ integration_tests/normalization_test_output/*/*/*.json
integration_tests/normalization_test_output/*/*/*.md
integration_tests/normalization_test_output/*/*/macros/
integration_tests/normalization_test_output/*/*/tests/
integration_tests/normalization_test_output/*/*/models/dbt_data_tests_tmp/
integration_tests/normalization_test_output/*/*/models/dbt_schema_tests/

Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,20 @@ quoting:
# are materialized, and more!
models:
airbyte_utils:
+materialized: table
generated:
airbyte_ctes:
+tags: airbyte_internal_cte
+materialized: ephemeral
airbyte_views:
+tags: airbyte_internal_views
+materialized: view
airbyte_incremental:
+tags: incremental_tables
+materialized: incremental
airbyte_tables:
+tags: normalized_tables
+materialized: table
+materialized: table
airbyte_views:
+tags: airbyte_internal_views
+materialized: view

vars:
dbt_utils_dispatch_list: ['airbyte_utils']
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,22 @@ quoting:
# are materialized, and more!
models:
airbyte_utils:
+materialized: table
generated:
airbyte_ctes:
+tags: airbyte_internal_cte
+materialized: ephemeral
airbyte_views:
+tags: airbyte_internal_views
+materialized: view
airbyte_incremental:
+tags: incremental_tables
# incremental is not enabled for MySql yet
#+materialized: incremental
+materialized: table
airbyte_tables:
+tags: normalized_tables
+materialized: table
+materialized: table
airbyte_views:
+tags: airbyte_internal_views
+materialized: view

vars:
dbt_utils_dispatch_list: ['airbyte_utils']
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,22 @@ quoting:
# are materialized, and more!
models:
airbyte_utils:
+materialized: table
generated:
airbyte_ctes:
+tags: airbyte_internal_cte
+materialized: ephemeral
airbyte_views:
+tags: airbyte_internal_views
+materialized: view
airbyte_incremental:
+tags: incremental_tables
# incremental is not enabled for Oracle yet
#+materialized: incremental
+materialized: table
airbyte_tables:
+tags: normalized_tables
+materialized: table
+materialized: table
airbyte_views:
+tags: airbyte_internal_views
+materialized: view

vars:
dbt_utils_dispatch_list: ['airbyte_utils']
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,21 @@ quoting:
# are materialized, and more!
models:
airbyte_utils:
+materialized: table
generated:
airbyte_ctes:
+tags: airbyte_internal_cte
+materialized: ephemeral
airbyte_views:
+tags: airbyte_internal_views
+materialized: view
airbyte_incremental:
+tags: incremental_tables
+materialized: incremental
+on_schema_change: append_new_columns
airbyte_tables:
+tags: normalized_tables
+materialized: table
+materialized: table
airbyte_views:
+tags: airbyte_internal_views
+materialized: view

dispatch:
- macro_namespace: dbt_utils
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{#
This macro controls how incremental models are updated in Airbyte's normalization step
#}

{%- macro incremental_clause(col_emitted_at) -%}
{% if is_incremental() %}
and {{ col_emitted_at }} > (select max({{ col_emitted_at }}) from {{ this }})
Copy link
Contributor Author

@ChristopheDuong ChristopheDuong Oct 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @andresbravog

The incremental clause is isolated in a dbt macro to make it easier for a user to override it without having to rebuild the normalization docker image. It would be doable by exporting the generated dbt project and editing the macro file to behave differently as mentioned here #4286 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it important for this the col_emitted_at to be indexed so that we avoid a full table scan on this query?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having an and in here feels wrong, the calling context should have knowledge of how to chain these predicates together whereas this macros can't be expected to know that. So shouldn't the context have the and?

Copy link
Contributor Author

@ChristopheDuong ChristopheDuong Oct 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it important for this the col_emitted_at to be indexed so that we avoid a full table scan on this query?

Yes, it's important for READ performances and dependent on the destination

That's why on some warehouse destinations, we would need to introduce the option of partitioning/clustering on raw tables. Maybe on databases destinations, it'd make sense to do create index.

without those changes on destinations sides, this PR starts to introduce optimization on the WRITE side at least.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChristopheDuong should this be >=? do we have a guarantee that in between normalization runs that another record with the same timestamp cannot be added? i don't think we have that guarantee. especially dodgy since the emitted_at timestamp is created by the worker. since we can't rely on the fact that timestamps are monotonically increasing, i think we always have to do >=. I think that's okay, because you handle deduping records with airbyte_ab_id, so the only cost is we may re-process a handful of records. That seems fine relative to the potential of missing a few records.

(this is another agument for keeping the raw data around like we were talking about the other day. it is definitely nice to be able to go back and re-process if we make a mistake in normalization without having to resend data).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes we can make it >= just in case

{% endif %}
{%- endmacro -%}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
{#
This overrides the behavior of the macro `should_full_refresh` so full refresh are triggered if:
- the dbt cli is run with --full-refresh flag or the model is configured explicitly to full_refresh
- the column _airbyte_ab_id does not exists in the normalized tables and make sure it is well populated.
cgardens marked this conversation as resolved.
Show resolved Hide resolved
#}

{%- macro need_full_refresh(col_ab_id, target_table=this) -%}
{%- if not execute -%}
{{ return(false) }}
{%- endif -%}
{%- set found_column = [] %}
{%- set cols = adapter.get_columns_in_relation(target_table) -%}
{%- for col in cols -%}
{%- if col.column == col_ab_id -%}
{% do found_column.append(col.column) %}
{%- endif -%}
{%- endfor -%}
{%- if found_column -%}
{{ return(false) }}
{%- else -%}
{{ dbt_utils.log_info(target_table ~ "." ~ col_ab_id ~ " does not exist. The table needs to be rebuilt in full_refresh") }}
{{ return(true) }}
{%- endif -%}
{%- endmacro -%}

{%- macro should_full_refresh() -%}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this macro called? Can't find it's usage apart from the comment in get_ab_id()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{% set config_full_refresh = config.get('full_refresh') %}
{%- if config_full_refresh is none -%}
{% set config_full_refresh = flags.FULL_REFRESH %}
{%- endif -%}
{%- if not config_full_refresh -%}
{% set config_full_refresh = need_full_refresh(get_col_ab_id(), this) %}
{%- endif -%}
{% do return(config_full_refresh) %}
{%- endmacro -%}

{%- macro get_col_ab_id() -%}
{{ adapter.dispatch('get_col_ab_id')() }}
{%- endmacro -%}

{%- macro default__get_col_ab_id() -%}
_airbyte_ab_id
{%- endmacro -%}

{%- macro oracle__get_col_ab_id() -%}
"_AIRBYTE_AB_ID"
{%- endmacro -%}

{%- macro snowflake__get_col_ab_id() -%}
_AIRBYTE_AB_ID
{%- endmacro -%}
Original file line number Diff line number Diff line change
Expand Up @@ -298,21 +298,29 @@ def get_normalization_image(destination_type: DestinationType) -> str:
else:
return "airbyte/normalization:dev"

def dbt_run(self, destination_type: DestinationType, test_root_dir: str):
def dbt_check(self, destination_type: DestinationType, test_root_dir: str):
"""
Run the dbt CLI to perform transformations on the test raw data in the destination
"""
normalization_image: str = self.get_normalization_image(destination_type)
# Perform sanity check on dbt project settings
assert self.run_check_dbt_command(normalization_image, "debug", test_root_dir)
assert self.run_check_dbt_command(normalization_image, "deps", test_root_dir)
final_sql_files = os.path.join(test_root_dir, "final")

def dbt_run(self, destination_type: DestinationType, test_root_dir: str, output_dir: str = "final", force_full_refresh: bool = False):
"""
Run the dbt CLI to perform transformations on the test raw data in the destination
"""
normalization_image: str = self.get_normalization_image(destination_type)
final_sql_files = os.path.join(test_root_dir, output_dir)
shutil.rmtree(final_sql_files, ignore_errors=True)
# Compile dbt models files into destination sql dialect, then run the transformation queries
assert self.run_check_dbt_command(normalization_image, "run", test_root_dir)
assert self.run_check_dbt_command(normalization_image, "run", test_root_dir, output_dir, force_full_refresh)

@staticmethod
def run_check_dbt_command(normalization_image: str, command: str, cwd: str) -> bool:
def run_check_dbt_command(
normalization_image: str, command: str, cwd: str, output_dir: str = "final", force_full_refresh: bool = False
) -> bool:
"""
Run dbt subprocess while checking and counting for "ERROR", "FAIL" or "WARNING" printed in its outputs
"""
Expand All @@ -327,7 +335,7 @@ def run_check_dbt_command(normalization_image: str, command: str, cwd: str) -> b
"-v",
f"{cwd}/build:/build",
"-v",
f"{cwd}/final:/build/run/airbyte_utils/models/generated",
f"{cwd}/{output_dir}:/build/run/airbyte_utils/models/generated",
"-v",
"/tmp:/tmp",
"--network",
Expand All @@ -340,6 +348,9 @@ def run_check_dbt_command(normalization_image: str, command: str, cwd: str) -> b
"--profiles-dir=/workspace",
"--project-dir=/workspace",
]
if force_full_refresh:
commands.append("--full-refresh")
command = f"{command} --full-refresh"
print("Executing: ", " ".join(commands))
print(f"Equivalent to: dbt {command} --profiles-dir={cwd} --project-dir={cwd}")
with open(os.path.join(cwd, "dbt_output.log"), "ab") as f:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@



create table "postgres".test_normalization."exchange_rate__dbt_tmp"
create table "postgres".test_normalization."exchange_rate"
as (

-- Final base SQL model
Expand All @@ -13,8 +14,12 @@ select
hkd_special___characters,
nzd,
usd,
_airbyte_ab_id,
_airbyte_emitted_at,
_airbyte_exchange_rate_hashid
from "postgres"._airbyte_test_normalization."exchange_rate_ab3"
-- exchange_rate from "postgres".test_normalization._airbyte_raw_exchange_rate
);
where 1 = 1

);

Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
{{ config(schema="_airbyte_test_normalization", tags=["top-level-intermediate"]) }}
{{ config(
schema = "_airbyte_test_normalization",
tags = [ "top-level-intermediate" ]
) }}
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
select
{{ json_extract_scalar('_airbyte_data', ['id'], ['id']) }} as {{ adapter.quote('id') }},
Expand All @@ -9,7 +12,9 @@ select
{{ json_extract_scalar('_airbyte_data', ['HKD_special___characters'], ['HKD_special___characters']) }} as hkd_special___characters,
{{ json_extract_scalar('_airbyte_data', ['NZD'], ['NZD']) }} as nzd,
{{ json_extract_scalar('_airbyte_data', ['USD'], ['USD']) }} as usd,
_airbyte_ab_id,
_airbyte_emitted_at
from {{ source('test_normalization', '_airbyte_raw_exchange_rate') }} as table_alias
-- exchange_rate
where 1 = 1

Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
{{ config(schema="_airbyte_test_normalization", tags=["top-level-intermediate"]) }}
{{ config(
schema = "_airbyte_test_normalization",
tags = [ "top-level-intermediate" ]
) }}
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
select
cast({{ adapter.quote('id') }} as {{ dbt_utils.type_bigint() }}) as {{ adapter.quote('id') }},
Expand All @@ -9,7 +12,9 @@ select
cast(hkd_special___characters as {{ dbt_utils.type_string() }}) as hkd_special___characters,
cast(nzd as {{ dbt_utils.type_float() }}) as nzd,
cast(usd as {{ dbt_utils.type_float() }}) as usd,
_airbyte_ab_id,
_airbyte_emitted_at
from {{ ref('exchange_rate_ab1') }}
-- exchange_rate
where 1 = 1

Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
{{ config(schema="_airbyte_test_normalization", tags=["top-level-intermediate"]) }}
{{ config(
schema = "_airbyte_test_normalization",
tags = [ "top-level-intermediate" ]
) }}
-- SQL model to build a hash column based on the values of this record
select
{{ dbt_utils.surrogate_key([
Expand All @@ -14,4 +17,5 @@ select
tmp.*
from {{ ref('exchange_rate_ab2') }} tmp
-- exchange_rate
where 1 = 1

Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{{ config(
schema = "test_normalization",
unique_key = env_var('AIRBYTE_DEFAULT_UNIQUE_KEY', '_airbyte_ab_id'),
tags = [ "top-level" ]
) }}
-- SQL model to build a Type 2 Slowly Changing Dimension (SCD) table for each record identified by their primary key
select
{{ dbt_utils.surrogate_key([
adapter.quote('id'),
'currency',
'nzd',
]) }} as _airbyte_unique_key,
{{ adapter.quote('id') }},
currency,
{{ adapter.quote('date') }},
timestamp_col,
{{ adapter.quote('HKD@spéçiäl & characters') }},
hkd_special___characters,
nzd,
usd,
{{ adapter.quote('date') }} as _airbyte_start_at,
lag({{ adapter.quote('date') }}) over (
partition by {{ adapter.quote('id') }}, currency, cast(nzd as {{ dbt_utils.type_string() }})
order by {{ adapter.quote('date') }} is null asc, {{ adapter.quote('date') }} desc, _airbyte_emitted_at desc
) as _airbyte_end_at,
case when lag({{ adapter.quote('date') }}) over (
partition by {{ adapter.quote('id') }}, currency, cast(nzd as {{ dbt_utils.type_string() }})
order by {{ adapter.quote('date') }} is null asc, {{ adapter.quote('date') }} desc, _airbyte_emitted_at desc
) is null then 1 else 0 end as _airbyte_active_row,
_airbyte_ab_id,
_airbyte_emitted_at,
_airbyte_dedup_exchange_rate_hashid
from {{ ref('dedup_exchange_rate_ab4') }}
-- dedup_exchange_rate from {{ source('test_normalization', '_airbyte_raw_dedup_exchange_rate') }}
where 1 = 1
and _airbyte_row_num = 1
{{ incremental_clause('_airbyte_emitted_at') }}

Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{{ config(
schema = "test_normalization",
unique_key = "_airbyte_unique_key",
tags = [ "top-level" ]
) }}
-- Final base SQL model
select
_airbyte_unique_key,
{{ adapter.quote('id') }},
currency,
{{ adapter.quote('date') }},
timestamp_col,
{{ adapter.quote('HKD@spéçiäl & characters') }},
hkd_special___characters,
nzd,
usd,
_airbyte_ab_id,
_airbyte_emitted_at,
_airbyte_dedup_exchange_rate_hashid
from {{ ref('dedup_exchange_rate_scd') }}
-- dedup_exchange_rate from {{ source('test_normalization', '_airbyte_raw_dedup_exchange_rate') }}
where 1 = 1
and _airbyte_active_row = 1
{{ incremental_clause('_airbyte_emitted_at') }}

Loading