Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Fix incremental normalization with empty tables #8394

Merged
merged 6 commits into from
Dec 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
FROM fishtownanalytics/dbt:0.21.0
FROM fishtownanalytics/dbt:0.21.1
COPY --from=airbyte/base-airbyte-protocol-python:0.1.1 /airbyte /airbyte

# Install SSH Tunneling dependencies
RUN apt-get update && apt-get install -y jq sshpass

WORKDIR /airbyte
COPY entrypoint.sh .
COPY build/sshtunneling.sh .
Expand All @@ -27,5 +28,5 @@ WORKDIR /airbyte
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.1.60
LABEL io.airbyte.version=0.1.61
LABEL io.airbyte.name=airbyte/normalization
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

packages:
- git: "https://github.com/fishtown-analytics/dbt-utils.git"
revision: 0.6.4
revision: 0.7.4
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,37 @@

{%- macro default__incremental_clause(col_emitted_at) -%}
{% if is_incremental() %}
and cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }}) >= (select max(cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }})) from {{ this }})
and coalesce(
cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }}) >= (select max(cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }})) from {{ this }}),
{# -- if {{ col_emitted_at }} is NULL in either table, the previous comparison would evaluate to NULL, #}
{# -- so we coalesce and make sure the row is always returned for incremental processing instead #}
true)
{% endif %}
{%- endmacro -%}

{# -- see https://on-systems.tech/113-beware-dbt-incremental-updates-against-snowflake-external-tables/ #}
{%- macro snowflake__incremental_clause(col_emitted_at) -%}
{% if is_incremental() %}
and cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }}) >= cast('{{ get_max_normalized_cursor(col_emitted_at) }}' as {{ type_timestamp_with_timezone() }})
{% if get_max_normalized_cursor(col_emitted_at) %}
and cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }}) >=
cast('{{ get_max_normalized_cursor(col_emitted_at) }}' as {{ type_timestamp_with_timezone() }})
{% endif %}
{% endif %}
{%- endmacro -%}

{%- macro sqlserver__incremental_clause(col_emitted_at) -%}
{% if is_incremental() %}
and ((select max(cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }})) from {{ this }}) is null
or cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }}) >=
(select max(cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }})) from {{ this }}))
{% endif %}
{%- endmacro -%}

