Skip to content

Commit

Permalink
馃悰 Minor fixes to incremental normalization and nesting (#7669)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopheDuong committed Nov 8, 2021
1 parent 1e4bde8 commit affea7f
Show file tree
Hide file tree
Showing 28 changed files with 252 additions and 193 deletions.
40 changes: 31 additions & 9 deletions airbyte-integrations/bases/base-normalization/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,34 @@ dbt_modules/
secrets/
dist/

integration_tests/normalization_test_output/*/*/*.log
integration_tests/normalization_test_output/*/*/*.yml
integration_tests/normalization_test_output/*/*/*.json
integration_tests/normalization_test_output/*/*/*.md
integration_tests/normalization_test_output/*/*/macros/
integration_tests/normalization_test_output/*/*/tests/
integration_tests/normalization_test_output/*/*/models/dbt_data_tests/
integration_tests/normalization_test_output/*/*/models/dbt_schema_tests/
integration_tests/normalization_test_output/*/*/modified_models/
integration_tests/normalization_test_output/*/*/macros
integration_tests/normalization_test_output/*/*/tests
integration_tests/normalization_test_output/**/*.json
integration_tests/normalization_test_output/**/*.log
integration_tests/normalization_test_output/**/*.md
integration_tests/normalization_test_output/**/*.sql
integration_tests/normalization_test_output/**/*.yml
!integration_tests/normalization_test_output/**/*dbt_project.yml
!integration_tests/normalization_test_output/**/generated/sources.yml

# We keep a minimal/restricted subset of sql files for all destinations to avoid noise in diff
# Simple Streams
!integration_tests/normalization_test_output/**/dedup_exchange_rate*.sql
!integration_tests/normalization_test_output/**/exchange_rate.sql
# Nested Streams
# Parent table
!integration_tests/normalization_test_output/**/nested_stream_with*_names_ab*.sql
!integration_tests/normalization_test_output/**/nested_stream_with*_names_scd.sql
!integration_tests/normalization_test_output/**/nested_stream_with*_names.sql
# Nested table
!integration_tests/normalization_test_output/**/nested_stream_with_*_partition_ab1.sql
!integration_tests/normalization_test_output/**/nested_stream_with_*_data_ab1.sql
!integration_tests/normalization_test_output/**/nested_stream_with*_partition_scd.sql
!integration_tests/normalization_test_output/**/nested_stream_with*_data_scd.sql
!integration_tests/normalization_test_output/**/nested_stream_with*_partition.sql
!integration_tests/normalization_test_output/**/nested_stream_with*_data.sql

# but we keep all sql files for Postgres
!integration_tests/normalization_test_output/postgres/**/*.sql
integration_tests/normalization_test_output/postgres/**/dbt_data_tests
integration_tests/normalization_test_output/postgres/**/dbt_schema_tests
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ WORKDIR /airbyte
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.1.58
LABEL io.airbyte.version=0.1.59
LABEL io.airbyte.name=airbyte/normalization
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@
{%- macro default__incremental_clause(col_emitted_at) -%}
{% if is_incremental() %}
and {{ col_emitted_at }} >= (select max({{ col_emitted_at }}) from {{ this }})
and cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }}) >= (select max(cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }})) from {{ this }})
{% endif %}
{%- endmacro -%}
{# -- see https://on-systems.tech/113-beware-dbt-incremental-updates-against-snowflake-external-tables/ #}
{%- macro snowflake__incremental_clause(col_emitted_at) -%}
{% if is_incremental() %}
and {{ col_emitted_at }} >= cast('{{ get_max_normalized_cursor(col_emitted_at) }}' as {{ type_timestamp_with_timezone() }})
and cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }}) >= cast('{{ get_max_normalized_cursor(col_emitted_at) }}' as {{ type_timestamp_with_timezone() }})
{% endif %}
{%- endmacro -%}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'hash'}],
unique_key = '_airbyte_ab_id',
schema = "test_normalization",
tags = [ "nested" ]
) }}
-- Final base SQL model
select
_airbyte_nested_stre__nto_long_names_hashid,
double_array_data,
{{ adapter.quote('DATA') }},
_airbyte_ab_id,
_airbyte_emitted_at,
{{ current_timestamp() }} as _airbyte_normalized_at,
_airbyte_partition_hashid
from {{ ref('nested_stream_with_c___long_names_partition_ab3') }}
-- partition at nested_stream_with_complex_columns_resulting_into_long_names/partition from {{ ref('nested_stream_with_c__lting_into_long_names_scd') }}
where 1 = 1
{{ incremental_clause('_airbyte_emitted_at') }}

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'hash'}],
schema = "test_normalization",
tags = [ "nested" ]
) }}
-- Final base SQL model
select
_airbyte_partition_hashid,
currency,
_airbyte_ab_id,
_airbyte_emitted_at,
{{ current_timestamp() }} as _airbyte_normalized_at,
_airbyte_data_hashid
from {{ ref('nested_stream_with_c___names_partition_data_ab3') }}
-- DATA at nested_stream_with_complex_columns_resulting_into_long_names/partition/DATA from {{ ref('nested_stream_with_c___long_names_partition') }}
where 1 = 1
{{ incremental_clause('_airbyte_emitted_at') }}

Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@



