diff --git a/airbyte-integrations/bases/base-normalization/Dockerfile b/airbyte-integrations/bases/base-normalization/Dockerfile index be915f01ab0d0..4d2d25c249ad0 100644 --- a/airbyte-integrations/bases/base-normalization/Dockerfile +++ b/airbyte-integrations/bases/base-normalization/Dockerfile @@ -28,5 +28,5 @@ WORKDIR /airbyte ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh" ENTRYPOINT ["/airbyte/entrypoint.sh"] -LABEL io.airbyte.version=0.1.62 +LABEL io.airbyte.version=0.1.63 LABEL io.airbyte.name=airbyte/normalization diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_nested_streams/models/generated/airbyte_incremental/scd/test_normalization/nested_stream_with_complex_columns_resulting_into_long_names_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_nested_streams/models/generated/airbyte_incremental/scd/test_normalization/nested_stream_with_complex_columns_resulting_into_long_names_scd.sql index a1d766dd11035..d814d04ecc61a 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_nested_streams/models/generated/airbyte_incremental/scd/test_normalization/nested_stream_with_complex_columns_resulting_into_long_names_scd.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_nested_streams/models/generated/airbyte_incremental/scd/test_normalization/nested_stream_with_complex_columns_resulting_into_long_names_scd.sql @@ -3,7 +3,7 @@ partition_by = {"field": "_airbyte_active_row", "data_type": "int64", "range": {"start": 0, "end": 1, "interval": 1}}, unique_key = "_airbyte_unique_key_scd", schema = "test_normalization", - post_hook = ['drop view _airbyte_test_normalization.nested_stream_with_complex_columns_resulting_into_long_names_stg'], + post_hook = ["drop view _airbyte_test_normalization.nested_stream_with_complex_columns_resulting_into_long_names_stg"], tags = [ "top-level" ] ) }} -- depends_on: ref('nested_stream_with_complex_columns_resulting_into_long_names_stg') diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql index c2edf0afe7961..cd673ea4b56cf 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql @@ -3,7 +3,7 @@ partition_by = {"field": "_airbyte_active_row", "data_type": "int64", "range": {"start": 0, "end": 1, "interval": 1}}, unique_key = "_airbyte_unique_key_scd", schema = "test_normalization", - post_hook = ['drop view _airbyte_test_normalization.dedup_exchange_rate_stg'], + post_hook = ["drop view _airbyte_test_normalization.dedup_exchange_rate_stg"], tags = [ "top-level" ] ) }} -- depends_on: ref('dedup_exchange_rate_stg') diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/models/generated/sources.yml b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/models/generated/sources.yml index 0e116b2bbec5d..97bf0d05cbd40 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/models/generated/sources.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/models/generated/sources.yml @@ -6,6 +6,7 @@ sources: schema: false identifier: false tables: + - name: _airbyte_raw_1_prefix_startwith_number - name: _airbyte_raw_dedup_cdc_excluded - name: _airbyte_raw_dedup_exchange_rate - name: _airbyte_raw_exchange_rate diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/modified_models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/modified_models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql index cc6694836bb6a..ccec637092e39 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/modified_models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/modified_models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql @@ -3,7 +3,7 @@ partition_by = {"field": "_airbyte_active_row", "data_type": "int64", "range": {"start": 0, "end": 1, "interval": 1}}, unique_key = "_airbyte_unique_key_scd", schema = "test_normalization", - post_hook = ['drop view _airbyte_test_normalization.dedup_exchange_rate_stg'], + post_hook = ["drop view _airbyte_test_normalization.dedup_exchange_rate_stg"], tags = [ "top-level" ] ) }} -- depends_on: ref('dedup_exchange_rate_stg') diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_cdc_excluded_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_cdc_excluded_scd.sql index a20276296c922..99f32737436db 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_cdc_excluded_scd.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_cdc_excluded_scd.sql @@ -1,7 +1,7 @@ {{ config( unique_key = "_airbyte_unique_key_scd", schema = "test_normalization", - post_hook = ['drop view _airbyte_test_normalization.dedup_cdc_excluded_stg'], + post_hook = ["drop view _airbyte_test_normalization.dedup_cdc_excluded_stg"], tags = [ "top-level" ] ) }} -- depends_on: ref('dedup_cdc_excluded_stg') diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql index 1d94573fc99e1..eff375bdc37d9 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql @@ -1,7 +1,7 @@ {{ config( unique_key = "_airbyte_unique_key_scd", schema = "test_normalization", - post_hook = ['drop view _airbyte_test_normalization.dedup_exchange_rate_stg'], + post_hook = ["drop view _airbyte_test_normalization.dedup_exchange_rate_stg"], tags = [ "top-level" ] ) }} -- depends_on: ref('dedup_exchange_rate_stg') diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/sources.yml b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/sources.yml index 0e116b2bbec5d..97bf0d05cbd40 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/sources.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/sources.yml @@ -6,6 +6,7 @@ sources: schema: false identifier: false tables: + - name: _airbyte_raw_1_prefix_startwith_number - name: _airbyte_raw_dedup_cdc_excluded - name: _airbyte_raw_dedup_exchange_rate - name: _airbyte_raw_exchange_rate diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mssql/test_nested_streams/models/generated/airbyte_incremental/scd/test_normalization/nested_stream_with_co__lting_into_long_names_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mssql/test_nested_streams/models/generated/airbyte_incremental/scd/test_normalization/nested_stream_with_co__lting_into_long_names_scd.sql index b04acfdabc525..3afb1b3224823 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mssql/test_nested_streams/models/generated/airbyte_incremental/scd/test_normalization/nested_stream_with_co__lting_into_long_names_scd.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mssql/test_nested_streams/models/generated/airbyte_incremental/scd/test_normalization/nested_stream_with_co__lting_into_long_names_scd.sql @@ -1,7 +1,7 @@ {{ config( unique_key = "_airbyte_unique_key_scd", schema = "test_normalization", - post_hook = ['drop view _airbyte_test_normalization.nested_stream_with_co__lting_into_long_names_stg'], + post_hook = ["drop view _airbyte_test_normalization.nested_stream_with_co__lting_into_long_names_stg"], tags = [ "top-level" ] ) }} -- depends_on: ref('nested_stream_with_co__lting_into_long_names_stg') diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mssql/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mssql/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql index fcc681aa95ba8..7cb32131d4ce2 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mssql/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mssql/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql @@ -1,7 +1,7 @@ {{ config( unique_key = "_airbyte_unique_key_scd", schema = "test_normalization", - post_hook = ['drop view _airbyte_test_normalization.dedup_exchange_rate_stg'], + post_hook = ["drop view _airbyte_test_normalization.dedup_exchange_rate_stg"], tags = [ "top-level" ] ) }} -- depends_on: ref('dedup_exchange_rate_stg') diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mssql/test_simple_streams/models/generated/sources.yml b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mssql/test_simple_streams/models/generated/sources.yml index 0e116b2bbec5d..97bf0d05cbd40 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mssql/test_simple_streams/models/generated/sources.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mssql/test_simple_streams/models/generated/sources.yml @@ -6,6 +6,7 @@ sources: schema: false identifier: false tables: + - name: _airbyte_raw_1_prefix_startwith_number - name: _airbyte_raw_dedup_cdc_excluded - name: _airbyte_raw_dedup_exchange_rate - name: _airbyte_raw_exchange_rate diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mysql/test_nested_streams/models/generated/airbyte_incremental/scd/test_normalization/nested_stream_with_co_1g_into_long_names_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mysql/test_nested_streams/models/generated/airbyte_incremental/scd/test_normalization/nested_stream_with_co_1g_into_long_names_scd.sql index d098146930d0c..d0e8e603259f3 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mysql/test_nested_streams/models/generated/airbyte_incremental/scd/test_normalization/nested_stream_with_co_1g_into_long_names_scd.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mysql/test_nested_streams/models/generated/airbyte_incremental/scd/test_normalization/nested_stream_with_co_1g_into_long_names_scd.sql @@ -1,7 +1,7 @@ {{ config( unique_key = "_airbyte_unique_key_scd", schema = "test_normalization", - post_hook = ['drop view _airbyte_test_normalization.nested_stream_with_co_1g_into_long_names_stg'], + post_hook = ["drop view _airbyte_test_normalization.nested_stream_with_co_1g_into_long_names_stg"], tags = [ "top-level" ] ) }} -- depends_on: ref('nested_stream_with_co_1g_into_long_names_stg') diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mysql/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mysql/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql index 309ac4c903fe5..9bf09bdcaa8ff 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mysql/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mysql/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql @@ -1,7 +1,7 @@ {{ config( unique_key = "_airbyte_unique_key_scd", schema = "test_normalization", - post_hook = ['drop view _airbyte_test_normalization.dedup_exchange_rate_stg'], + post_hook = ["drop view _airbyte_test_normalization.dedup_exchange_rate_stg"], tags = [ "top-level" ] ) }} -- depends_on: ref('dedup_exchange_rate_stg') diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mysql/test_simple_streams/models/generated/sources.yml b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mysql/test_simple_streams/models/generated/sources.yml index 0e116b2bbec5d..97bf0d05cbd40 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mysql/test_simple_streams/models/generated/sources.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mysql/test_simple_streams/models/generated/sources.yml @@ -6,6 +6,7 @@ sources: schema: false identifier: false tables: + - name: _airbyte_raw_1_prefix_startwith_number - name: _airbyte_raw_dedup_cdc_excluded - name: _airbyte_raw_dedup_exchange_rate - name: _airbyte_raw_exchange_rate diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/oracle/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/oracle/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql index 8e8364a7b5072..712f6bd747522 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/oracle/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/oracle/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql @@ -1,7 +1,7 @@ {{ config( unique_key = "{{ quote('_AIRBYTE_UNIQUE_KEY_SCD') }}", schema = "test_normalization", - post_hook = ['drop view test_normalization.dedup_exchange_rate_stg'], + post_hook = ["drop view test_normalization.dedup_exchange_rate_stg"], tags = [ "top-level" ] ) }} -- depends_on: ref('dedup_exchange_rate_stg') diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/oracle/test_simple_streams/models/generated/sources.yml b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/oracle/test_simple_streams/models/generated/sources.yml index 3faad76c57b34..b5460fb4d43dd 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/oracle/test_simple_streams/models/generated/sources.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/oracle/test_simple_streams/models/generated/sources.yml @@ -6,6 +6,7 @@ sources: schema: false identifier: false tables: + - name: airbyte_raw_1_prefix_startwith_number - name: airbyte_raw_dedup_cdc_excluded - name: airbyte_raw_dedup_exchange_rate - name: airbyte_raw_exchange_rate diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_incremental/scd/test_normalization/nested_stream_with_c__lting_into_long_names_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_incremental/scd/test_normalization/nested_stream_with_c__lting_into_long_names_scd.sql index 8772de10b5e74..03e7d58bbeab2 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_incremental/scd/test_normalization/nested_stream_with_c__lting_into_long_names_scd.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_incremental/scd/test_normalization/nested_stream_with_c__lting_into_long_names_scd.sql @@ -2,7 +2,7 @@ indexes = [{'columns':['_airbyte_active_row','_airbyte_unique_key_scd','_airbyte_emitted_at'],'type': 'btree'}], unique_key = "_airbyte_unique_key_scd", schema = "test_normalization", - post_hook = ['delete from _airbyte_test_normalization.nested_stream_with_c__lting_into_long_names_stg where _airbyte_emitted_at != (select max(_airbyte_emitted_at) from _airbyte_test_normalization.nested_stream_with_c__lting_into_long_names_stg)'], + post_hook = ["delete from _airbyte_test_normalization.nested_stream_with_c__lting_into_long_names_stg where _airbyte_emitted_at != (select max(_airbyte_emitted_at) from _airbyte_test_normalization.nested_stream_with_c__lting_into_long_names_stg)"], tags = [ "top-level" ] ) }} -- depends_on: ref('nested_stream_with_c__lting_into_long_names_stg') diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_incremental/scd/test_normalization/some_stream_that_was_empty_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_incremental/scd/test_normalization/some_stream_that_was_empty_scd.sql index 1eba7ba7bd0ba..0caa4d9bfc659 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_incremental/scd/test_normalization/some_stream_that_was_empty_scd.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_incremental/scd/test_normalization/some_stream_that_was_empty_scd.sql @@ -2,7 +2,7 @@ indexes = [{'columns':['_airbyte_active_row','_airbyte_unique_key_scd','_airbyte_emitted_at'],'type': 'btree'}], unique_key = "_airbyte_unique_key_scd", schema = "test_normalization", - post_hook = ['delete from _airbyte_test_normalization.some_stream_that_was_empty_stg where _airbyte_emitted_at != (select max(_airbyte_emitted_at) from _airbyte_test_normalization.some_stream_that_was_empty_stg)'], + post_hook = ["delete from _airbyte_test_normalization.some_stream_that_was_empty_stg where _airbyte_emitted_at != (select max(_airbyte_emitted_at) from _airbyte_test_normalization.some_stream_that_was_empty_stg)"], tags = [ "top-level" ] ) }} -- depends_on: ref('some_stream_that_was_empty_stg') diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/dbt_project.yml b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/dbt_project.yml index 522b1e595e8ac..9ad8158759001 100755 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/dbt_project.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/dbt_project.yml @@ -15,7 +15,7 @@ profile: 'normalize' # These configurations specify where dbt should look for different types of files. # The `source-paths` config, for example, states that source models can be found # in the "models/" directory. You probably won't need to change these! -source-paths: ["modified_models"] +source-paths: ["models"] docs-paths: ["docs"] analysis-paths: ["analysis"] test-paths: ["tests"] diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/first_dbt_project.yml b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/first_dbt_project.yml deleted file mode 100644 index 9ad8158759001..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/first_dbt_project.yml +++ /dev/null @@ -1,63 +0,0 @@ -# This file is necessary to install dbt-utils with dbt deps -# the content will be overwritten by the transform function - -# Name your package! Package names should contain only lowercase characters -# and underscores. A good package name should reflect your organization's -# name or the intended use of these models -name: 'airbyte_utils' -version: '1.0' -config-version: 2 - -# This setting configures which "profile" dbt uses for this project. Profiles contain -# database connection information, and should be configured in the ~/.dbt/profiles.yml file -profile: 'normalize' - -# These configurations specify where dbt should look for different types of files. -# The `source-paths` config, for example, states that source models can be found -# in the "models/" directory. You probably won't need to change these! -source-paths: ["models"] -docs-paths: ["docs"] -analysis-paths: ["analysis"] -test-paths: ["tests"] -data-paths: ["data"] -macro-paths: ["macros"] - -target-path: "../build" # directory which will store compiled SQL files -log-path: "../logs" # directory which will store DBT logs -modules-path: "/tmp/dbt_modules" # directory which will store external DBT dependencies - -clean-targets: # directories to be removed by `dbt clean` - - "build" - - "dbt_modules" - -quoting: - database: true -# Temporarily disabling the behavior of the ExtendedNameTransformer on table/schema names, see (issue #1785) -# all schemas should be unquoted - schema: false - identifier: true - -# You can define configurations for models in the `source-paths` directory here. -# Using these configurations, you can enable or disable models, change how they -# are materialized, and more! -models: - airbyte_utils: - +materialized: table - generated: - airbyte_ctes: - +tags: airbyte_internal_cte - +materialized: ephemeral - airbyte_incremental: - +tags: incremental_tables - +materialized: incremental - +on_schema_change: sync_all_columns - airbyte_tables: - +tags: normalized_tables - +materialized: table - airbyte_views: - +tags: airbyte_internal_views - +materialized: view - -dispatch: - - macro_namespace: dbt_utils - search_order: ['airbyte_utils', 'dbt_utils'] diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/first_output/airbyte_incremental/scd/test_normalization/1_prefix_startwith_number_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/first_output/airbyte_incremental/scd/test_normalization/1_prefix_startwith_number_scd.sql new file mode 100644 index 0000000000000..203534b3d53b5 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/first_output/airbyte_incremental/scd/test_normalization/1_prefix_startwith_number_scd.sql @@ -0,0 +1,85 @@ + + + + create table "postgres".test_normalization."1_prefix_startwith_number_scd" + as ( + +-- depends_on: ref('1_prefix_startwith_number_stg') +with + +input_data as ( + select * + from "postgres"._airbyte_test_normalization."1_prefix_startwith_number_stg" + -- 1_prefix_startwith_number from "postgres".test_normalization._airbyte_raw_1_prefix_startwith_number +), + +scd_data as ( + -- SQL model to build a Type 2 Slowly Changing Dimension (SCD) table for each record identified by their primary key + select + md5(cast(coalesce(cast("id" as + varchar +), '') as + varchar +)) as _airbyte_unique_key, + "id", + "date", + "text", + "date" as _airbyte_start_at, + lag("date") over ( + partition by "id" + order by + "date" is null asc, + "date" desc, + _airbyte_emitted_at desc + ) as _airbyte_end_at, + case when row_number() over ( + partition by "id" + order by + "date" is null asc, + "date" desc, + _airbyte_emitted_at desc + ) = 1 then 1 else 0 end as _airbyte_active_row, + _airbyte_ab_id, + _airbyte_emitted_at, + _airbyte_1_prefix_startwith_number_hashid + from input_data +), +dedup_data as ( + select + -- we need to ensure de-duplicated rows for merge/update queries + -- additionally, we generate a unique key for the scd table + row_number() over ( + partition by + _airbyte_unique_key, + _airbyte_start_at, + _airbyte_emitted_at + order by _airbyte_active_row desc, _airbyte_ab_id + ) as _airbyte_row_num, + md5(cast(coalesce(cast(_airbyte_unique_key as + varchar +), '') || '-' || coalesce(cast(_airbyte_start_at as + varchar +), '') || '-' || coalesce(cast(_airbyte_emitted_at as + varchar +), '') as + varchar +)) as _airbyte_unique_key_scd, + scd_data.* + from scd_data +) +select + _airbyte_unique_key, + _airbyte_unique_key_scd, + "id", + "date", + "text", + _airbyte_start_at, + _airbyte_end_at, + _airbyte_active_row, + _airbyte_ab_id, + _airbyte_emitted_at, + now() as _airbyte_normalized_at, + _airbyte_1_prefix_startwith_number_hashid +from dedup_data where _airbyte_row_num = 1 + ); + \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/first_output/airbyte_incremental/test_normalization/1_prefix_startwith_number.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/first_output/airbyte_incremental/test_normalization/1_prefix_startwith_number.sql new file mode 100644 index 0000000000000..aad38834ac949 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/first_output/airbyte_incremental/test_normalization/1_prefix_startwith_number.sql @@ -0,0 +1,24 @@ + + + + create table "postgres".test_normalization."1_prefix_startwith_number" + as ( + +-- Final base SQL model +-- depends_on: "postgres".test_normalization."1_prefix_startwith_number_scd" +select + _airbyte_unique_key, + "id", + "date", + "text", + _airbyte_ab_id, + _airbyte_emitted_at, + now() as _airbyte_normalized_at, + _airbyte_1_prefix_startwith_number_hashid +from "postgres".test_normalization."1_prefix_startwith_number_scd" +-- 1_prefix_startwith_number from "postgres".test_normalization._airbyte_raw_1_prefix_startwith_number +where 1 = 1 +and _airbyte_active_row = 1 + + ); + \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/first_output/airbyte_incremental/test_normalization/1_prefix_startwith_number_stg.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/first_output/airbyte_incremental/test_normalization/1_prefix_startwith_number_stg.sql new file mode 100644 index 0000000000000..1d6a4096615f8 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/first_output/airbyte_incremental/test_normalization/1_prefix_startwith_number_stg.sql @@ -0,0 +1,61 @@ + + + + create table "postgres"._airbyte_test_normalization."1_prefix_startwith_number_stg" + as ( + +with __dbt__cte__1_prefix_startwith_number_ab1 as ( + +-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema +-- depends_on: "postgres".test_normalization._airbyte_raw_1_prefix_startwith_number +select + jsonb_extract_path_text(_airbyte_data, 'id') as "id", + jsonb_extract_path_text(_airbyte_data, 'date') as "date", + jsonb_extract_path_text(_airbyte_data, 'text') as "text", + _airbyte_ab_id, + _airbyte_emitted_at, + now() as _airbyte_normalized_at +from "postgres".test_normalization._airbyte_raw_1_prefix_startwith_number as table_alias +-- 1_prefix_startwith_number +where 1 = 1 + +), __dbt__cte__1_prefix_startwith_number_ab2 as ( + +-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type +-- depends_on: __dbt__cte__1_prefix_startwith_number_ab1 +select + cast("id" as + bigint +) as "id", + cast(nullif("date", '') as + date +) as "date", + cast("text" as + varchar +) as "text", + _airbyte_ab_id, + _airbyte_emitted_at, + now() as _airbyte_normalized_at +from __dbt__cte__1_prefix_startwith_number_ab1 +-- 1_prefix_startwith_number +where 1 = 1 + +)-- SQL model to build a hash column based on the values of this record +-- depends_on: __dbt__cte__1_prefix_startwith_number_ab2 +select + md5(cast(coalesce(cast("id" as + varchar +), '') || '-' || coalesce(cast("date" as + varchar +), '') || '-' || coalesce(cast("text" as + varchar +), '') as + varchar +)) as _airbyte_1_prefix_startwith_number_hashid, + tmp.* +from __dbt__cte__1_prefix_startwith_number_ab2 tmp +-- 1_prefix_startwith_number +where 1 = 1 + + ); + \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_ctes/test_normalization/1_prefix_startwith_number_ab1.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_ctes/test_normalization/1_prefix_startwith_number_ab1.sql new file mode 100644 index 0000000000000..080ffcc0b14c6 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_ctes/test_normalization/1_prefix_startwith_number_ab1.sql @@ -0,0 +1,20 @@ +{{ config( + indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}], + unique_key = '_airbyte_ab_id', + schema = "_airbyte_test_normalization", + tags = [ "top-level-intermediate" ] +) }} +-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema +-- depends_on: {{ source('test_normalization', '_airbyte_raw_1_prefix_startwith_number') }} +select + {{ json_extract_scalar('_airbyte_data', ['id'], ['id']) }} as {{ adapter.quote('id') }}, + {{ json_extract_scalar('_airbyte_data', ['date'], ['date']) }} as {{ adapter.quote('date') }}, + {{ json_extract_scalar('_airbyte_data', ['text'], ['text']) }} as {{ adapter.quote('text') }}, + _airbyte_ab_id, + _airbyte_emitted_at, + {{ current_timestamp() }} as _airbyte_normalized_at +from {{ source('test_normalization', '_airbyte_raw_1_prefix_startwith_number') }} as table_alias +-- 1_prefix_startwith_number +where 1 = 1 +{{ incremental_clause('_airbyte_emitted_at') }} + diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_ctes/test_normalization/renamed_dedup_cdc_excluded_ab2.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_ctes/test_normalization/1_prefix_startwith_number_ab2.sql similarity index 54% rename from airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_ctes/test_normalization/renamed_dedup_cdc_excluded_ab2.sql rename to airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_ctes/test_normalization/1_prefix_startwith_number_ab2.sql index 0718ac05fcbf9..5402072233ba4 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_ctes/test_normalization/renamed_dedup_cdc_excluded_ab2.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_ctes/test_normalization/1_prefix_startwith_number_ab2.sql @@ -5,18 +5,16 @@ tags = [ "top-level-intermediate" ] ) }} -- SQL model to cast each column to its adequate SQL type converted from the JSON schema type --- depends_on: {{ ref('renamed_dedup_cdc_excluded_ab1') }} +-- depends_on: {{ ref('1_prefix_startwith_number_ab1') }} select cast({{ adapter.quote('id') }} as {{ dbt_utils.type_bigint() }}) as {{ adapter.quote('id') }}, - cast({{ adapter.quote('name') }} as {{ dbt_utils.type_string() }}) as {{ adapter.quote('name') }}, - cast(_ab_cdc_lsn as {{ dbt_utils.type_float() }}) as _ab_cdc_lsn, - cast(_ab_cdc_updated_at as {{ dbt_utils.type_float() }}) as _ab_cdc_updated_at, - cast(_ab_cdc_deleted_at as {{ dbt_utils.type_float() }}) as _ab_cdc_deleted_at, + cast({{ empty_string_to_null(adapter.quote('date')) }} as {{ type_date() }}) as {{ adapter.quote('date') }}, + cast({{ adapter.quote('text') }} as {{ dbt_utils.type_string() }}) as {{ adapter.quote('text') }}, _airbyte_ab_id, _airbyte_emitted_at, {{ current_timestamp() }} as _airbyte_normalized_at -from {{ ref('renamed_dedup_cdc_excluded_ab1') }} --- renamed_dedup_cdc_excluded +from {{ ref('1_prefix_startwith_number_ab1') }} +-- 1_prefix_startwith_number where 1 = 1 {{ incremental_clause('_airbyte_emitted_at') }} diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/1_prefix_startwith_number_scd.sql similarity index 68% rename from airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql rename to airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/1_prefix_startwith_number_scd.sql index 5db2e106d7ddf..e2ade95cd401b 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/1_prefix_startwith_number_scd.sql @@ -2,18 +2,18 @@ indexes = [{'columns':['_airbyte_active_row','_airbyte_unique_key_scd','_airbyte_emitted_at'],'type': 'btree'}], unique_key = "_airbyte_unique_key_scd", schema = "test_normalization", - post_hook = ['delete from _airbyte_test_normalization.dedup_exchange_rate_stg where _airbyte_emitted_at != (select max(_airbyte_emitted_at) from _airbyte_test_normalization.dedup_exchange_rate_stg)'], + post_hook = ["delete from _airbyte_test_normalization.{{ adapter.quote('1_prefix_startwith_number_stg') }} where _airbyte_emitted_at != (select max(_airbyte_emitted_at) from _airbyte_test_normalization.{{ adapter.quote('1_prefix_startwith_number_stg') }})"], tags = [ "top-level" ] ) }} --- depends_on: ref('dedup_exchange_rate_stg') +-- depends_on: ref('1_prefix_startwith_number_stg') with {% if is_incremental() %} new_data as ( -- retrieve incremental "new" data select * - from {{ ref('dedup_exchange_rate_stg') }} - -- dedup_exchange_rate from {{ source('test_normalization', '_airbyte_raw_dedup_exchange_rate') }} + from {{ ref('1_prefix_startwith_number_stg') }} + -- 1_prefix_startwith_number from {{ source('test_normalization', '_airbyte_raw_1_prefix_startwith_number') }} where 1 = 1 {{ incremental_clause('_airbyte_emitted_at') }} ), @@ -22,8 +22,6 @@ new_data_ids as ( select distinct {{ dbt_utils.surrogate_key([ adapter.quote('id'), - 'currency', - 'nzd', ]) }} as _airbyte_unique_key from new_data ), @@ -34,7 +32,7 @@ empty_new_data as ( previous_active_scd_data as ( -- retrieve "incomplete old" data that needs to be updated with an end date because of new changes select - {{ star_intersect(ref('dedup_exchange_rate_stg'), this, from_alias='inc_data', intersect_alias='this_data') }} + {{ star_intersect(ref('1_prefix_startwith_number_stg'), this, from_alias='inc_data', intersect_alias='this_data') }} from {{ this }} as this_data -- make a join with new_data using primary key to filter active data that need to be updated only join new_data_ids on this_data._airbyte_unique_key = new_data_ids._airbyte_unique_key @@ -43,15 +41,15 @@ previous_active_scd_data as ( where _airbyte_active_row = 1 ), input_data as ( - select {{ dbt_utils.star(ref('dedup_exchange_rate_stg')) }} from new_data + select {{ dbt_utils.star(ref('1_prefix_startwith_number_stg')) }} from new_data union all - select {{ dbt_utils.star(ref('dedup_exchange_rate_stg')) }} from previous_active_scd_data + select {{ dbt_utils.star(ref('1_prefix_startwith_number_stg')) }} from previous_active_scd_data ), {% else %} input_data as ( select * - from {{ ref('dedup_exchange_rate_stg') }} - -- dedup_exchange_rate from {{ source('test_normalization', '_airbyte_raw_dedup_exchange_rate') }} + from {{ ref('1_prefix_startwith_number_stg') }} + -- 1_prefix_startwith_number from {{ source('test_normalization', '_airbyte_raw_1_prefix_startwith_number') }} ), {% endif %} scd_data as ( @@ -59,27 +57,20 @@ scd_data as ( select {{ dbt_utils.surrogate_key([ adapter.quote('id'), - 'currency', - 'nzd', ]) }} as _airbyte_unique_key, {{ adapter.quote('id') }}, - currency, - new_column, {{ adapter.quote('date') }}, - timestamp_col, - {{ adapter.quote('HKD@spéçiäl & characters') }}, - nzd, - usd, + {{ adapter.quote('text') }}, {{ adapter.quote('date') }} as _airbyte_start_at, lag({{ adapter.quote('date') }}) over ( - partition by cast({{ adapter.quote('id') }} as {{ dbt_utils.type_string() }}), currency, cast(nzd as {{ dbt_utils.type_string() }}) + partition by {{ adapter.quote('id') }} order by {{ adapter.quote('date') }} is null asc, {{ adapter.quote('date') }} desc, _airbyte_emitted_at desc ) as _airbyte_end_at, case when row_number() over ( - partition by cast({{ adapter.quote('id') }} as {{ dbt_utils.type_string() }}), currency, cast(nzd as {{ dbt_utils.type_string() }}) + partition by {{ adapter.quote('id') }} order by {{ adapter.quote('date') }} is null asc, {{ adapter.quote('date') }} desc, @@ -87,7 +78,7 @@ scd_data as ( ) = 1 then 1 else 0 end as _airbyte_active_row, _airbyte_ab_id, _airbyte_emitted_at, - _airbyte_dedup_exchange_rate_hashid + _airbyte_1_prefix_startwith_number_hashid from input_data ), dedup_data as ( @@ -113,19 +104,14 @@ select _airbyte_unique_key, _airbyte_unique_key_scd, {{ adapter.quote('id') }}, - currency, - new_column, {{ adapter.quote('date') }}, - timestamp_col, - {{ adapter.quote('HKD@spéçiäl & characters') }}, - nzd, - usd, + {{ adapter.quote('text') }}, _airbyte_start_at, _airbyte_end_at, _airbyte_active_row, _airbyte_ab_id, _airbyte_emitted_at, {{ current_timestamp() }} as _airbyte_normalized_at, - _airbyte_dedup_exchange_rate_hashid + _airbyte_1_prefix_startwith_number_hashid from dedup_data where _airbyte_row_num = 1 diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_cdc_excluded_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_cdc_excluded_scd.sql index 98325193a5f00..2fb3816fb87fa 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_cdc_excluded_scd.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_cdc_excluded_scd.sql @@ -2,7 +2,7 @@ indexes = [{'columns':['_airbyte_active_row','_airbyte_unique_key_scd','_airbyte_emitted_at'],'type': 'btree'}], unique_key = "_airbyte_unique_key_scd", schema = "test_normalization", - post_hook = ['delete from _airbyte_test_normalization.dedup_cdc_excluded_stg where _airbyte_emitted_at != (select max(_airbyte_emitted_at) from _airbyte_test_normalization.dedup_cdc_excluded_stg)'], + post_hook = ["delete from _airbyte_test_normalization.dedup_cdc_excluded_stg where _airbyte_emitted_at != (select max(_airbyte_emitted_at) from _airbyte_test_normalization.dedup_cdc_excluded_stg)"], tags = [ "top-level" ] ) }} -- depends_on: ref('dedup_cdc_excluded_stg') diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql index 87453dc261145..7234b26c0f810 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql @@ -2,7 +2,7 @@ indexes = [{'columns':['_airbyte_active_row','_airbyte_unique_key_scd','_airbyte_emitted_at'],'type': 'btree'}], unique_key = "_airbyte_unique_key_scd", schema = "test_normalization", - post_hook = ['delete from _airbyte_test_normalization.dedup_exchange_rate_stg where _airbyte_emitted_at != (select max(_airbyte_emitted_at) from _airbyte_test_normalization.dedup_exchange_rate_stg)'], + post_hook = ["delete from _airbyte_test_normalization.dedup_exchange_rate_stg where _airbyte_emitted_at != (select max(_airbyte_emitted_at) from _airbyte_test_normalization.dedup_exchange_rate_stg)"], tags = [ "top-level" ] ) }} -- depends_on: ref('dedup_exchange_rate_stg') diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/multiple_column_names_conflicts_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/multiple_column_names_conflicts_scd.sql index 3fcf2e971cbc6..736e25452ae3b 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/multiple_column_names_conflicts_scd.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/multiple_column_names_conflicts_scd.sql @@ -2,7 +2,7 @@ indexes = [{'columns':['_airbyte_active_row','_airbyte_unique_key_scd','_airbyte_emitted_at'],'type': 'btree'}], unique_key = "_airbyte_unique_key_scd", schema = "test_normalization", - post_hook = ['delete from _airbyte_test_normalization.multiple_column_names_conflicts_stg where _airbyte_emitted_at != (select max(_airbyte_emitted_at) from _airbyte_test_normalization.multiple_column_names_conflicts_stg)'], + post_hook = ["delete from _airbyte_test_normalization.multiple_column_names_conflicts_stg where _airbyte_emitted_at != (select max(_airbyte_emitted_at) from _airbyte_test_normalization.multiple_column_names_conflicts_stg)"], tags = [ "top-level" ] ) }} -- depends_on: ref('multiple_column_names_conflicts_stg') diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/pos_dedup_cdcx_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/pos_dedup_cdcx_scd.sql index 36ce51399a3f3..1512b6fe8546a 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/pos_dedup_cdcx_scd.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/pos_dedup_cdcx_scd.sql @@ -2,7 +2,7 @@ indexes = [{'columns':['_airbyte_active_row','_airbyte_unique_key_scd','_airbyte_emitted_at'],'type': 'btree'}], unique_key = "_airbyte_unique_key_scd", schema = "test_normalization", - post_hook = ['delete from _airbyte_test_normalization.pos_dedup_cdcx_stg where _airbyte_emitted_at != (select max(_airbyte_emitted_at) from _airbyte_test_normalization.pos_dedup_cdcx_stg)'], + post_hook = ["delete from _airbyte_test_normalization.pos_dedup_cdcx_stg where _airbyte_emitted_at != (select max(_airbyte_emitted_at) from _airbyte_test_normalization.pos_dedup_cdcx_stg)"], tags = [ "top-level" ] ) }} -- depends_on: ref('pos_dedup_cdcx_stg') diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/renamed_dedup_cdc_excluded_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/renamed_dedup_cdc_excluded_scd.sql index 0c50939426f79..4fbd681d8ee61 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/renamed_dedup_cdc_excluded_scd.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/renamed_dedup_cdc_excluded_scd.sql @@ -2,7 +2,7 @@ indexes = [{'columns':['_airbyte_active_row','_airbyte_unique_key_scd','_airbyte_emitted_at'],'type': 'btree'}], unique_key = "_airbyte_unique_key_scd", schema = "test_normalization", - post_hook = ['delete from _airbyte_test_normalization.renamed_dedup_cdc_excluded_stg where _airbyte_emitted_at != (select max(_airbyte_emitted_at) from _airbyte_test_normalization.renamed_dedup_cdc_excluded_stg)'], + post_hook = ["delete from _airbyte_test_normalization.renamed_dedup_cdc_excluded_stg where _airbyte_emitted_at != (select max(_airbyte_emitted_at) from _airbyte_test_normalization.renamed_dedup_cdc_excluded_stg)"], tags = [ "top-level" ] ) }} -- depends_on: ref('renamed_dedup_cdc_excluded_stg') diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_incremental/test_normalization/dedup_exchange_rate.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/test_normalization/1_prefix_startwith_number.sql similarity index 58% rename from airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_incremental/test_normalization/dedup_exchange_rate.sql rename to airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/test_normalization/1_prefix_startwith_number.sql index 8529ede3dcfac..77aba25edc2a7 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_incremental/test_normalization/dedup_exchange_rate.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/test_normalization/1_prefix_startwith_number.sql @@ -5,23 +5,18 @@ tags = [ "top-level" ] ) }} -- Final base SQL model --- depends_on: {{ ref('dedup_exchange_rate_scd') }} +-- depends_on: {{ ref('1_prefix_startwith_number_scd') }} select _airbyte_unique_key, {{ adapter.quote('id') }}, - currency, - new_column, {{ adapter.quote('date') }}, - timestamp_col, - {{ adapter.quote('HKD@spéçiäl & characters') }}, - nzd, - usd, + {{ adapter.quote('text') }}, _airbyte_ab_id, _airbyte_emitted_at, {{ current_timestamp() }} as _airbyte_normalized_at, - _airbyte_dedup_exchange_rate_hashid -from {{ ref('dedup_exchange_rate_scd') }} --- dedup_exchange_rate from {{ source('test_normalization', '_airbyte_raw_dedup_exchange_rate') }} + _airbyte_1_prefix_startwith_number_hashid +from {{ ref('1_prefix_startwith_number_scd') }} +-- 1_prefix_startwith_number from {{ source('test_normalization', '_airbyte_raw_1_prefix_startwith_number') }} where 1 = 1 and _airbyte_active_row = 1 {{ incremental_clause('_airbyte_emitted_at') }} diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_incremental/test_normalization/dedup_exchange_rate_stg.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/test_normalization/1_prefix_startwith_number_stg.sql similarity index 58% rename from airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_incremental/test_normalization/dedup_exchange_rate_stg.sql rename to airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/test_normalization/1_prefix_startwith_number_stg.sql index a4c1e8816f8ae..69bff1d44aaa7 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_incremental/test_normalization/dedup_exchange_rate_stg.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/test_normalization/1_prefix_startwith_number_stg.sql @@ -5,21 +5,16 @@ tags = [ "top-level-intermediate" ] ) }} -- SQL model to build a hash column based on the values of this record --- depends_on: {{ ref('dedup_exchange_rate_ab2') }} +-- depends_on: {{ ref('1_prefix_startwith_number_ab2') }} select {{ dbt_utils.surrogate_key([ adapter.quote('id'), - 'currency', - 'new_column', adapter.quote('date'), - 'timestamp_col', - adapter.quote('HKD@spéçiäl & characters'), - 'nzd', - 'usd', - ]) }} as _airbyte_dedup_exchange_rate_hashid, + adapter.quote('text'), + ]) }} as _airbyte_1_prefix_startwith_number_hashid, tmp.* -from {{ ref('dedup_exchange_rate_ab2') }} tmp --- dedup_exchange_rate +from {{ ref('1_prefix_startwith_number_ab2') }} tmp +-- 1_prefix_startwith_number where 1 = 1 {{ incremental_clause('_airbyte_emitted_at') }} diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/sources.yml b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/sources.yml index 0e116b2bbec5d..97bf0d05cbd40 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/sources.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/sources.yml @@ -6,6 +6,7 @@ sources: schema: false identifier: false tables: + - name: _airbyte_raw_1_prefix_startwith_number - name: _airbyte_raw_dedup_cdc_excluded - name: _airbyte_raw_dedup_exchange_rate - name: _airbyte_raw_exchange_rate diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_ctes/test_normalization/dedup_exchange_rate_ab1.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_ctes/test_normalization/dedup_exchange_rate_ab1.sql deleted file mode 100644 index 23e1bb70c5879..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_ctes/test_normalization/dedup_exchange_rate_ab1.sql +++ /dev/null @@ -1,25 +0,0 @@ -{{ config( - indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}], - unique_key = '_airbyte_ab_id', - schema = "_airbyte_test_normalization", - tags = [ "top-level-intermediate" ] -) }} --- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema --- depends_on: {{ source('test_normalization', '_airbyte_raw_dedup_exchange_rate') }} -select - {{ json_extract_scalar('_airbyte_data', ['id'], ['id']) }} as {{ adapter.quote('id') }}, - {{ json_extract_scalar('_airbyte_data', ['currency'], ['currency']) }} as currency, - {{ json_extract_scalar('_airbyte_data', ['new_column'], ['new_column']) }} as new_column, - {{ json_extract_scalar('_airbyte_data', ['date'], ['date']) }} as {{ adapter.quote('date') }}, - {{ json_extract_scalar('_airbyte_data', ['timestamp_col'], ['timestamp_col']) }} as timestamp_col, - {{ json_extract_scalar('_airbyte_data', ['HKD@spéçiäl & characters'], ['HKD@spéçiäl & characters']) }} as {{ adapter.quote('HKD@spéçiäl & characters') }}, - {{ json_extract_scalar('_airbyte_data', ['NZD'], ['NZD']) }} as nzd, - {{ json_extract_scalar('_airbyte_data', ['USD'], ['USD']) }} as usd, - _airbyte_ab_id, - _airbyte_emitted_at, - {{ current_timestamp() }} as _airbyte_normalized_at -from {{ source('test_normalization', '_airbyte_raw_dedup_exchange_rate') }} as table_alias --- dedup_exchange_rate -where 1 = 1 -{{ incremental_clause('_airbyte_emitted_at') }} - diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_ctes/test_normalization/dedup_exchange_rate_ab2.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_ctes/test_normalization/dedup_exchange_rate_ab2.sql deleted file mode 100644 index b43312b67ebf1..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_ctes/test_normalization/dedup_exchange_rate_ab2.sql +++ /dev/null @@ -1,25 +0,0 @@ -{{ config( - indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}], - unique_key = '_airbyte_ab_id', - schema = "_airbyte_test_normalization", - tags = [ "top-level-intermediate" ] -) }} --- SQL model to cast each column to its adequate SQL type converted from the JSON schema type --- depends_on: {{ ref('dedup_exchange_rate_ab1') }} -select - cast({{ adapter.quote('id') }} as {{ dbt_utils.type_float() }}) as {{ adapter.quote('id') }}, - cast(currency as {{ dbt_utils.type_string() }}) as currency, - cast(new_column as {{ dbt_utils.type_float() }}) as new_column, - cast({{ empty_string_to_null(adapter.quote('date')) }} as {{ type_date() }}) as {{ adapter.quote('date') }}, - cast({{ empty_string_to_null('timestamp_col') }} as {{ type_timestamp_with_timezone() }}) as timestamp_col, - cast({{ adapter.quote('HKD@spéçiäl & characters') }} as {{ dbt_utils.type_float() }}) as {{ adapter.quote('HKD@spéçiäl & characters') }}, - cast(nzd as {{ dbt_utils.type_float() }}) as nzd, - cast(usd as {{ dbt_utils.type_bigint() }}) as usd, - _airbyte_ab_id, - _airbyte_emitted_at, - {{ current_timestamp() }} as _airbyte_normalized_at -from {{ ref('dedup_exchange_rate_ab1') }} --- dedup_exchange_rate -where 1 = 1 -{{ incremental_clause('_airbyte_emitted_at') }} - diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_ctes/test_normalization/exchange_rate_ab1.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_ctes/test_normalization/exchange_rate_ab1.sql deleted file mode 100644 index ba88ffa22b0d9..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_ctes/test_normalization/exchange_rate_ab1.sql +++ /dev/null @@ -1,25 +0,0 @@ -{{ config( - indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}], - unique_key = '_airbyte_ab_id', - schema = "_airbyte_test_normalization", - tags = [ "top-level-intermediate" ] -) }} --- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema --- depends_on: {{ source('test_normalization', '_airbyte_raw_exchange_rate') }} -select - {{ json_extract_scalar('_airbyte_data', ['id'], ['id']) }} as {{ adapter.quote('id') }}, - {{ json_extract_scalar('_airbyte_data', ['currency'], ['currency']) }} as currency, - {{ json_extract_scalar('_airbyte_data', ['new_column'], ['new_column']) }} as new_column, - {{ json_extract_scalar('_airbyte_data', ['date'], ['date']) }} as {{ adapter.quote('date') }}, - {{ json_extract_scalar('_airbyte_data', ['timestamp_col'], ['timestamp_col']) }} as timestamp_col, - {{ json_extract_scalar('_airbyte_data', ['HKD@spéçiäl & characters'], ['HKD@spéçiäl & characters']) }} as {{ adapter.quote('HKD@spéçiäl & characters') }}, - {{ json_extract_scalar('_airbyte_data', ['NZD'], ['NZD']) }} as nzd, - {{ json_extract_scalar('_airbyte_data', ['USD'], ['USD']) }} as usd, - {{ json_extract_scalar('_airbyte_data', ['column`_\'with"_quotes'], ['column___with__quotes']) }} as {{ adapter.quote('column`_\'with""_quotes') }}, - _airbyte_ab_id, - _airbyte_emitted_at, - {{ current_timestamp() }} as _airbyte_normalized_at -from {{ source('test_normalization', '_airbyte_raw_exchange_rate') }} as table_alias --- exchange_rate -where 1 = 1 - diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_ctes/test_normalization/exchange_rate_ab2.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_ctes/test_normalization/exchange_rate_ab2.sql deleted file mode 100644 index e6cf7ee1e5760..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_ctes/test_normalization/exchange_rate_ab2.sql +++ /dev/null @@ -1,25 +0,0 @@ -{{ config( - indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}], - unique_key = '_airbyte_ab_id', - schema = "_airbyte_test_normalization", - tags = [ "top-level-intermediate" ] -) }} --- SQL model to cast each column to its adequate SQL type converted from the JSON schema type --- depends_on: {{ ref('exchange_rate_ab1') }} -select - cast({{ adapter.quote('id') }} as {{ dbt_utils.type_float() }}) as {{ adapter.quote('id') }}, - cast(currency as {{ dbt_utils.type_string() }}) as currency, - cast(new_column as {{ dbt_utils.type_float() }}) as new_column, - cast({{ empty_string_to_null(adapter.quote('date')) }} as {{ type_date() }}) as {{ adapter.quote('date') }}, - cast({{ empty_string_to_null('timestamp_col') }} as {{ type_timestamp_with_timezone() }}) as timestamp_col, - cast({{ adapter.quote('HKD@spéçiäl & characters') }} as {{ dbt_utils.type_float() }}) as {{ adapter.quote('HKD@spéçiäl & characters') }}, - cast(nzd as {{ dbt_utils.type_float() }}) as nzd, - cast(usd as {{ dbt_utils.type_float() }}) as usd, - cast({{ adapter.quote('column`_\'with""_quotes') }} as {{ dbt_utils.type_string() }}) as {{ adapter.quote('column`_\'with""_quotes') }}, - _airbyte_ab_id, - _airbyte_emitted_at, - {{ current_timestamp() }} as _airbyte_normalized_at -from {{ ref('exchange_rate_ab1') }} --- exchange_rate -where 1 = 1 - diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_ctes/test_normalization/exchange_rate_ab3.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_ctes/test_normalization/exchange_rate_ab3.sql deleted file mode 100644 index 96c96a4d4799c..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_ctes/test_normalization/exchange_rate_ab3.sql +++ /dev/null @@ -1,25 +0,0 @@ -{{ config( - indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}], - unique_key = '_airbyte_ab_id', - schema = "_airbyte_test_normalization", - tags = [ "top-level-intermediate" ] -) }} --- SQL model to build a hash column based on the values of this record --- depends_on: {{ ref('exchange_rate_ab2') }} -select - {{ dbt_utils.surrogate_key([ - adapter.quote('id'), - 'currency', - 'new_column', - adapter.quote('date'), - 'timestamp_col', - adapter.quote('HKD@spéçiäl & characters'), - 'nzd', - 'usd', - adapter.quote('column`_\'with""_quotes'), - ]) }} as _airbyte_exchange_rate_hashid, - tmp.* -from {{ ref('exchange_rate_ab2') }} tmp --- exchange_rate -where 1 = 1 - diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_ctes/test_normalization/renamed_dedup_cdc_excluded_ab1.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_ctes/test_normalization/renamed_dedup_cdc_excluded_ab1.sql deleted file mode 100644 index 590e1e755b5c4..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_ctes/test_normalization/renamed_dedup_cdc_excluded_ab1.sql +++ /dev/null @@ -1,22 +0,0 @@ -{{ config( - indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}], - unique_key = '_airbyte_ab_id', - schema = "_airbyte_test_normalization", - tags = [ "top-level-intermediate" ] -) }} --- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema --- depends_on: {{ source('test_normalization', '_airbyte_raw_renamed_dedup_cdc_excluded') }} -select - {{ json_extract_scalar('_airbyte_data', ['id'], ['id']) }} as {{ adapter.quote('id') }}, - {{ json_extract_scalar('_airbyte_data', ['name'], ['name']) }} as {{ adapter.quote('name') }}, - {{ json_extract_scalar('_airbyte_data', ['_ab_cdc_lsn'], ['_ab_cdc_lsn']) }} as _ab_cdc_lsn, - {{ json_extract_scalar('_airbyte_data', ['_ab_cdc_updated_at'], ['_ab_cdc_updated_at']) }} as _ab_cdc_updated_at, - {{ json_extract_scalar('_airbyte_data', ['_ab_cdc_deleted_at'], ['_ab_cdc_deleted_at']) }} as _ab_cdc_deleted_at, - _airbyte_ab_id, - _airbyte_emitted_at, - {{ current_timestamp() }} as _airbyte_normalized_at -from {{ source('test_normalization', '_airbyte_raw_renamed_dedup_cdc_excluded') }} as table_alias --- renamed_dedup_cdc_excluded -where 1 = 1 -{{ incremental_clause('_airbyte_emitted_at') }} - diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_incremental/scd/test_normalization/renamed_dedup_cdc_excluded_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_incremental/scd/test_normalization/renamed_dedup_cdc_excluded_scd.sql deleted file mode 100644 index 3803571720588..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_incremental/scd/test_normalization/renamed_dedup_cdc_excluded_scd.sql +++ /dev/null @@ -1,123 +0,0 @@ -{{ config( - indexes = [{'columns':['_airbyte_active_row','_airbyte_unique_key_scd','_airbyte_emitted_at'],'type': 'btree'}], - unique_key = "_airbyte_unique_key_scd", - schema = "test_normalization", - post_hook = ['delete from _airbyte_test_normalization.renamed_dedup_cdc_excluded_stg where _airbyte_emitted_at != (select max(_airbyte_emitted_at) from _airbyte_test_normalization.renamed_dedup_cdc_excluded_stg)'], - tags = [ "top-level" ] -) }} --- depends_on: ref('renamed_dedup_cdc_excluded_stg') -with -{% if is_incremental() %} -new_data as ( - -- retrieve incremental "new" data - select - * - from {{ ref('renamed_dedup_cdc_excluded_stg') }} - -- renamed_dedup_cdc_excluded from {{ source('test_normalization', '_airbyte_raw_renamed_dedup_cdc_excluded') }} - where 1 = 1 - {{ incremental_clause('_airbyte_emitted_at') }} -), -new_data_ids as ( - -- build a subset of _airbyte_unique_key from rows that are new - select distinct - {{ dbt_utils.surrogate_key([ - adapter.quote('id'), - ]) }} as _airbyte_unique_key - from new_data -), -empty_new_data as ( - -- build an empty table to only keep the table's column types - select * from new_data where 1 = 0 -), -previous_active_scd_data as ( - -- retrieve "incomplete old" data that needs to be updated with an end date because of new changes - select - {{ star_intersect(ref('renamed_dedup_cdc_excluded_stg'), this, from_alias='inc_data', intersect_alias='this_data') }} - from {{ this }} as this_data - -- make a join with new_data using primary key to filter active data that need to be updated only - join new_data_ids on this_data._airbyte_unique_key = new_data_ids._airbyte_unique_key - -- force left join to NULL values (we just need to transfer column types only for the star_intersect macro on schema changes) - left join empty_new_data as inc_data on this_data._airbyte_ab_id = inc_data._airbyte_ab_id - where _airbyte_active_row = 1 -), -input_data as ( - select {{ dbt_utils.star(ref('renamed_dedup_cdc_excluded_stg')) }} from new_data - union all - select {{ dbt_utils.star(ref('renamed_dedup_cdc_excluded_stg')) }} from previous_active_scd_data -), -{% else %} -input_data as ( - select * - from {{ ref('renamed_dedup_cdc_excluded_stg') }} - -- renamed_dedup_cdc_excluded from {{ source('test_normalization', '_airbyte_raw_renamed_dedup_cdc_excluded') }} -), -{% endif %} -scd_data as ( - -- SQL model to build a Type 2 Slowly Changing Dimension (SCD) table for each record identified by their primary key - select - {{ dbt_utils.surrogate_key([ - adapter.quote('id'), - ]) }} as _airbyte_unique_key, - {{ adapter.quote('id') }}, - {{ adapter.quote('name') }}, - _ab_cdc_lsn, - _ab_cdc_updated_at, - _ab_cdc_deleted_at, - _ab_cdc_updated_at as _airbyte_start_at, - lag(_ab_cdc_updated_at) over ( - partition by {{ adapter.quote('id') }} - order by - _ab_cdc_updated_at is null asc, - _ab_cdc_updated_at desc, - _ab_cdc_updated_at desc, - _airbyte_emitted_at desc - ) as _airbyte_end_at, - case when row_number() over ( - partition by {{ adapter.quote('id') }} - order by - _ab_cdc_updated_at is null asc, - _ab_cdc_updated_at desc, - _ab_cdc_updated_at desc, - _airbyte_emitted_at desc - ) = 1 and _ab_cdc_deleted_at is null then 1 else 0 end as _airbyte_active_row, - _airbyte_ab_id, - _airbyte_emitted_at, - _airbyte_renamed_dedup_cdc_excluded_hashid - from input_data -), -dedup_data as ( - select - -- we need to ensure de-duplicated rows for merge/update queries - -- additionally, we generate a unique key for the scd table - row_number() over ( - partition by - _airbyte_unique_key, - _airbyte_start_at, - _airbyte_emitted_at, cast(_ab_cdc_deleted_at as {{ dbt_utils.type_string() }}), cast(_ab_cdc_updated_at as {{ dbt_utils.type_string() }}) - order by _airbyte_active_row desc, _airbyte_ab_id - ) as _airbyte_row_num, - {{ dbt_utils.surrogate_key([ - '_airbyte_unique_key', - '_airbyte_start_at', - '_airbyte_emitted_at', '_ab_cdc_deleted_at', '_ab_cdc_updated_at' - ]) }} as _airbyte_unique_key_scd, - scd_data.* - from scd_data -) -select - _airbyte_unique_key, - _airbyte_unique_key_scd, - {{ adapter.quote('id') }}, - {{ adapter.quote('name') }}, - _ab_cdc_lsn, - _ab_cdc_updated_at, - _ab_cdc_deleted_at, - _airbyte_start_at, - _airbyte_end_at, - _airbyte_active_row, - _airbyte_ab_id, - _airbyte_emitted_at, - {{ current_timestamp() }} as _airbyte_normalized_at, - _airbyte_renamed_dedup_cdc_excluded_hashid -from dedup_data where _airbyte_row_num = 1 - diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_incremental/test_normalization/renamed_dedup_cdc_excluded.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_incremental/test_normalization/renamed_dedup_cdc_excluded.sql deleted file mode 100644 index 80ff3fc2138ca..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_incremental/test_normalization/renamed_dedup_cdc_excluded.sql +++ /dev/null @@ -1,25 +0,0 @@ -{{ config( - indexes = [{'columns':['_airbyte_unique_key'],'unique':True}], - unique_key = "_airbyte_unique_key", - schema = "test_normalization", - tags = [ "top-level" ] -) }} --- Final base SQL model --- depends_on: {{ ref('renamed_dedup_cdc_excluded_scd') }} -select - _airbyte_unique_key, - {{ adapter.quote('id') }}, - {{ adapter.quote('name') }}, - _ab_cdc_lsn, - _ab_cdc_updated_at, - _ab_cdc_deleted_at, - _airbyte_ab_id, - _airbyte_emitted_at, - {{ current_timestamp() }} as _airbyte_normalized_at, - _airbyte_renamed_dedup_cdc_excluded_hashid -from {{ ref('renamed_dedup_cdc_excluded_scd') }} --- renamed_dedup_cdc_excluded from {{ source('test_normalization', '_airbyte_raw_renamed_dedup_cdc_excluded') }} -where 1 = 1 -and _airbyte_active_row = 1 -{{ incremental_clause('_airbyte_emitted_at') }} - diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_incremental/test_normalization/renamed_dedup_cdc_excluded_stg.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_incremental/test_normalization/renamed_dedup_cdc_excluded_stg.sql deleted file mode 100644 index 86d0e6f4451d0..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_incremental/test_normalization/renamed_dedup_cdc_excluded_stg.sql +++ /dev/null @@ -1,22 +0,0 @@ -{{ config( - indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}], - unique_key = '_airbyte_ab_id', - schema = "_airbyte_test_normalization", - tags = [ "top-level-intermediate" ] -) }} --- SQL model to build a hash column based on the values of this record --- depends_on: {{ ref('renamed_dedup_cdc_excluded_ab2') }} -select - {{ dbt_utils.surrogate_key([ - adapter.quote('id'), - adapter.quote('name'), - '_ab_cdc_lsn', - '_ab_cdc_updated_at', - '_ab_cdc_deleted_at', - ]) }} as _airbyte_renamed_dedup_cdc_excluded_hashid, - tmp.* -from {{ ref('renamed_dedup_cdc_excluded_ab2') }} tmp --- renamed_dedup_cdc_excluded -where 1 = 1 -{{ incremental_clause('_airbyte_emitted_at') }} - diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_tables/test_normalization/exchange_rate.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_tables/test_normalization/exchange_rate.sql deleted file mode 100644 index 40b5ffb3f87d9..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/airbyte_tables/test_normalization/exchange_rate.sql +++ /dev/null @@ -1,26 +0,0 @@ -{{ config( - indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}], - unique_key = '_airbyte_ab_id', - schema = "test_normalization", - tags = [ "top-level" ] -) }} --- Final base SQL model --- depends_on: {{ ref('exchange_rate_ab3') }} -select - {{ adapter.quote('id') }}, - currency, - new_column, - {{ adapter.quote('date') }}, - timestamp_col, - {{ adapter.quote('HKD@spéçiäl & characters') }}, - nzd, - usd, - {{ adapter.quote('column`_\'with""_quotes') }}, - _airbyte_ab_id, - _airbyte_emitted_at, - {{ current_timestamp() }} as _airbyte_normalized_at, - _airbyte_exchange_rate_hashid -from {{ ref('exchange_rate_ab3') }} --- exchange_rate from {{ source('test_normalization', '_airbyte_raw_exchange_rate') }} -where 1 = 1 - diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/sources.yml b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/sources.yml deleted file mode 100644 index dd538a80131ae..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/modified_models/generated/sources.yml +++ /dev/null @@ -1,11 +0,0 @@ -version: 2 -sources: -- name: test_normalization - quoting: - database: true - schema: false - identifier: false - tables: - - name: _airbyte_raw_dedup_exchange_rate - - name: _airbyte_raw_exchange_rate - - name: _airbyte_raw_renamed_dedup_cdc_excluded diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/second_output/airbyte_incremental/scd/test_normalization/1_prefix_startwith_number_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/second_output/airbyte_incremental/scd/test_normalization/1_prefix_startwith_number_scd.sql new file mode 100644 index 0000000000000..3eb10166dd315 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/second_output/airbyte_incremental/scd/test_normalization/1_prefix_startwith_number_scd.sql @@ -0,0 +1,14 @@ + + delete + from "postgres".test_normalization."1_prefix_startwith_number_scd" + where (_airbyte_unique_key_scd) in ( + select (_airbyte_unique_key_scd) + from "1_prefix_startwith_number_scd__dbt_tmp" + ); + + insert into "postgres".test_normalization."1_prefix_startwith_number_scd" ("_airbyte_unique_key", "_airbyte_unique_key_scd", "id", "date", "text", "_airbyte_start_at", "_airbyte_end_at", "_airbyte_active_row", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_1_prefix_startwith_number_hashid") + ( + select "_airbyte_unique_key", "_airbyte_unique_key_scd", "id", "date", "text", "_airbyte_start_at", "_airbyte_end_at", "_airbyte_active_row", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_1_prefix_startwith_number_hashid" + from "1_prefix_startwith_number_scd__dbt_tmp" + ); + \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/second_output/airbyte_incremental/test_normalization/1_prefix_startwith_number.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/second_output/airbyte_incremental/test_normalization/1_prefix_startwith_number.sql new file mode 100644 index 0000000000000..a95bdd8ef269c --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/second_output/airbyte_incremental/test_normalization/1_prefix_startwith_number.sql @@ -0,0 +1,14 @@ + + delete + from "postgres".test_normalization."1_prefix_startwith_number" + where (_airbyte_unique_key) in ( + select (_airbyte_unique_key) + from "1_prefix_startwith_number__dbt_tmp" + ); + + insert into "postgres".test_normalization."1_prefix_startwith_number" ("_airbyte_unique_key", "id", "date", "text", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_1_prefix_startwith_number_hashid") + ( + select "_airbyte_unique_key", "id", "date", "text", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_1_prefix_startwith_number_hashid" + from "1_prefix_startwith_number__dbt_tmp" + ); + \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/second_output/airbyte_incremental/test_normalization/1_prefix_startwith_number_stg.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/second_output/airbyte_incremental/test_normalization/1_prefix_startwith_number_stg.sql new file mode 100644 index 0000000000000..c69b8a6554952 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/second_output/airbyte_incremental/test_normalization/1_prefix_startwith_number_stg.sql @@ -0,0 +1,14 @@ + + delete + from "postgres"._airbyte_test_normalization."1_prefix_startwith_number_stg" + where (_airbyte_ab_id) in ( + select (_airbyte_ab_id) + from "1_prefix_startwith_number_stg__dbt_tmp" + ); + + insert into "postgres"._airbyte_test_normalization."1_prefix_startwith_number_stg" ("_airbyte_1_prefix_startwith_number_hashid", "id", "date", "text", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at") + ( + select "_airbyte_1_prefix_startwith_number_hashid", "id", "date", "text", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at" + from "1_prefix_startwith_number_stg__dbt_tmp" + ); + \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/third_output/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/third_output/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql deleted file mode 100644 index a5de1de2333db..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/third_output/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql +++ /dev/null @@ -1,14 +0,0 @@ - - delete - from "postgres".test_normalization."dedup_exchange_rate_scd" - where (_airbyte_unique_key_scd) in ( - select (_airbyte_unique_key_scd) - from "dedup_exchange_rate_scd__dbt_tmp" - ); - - insert into "postgres".test_normalization."dedup_exchange_rate_scd" ("_airbyte_unique_key", "_airbyte_unique_key_scd", "currency", "date", "timestamp_col", "HKD@spéçiäl & characters", "nzd", "usd", "_airbyte_start_at", "_airbyte_end_at", "_airbyte_active_row", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_dedup_exchange_rate_hashid", "new_column", "id") - ( - select "_airbyte_unique_key", "_airbyte_unique_key_scd", "currency", "date", "timestamp_col", "HKD@spéçiäl & characters", "nzd", "usd", "_airbyte_start_at", "_airbyte_end_at", "_airbyte_active_row", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_dedup_exchange_rate_hashid", "new_column", "id" - from "dedup_exchange_rate_scd__dbt_tmp" - ); - \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/third_output/airbyte_incremental/scd/test_normalization/renamed_dedup_cdc_excluded_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/third_output/airbyte_incremental/scd/test_normalization/renamed_dedup_cdc_excluded_scd.sql deleted file mode 100644 index dfe10c6da794d..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/third_output/airbyte_incremental/scd/test_normalization/renamed_dedup_cdc_excluded_scd.sql +++ /dev/null @@ -1,14 +0,0 @@ - - delete - from "postgres".test_normalization."renamed_dedup_cdc_excluded_scd" - where (_airbyte_unique_key_scd) in ( - select (_airbyte_unique_key_scd) - from "renamed_dedup_cdc_excluded_scd__dbt_tmp" - ); - - insert into "postgres".test_normalization."renamed_dedup_cdc_excluded_scd" ("_airbyte_unique_key", "_airbyte_unique_key_scd", "id", "_ab_cdc_updated_at", "_airbyte_start_at", "_airbyte_end_at", "_airbyte_active_row", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_renamed_dedup_cdc_excluded_hashid", "name", "_ab_cdc_lsn", "_ab_cdc_deleted_at") - ( - select "_airbyte_unique_key", "_airbyte_unique_key_scd", "id", "_ab_cdc_updated_at", "_airbyte_start_at", "_airbyte_end_at", "_airbyte_active_row", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_renamed_dedup_cdc_excluded_hashid", "name", "_ab_cdc_lsn", "_ab_cdc_deleted_at" - from "renamed_dedup_cdc_excluded_scd__dbt_tmp" - ); - \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/third_output/airbyte_incremental/test_normalization/dedup_exchange_rate.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/third_output/airbyte_incremental/test_normalization/dedup_exchange_rate.sql deleted file mode 100644 index ecc81c8883b07..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/third_output/airbyte_incremental/test_normalization/dedup_exchange_rate.sql +++ /dev/null @@ -1,14 +0,0 @@ - - delete - from "postgres".test_normalization."dedup_exchange_rate" - where (_airbyte_unique_key) in ( - select (_airbyte_unique_key) - from "dedup_exchange_rate__dbt_tmp" - ); - - insert into "postgres".test_normalization."dedup_exchange_rate" ("_airbyte_unique_key", "currency", "date", "timestamp_col", "HKD@spéçiäl & characters", "nzd", "usd", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_dedup_exchange_rate_hashid", "new_column", "id") - ( - select "_airbyte_unique_key", "currency", "date", "timestamp_col", "HKD@spéçiäl & characters", "nzd", "usd", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_dedup_exchange_rate_hashid", "new_column", "id" - from "dedup_exchange_rate__dbt_tmp" - ); - \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/third_output/airbyte_incremental/test_normalization/dedup_exchange_rate_stg.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/third_output/airbyte_incremental/test_normalization/dedup_exchange_rate_stg.sql deleted file mode 100644 index f35951198e0fb..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/third_output/airbyte_incremental/test_normalization/dedup_exchange_rate_stg.sql +++ /dev/null @@ -1,14 +0,0 @@ - - delete - from "postgres"._airbyte_test_normalization."dedup_exchange_rate_stg" - where (_airbyte_ab_id) in ( - select (_airbyte_ab_id) - from "dedup_exchange_rate_stg__dbt_tmp" - ); - - insert into "postgres"._airbyte_test_normalization."dedup_exchange_rate_stg" ("_airbyte_dedup_exchange_rate_hashid", "currency", "date", "timestamp_col", "HKD@spéçiäl & characters", "nzd", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "new_column", "id", "usd") - ( - select "_airbyte_dedup_exchange_rate_hashid", "currency", "date", "timestamp_col", "HKD@spéçiäl & characters", "nzd", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "new_column", "id", "usd" - from "dedup_exchange_rate_stg__dbt_tmp" - ); - \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/third_output/airbyte_incremental/test_normalization/renamed_dedup_cdc_excluded.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/third_output/airbyte_incremental/test_normalization/renamed_dedup_cdc_excluded.sql deleted file mode 100644 index c1d1c310179d3..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/third_output/airbyte_incremental/test_normalization/renamed_dedup_cdc_excluded.sql +++ /dev/null @@ -1,14 +0,0 @@ - - delete - from "postgres".test_normalization."renamed_dedup_cdc_excluded" - where (_airbyte_unique_key) in ( - select (_airbyte_unique_key) - from "renamed_dedup_cdc_excluded__dbt_tmp" - ); - - insert into "postgres".test_normalization."renamed_dedup_cdc_excluded" ("_airbyte_unique_key", "id", "_ab_cdc_updated_at", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_renamed_dedup_cdc_excluded_hashid", "name", "_ab_cdc_lsn", "_ab_cdc_deleted_at") - ( - select "_airbyte_unique_key", "id", "_ab_cdc_updated_at", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_renamed_dedup_cdc_excluded_hashid", "name", "_ab_cdc_lsn", "_ab_cdc_deleted_at" - from "renamed_dedup_cdc_excluded__dbt_tmp" - ); - \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/third_output/airbyte_incremental/test_normalization/renamed_dedup_cdc_excluded_stg.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/third_output/airbyte_incremental/test_normalization/renamed_dedup_cdc_excluded_stg.sql deleted file mode 100644 index 55db812277ae0..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/third_output/airbyte_incremental/test_normalization/renamed_dedup_cdc_excluded_stg.sql +++ /dev/null @@ -1,14 +0,0 @@ - - delete - from "postgres"._airbyte_test_normalization."renamed_dedup_cdc_excluded_stg" - where (_airbyte_ab_id) in ( - select (_airbyte_ab_id) - from "renamed_dedup_cdc_excluded_stg__dbt_tmp" - ); - - insert into "postgres"._airbyte_test_normalization."renamed_dedup_cdc_excluded_stg" ("_airbyte_renamed_dedup_cdc_excluded_hashid", "id", "_ab_cdc_updated_at", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "name", "_ab_cdc_lsn", "_ab_cdc_deleted_at") - ( - select "_airbyte_renamed_dedup_cdc_excluded_hashid", "id", "_ab_cdc_updated_at", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "name", "_ab_cdc_lsn", "_ab_cdc_deleted_at" - from "renamed_dedup_cdc_excluded_stg__dbt_tmp" - ); - \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/third_output/airbyte_tables/test_normalization/exchange_rate.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/third_output/airbyte_tables/test_normalization/exchange_rate.sql deleted file mode 100644 index d7f0d50be215f..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/third_output/airbyte_tables/test_normalization/exchange_rate.sql +++ /dev/null @@ -1,113 +0,0 @@ - - - create table "postgres".test_normalization."exchange_rate__dbt_tmp" - as ( - -with __dbt__cte__exchange_rate_ab1 as ( - --- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema --- depends_on: "postgres".test_normalization._airbyte_raw_exchange_rate -select - jsonb_extract_path_text(_airbyte_data, 'id') as "id", - jsonb_extract_path_text(_airbyte_data, 'currency') as currency, - jsonb_extract_path_text(_airbyte_data, 'new_column') as new_column, - jsonb_extract_path_text(_airbyte_data, 'date') as "date", - jsonb_extract_path_text(_airbyte_data, 'timestamp_col') as timestamp_col, - jsonb_extract_path_text(_airbyte_data, 'HKD@spéçiäl & characters') as "HKD@spéçiäl & characters", - jsonb_extract_path_text(_airbyte_data, 'NZD') as nzd, - jsonb_extract_path_text(_airbyte_data, 'USD') as usd, - jsonb_extract_path_text(_airbyte_data, 'column`_''with"_quotes') as "column`_'with""_quotes", - _airbyte_ab_id, - _airbyte_emitted_at, - now() as _airbyte_normalized_at -from "postgres".test_normalization._airbyte_raw_exchange_rate as table_alias --- exchange_rate -where 1 = 1 -), __dbt__cte__exchange_rate_ab2 as ( - --- SQL model to cast each column to its adequate SQL type converted from the JSON schema type --- depends_on: __dbt__cte__exchange_rate_ab1 -select - cast("id" as - float -) as "id", - cast(currency as - varchar -) as currency, - cast(new_column as - float -) as new_column, - cast(nullif("date", '') as - date -) as "date", - cast(nullif(timestamp_col, '') as - timestamp with time zone -) as timestamp_col, - cast("HKD@spéçiäl & characters" as - float -) as "HKD@spéçiäl & characters", - cast(nzd as - float -) as nzd, - cast(usd as - float -) as usd, - cast("column`_'with""_quotes" as - varchar -) as "column`_'with""_quotes", - _airbyte_ab_id, - _airbyte_emitted_at, - now() as _airbyte_normalized_at -from __dbt__cte__exchange_rate_ab1 --- exchange_rate -where 1 = 1 -), __dbt__cte__exchange_rate_ab3 as ( - --- SQL model to build a hash column based on the values of this record --- depends_on: __dbt__cte__exchange_rate_ab2 -select - md5(cast(coalesce(cast("id" as - varchar -), '') || '-' || coalesce(cast(currency as - varchar -), '') || '-' || coalesce(cast(new_column as - varchar -), '') || '-' || coalesce(cast("date" as - varchar -), '') || '-' || coalesce(cast(timestamp_col as - varchar -), '') || '-' || coalesce(cast("HKD@spéçiäl & characters" as - varchar -), '') || '-' || coalesce(cast(nzd as - varchar -), '') || '-' || coalesce(cast(usd as - varchar -), '') || '-' || coalesce(cast("column`_'with""_quotes" as - varchar -), '') as - varchar -)) as _airbyte_exchange_rate_hashid, - tmp.* -from __dbt__cte__exchange_rate_ab2 tmp --- exchange_rate -where 1 = 1 -)-- Final base SQL model --- depends_on: __dbt__cte__exchange_rate_ab3 -select - "id", - currency, - new_column, - "date", - timestamp_col, - "HKD@spéçiäl & characters", - nzd, - usd, - "column`_'with""_quotes", - _airbyte_ab_id, - _airbyte_emitted_at, - now() as _airbyte_normalized_at, - _airbyte_exchange_rate_hashid -from __dbt__cte__exchange_rate_ab3 --- exchange_rate from "postgres".test_normalization._airbyte_raw_exchange_rate -where 1 = 1 - ); \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/redshift/test_nested_streams/models/generated/airbyte_incremental/scd/test_normalization/nested_stream_with_complex_columns_resulting_into_long_names_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/redshift/test_nested_streams/models/generated/airbyte_incremental/scd/test_normalization/nested_stream_with_complex_columns_resulting_into_long_names_scd.sql index 1e30bf57e4595..e89e97f58fea2 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/redshift/test_nested_streams/models/generated/airbyte_incremental/scd/test_normalization/nested_stream_with_complex_columns_resulting_into_long_names_scd.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/redshift/test_nested_streams/models/generated/airbyte_incremental/scd/test_normalization/nested_stream_with_complex_columns_resulting_into_long_names_scd.sql @@ -2,7 +2,7 @@ sort = ["_airbyte_active_row", "_airbyte_unique_key_scd", "_airbyte_emitted_at"], unique_key = "_airbyte_unique_key_scd", schema = "test_normalization", - post_hook = ['drop view _airbyte_test_normalization.nested_stream_with_complex_columns_resulting_into_long_names_stg'], + post_hook = ["drop view _airbyte_test_normalization.nested_stream_with_complex_columns_resulting_into_long_names_stg"], tags = [ "top-level" ] ) }} -- depends_on: ref('nested_stream_with_complex_columns_resulting_into_long_names_stg') diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/redshift/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/redshift/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql index 81b85e492cd51..3cb089de2de1a 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/redshift/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/redshift/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql @@ -2,7 +2,7 @@ sort = ["_airbyte_active_row", "_airbyte_unique_key_scd", "_airbyte_emitted_at"], unique_key = "_airbyte_unique_key_scd", schema = "test_normalization", - post_hook = ['drop view _airbyte_test_normalization.dedup_exchange_rate_stg'], + post_hook = ["drop view _airbyte_test_normalization.dedup_exchange_rate_stg"], tags = [ "top-level" ] ) }} -- depends_on: ref('dedup_exchange_rate_stg') diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/redshift/test_simple_streams/models/generated/sources.yml b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/redshift/test_simple_streams/models/generated/sources.yml index 0e116b2bbec5d..97bf0d05cbd40 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/redshift/test_simple_streams/models/generated/sources.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/redshift/test_simple_streams/models/generated/sources.yml @@ -6,6 +6,7 @@ sources: schema: false identifier: false tables: + - name: _airbyte_raw_1_prefix_startwith_number - name: _airbyte_raw_dedup_cdc_excluded - name: _airbyte_raw_dedup_exchange_rate - name: _airbyte_raw_exchange_rate diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/redshift/test_simple_streams/modified_models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/redshift/test_simple_streams/modified_models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql index 363a39ec25fe7..0145a94818b0a 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/redshift/test_simple_streams/modified_models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/redshift/test_simple_streams/modified_models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql @@ -2,7 +2,7 @@ sort = ["_airbyte_active_row", "_airbyte_unique_key_scd", "_airbyte_emitted_at"], unique_key = "_airbyte_unique_key_scd", schema = "test_normalization", - post_hook = ['drop view _airbyte_test_normalization.dedup_exchange_rate_stg'], + post_hook = ["drop view _airbyte_test_normalization.dedup_exchange_rate_stg"], tags = [ "top-level" ] ) }} -- depends_on: ref('dedup_exchange_rate_stg') diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/snowflake/test_nested_streams/models/generated/airbyte_incremental/scd/TEST_NORMALIZATION/NESTED_STREAM_WITH_COMPLEX_COLUMNS_RESULTING_INTO_LONG_NAMES_SCD.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/snowflake/test_nested_streams/models/generated/airbyte_incremental/scd/TEST_NORMALIZATION/NESTED_STREAM_WITH_COMPLEX_COLUMNS_RESULTING_INTO_LONG_NAMES_SCD.sql index 167cdb066cb4c..9435ebaf2bc1d 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/snowflake/test_nested_streams/models/generated/airbyte_incremental/scd/TEST_NORMALIZATION/NESTED_STREAM_WITH_COMPLEX_COLUMNS_RESULTING_INTO_LONG_NAMES_SCD.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/snowflake/test_nested_streams/models/generated/airbyte_incremental/scd/TEST_NORMALIZATION/NESTED_STREAM_WITH_COMPLEX_COLUMNS_RESULTING_INTO_LONG_NAMES_SCD.sql @@ -2,7 +2,7 @@ cluster_by = ["_AIRBYTE_ACTIVE_ROW", "_AIRBYTE_UNIQUE_KEY_SCD", "_AIRBYTE_EMITTED_AT"], unique_key = "_AIRBYTE_UNIQUE_KEY_SCD", schema = "TEST_NORMALIZATION", - post_hook = ['drop view _AIRBYTE_TEST_NORMALIZATION.NESTED_STREAM_WITH_COMPLEX_COLUMNS_RESULTING_INTO_LONG_NAMES_STG'], + post_hook = ["drop view _AIRBYTE_TEST_NORMALIZATION.NESTED_STREAM_WITH_COMPLEX_COLUMNS_RESULTING_INTO_LONG_NAMES_STG"], tags = [ "top-level" ] ) }} -- depends_on: ref('NESTED_STREAM_WITH_COMPLEX_COLUMNS_RESULTING_INTO_LONG_NAMES_STG') diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/snowflake/test_simple_streams/models/generated/airbyte_incremental/scd/TEST_NORMALIZATION/DEDUP_EXCHANGE_RATE_SCD.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/snowflake/test_simple_streams/models/generated/airbyte_incremental/scd/TEST_NORMALIZATION/DEDUP_EXCHANGE_RATE_SCD.sql index 2b62f6776a223..688926bdcab0f 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/snowflake/test_simple_streams/models/generated/airbyte_incremental/scd/TEST_NORMALIZATION/DEDUP_EXCHANGE_RATE_SCD.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/snowflake/test_simple_streams/models/generated/airbyte_incremental/scd/TEST_NORMALIZATION/DEDUP_EXCHANGE_RATE_SCD.sql @@ -2,7 +2,7 @@ cluster_by = ["_AIRBYTE_ACTIVE_ROW", "_AIRBYTE_UNIQUE_KEY_SCD", "_AIRBYTE_EMITTED_AT"], unique_key = "_AIRBYTE_UNIQUE_KEY_SCD", schema = "TEST_NORMALIZATION", - post_hook = ['drop view _AIRBYTE_TEST_NORMALIZATION.DEDUP_EXCHANGE_RATE_STG'], + post_hook = ["drop view _AIRBYTE_TEST_NORMALIZATION.DEDUP_EXCHANGE_RATE_STG"], tags = [ "top-level" ] ) }} -- depends_on: ref('DEDUP_EXCHANGE_RATE_STG') diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/snowflake/test_simple_streams/models/generated/sources.yml b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/snowflake/test_simple_streams/models/generated/sources.yml index bec4269ba6bf8..1d9d7ae323bbb 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/snowflake/test_simple_streams/models/generated/sources.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/snowflake/test_simple_streams/models/generated/sources.yml @@ -6,6 +6,7 @@ sources: schema: false identifier: false tables: + - name: _AIRBYTE_RAW_1_PREFIX_STARTWITH_NUMBER - name: _AIRBYTE_RAW_DEDUP_CDC_EXCLUDED - name: _AIRBYTE_RAW_DEDUP_EXCHANGE_RATE - name: _AIRBYTE_RAW_EXCHANGE_RATE diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/catalog.json b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/catalog.json index dc25bf3713604..a193f98bd8e86 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/catalog.json +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/catalog.json @@ -178,6 +178,33 @@ "destination_sync_mode": "append_dedup", "primary_key": [["id"]] }, + { + "stream": { + "name": "1_prefix_startwith_number", + "json_schema": { + "type": ["null", "object"], + "properties": { + "id": { + "type": "integer" + }, + "date": { + "type": "string", + "format": "date" + }, + "text": { + "type": "string" + } + } + }, + "supported_sync_modes": ["incremental"], + "source_defined_cursor": true, + "default_cursor_field": [] + }, + "sync_mode": "incremental", + "cursor_field": ["date"], + "destination_sync_mode": "append_dedup", + "primary_key": [["id"]] + }, { "stream": { "name": "multiple_column_names_conflicts", diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages.txt b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages.txt index e84ca1f63c79a..c4b5b4d8543a1 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages.txt +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages.txt @@ -42,4 +42,12 @@ {"type":"RECORD","record":{"stream":"pos_dedup_cdcx","data":{"id":5,"name":"lotus","_ab_cdc_updated_at":1623850868371,"_ab_cdc_lsn":27010048,"_ab_cdc_log_pos": 33280,"_ab_cdc_deleted_at":null},"emitted_at":1623861660}} {"type":"RECORD","record":{"stream":"pos_dedup_cdcx","data":{"id":5,"name":"lily","_ab_cdc_updated_at":1623850868371,"_ab_cdc_lsn":27010232,"_ab_cdc_log_pos": 33281,"_ab_cdc_deleted_at":null},"emitted_at":1623861660}} +{"type": "RECORD", "record": {"stream": "1_prefix_startwith_number", "emitted_at": 1602637589000, "data": { "id": 1, "date": "2020-08-29", "text": "hi 1"}}} +{"type": "RECORD", "record": {"stream": "1_prefix_startwith_number", "emitted_at": 1602637689100, "data": { "id": 1, "date": "2020-08-30", "text": "hi 2"}}} +{"type": "RECORD", "record": {"stream": "1_prefix_startwith_number", "emitted_at": 1602637789200, "data": { "id": 2, "date": "2020-08-31", "text": "hi 1"}}} +{"type": "RECORD", "record": {"stream": "1_prefix_startwith_number", "emitted_at": 1602637889300, "data": { "id": 2, "date": "2020-08-31", "text": "hi 2"}}} +{"type": "RECORD", "record": {"stream": "1_prefix_startwith_number", "emitted_at": 1602637989400, "data": { "id": 2, "date": "2020-09-01", "text": "hi 3"}}} +{"type": "RECORD", "record": {"stream": "1_prefix_startwith_number", "emitted_at": 1602637990700, "data": { "id": 1, "date": "2020-09-01", "text": "hi 3"}}} +{"type": "RECORD", "record": {"stream": "1_prefix_startwith_number", "emitted_at": 1602637990800, "data": { "id": 2, "date": "2020-09-01", "text": "hi 4"}}} + {"type":"RECORD","record":{"stream":"multiple_column_names_conflicts","data":{"id":1,"User Id":"chris","user_id":42,"User id":300,"user id": 102,"UserId":101},"emitted_at":1623959926}} diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/test_ephemeral.py b/airbyte-integrations/bases/base-normalization/integration_tests/test_ephemeral.py index 287789e378104..f8eb6a324f8e9 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/test_ephemeral.py +++ b/airbyte-integrations/bases/base-normalization/integration_tests/test_ephemeral.py @@ -54,7 +54,9 @@ def test_destination_supported_limits(destination_type: DestinationType, column_ pytest.skip(f"Destinations {destination_type} is not in NORMALIZATION_TEST_TARGET env variable (MYSQL is also skipped)") if destination_type.value == DestinationType.ORACLE.value: # Airbyte uses a few columns for metadata and Oracle limits are right at 1000 - column_count = 995 + column_count = 993 + if destination_type.value == DestinationType.MSSQL.value: + column_count = 999 run_test(destination_type, column_count) @@ -62,7 +64,7 @@ def test_destination_supported_limits(destination_type: DestinationType, column_ "integration_type, column_count, expected_exception_message", [ ("Postgres", 1665, "target lists can have at most 1664 entries"), - ("BigQuery", 2500, "The view is too large."), + ("BigQuery", 3000, "The view is too large."), ("Snowflake", 2000, "Operation failed because soft limit on objects of type 'Column' per table was exceeded."), ("Redshift", 1665, "target lists can have at most 1664 entries"), ("MySQL", 250, "Row size too large"), diff --git a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/destination_name_transformer.py b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/destination_name_transformer.py index ab7b3894d6134..66a1372bfcdff 100644 --- a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/destination_name_transformer.py +++ b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/destination_name_transformer.py @@ -150,6 +150,11 @@ def __normalize_non_column_identifier_name( if truncate: result = self.truncate_identifier_name(input_name=result, conflict=conflict, conflict_level=conflict_level) result = self.__normalize_identifier_case(result, is_quoted=False) + if result[0].isdigit(): + if self.destination_type == DestinationType.MSSQL: + result = "_" + result + elif self.destination_type == DestinationType.ORACLE: + result = "ab_" + result return result def __normalize_identifier_name( @@ -165,13 +170,7 @@ def __normalize_identifier_name( result = result.replace("`", "_") result = result.replace("'", "\\'") result = self.__normalize_identifier_case(result, is_quoted=True) - if self.destination_type == DestinationType.ORACLE: - # Oracle dbt lib doesn't implemented adapter quote yet. - result = f"quote('{result}')" - elif self.destination_type == DestinationType.CLICKHOUSE: - result = f"quote('{result}')" - else: - result = f"adapter.quote('{result}')" + result = self.apply_quote(result) if not in_jinja: result = jinja_call(result) return result @@ -182,6 +181,14 @@ def __normalize_identifier_name( return f"'{result}'" return result + def apply_quote(self, input: str) -> str: + if self.destination_type == DestinationType.ORACLE: + # Oracle dbt lib doesn't implemented adapter quote yet. + return f"quote('{input}')" + elif self.destination_type == DestinationType.CLICKHOUSE: + return f"quote('{input}')" + return f"adapter.quote('{input}')" + def __normalize_naming_conventions(self, input_name: str) -> str: result = input_name if self.destination_type.value == DestinationType.ORACLE.value: diff --git a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py index d5ba201b9746b..bb3a689ff0c8b 100644 --- a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py +++ b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py @@ -1064,15 +1064,17 @@ def add_to_outputs( if suffix == "scd": stg_schema = self.get_schema(True) stg_table = self.tables_registry.get_file_name(schema, self.json_path, self.stream_name, "stg", truncate_name) + if self.name_transformer.needs_quotes(stg_table): + stg_table = jinja_call(self.name_transformer.apply_quote(stg_table)) if self.destination_type.value == DestinationType.POSTGRES.value: # Keep only rows with the max emitted_at to keep incremental behavior config["post_hook"] = ( - f"['delete from {stg_schema}.{stg_table} " + f'["delete from {stg_schema}.{stg_table} ' + f"where {self.airbyte_emitted_at} != (select max({self.airbyte_emitted_at}) " - + f"from {stg_schema}.{stg_table})']" + + f'from {stg_schema}.{stg_table})"]' ) else: - config["post_hook"] = f"['drop view {stg_schema}.{stg_table}']" + config["post_hook"] = f'["drop view {stg_schema}.{stg_table}"]' else: # incremental is handled in the SCD SQL already sql = self.add_incremental_clause(sql) diff --git a/airbyte-integrations/bases/base-normalization/unit_tests/test_destination_name_transformer.py b/airbyte-integrations/bases/base-normalization/unit_tests/test_destination_name_transformer.py index e5c4a0d2eccd5..61754fc2fc599 100644 --- a/airbyte-integrations/bases/base-normalization/unit_tests/test_destination_name_transformer.py +++ b/airbyte-integrations/bases/base-normalization/unit_tests/test_destination_name_transformer.py @@ -121,7 +121,7 @@ def test_transform_standard_naming(input_str: str, expected: str): ("100x2003", "Snowflake", "100x2003", "{{ adapter.quote('100x2003') }}"), ("100x2004", "Redshift", "100x2004", "{{ adapter.quote('100x2004') }}"), ("100x2005", "MySQL", "100x2005", "{{ adapter.quote('100x2005') }}"), - ("100x2006", "MSSQL", "100x2006", "{{ adapter.quote('100x2006') }}"), + ("100x2006", "MSSQL", "_100x2006", "{{ adapter.quote('100x2006') }}"), # Reserved Keywords in BQ and MySQL ("Groups", "Postgres", "groups", "groups"), ("Groups", "BigQuery", "Groups", "{{ adapter.quote('Groups') }}"), diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java index 0e921d438a0df..a433e4c3a8f05 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java @@ -14,7 +14,7 @@ public class NormalizationRunnerFactory { public static final String BASE_NORMALIZATION_IMAGE_NAME = "airbyte/normalization"; - public static final String NORMALIZATION_VERSION = "0.1.62"; + public static final String NORMALIZATION_VERSION = "0.1.63"; static final Map> NORMALIZATION_MAPPING = ImmutableMap.>builder()