Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Normalization fix Snowflake Prefix Tables starting with number #9301

Merged
merged 24 commits into from
Jan 7, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
c6ae621
add normalization-clickhouse docker build step
jzcruiser Dec 22, 2021
b88225e
Merge branch 'patch-4' of github.com:jzcruiser/airbyte into marcos/te…
marcosmarxm Dec 23, 2021
cc499c2
bump normalization version
marcosmarxm Dec 23, 2021
a2517a3
small changes gradle
marcosmarxm Dec 23, 2021
4a51799
Merge branch 'master' into marcos/test-pr-9029
marcosmarxm Dec 23, 2021
a57495c
fix settings gradle
marcosmarxm Dec 27, 2021
c56ef54
fix eof file
marcosmarxm Dec 27, 2021
f8ccfd6
correct clickhouse normalization
marcosmarxm Dec 29, 2021
f24ea4b
Merge branch 'master' into marcos/test-pr-9029
edgao Jan 4, 2022
a6e4c31
Refactor jinja template for scd (#9278)
ChristopheDuong Jan 4, 2022
4f9f8ae
merge chris code and regenerate sql files
marcosmarxm Jan 4, 2022
62e7dab
correct scd post-hook generation for snowflake
marcosmarxm Jan 5, 2022
b069b03
Merge branch 'master' into marcos/normalization-snowflake-startwith-n…
marcosmarxm Jan 5, 2022
6a5f941
fix scd table for snowflake prefix table with number
marcosmarxm Jan 5, 2022
d8bb53f
scd fix for all destinations
marcosmarxm Jan 5, 2022
e6b0407
use quote
marcosmarxm Jan 5, 2022
745fbf7
use normalize column for post-hook
marcosmarxm Jan 5, 2022
96df256
change logic to apply quote
marcosmarxm Jan 6, 2022
8495633
add logic to handle prefix for mssql and oracle
marcosmarxm Jan 6, 2022
8f14292
run tests
marcosmarxm Jan 6, 2022
d06c537
merge master
marcosmarxm Jan 6, 2022
5fee2a6
correct unit test
marcosmarxm Jan 6, 2022
89cbcdb
Merge branch 'master' into marcos/normalization-snowflake-startwith-n…
marcosmarxm Jan 6, 2022
29e047d
bump normalization version
marcosmarxm Jan 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@



create table "postgres".test_normalization."1_prefix_startwith_number_scd"
as (

-- depends_on: ref('1_prefix_startwith_number_stg')
with

input_data as (
select *
from "postgres"._airbyte_test_normalization."1_prefix_startwith_number_stg"
-- 1_prefix_startwith_number from "postgres".test_normalization._airbyte_raw_1_prefix_startwith_number
),

scd_data as (
-- SQL model to build a Type 2 Slowly Changing Dimension (SCD) table for each record identified by their primary key
select
md5(cast(coalesce(cast("id" as
varchar
), '') as
varchar
)) as _airbyte_unique_key,
"id",
"date",
"text",
"date" as _airbyte_start_at,
lag("date") over (
partition by "id"
order by
"date" is null asc,
"date" desc,
_airbyte_emitted_at desc
) as _airbyte_end_at,
case when row_number() over (
partition by "id"
order by
"date" is null asc,
"date" desc,
_airbyte_emitted_at desc
) = 1 then 1 else 0 end as _airbyte_active_row,
_airbyte_ab_id,
_airbyte_emitted_at,
_airbyte_1_prefix_startwith_number_hashid
from input_data
),
dedup_data as (
select
-- we need to ensure de-duplicated rows for merge/update queries
-- additionally, we generate a unique key for the scd table
row_number() over (
partition by _airbyte_unique_key, _airbyte_start_at, _airbyte_emitted_at
order by _airbyte_active_row desc, _airbyte_ab_id
) as _airbyte_row_num,
md5(cast(coalesce(cast(_airbyte_unique_key as
varchar
), '') || '-' || coalesce(cast(_airbyte_start_at as
varchar
), '') || '-' || coalesce(cast(_airbyte_emitted_at as
varchar
), '') as
varchar
)) as _airbyte_unique_key_scd,
scd_data.*
from scd_data
)
select
_airbyte_unique_key,
_airbyte_unique_key_scd,
"id",
"date",
"text",
_airbyte_start_at,
_airbyte_end_at,
_airbyte_active_row,
_airbyte_ab_id,
_airbyte_emitted_at,
now() as _airbyte_normalized_at,
_airbyte_1_prefix_startwith_number_hashid
from dedup_data where _airbyte_row_num = 1
);

Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@



create table "postgres".test_normalization."1_prefix_startwith_number"
as (

-- Final base SQL model
-- depends_on: "postgres".test_normalization."1_prefix_startwith_number_scd"
select
_airbyte_unique_key,
"id",
"date",
"text",
_airbyte_ab_id,
_airbyte_emitted_at,
now() as _airbyte_normalized_at,
_airbyte_1_prefix_startwith_number_hashid
from "postgres".test_normalization."1_prefix_startwith_number_scd"
-- 1_prefix_startwith_number from "postgres".test_normalization._airbyte_raw_1_prefix_startwith_number
where 1 = 1
and _airbyte_active_row = 1

);

Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@



create table "postgres"._airbyte_test_normalization."1_prefix_startwith_number_stg"
as (

with __dbt__cte__1_prefix_startwith_number_ab1 as (

-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: "postgres".test_normalization._airbyte_raw_1_prefix_startwith_number
select
jsonb_extract_path_text(_airbyte_data, 'id') as "id",
jsonb_extract_path_text(_airbyte_data, 'date') as "date",
jsonb_extract_path_text(_airbyte_data, 'text') as "text",
_airbyte_ab_id,
_airbyte_emitted_at,
now() as _airbyte_normalized_at
from "postgres".test_normalization._airbyte_raw_1_prefix_startwith_number as table_alias
-- 1_prefix_startwith_number
where 1 = 1

), __dbt__cte__1_prefix_startwith_number_ab2 as (

-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: __dbt__cte__1_prefix_startwith_number_ab1
select
cast("id" as
bigint
) as "id",
cast(nullif("date", '') as
date
) as "date",
cast("text" as
varchar
) as "text",
_airbyte_ab_id,
_airbyte_emitted_at,
now() as _airbyte_normalized_at
from __dbt__cte__1_prefix_startwith_number_ab1
-- 1_prefix_startwith_number
where 1 = 1

)-- SQL model to build a hash column based on the values of this record
-- depends_on: __dbt__cte__1_prefix_startwith_number_ab2
select
md5(cast(coalesce(cast("id" as
varchar
), '') || '-' || coalesce(cast("date" as
varchar
), '') || '-' || coalesce(cast("text" as
varchar
), '') as
varchar
)) as _airbyte_1_prefix_startwith_number_hashid,
tmp.*
from __dbt__cte__1_prefix_startwith_number_ab2 tmp
-- 1_prefix_startwith_number
where 1 = 1

);

Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}],
unique_key = '_airbyte_ab_id',
schema = "_airbyte_test_normalization",
tags = [ "top-level-intermediate" ]
) }}
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: {{ source('test_normalization', '_airbyte_raw_1_prefix_startwith_number') }}
select
{{ json_extract_scalar('_airbyte_data', ['id'], ['id']) }} as {{ adapter.quote('id') }},
{{ json_extract_scalar('_airbyte_data', ['date'], ['date']) }} as {{ adapter.quote('date') }},
{{ json_extract_scalar('_airbyte_data', ['text'], ['text']) }} as {{ adapter.quote('text') }},
_airbyte_ab_id,
_airbyte_emitted_at,
{{ current_timestamp() }} as _airbyte_normalized_at
from {{ source('test_normalization', '_airbyte_raw_1_prefix_startwith_number') }} as table_alias
-- 1_prefix_startwith_number
where 1 = 1
{{ incremental_clause('_airbyte_emitted_at') }}

Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}],
unique_key = '_airbyte_ab_id',
schema = "_airbyte_test_normalization",
tags = [ "top-level-intermediate" ]
) }}
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: {{ ref('1_prefix_startwith_number_ab1') }}
select
cast({{ adapter.quote('id') }} as {{ dbt_utils.type_bigint() }}) as {{ adapter.quote('id') }},
cast({{ empty_string_to_null(adapter.quote('date')) }} as {{ type_date() }}) as {{ adapter.quote('date') }},
cast({{ adapter.quote('text') }} as {{ dbt_utils.type_string() }}) as {{ adapter.quote('text') }},
_airbyte_ab_id,
_airbyte_emitted_at,
{{ current_timestamp() }} as _airbyte_normalized_at
from {{ ref('1_prefix_startwith_number_ab1') }}
-- 1_prefix_startwith_number
where 1 = 1
{{ incremental_clause('_airbyte_emitted_at') }}

Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
{{ config(
indexes = [{'columns':['_airbyte_active_row','_airbyte_unique_key_scd','_airbyte_emitted_at'],'type': 'btree'}],
unique_key = "_airbyte_unique_key_scd",
schema = "test_normalization",
post_hook = ['delete from _airbyte_test_normalization."1_prefix_startwith_number_stg" where _airbyte_emitted_at != (select max(_airbyte_emitted_at) from _airbyte_test_normalization."1_prefix_startwith_number_stg")'],
tags = [ "top-level" ]
) }}
-- depends_on: ref('1_prefix_startwith_number_stg')
with
{% if is_incremental() %}
new_data as (
-- retrieve incremental "new" data
select
*
from {{ ref('1_prefix_startwith_number_stg') }}
-- 1_prefix_startwith_number from {{ source('test_normalization', '_airbyte_raw_1_prefix_startwith_number') }}
where 1 = 1
{{ incremental_clause('_airbyte_emitted_at') }}
),
new_data_ids as (
-- build a subset of _airbyte_unique_key from rows that are new
select distinct
{{ dbt_utils.surrogate_key([
adapter.quote('id'),
]) }} as _airbyte_unique_key
from new_data
),
empty_new_data as (
-- build an empty table to only keep the table's column types
select * from new_data where 1 = 0
),
previous_active_scd_data as (
-- retrieve "incomplete old" data that needs to be updated with an end date because of new changes
select
{{ star_intersect(ref('1_prefix_startwith_number_stg'), this, from_alias='inc_data', intersect_alias='this_data') }}
from {{ this }} as this_data
-- make a join with new_data using primary key to filter active data that need to be updated only
join new_data_ids on this_data._airbyte_unique_key = new_data_ids._airbyte_unique_key
-- force left join to NULL values (we just need to transfer column types only for the star_intersect macro on schema changes)
left join empty_new_data as inc_data on this_data._airbyte_ab_id = inc_data._airbyte_ab_id
where _airbyte_active_row = 1
),
input_data as (
select {{ dbt_utils.star(ref('1_prefix_startwith_number_stg')) }} from new_data
union all
select {{ dbt_utils.star(ref('1_prefix_startwith_number_stg')) }} from previous_active_scd_data
),
{% else %}
input_data as (
select *
from {{ ref('1_prefix_startwith_number_stg') }}
-- 1_prefix_startwith_number from {{ source('test_normalization', '_airbyte_raw_1_prefix_startwith_number') }}
),
{% endif %}
scd_data as (
-- SQL model to build a Type 2 Slowly Changing Dimension (SCD) table for each record identified by their primary key
select
{{ dbt_utils.surrogate_key([
adapter.quote('id'),
]) }} as _airbyte_unique_key,
{{ adapter.quote('id') }},
{{ adapter.quote('date') }},
{{ adapter.quote('text') }},
{{ adapter.quote('date') }} as _airbyte_start_at,
lag({{ adapter.quote('date') }}) over (
partition by {{ adapter.quote('id') }}
order by
{{ adapter.quote('date') }} is null asc,
{{ adapter.quote('date') }} desc,
_airbyte_emitted_at desc
) as _airbyte_end_at,
case when row_number() over (
partition by {{ adapter.quote('id') }}
order by
{{ adapter.quote('date') }} is null asc,
{{ adapter.quote('date') }} desc,
_airbyte_emitted_at desc
) = 1 then 1 else 0 end as _airbyte_active_row,
_airbyte_ab_id,
_airbyte_emitted_at,
_airbyte_1_prefix_startwith_number_hashid
from input_data
),
dedup_data as (
select
-- we need to ensure de-duplicated rows for merge/update queries
-- additionally, we generate a unique key for the scd table
row_number() over (
partition by _airbyte_unique_key, _airbyte_start_at, _airbyte_emitted_at
order by _airbyte_active_row desc, _airbyte_ab_id
) as _airbyte_row_num,
{{ dbt_utils.surrogate_key([
'_airbyte_unique_key',
'_airbyte_start_at',
'_airbyte_emitted_at'
]) }} as _airbyte_unique_key_scd,
scd_data.*
from scd_data
)
select
_airbyte_unique_key,
_airbyte_unique_key_scd,
{{ adapter.quote('id') }},
{{ adapter.quote('date') }},
{{ adapter.quote('text') }},
_airbyte_start_at,
_airbyte_end_at,
_airbyte_active_row,
_airbyte_ab_id,
_airbyte_emitted_at,
{{ current_timestamp() }} as _airbyte_normalized_at,
_airbyte_1_prefix_startwith_number_hashid
from dedup_data where _airbyte_row_num = 1

Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{{ config(
indexes = [{'columns':['_airbyte_unique_key'],'unique':True}],
unique_key = "_airbyte_unique_key",
schema = "test_normalization",
tags = [ "top-level" ]
) }}
-- Final base SQL model
-- depends_on: {{ ref('1_prefix_startwith_number_scd') }}
select
_airbyte_unique_key,
{{ adapter.quote('id') }},
{{ adapter.quote('date') }},
{{ adapter.quote('text') }},
_airbyte_ab_id,
_airbyte_emitted_at,
{{ current_timestamp() }} as _airbyte_normalized_at,
_airbyte_1_prefix_startwith_number_hashid
from {{ ref('1_prefix_startwith_number_scd') }}
-- 1_prefix_startwith_number from {{ source('test_normalization', '_airbyte_raw_1_prefix_startwith_number') }}
where 1 = 1
and _airbyte_active_row = 1
{{ incremental_clause('_airbyte_emitted_at') }}

Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}],
unique_key = '_airbyte_ab_id',
schema = "_airbyte_test_normalization",
tags = [ "top-level-intermediate" ]
) }}
-- SQL model to build a hash column based on the values of this record
-- depends_on: {{ ref('1_prefix_startwith_number_ab2') }}
select
{{ dbt_utils.surrogate_key([
adapter.quote('id'),
adapter.quote('date'),
adapter.quote('text'),
]) }} as _airbyte_1_prefix_startwith_number_hashid,
tmp.*
from {{ ref('1_prefix_startwith_number_ab2') }} tmp
-- 1_prefix_startwith_number
where 1 = 1
{{ incremental_clause('_airbyte_emitted_at') }}

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ sources:
schema: false
identifier: false
tables:
- name: _airbyte_raw_1_prefix_startwith_number
- name: _airbyte_raw_dedup_cdc_excluded
- name: _airbyte_raw_dedup_exchange_rate
- name: _airbyte_raw_exchange_rate
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@

delete
from "postgres".test_normalization."1_prefix_startwith_number_scd"
where (_airbyte_unique_key_scd) in (
select (_airbyte_unique_key_scd)
from "1_prefix_startwith_number_scd__dbt_tmp"
);

insert into "postgres".test_normalization."1_prefix_startwith_number_scd" ("_airbyte_unique_key", "_airbyte_unique_key_scd", "id", "date", "text", "_airbyte_start_at", "_airbyte_end_at", "_airbyte_active_row", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_1_prefix_startwith_number_hashid")
(
select "_airbyte_unique_key", "_airbyte_unique_key_scd", "id", "date", "text", "_airbyte_start_at", "_airbyte_end_at", "_airbyte_active_row", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_1_prefix_startwith_number_hashid"
from "1_prefix_startwith_number_scd__dbt_tmp"
);

Loading