{% macro get_max_normalized_cursor(col_emitted_at) %}
{% if execute and is_incremental() %}
{% if env_var('INCREMENTAL_CURSOR', 'UNSET') == 'UNSET' %}
{% set query %}
select coalesce(max({{ col_emitted_at }}), cast('1970-01-01 00:00:00' as {{ type_timestamp_with_timezone() }})) from {{ this }}
select max(cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }})) from {{ this }}
{% endset %}
{% set max_cursor = run_query(query).columns[0][0] %}
{% do return(max_cursor) %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

packages:
- git: "https://github.com/fishtown-analytics/dbt-utils.git"
revision: 0.7.3
revision: 0.7.4
Original file line number Diff line number Diff line change
Expand Up @@ -186,24 +186,24 @@ def setup_mssql_db(self):
# wait for service is available
print(f"....Waiting for MS SQL Server to start...{wait_sec} sec")
time.sleep(wait_sec)
# Run additional commands to prepare the table
command_create_db = [
"docker",
"exec",
f"{self.container_prefix}_mssql",
"/opt/mssql-tools/bin/sqlcmd",
"-S",
config["host"],
"-U",
config["username"],
"-P",
config["password"],
"-Q",
f"CREATE DATABASE [{config['database']}]",
]
# create test db
print("Executing: ", " ".join(command_create_db))
subprocess.call(command_create_db)
# Run additional commands to prepare the table
command_create_db = [
"docker",
"exec",
f"{self.container_prefix}_mssql",
"/opt/mssql-tools/bin/sqlcmd",
"-S",
config["host"],
"-U",
config["username"],
"-P",
config["password"],
"-Q",
f"CREATE DATABASE [{config['database']}]",
]
# create test db
print("Executing: ", " ".join(command_create_db))
subprocess.call(command_create_db)
if not os.path.exists("../secrets"):
os.makedirs("../secrets")
with open("../secrets/mssql.json", "w") as fh:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ dedup_data as (
-- additionally, we generate a unique key for the scd table
row_number() over (
partition by _airbyte_unique_key, _airbyte_start_at, _airbyte_emitted_at
order by _airbyte_ab_id
order by _airbyte_active_row desc, _airbyte_ab_id
) as _airbyte_row_num,
to_hex(md5(cast(concat(coalesce(cast(_airbyte_unique_key as
string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ dedup_data as (
-- additionally, we generate a unique key for the scd table
row_number() over (
partition by _airbyte_unique_key, _airbyte_start_at, _airbyte_emitted_at
order by _airbyte_ab_id
order by _airbyte_active_row desc, _airbyte_ab_id
) as _airbyte_row_num,
{{ dbt_utils.surrogate_key([
'_airbyte_unique_key',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ sources:
- name: _airbyte_raw_conflict_stream_scalar
- name: _airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
- name: _airbyte_raw_non_nested_stream_without_namespace_resulting_into_long_names
- name: _airbyte_raw_some_stream_that_was_empty
- name: _airbyte_raw_unnest_alias
- name: test_normalization_namespace
quoting:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ dedup_data as (
-- additionally, we generate a unique key for the scd table
row_number() over (
partition by _airbyte_unique_key, _airbyte_start_at, _airbyte_emitted_at
order by _airbyte_ab_id
order by _airbyte_active_row desc, _airbyte_ab_id
) as _airbyte_row_num,
to_hex(md5(cast(concat(coalesce(cast(_airbyte_unique_key as
string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ dedup_data as (
-- additionally, we generate a unique key for the scd table
row_number() over (
partition by _airbyte_unique_key, _airbyte_start_at, _airbyte_emitted_at
order by _airbyte_ab_id
order by _airbyte_active_row desc, _airbyte_ab_id
) as _airbyte_row_num,
{{ dbt_utils.surrogate_key([
'_airbyte_unique_key',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ dedup_data as (
-- additionally, we generate a unique key for the scd table
row_number() over (
partition by _airbyte_unique_key, _airbyte_start_at, _airbyte_emitted_at
order by _airbyte_ab_id
order by _airbyte_active_row desc, _airbyte_ab_id
) as _airbyte_row_num,
{{ dbt_utils.surrogate_key([
'_airbyte_unique_key',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ dedup_data as (
-- additionally, we generate a unique key for the scd table
row_number() over (
partition by _airbyte_unique_key, _airbyte_start_at, _airbyte_emitted_at
order by _airbyte_ab_id
order by _airbyte_active_row desc, _airbyte_ab_id
) as _airbyte_row_num,
convert(varchar(32), HashBytes(''md5'', coalesce(cast(

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
USE [test_normalization];
EXEC('create view test_normalization."nested_stream_with_co___long_names_partition_temp_view" as

with __dbt__CTE__nested_stream_with_co___long_names_partition_ab1 as (
with __dbt__cte__nested_stream_with_co___long_names_partition_ab1 as (

-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: "test_normalization".test_normalization."nested_stream_with_co__lting_into_long_names_scd"
Expand All @@ -36,25 +36,25 @@ from "test_normalization".test_normalization."nested_stream_with_co__lting_into_
where 1 = 1
and "partition" is not null

), __dbt__CTE__nested_stream_with_co___long_names_partition_ab2 as (
), __dbt__cte__nested_stream_with_co___long_names_partition_ab2 as (

-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: __dbt__CTE__nested_stream_with_co___long_names_partition_ab1
-- depends_on: __dbt__cte__nested_stream_with_co___long_names_partition_ab1
select
_airbyte_nested_strea__nto_long_names_hashid,
double_array_data,
"DATA",
_airbyte_ab_id,
_airbyte_emitted_at,
SYSDATETIME() as _airbyte_normalized_at
from __dbt__CTE__nested_stream_with_co___long_names_partition_ab1
from __dbt__cte__nested_stream_with_co___long_names_partition_ab1
-- partition at nested_stream_with_complex_columns_resulting_into_long_names/partition
where 1 = 1

), __dbt__CTE__nested_stream_with_co___long_names_partition_ab3 as (
), __dbt__cte__nested_stream_with_co___long_names_partition_ab3 as (

-- SQL model to build a hash column based on the values of this record
-- depends_on: __dbt__CTE__nested_stream_with_co___long_names_partition_ab2
-- depends_on: __dbt__cte__nested_stream_with_co___long_names_partition_ab2
select
convert(varchar(32), HashBytes(''md5'', coalesce(cast(

Expand All @@ -68,12 +68,12 @@ select
VARCHAR(max)), ''''),''''), '''') as
VARCHAR(max)), '''')), 2) as _airbyte_partition_hashid,
tmp.*
from __dbt__CTE__nested_stream_with_co___long_names_partition_ab2 tmp
from __dbt__cte__nested_stream_with_co___long_names_partition_ab2 tmp
-- partition at nested_stream_with_complex_columns_resulting_into_long_names/partition
where 1 = 1

)-- Final base SQL model
-- depends_on: __dbt__CTE__nested_stream_with_co___long_names_partition_ab3
-- depends_on: __dbt__cte__nested_stream_with_co___long_names_partition_ab3
select
_airbyte_nested_strea__nto_long_names_hashid,
double_array_data,
Expand All @@ -82,7 +82,7 @@ select
_airbyte_emitted_at,
SYSDATETIME() as _airbyte_normalized_at,
_airbyte_partition_hashid
from __dbt__CTE__nested_stream_with_co___long_names_partition_ab3
from __dbt__cte__nested_stream_with_co___long_names_partition_ab3
-- partition at nested_stream_with_complex_columns_resulting_into_long_names/partition from "test_normalization".test_normalization."nested_stream_with_co__lting_into_long_names_scd"
where 1 = 1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
USE [test_normalization];
EXEC('create view test_normalization."nested_stream_with_co___names_partition_data_temp_view" as

with __dbt__CTE__nested_stream_with_co___names_partition_data_ab1 as (
with __dbt__cte__nested_stream_with_co___names_partition_data_ab1 as (

-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: "test_normalization".test_normalization."nested_stream_with_co___long_names_partition"
Expand All @@ -45,25 +45,25 @@ from "test_normalization".test_normalization."nested_stream_with_co___long_names
where 1 = 1
and "DATA" is not null

), __dbt__CTE__nested_stream_with_co___names_partition_data_ab2 as (
), __dbt__cte__nested_stream_with_co___names_partition_data_ab2 as (

-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: __dbt__CTE__nested_stream_with_co___names_partition_data_ab1
-- depends_on: __dbt__cte__nested_stream_with_co___names_partition_data_ab1
select
_airbyte_partition_hashid,
cast(currency as
VARCHAR(max)) as currency,
_airbyte_ab_id,
_airbyte_emitted_at,
SYSDATETIME() as _airbyte_normalized_at
from __dbt__CTE__nested_stream_with_co___names_partition_data_ab1
from __dbt__cte__nested_stream_with_co___names_partition_data_ab1
-- DATA at nested_stream_with_complex_columns_resulting_into_long_names/partition/DATA
where 1 = 1

), __dbt__CTE__nested_stream_with_co___names_partition_data_ab3 as (
), __dbt__cte__nested_stream_with_co___names_partition_data_ab3 as (

-- SQL model to build a hash column based on the values of this record
-- depends_on: __dbt__CTE__nested_stream_with_co___names_partition_data_ab2
-- depends_on: __dbt__cte__nested_stream_with_co___names_partition_data_ab2
select
convert(varchar(32), HashBytes(''md5'', coalesce(cast(

Expand All @@ -74,20 +74,20 @@ select
VARCHAR(max)), ''''),''''), '''') as
VARCHAR(max)), '''')), 2) as _airbyte_data_hashid,
tmp.*
from __dbt__CTE__nested_stream_with_co___names_partition_data_ab2 tmp
from __dbt__cte__nested_stream_with_co___names_partition_data_ab2 tmp
-- DATA at nested_stream_with_complex_columns_resulting_into_long_names/partition/DATA
where 1 = 1

)-- Final base SQL model
-- depends_on: __dbt__CTE__nested_stream_with_co___names_partition_data_ab3
-- depends_on: __dbt__cte__nested_stream_with_co___names_partition_data_ab3
select
_airbyte_partition_hashid,
currency,
_airbyte_ab_id,
_airbyte_emitted_at,
SYSDATETIME() as _airbyte_normalized_at,
_airbyte_data_hashid
from __dbt__CTE__nested_stream_with_co___names_partition_data_ab3
from __dbt__cte__nested_stream_with_co___names_partition_data_ab3
-- DATA at nested_stream_with_complex_columns_resulting_into_long_names/partition/DATA from "test_normalization".test_normalization."nested_stream_with_co___long_names_partition"
where 1 = 1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
USE [test_normalization];
EXEC('create view test_normalization."nested_stream_with_co__ion_double_array_data_temp_view" as

with __dbt__CTE__nested_stream_with_co__ion_double_array_data_ab1 as (
with __dbt__cte__nested_stream_with_co__ion_double_array_data_ab1 as (

-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: "test_normalization".test_normalization."nested_stream_with_co___long_names_partition"
Expand All @@ -45,25 +45,25 @@ from "test_normalization".test_normalization."nested_stream_with_co___long_names
where 1 = 1
and double_array_data is not null

), __dbt__CTE__nested_stream_with_co__ion_double_array_data_ab2 as (
), __dbt__cte__nested_stream_with_co__ion_double_array_data_ab2 as (

-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: __dbt__CTE__nested_stream_with_co__ion_double_array_data_ab1
-- depends_on: __dbt__cte__nested_stream_with_co__ion_double_array_data_ab1
select
_airbyte_partition_hashid,
cast(id as
VARCHAR(max)) as id,
_airbyte_ab_id,
_airbyte_emitted_at,
SYSDATETIME() as _airbyte_normalized_at
from __dbt__CTE__nested_stream_with_co__ion_double_array_data_ab1
from __dbt__cte__nested_stream_with_co__ion_double_array_data_ab1
-- double_array_data at nested_stream_with_complex_columns_resulting_into_long_names/partition/double_array_data
where 1 = 1

), __dbt__CTE__nested_stream_with_co__ion_double_array_data_ab3 as (
), __dbt__cte__nested_stream_with_co__ion_double_array_data_ab3 as (

-- SQL model to build a hash column based on the values of this record
-- depends_on: __dbt__CTE__nested_stream_with_co__ion_double_array_data_ab2
-- depends_on: __dbt__cte__nested_stream_with_co__ion_double_array_data_ab2
select
convert(varchar(32), HashBytes(''md5'', coalesce(cast(

Expand All @@ -74,20 +74,20 @@ select
VARCHAR(max)), ''''),''''), '''') as
VARCHAR(max)), '''')), 2) as _airbyte_double_array_data_hashid,
tmp.*
from __dbt__CTE__nested_stream_with_co__ion_double_array_data_ab2 tmp
from __dbt__cte__nested_stream_with_co__ion_double_array_data_ab2 tmp
-- double_array_data at nested_stream_with_complex_columns_resulting_into_long_names/partition/double_array_data
where 1 = 1

)-- Final base SQL model
-- depends_on: __dbt__CTE__nested_stream_with_co__ion_double_array_data_ab3
-- depends_on: __dbt__cte__nested_stream_with_co__ion_double_array_data_ab3
select
_airbyte_partition_hashid,
id,
_airbyte_ab_id,
_airbyte_emitted_at,
SYSDATETIME() as _airbyte_normalized_at,
_airbyte_double_array_data_hashid
from __dbt__CTE__nested_stream_with_co__ion_double_array_data_ab3
from __dbt__cte__nested_stream_with_co__ion_double_array_data_ab3
-- double_array_data at nested_stream_with_complex_columns_resulting_into_long_names/partition/double_array_data from "test_normalization".test_normalization."nested_stream_with_co___long_names_partition"
where 1 = 1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ dedup_data as (
-- additionally, we generate a unique key for the scd table
row_number() over (
partition by _airbyte_unique_key, _airbyte_start_at, _airbyte_emitted_at
order by _airbyte_ab_id
order by _airbyte_active_row desc, _airbyte_ab_id
) as _airbyte_row_num,
{{ dbt_utils.surrogate_key([
'_airbyte_unique_key',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ sources:
- name: _airbyte_raw_conflict_stream_scalar
- name: _airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
- name: _airbyte_raw_non_nested_stream_without_namespace_resulting_into_long_names
- name: _airbyte_raw_some_stream_that_was_empty
- name: _airbyte_raw_unnest_alias
- name: test_normalization_namespace
quoting:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ dedup_data as (
-- additionally, we generate a unique key for the scd table
row_number() over (
partition by _airbyte_unique_key, _airbyte_start_at, _airbyte_emitted_at
order by _airbyte_ab_id
order by _airbyte_active_row desc, _airbyte_ab_id
) as _airbyte_row_num,
convert(varchar(32), HashBytes(''md5'', coalesce(cast(

Expand Down
Loading