Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Normalization fix Snowflake Prefix Tables starting with number #9301

Merged
merged 24 commits into from
Jan 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
c6ae621
add normalization-clickhouse docker build step
jzcruiser Dec 22, 2021
b88225e
Merge branch 'patch-4' of github.com:jzcruiser/airbyte into marcos/te…
marcosmarxm Dec 23, 2021
cc499c2
bump normalization version
marcosmarxm Dec 23, 2021
a2517a3
small changes gradle
marcosmarxm Dec 23, 2021
4a51799
Merge branch 'master' into marcos/test-pr-9029
marcosmarxm Dec 23, 2021
a57495c
fix settings gradle
marcosmarxm Dec 27, 2021
c56ef54
fix eof file
marcosmarxm Dec 27, 2021
f8ccfd6
correct clickhouse normalization
marcosmarxm Dec 29, 2021
f24ea4b
Merge branch 'master' into marcos/test-pr-9029
edgao Jan 4, 2022
a6e4c31
Refactor jinja template for scd (#9278)
ChristopheDuong Jan 4, 2022
4f9f8ae
merge chris code and regenerate sql files
marcosmarxm Jan 4, 2022
62e7dab
correct scd post-hook generation for snowflake
marcosmarxm Jan 5, 2022
b069b03
Merge branch 'master' into marcos/normalization-snowflake-startwith-n…
marcosmarxm Jan 5, 2022
6a5f941
fix scd table for snowflake prefix table with number
marcosmarxm Jan 5, 2022
d8bb53f
scd fix for all destinations
marcosmarxm Jan 5, 2022
e6b0407
use quote
marcosmarxm Jan 5, 2022
745fbf7
use normalize column for post-hook
marcosmarxm Jan 5, 2022
96df256
change logic to apply quote
marcosmarxm Jan 6, 2022
8495633
add logic to handle prefix for mssql and oracle
marcosmarxm Jan 6, 2022
8f14292
run tests
marcosmarxm Jan 6, 2022
d06c537
merge master
marcosmarxm Jan 6, 2022
5fee2a6
correct unit test
marcosmarxm Jan 6, 2022
89cbcdb
Merge branch 'master' into marcos/normalization-snowflake-startwith-n…
marcosmarxm Jan 6, 2022
29e047d
bump normalization version
marcosmarxm Jan 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ WORKDIR /airbyte
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.1.62
LABEL io.airbyte.version=0.1.63
LABEL io.airbyte.name=airbyte/normalization
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
partition_by = {"field": "_airbyte_active_row", "data_type": "int64", "range": {"start": 0, "end": 1, "interval": 1}},
unique_key = "_airbyte_unique_key_scd",
schema = "test_normalization",
post_hook = ['drop view _airbyte_test_normalization.nested_stream_with_complex_columns_resulting_into_long_names_stg'],
post_hook = ["drop view _airbyte_test_normalization.nested_stream_with_complex_columns_resulting_into_long_names_stg"],
tags = [ "top-level" ]
) }}
-- depends_on: ref('nested_stream_with_complex_columns_resulting_into_long_names_stg')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
partition_by = {"field": "_airbyte_active_row", "data_type": "int64", "range": {"start": 0, "end": 1, "interval": 1}},
unique_key = "_airbyte_unique_key_scd",
schema = "test_normalization",
post_hook = ['drop view _airbyte_test_normalization.dedup_exchange_rate_stg'],
post_hook = ["drop view _airbyte_test_normalization.dedup_exchange_rate_stg"],
tags = [ "top-level" ]
) }}
-- depends_on: ref('dedup_exchange_rate_stg')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ sources:
schema: false
identifier: false
tables:
- name: _airbyte_raw_1_prefix_startwith_number
- name: _airbyte_raw_dedup_cdc_excluded
- name: _airbyte_raw_dedup_exchange_rate
- name: _airbyte_raw_exchange_rate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
partition_by = {"field": "_airbyte_active_row", "data_type": "int64", "range": {"start": 0, "end": 1, "interval": 1}},
unique_key = "_airbyte_unique_key_scd",
schema = "test_normalization",
post_hook = ['drop view _airbyte_test_normalization.dedup_exchange_rate_stg'],
post_hook = ["drop view _airbyte_test_normalization.dedup_exchange_rate_stg"],
tags = [ "top-level" ]
) }}
-- depends_on: ref('dedup_exchange_rate_stg')
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{{ config(
unique_key = "_airbyte_unique_key_scd",
schema = "test_normalization",
post_hook = ['drop view _airbyte_test_normalization.dedup_cdc_excluded_stg'],
post_hook = ["drop view _airbyte_test_normalization.dedup_cdc_excluded_stg"],
tags = [ "top-level" ]
) }}
-- depends_on: ref('dedup_cdc_excluded_stg')
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{{ config(
unique_key = "_airbyte_unique_key_scd",
schema = "test_normalization",
post_hook = ['drop view _airbyte_test_normalization.dedup_exchange_rate_stg'],
post_hook = ["drop view _airbyte_test_normalization.dedup_exchange_rate_stg"],
tags = [ "top-level" ]
) }}
-- depends_on: ref('dedup_exchange_rate_stg')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ sources:
schema: false
identifier: false
tables:
- name: _airbyte_raw_1_prefix_startwith_number
- name: _airbyte_raw_dedup_cdc_excluded
- name: _airbyte_raw_dedup_exchange_rate
- name: _airbyte_raw_exchange_rate
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{{ config(
unique_key = "_airbyte_unique_key_scd",
schema = "test_normalization",
post_hook = ['drop view _airbyte_test_normalization.nested_stream_with_co__lting_into_long_names_stg'],
post_hook = ["drop view _airbyte_test_normalization.nested_stream_with_co__lting_into_long_names_stg"],
tags = [ "top-level" ]
) }}
-- depends_on: ref('nested_stream_with_co__lting_into_long_names_stg')
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{{ config(
unique_key = "_airbyte_unique_key_scd",
schema = "test_normalization",
post_hook = ['drop view _airbyte_test_normalization.dedup_exchange_rate_stg'],
post_hook = ["drop view _airbyte_test_normalization.dedup_exchange_rate_stg"],
tags = [ "top-level" ]
) }}
-- depends_on: ref('dedup_exchange_rate_stg')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ sources:
schema: false
identifier: false
tables:
- name: _airbyte_raw_1_prefix_startwith_number
- name: _airbyte_raw_dedup_cdc_excluded
- name: _airbyte_raw_dedup_exchange_rate
- name: _airbyte_raw_exchange_rate
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{{ config(
unique_key = "_airbyte_unique_key_scd",
schema = "test_normalization",
post_hook = ['drop view _airbyte_test_normalization.nested_stream_with_co_1g_into_long_names_stg'],
post_hook = ["drop view _airbyte_test_normalization.nested_stream_with_co_1g_into_long_names_stg"],
tags = [ "top-level" ]
) }}
-- depends_on: ref('nested_stream_with_co_1g_into_long_names_stg')
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{{ config(
unique_key = "_airbyte_unique_key_scd",
schema = "test_normalization",
post_hook = ['drop view _airbyte_test_normalization.dedup_exchange_rate_stg'],
post_hook = ["drop view _airbyte_test_normalization.dedup_exchange_rate_stg"],
tags = [ "top-level" ]
) }}
-- depends_on: ref('dedup_exchange_rate_stg')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ sources:
schema: false
identifier: false
tables:
- name: _airbyte_raw_1_prefix_startwith_number
- name: _airbyte_raw_dedup_cdc_excluded
- name: _airbyte_raw_dedup_exchange_rate
- name: _airbyte_raw_exchange_rate
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{{ config(
unique_key = "{{ quote('_AIRBYTE_UNIQUE_KEY_SCD') }}",
schema = "test_normalization",
post_hook = ['drop view test_normalization.dedup_exchange_rate_stg'],
post_hook = ["drop view test_normalization.dedup_exchange_rate_stg"],
tags = [ "top-level" ]
) }}
-- depends_on: ref('dedup_exchange_rate_stg')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ sources:
schema: false
identifier: false
tables:
- name: airbyte_raw_1_prefix_startwith_number
- name: airbyte_raw_dedup_cdc_excluded
- name: airbyte_raw_dedup_exchange_rate
- name: airbyte_raw_exchange_rate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
indexes = [{'columns':['_airbyte_active_row','_airbyte_unique_key_scd','_airbyte_emitted_at'],'type': 'btree'}],
unique_key = "_airbyte_unique_key_scd",
schema = "test_normalization",
post_hook = ['delete from _airbyte_test_normalization.nested_stream_with_c__lting_into_long_names_stg where _airbyte_emitted_at != (select max(_airbyte_emitted_at) from _airbyte_test_normalization.nested_stream_with_c__lting_into_long_names_stg)'],
post_hook = ["delete from _airbyte_test_normalization.nested_stream_with_c__lting_into_long_names_stg where _airbyte_emitted_at != (select max(_airbyte_emitted_at) from _airbyte_test_normalization.nested_stream_with_c__lting_into_long_names_stg)"],
tags = [ "top-level" ]
) }}
-- depends_on: ref('nested_stream_with_c__lting_into_long_names_stg')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
indexes = [{'columns':['_airbyte_active_row','_airbyte_unique_key_scd','_airbyte_emitted_at'],'type': 'btree'}],
unique_key = "_airbyte_unique_key_scd",
schema = "test_normalization",
post_hook = ['delete from _airbyte_test_normalization.some_stream_that_was_empty_stg where _airbyte_emitted_at != (select max(_airbyte_emitted_at) from _airbyte_test_normalization.some_stream_that_was_empty_stg)'],
post_hook = ["delete from _airbyte_test_normalization.some_stream_that_was_empty_stg where _airbyte_emitted_at != (select max(_airbyte_emitted_at) from _airbyte_test_normalization.some_stream_that_was_empty_stg)"],
tags = [ "top-level" ]
) }}
-- depends_on: ref('some_stream_that_was_empty_stg')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ profile: 'normalize'
# These configurations specify where dbt should look for different types of files.
# The `source-paths` config, for example, states that source models can be found
# in the "models/" directory. You probably won't need to change these!
source-paths: ["modified_models"]
source-paths: ["models"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you know why these are changing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I didn't change it directly!

docs-paths: ["docs"]
analysis-paths: ["analysis"]
test-paths: ["tests"]
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@



create table "postgres".test_normalization."1_prefix_startwith_number_scd"
as (

-- depends_on: ref('1_prefix_startwith_number_stg')
with

input_data as (
select *
from "postgres"._airbyte_test_normalization."1_prefix_startwith_number_stg"
-- 1_prefix_startwith_number from "postgres".test_normalization._airbyte_raw_1_prefix_startwith_number
),

scd_data as (
-- SQL model to build a Type 2 Slowly Changing Dimension (SCD) table for each record identified by their primary key
select
md5(cast(coalesce(cast("id" as
varchar
), '') as
varchar
)) as _airbyte_unique_key,
"id",
"date",
"text",
"date" as _airbyte_start_at,
lag("date") over (
partition by "id"
order by
"date" is null asc,
"date" desc,
_airbyte_emitted_at desc
) as _airbyte_end_at,
case when row_number() over (
partition by "id"
order by
"date" is null asc,
"date" desc,
_airbyte_emitted_at desc
) = 1 then 1 else 0 end as _airbyte_active_row,
_airbyte_ab_id,
_airbyte_emitted_at,
_airbyte_1_prefix_startwith_number_hashid
from input_data
),
dedup_data as (
select
-- we need to ensure de-duplicated rows for merge/update queries
-- additionally, we generate a unique key for the scd table
row_number() over (
partition by
_airbyte_unique_key,
_airbyte_start_at,
_airbyte_emitted_at
order by _airbyte_active_row desc, _airbyte_ab_id
) as _airbyte_row_num,
md5(cast(coalesce(cast(_airbyte_unique_key as
varchar
), '') || '-' || coalesce(cast(_airbyte_start_at as
varchar
), '') || '-' || coalesce(cast(_airbyte_emitted_at as
varchar
), '') as
varchar
)) as _airbyte_unique_key_scd,
scd_data.*
from scd_data
)
select
_airbyte_unique_key,
_airbyte_unique_key_scd,
"id",
"date",
"text",
_airbyte_start_at,
_airbyte_end_at,
_airbyte_active_row,
_airbyte_ab_id,
_airbyte_emitted_at,
now() as _airbyte_normalized_at,
_airbyte_1_prefix_startwith_number_hashid
from dedup_data where _airbyte_row_num = 1
);

Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@



create table "postgres".test_normalization."1_prefix_startwith_number"
as (

-- Final base SQL model
-- depends_on: "postgres".test_normalization."1_prefix_startwith_number_scd"
select
_airbyte_unique_key,
"id",
"date",
"text",
_airbyte_ab_id,
_airbyte_emitted_at,
now() as _airbyte_normalized_at,
_airbyte_1_prefix_startwith_number_hashid
from "postgres".test_normalization."1_prefix_startwith_number_scd"
-- 1_prefix_startwith_number from "postgres".test_normalization._airbyte_raw_1_prefix_startwith_number
where 1 = 1
and _airbyte_active_row = 1

);

Loading