create table "postgres".test_normalization."dedup_exchange_rate_scd"
create table "postgres"."test_normalization"."dedup_exchange_rate_scd__dbt_tmp"
as (

with
Expand Down Expand Up @@ -42,15 +42,15 @@ scd_data as (
"date" desc,
_airbyte_emitted_at desc
) as _airbyte_end_at,
case when lag("date") over (
case when row_number() over (
partition by "id", currency, cast(nzd as
varchar
)
order by
"date" is null asc,
"date" desc,
_airbyte_emitted_at desc
) is null then 1 else 0 end as _airbyte_active_row,
) = 1 then 1 else 0 end as _airbyte_active_row,
_airbyte_ab_id,
_airbyte_emitted_at,
_airbyte_dedup_exchange_rate_hashid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,6 @@
}
}
}
},
"column`_'with\"_quotes": {
"type": ["null", "array"],
"items": {
"properties": {
"currency": {
"type": ["null", "string"]
}
}
}
}
}
}
Expand Down Expand Up @@ -234,6 +224,16 @@
"properties": {
"owner_id": {
"type": ["null", "integer"]
},
"column`_'with\"_quotes": {
"type": ["null", "array"],
"items": {
"properties": {
"currency": {
"type": ["null", "string"]
}
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@
{"type":"RECORD","record":{"stream":"conflict_stream_scalar","data":{"id":1,"conflict_stream_scalar": 2},"emitted_at":1623861660}}
{"type":"RECORD","record":{"stream":"conflict_stream_scalar","data":{"id":2,"conflict_stream_scalar": 2},"emitted_at":1623861660}}

{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":1, "children": [{"ab_id": 1, "owner": {"owner_id": 1}},{"ab_id": 2, "owner": {"owner_id": 2}}]},"emitted_at":1623861660}}
{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":2, "children": [{"ab_id": 3, "owner": {"owner_id": 3}},{"ab_id": 4, "owner": {"owner_id": 4}}]},"emitted_at":1623861660}}
{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":1, "children": [{"ab_id": 1, "owner": {"owner_id": 1, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}},{"ab_id": 2, "owner": {"owner_id": 2, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}}]},"emitted_at":1623861660}}
{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":2, "children": [{"ab_id": 3, "owner": {"owner_id": 3, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}},{"ab_id": 4, "owner": {"owner_id": 4, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}}]},"emitted_at":1623861660}}

Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,5 @@
{"type":"RECORD","record":{"stream":"conflict_stream_scalar","data":{"id":1,"conflict_stream_scalar": 2},"emitted_at":1623861660}}
{"type":"RECORD","record":{"stream":"conflict_stream_scalar","data":{"id":2,"conflict_stream_scalar": 2},"emitted_at":1623861660}}

{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":1, "children": [{"ab_id": 1, "owner": {"owner_id": 1}},{"ab_id": 2, "owner": {"owner_id": 2}}]},"emitted_at":1623861660}}
{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":2, "children": [{"ab_id": 3, "owner": {"owner_id": 3}},{"ab_id": 4, "owner": {"owner_id": 4}}]},"emitted_at":1623861660}}

{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":1, "children": [{"ab_id": 1, "owner": {"owner_id": 1, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}},{"ab_id": 2, "owner": {"owner_id": 2, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}}]},"emitted_at":1623861660}}
{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":2, "children": [{"ab_id": 3, "owner": {"owner_id": 3, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}},{"ab_id": 4, "owner": {"owner_id": 4, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}}]},"emitted_at":1623861660}}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ with table_row_counts as (
union all
select distinct 'nested_stream_with_complex_columns_resulting_into_long_names' as label, count(*) as row_count, 3 as expected_count
from {{ ref('nested_stream_with_complex_columns_resulting_into_long_names') }}
union all
select distinct 'nested_stream_with_complex_columns_resulting_into_long_names_partition' as label, count(*) as row_count, 3 as expected_count
from {{ ref('nested_stream_with_complex_columns_resulting_into_long_names_partition') }}
union all
select 'nested_stream_with_complex_columns_resulting_into_long_names_partition_DATA' as label, count(distinct currency) as row_count, 1 as expected_count
from {{ ref('nested_stream_with_complex_columns_resulting_into_long_names_partition_DATA') }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@ models:
expression: "double_array_data is not null"
- dbt_utils.expression_is_true:
expression: "DATA is not null"
- dbt_utils.expression_is_true:
expression: "\"column`_'with\"\"_quotes\" is not null"
- name: nested_stream_with_complex_columns_resulting_into_long_names_partition_DATA
columns:
- name: currency
tests:
- not_null
- name: nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data
columns:
- name: id
tests:
# - name: nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data
# columns:
# - name: id
# tests:
# - not_null # TODO Fix bug here
- name: unnest_alias_children_owner
tests:
- dbt_utils.expression_is_true:
expression: "\"column`_'with\"\"_quotes\" is not null"
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@ models:
expression: "double_array_data is not null"
- dbt_utils.expression_is_true:
expression: "DATA is not null"
- dbt_utils.expression_is_true:
expression: "\"column`_'with\"\"_quotes\" is not null"
- name: nested_stream_with_complex_columns_resulting_into_long_names_partition_DATA
columns:
- name: currency
tests:
- not_null
- name: nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data
columns:
- name: id
tests:
# - name: nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data
# columns:
# - name: id
# tests:
# - not_null # TODO Fix bug here
- name: unnest_alias_children_owner
tests:
- dbt_utils.expression_is_true:
expression: "\"column`_'with\"\"_quotes\" is not null"
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
},
"USD": {
"type": "number"
},
"column`_'with\"_quotes": {
"type": "string"
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
},
"USD": {
"type": "number"
},
"column`_'with\"_quotes": {
"type": "string"
}
}
},
Expand Down

0 comments on commit affea7f

Please sign in to comment.