From 89e78a6be5a1384904b960239c802b33f2309f66 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Wed, 13 Jul 2022 18:21:17 -0700 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Destination=20BIgQuery=20can=20h?= =?UTF-8?q?andle=20nulls=20inside=20arrays=20(#14522)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../bases/base-normalization/Dockerfile | 2 +- .../macros/cross_db_utils/json_operations.sql | 32 +++++++++- .../test_nested_streams/dbt_project.yml | 34 ++++++----- .../models/generated/sources.yml | 1 + .../test_simple_streams/dbt_project.yml | 26 ++++----- .../test_simple_streams/first_dbt_project.yml | 26 ++++----- .../modified_models/generated/sources.yml | 20 +++---- .../test_nested_streams/dbt_project.yml | 34 ++++++----- .../test_normalization/arrays.sql | 58 +++++++++++++++++++ .../arrays_nested_array_parent.sql | 55 ++++++++++++++++++ .../test_normalization/arrays_ab1.sql | 18 ++++++ .../test_normalization/arrays_ab2.sql | 18 ++++++ .../test_normalization/arrays_ab3.sql | 18 ++++++ .../arrays_nested_array_parent_ab1.sql | 18 ++++++ .../arrays_nested_array_parent_ab2.sql | 17 ++++++ .../arrays_nested_array_parent_ab3.sql | 17 ++++++ .../test_normalization/arrays.sql | 19 ++++++ .../arrays_nested_array_parent.sql | 18 ++++++ .../models/generated/sources.yml | 1 + .../test_normalization/arrays.sql | 58 +++++++++++++++++++ .../arrays_nested_array_parent.sql | 55 ++++++++++++++++++ .../data_input/catalog.json | 11 ++++ .../data_input/messages.txt | 2 +- .../nested_streams_first_run_row_counts.sql | 6 ++ .../NormalizationRunnerFactory.java | 2 +- .../basic-normalization.md | 1 + 26 files changed, 500 insertions(+), 67 deletions(-) create mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/first_output/airbyte_tables/test_normalization/arrays.sql create mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/first_output/airbyte_tables/test_normalization/arrays_nested_array_parent.sql create mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_ctes/test_normalization/arrays_ab1.sql create mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_ctes/test_normalization/arrays_ab2.sql create mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_ctes/test_normalization/arrays_ab3.sql create mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_ctes/test_normalization/arrays_nested_array_parent_ab1.sql create mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_ctes/test_normalization/arrays_nested_array_parent_ab2.sql create mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_ctes/test_normalization/arrays_nested_array_parent_ab3.sql create mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_tables/test_normalization/arrays.sql create mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_tables/test_normalization/arrays_nested_array_parent.sql create mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/second_output/airbyte_tables/test_normalization/arrays.sql create mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/second_output/airbyte_tables/test_normalization/arrays_nested_array_parent.sql diff --git a/airbyte-integrations/bases/base-normalization/Dockerfile b/airbyte-integrations/bases/base-normalization/Dockerfile index 2cbda1fe6f599..1de510acc0eec 100644 --- a/airbyte-integrations/bases/base-normalization/Dockerfile +++ b/airbyte-integrations/bases/base-normalization/Dockerfile @@ -28,5 +28,5 @@ WORKDIR /airbyte ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh" ENTRYPOINT ["/airbyte/entrypoint.sh"] -LABEL io.airbyte.version=0.2.7 +LABEL io.airbyte.version=0.2.8 LABEL io.airbyte.name=airbyte/normalization diff --git a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/json_operations.sql b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/json_operations.sql index f6bfa26d22852..82ca7f517eaf1 100644 --- a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/json_operations.sql +++ b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/json_operations.sql @@ -21,6 +21,13 @@ {{ '\'$."' ~ json_path_list|join('."') ~ '"\'' }} {%- endmacro %} +{# + BigQuery has different JSONPath syntax depending on which function you call. + Most of our macros use the "legacy" JSON functions, so this function uses + the legacy syntax. + + These paths look like: "$['foo']['bar']" +#} {% macro bigquery__format_json_path(json_path_list) -%} {%- set str_list = [] -%} {%- for json_path in json_path_list -%} @@ -29,6 +36,20 @@ {{ '"$[\'' ~ str_list|join('\'][\'') ~ '\']"' }} {%- endmacro %} +{# + For macros which use the newer JSON functions, define a new_format_json_path + macro which generates the correct path syntax. + + These paths look like: '$."foo"."bar"' +#} +{% macro bigquery_new_format_json_path(json_path_list) -%} + {%- set str_list = [] -%} + {%- for json_path in json_path_list -%} + {%- if str_list.append(json_path.replace('\'', '\\\'')) -%} {%- endif -%} + {%- endfor -%} + {{ '\'$."' ~ str_list|join('"."') ~ '"\'' }} +{%- endmacro %} + {% macro postgres__format_json_path(json_path_list) -%} {%- set str_list = [] -%} {%- for json_path in json_path_list -%} @@ -247,7 +268,14 @@ {{ json_extract_array(json_column, json_path_list, normalized_json_path) }} {%- endmacro %} -# https://cloud.google.com/bigquery/docs/reference/standard-sql/json_functions#json_extract_string_array +{# +See https://cloud.google.com/bigquery/docs/reference/standard-sql/json_functions#json_extract_string_array + +BigQuery does not allow NULL entries in REPEATED fields, so we replace those with literal "NULL" strings. +#} {% macro bigquery__json_extract_string_array(json_column, json_path_list, normalized_json_path) -%} - json_extract_string_array({{ json_column }}, {{ format_json_path(normalized_json_path) }}) + array( + select ifnull(x, "NULL") + from unnest(json_value_array({{ json_column }}, {{ bigquery_new_format_json_path(normalized_json_path) }})) as x + ) {%- endmacro %} diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_nested_streams/dbt_project.yml b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_nested_streams/dbt_project.yml index 68ca41b91d53f..474ab801dbf43 100755 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_nested_streams/dbt_project.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_nested_streams/dbt_project.yml @@ -1,25 +1,25 @@ name: airbyte_utils -version: "1.0" +version: '1.0' config-version: 2 profile: normalize model-paths: - - models +- models docs-paths: - - docs +- docs analysis-paths: - - analysis +- analysis test-paths: - - tests +- tests seed-paths: - - data +- data macro-paths: - - macros +- macros target-path: ../build log-path: ../logs packages-install-path: /dbt clean-targets: - - build - - dbt_modules +- build +- dbt_modules quoting: database: true schema: false @@ -42,10 +42,10 @@ models: +tags: airbyte_internal_views +materialized: view dispatch: - - macro_namespace: dbt_utils - search_order: - - airbyte_utils - - dbt_utils +- macro_namespace: dbt_utils + search_order: + - airbyte_utils + - dbt_utils vars: json_column: _airbyte_data models_to_source: @@ -83,6 +83,10 @@ vars: unnest_alias_ab2: test_normalization._airbyte_raw_unnest_alias unnest_alias_ab3: test_normalization._airbyte_raw_unnest_alias unnest_alias: test_normalization._airbyte_raw_unnest_alias + arrays_ab1: test_normalization._airbyte_raw_arrays + arrays_ab2: test_normalization._airbyte_raw_arrays + arrays_ab3: test_normalization._airbyte_raw_arrays + arrays: test_normalization._airbyte_raw_arrays nested_stream_with_complex_columns_resulting_into_long_names_partition_ab1: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names nested_stream_with_complex_columns_resulting_into_long_names_partition_ab2: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names nested_stream_with_complex_columns_resulting_into_long_names_partition_ab3: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names @@ -95,6 +99,10 @@ vars: unnest_alias_children_ab2: test_normalization._airbyte_raw_unnest_alias unnest_alias_children_ab3: test_normalization._airbyte_raw_unnest_alias unnest_alias_children: test_normalization._airbyte_raw_unnest_alias + arrays_nested_array_parent_ab1: test_normalization._airbyte_raw_arrays + arrays_nested_array_parent_ab2: test_normalization._airbyte_raw_arrays + arrays_nested_array_parent_ab3: test_normalization._airbyte_raw_arrays + arrays_nested_array_parent: test_normalization._airbyte_raw_arrays nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data_ab1: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data_ab2: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data_ab3: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_nested_streams/models/generated/sources.yml b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_nested_streams/models/generated/sources.yml index 92fa4c9a2580e..29bae1b4b5105 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_nested_streams/models/generated/sources.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_nested_streams/models/generated/sources.yml @@ -6,6 +6,7 @@ sources: schema: false identifier: false tables: + - name: _airbyte_raw_arrays - name: _airbyte_raw_conflict_stream_array - name: _airbyte_raw_conflict_stream_name - name: _airbyte_raw_conflict_stream_scalar diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/dbt_project.yml b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/dbt_project.yml index 77cd510537471..013a446b320a5 100755 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/dbt_project.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/dbt_project.yml @@ -1,25 +1,25 @@ name: airbyte_utils -version: "1.0" +version: '1.0' config-version: 2 profile: normalize model-paths: - - modified_models +- modified_models docs-paths: - - docs +- docs analysis-paths: - - analysis +- analysis test-paths: - - tests +- tests seed-paths: - - data +- data macro-paths: - - macros +- macros target-path: ../build log-path: ../logs packages-install-path: /dbt clean-targets: - - build - - dbt_modules +- build +- dbt_modules quoting: database: true schema: false @@ -42,10 +42,10 @@ models: +tags: airbyte_internal_views +materialized: view dispatch: - - macro_namespace: dbt_utils - search_order: - - airbyte_utils - - dbt_utils +- macro_namespace: dbt_utils + search_order: + - airbyte_utils + - dbt_utils vars: json_column: _airbyte_data models_to_source: diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/first_dbt_project.yml b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/first_dbt_project.yml index 200e87ca5ea7c..bd8ede6a646f2 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/first_dbt_project.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/first_dbt_project.yml @@ -1,25 +1,25 @@ name: airbyte_utils -version: "1.0" +version: '1.0' config-version: 2 profile: normalize model-paths: - - models +- models docs-paths: - - docs +- docs analysis-paths: - - analysis +- analysis test-paths: - - tests +- tests seed-paths: - - data +- data macro-paths: - - macros +- macros target-path: ../build log-path: ../logs packages-install-path: /dbt clean-targets: - - build - - dbt_modules +- build +- dbt_modules quoting: database: true schema: false @@ -42,10 +42,10 @@ models: +tags: airbyte_internal_views +materialized: view dispatch: - - macro_namespace: dbt_utils - search_order: - - airbyte_utils - - dbt_utils +- macro_namespace: dbt_utils + search_order: + - airbyte_utils + - dbt_utils vars: json_column: _airbyte_data models_to_source: diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/modified_models/generated/sources.yml b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/modified_models/generated/sources.yml index 79ad1a1bb5c5a..6a5d7bdc09a16 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/modified_models/generated/sources.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/modified_models/generated/sources.yml @@ -1,12 +1,12 @@ version: 2 sources: - - name: test_normalization - quoting: - database: true - schema: false - identifier: false - tables: - - name: _airbyte_raw_dedup_cdc_excluded - - name: _airbyte_raw_dedup_exchange_rate - - name: _airbyte_raw_exchange_rate - - name: _airbyte_raw_renamed_dedup_cdc_excluded +- name: test_normalization + quoting: + database: true + schema: false + identifier: false + tables: + - name: _airbyte_raw_dedup_cdc_excluded + - name: _airbyte_raw_dedup_exchange_rate + - name: _airbyte_raw_exchange_rate + - name: _airbyte_raw_renamed_dedup_cdc_excluded diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/dbt_project.yml b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/dbt_project.yml index fa54af3b1a086..6199d0a669d13 100755 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/dbt_project.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/dbt_project.yml @@ -1,25 +1,25 @@ name: airbyte_utils -version: "1.0" +version: '1.0' config-version: 2 profile: normalize model-paths: - - models +- models docs-paths: - - docs +- docs analysis-paths: - - analysis +- analysis test-paths: - - tests +- tests seed-paths: - - data +- data macro-paths: - - macros +- macros target-path: ../build log-path: ../logs packages-install-path: /dbt clean-targets: - - build - - dbt_modules +- build +- dbt_modules quoting: database: true schema: false @@ -42,10 +42,10 @@ models: +tags: airbyte_internal_views +materialized: view dispatch: - - macro_namespace: dbt_utils - search_order: - - airbyte_utils - - dbt_utils +- macro_namespace: dbt_utils + search_order: + - airbyte_utils + - dbt_utils vars: json_column: _airbyte_data models_to_source: @@ -83,6 +83,10 @@ vars: unnest_alias_ab2: test_normalization._airbyte_raw_unnest_alias unnest_alias_ab3: test_normalization._airbyte_raw_unnest_alias unnest_alias: test_normalization._airbyte_raw_unnest_alias + arrays_ab1: test_normalization._airbyte_raw_arrays + arrays_ab2: test_normalization._airbyte_raw_arrays + arrays_ab3: test_normalization._airbyte_raw_arrays + arrays: test_normalization._airbyte_raw_arrays nested_stream_with_c___long_names_partition_ab1: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names nested_stream_with_c___long_names_partition_ab2: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names nested_stream_with_c___long_names_partition_ab3: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names @@ -95,6 +99,10 @@ vars: unnest_alias_children_ab2: test_normalization._airbyte_raw_unnest_alias unnest_alias_children_ab3: test_normalization._airbyte_raw_unnest_alias unnest_alias_children: test_normalization._airbyte_raw_unnest_alias + arrays_nested_array_parent_ab1: test_normalization._airbyte_raw_arrays + arrays_nested_array_parent_ab2: test_normalization._airbyte_raw_arrays + arrays_nested_array_parent_ab3: test_normalization._airbyte_raw_arrays + arrays_nested_array_parent: test_normalization._airbyte_raw_arrays nested_stream_with_c__ion_double_array_data_ab1: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names nested_stream_with_c__ion_double_array_data_ab2: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names nested_stream_with_c__ion_double_array_data_ab3: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/first_output/airbyte_tables/test_normalization/arrays.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/first_output/airbyte_tables/test_normalization/arrays.sql new file mode 100644 index 0000000000000..e10c4619e53a4 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/first_output/airbyte_tables/test_normalization/arrays.sql @@ -0,0 +1,58 @@ + + + create table "postgres".test_normalization."arrays__dbt_tmp" + as ( + +with __dbt__cte__arrays_ab1 as ( + +-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema +-- depends_on: "postgres".test_normalization._airbyte_raw_arrays +select + jsonb_extract_path(_airbyte_data, 'array_of_strings') as array_of_strings, + + jsonb_extract_path(table_alias._airbyte_data, 'nested_array_parent') + as nested_array_parent, + _airbyte_ab_id, + _airbyte_emitted_at, + now() as _airbyte_normalized_at +from "postgres".test_normalization._airbyte_raw_arrays as table_alias +-- arrays +where 1 = 1 +), __dbt__cte__arrays_ab2 as ( + +-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type +-- depends_on: __dbt__cte__arrays_ab1 +select + array_of_strings, + cast(nested_array_parent as + jsonb +) as nested_array_parent, + _airbyte_ab_id, + _airbyte_emitted_at, + now() as _airbyte_normalized_at +from __dbt__cte__arrays_ab1 +-- arrays +where 1 = 1 +), __dbt__cte__arrays_ab3 as ( + +-- SQL model to build a hash column based on the values of this record +-- depends_on: __dbt__cte__arrays_ab2 +select + md5(cast(coalesce(cast(array_of_strings as text), '') || '-' || coalesce(cast(nested_array_parent as text), '') as text)) as _airbyte_arrays_hashid, + tmp.* +from __dbt__cte__arrays_ab2 tmp +-- arrays +where 1 = 1 +)-- Final base SQL model +-- depends_on: __dbt__cte__arrays_ab3 +select + array_of_strings, + nested_array_parent, + _airbyte_ab_id, + _airbyte_emitted_at, + now() as _airbyte_normalized_at, + _airbyte_arrays_hashid +from __dbt__cte__arrays_ab3 +-- arrays from "postgres".test_normalization._airbyte_raw_arrays +where 1 = 1 + ); \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/first_output/airbyte_tables/test_normalization/arrays_nested_array_parent.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/first_output/airbyte_tables/test_normalization/arrays_nested_array_parent.sql new file mode 100644 index 0000000000000..09ad8fe3cd3f9 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/first_output/airbyte_tables/test_normalization/arrays_nested_array_parent.sql @@ -0,0 +1,55 @@ + + + create table "postgres".test_normalization."arrays_nested_array_parent__dbt_tmp" + as ( + +with __dbt__cte__arrays_nested_array_parent_ab1 as ( + +-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema +-- depends_on: "postgres".test_normalization."arrays" +select + _airbyte_arrays_hashid, + jsonb_extract_path(nested_array_parent, 'nested_array') as nested_array, + _airbyte_ab_id, + _airbyte_emitted_at, + now() as _airbyte_normalized_at +from "postgres".test_normalization."arrays" as table_alias +-- nested_array_parent at arrays/nested_array_parent +where 1 = 1 +and nested_array_parent is not null +), __dbt__cte__arrays_nested_array_parent_ab2 as ( + +-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type +-- depends_on: __dbt__cte__arrays_nested_array_parent_ab1 +select + _airbyte_arrays_hashid, + nested_array, + _airbyte_ab_id, + _airbyte_emitted_at, + now() as _airbyte_normalized_at +from __dbt__cte__arrays_nested_array_parent_ab1 +-- nested_array_parent at arrays/nested_array_parent +where 1 = 1 +), __dbt__cte__arrays_nested_array_parent_ab3 as ( + +-- SQL model to build a hash column based on the values of this record +-- depends_on: __dbt__cte__arrays_nested_array_parent_ab2 +select + md5(cast(coalesce(cast(_airbyte_arrays_hashid as text), '') || '-' || coalesce(cast(nested_array as text), '') as text)) as _airbyte_nested_array_parent_hashid, + tmp.* +from __dbt__cte__arrays_nested_array_parent_ab2 tmp +-- nested_array_parent at arrays/nested_array_parent +where 1 = 1 +)-- Final base SQL model +-- depends_on: __dbt__cte__arrays_nested_array_parent_ab3 +select + _airbyte_arrays_hashid, + nested_array, + _airbyte_ab_id, + _airbyte_emitted_at, + now() as _airbyte_normalized_at, + _airbyte_nested_array_parent_hashid +from __dbt__cte__arrays_nested_array_parent_ab3 +-- nested_array_parent at arrays/nested_array_parent from "postgres".test_normalization."arrays" +where 1 = 1 + ); \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_ctes/test_normalization/arrays_ab1.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_ctes/test_normalization/arrays_ab1.sql new file mode 100644 index 0000000000000..6fbf79914b825 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_ctes/test_normalization/arrays_ab1.sql @@ -0,0 +1,18 @@ +{{ config( + indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}], + unique_key = '_airbyte_ab_id', + schema = "_airbyte_test_normalization", + tags = [ "top-level-intermediate" ] +) }} +-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema +-- depends_on: {{ source('test_normalization', '_airbyte_raw_arrays') }} +select + {{ json_extract_string_array('_airbyte_data', ['array_of_strings'], ['array_of_strings']) }} as array_of_strings, + {{ json_extract('table_alias', '_airbyte_data', ['nested_array_parent'], ['nested_array_parent']) }} as nested_array_parent, + _airbyte_ab_id, + _airbyte_emitted_at, + {{ current_timestamp() }} as _airbyte_normalized_at +from {{ source('test_normalization', '_airbyte_raw_arrays') }} as table_alias +-- arrays +where 1 = 1 + diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_ctes/test_normalization/arrays_ab2.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_ctes/test_normalization/arrays_ab2.sql new file mode 100644 index 0000000000000..97010a6648aa3 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_ctes/test_normalization/arrays_ab2.sql @@ -0,0 +1,18 @@ +{{ config( + indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}], + unique_key = '_airbyte_ab_id', + schema = "_airbyte_test_normalization", + tags = [ "top-level-intermediate" ] +) }} +-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type +-- depends_on: {{ ref('arrays_ab1') }} +select + array_of_strings, + cast(nested_array_parent as {{ type_json() }}) as nested_array_parent, + _airbyte_ab_id, + _airbyte_emitted_at, + {{ current_timestamp() }} as _airbyte_normalized_at +from {{ ref('arrays_ab1') }} +-- arrays +where 1 = 1 + diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_ctes/test_normalization/arrays_ab3.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_ctes/test_normalization/arrays_ab3.sql new file mode 100644 index 0000000000000..c3c0afc5de7b3 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_ctes/test_normalization/arrays_ab3.sql @@ -0,0 +1,18 @@ +{{ config( + indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}], + unique_key = '_airbyte_ab_id', + schema = "_airbyte_test_normalization", + tags = [ "top-level-intermediate" ] +) }} +-- SQL model to build a hash column based on the values of this record +-- depends_on: {{ ref('arrays_ab2') }} +select + {{ dbt_utils.surrogate_key([ + array_to_string('array_of_strings'), + object_to_string('nested_array_parent'), + ]) }} as _airbyte_arrays_hashid, + tmp.* +from {{ ref('arrays_ab2') }} tmp +-- arrays +where 1 = 1 + diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_ctes/test_normalization/arrays_nested_array_parent_ab1.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_ctes/test_normalization/arrays_nested_array_parent_ab1.sql new file mode 100644 index 0000000000000..2cbe78b134dc7 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_ctes/test_normalization/arrays_nested_array_parent_ab1.sql @@ -0,0 +1,18 @@ +{{ config( + indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}], + schema = "_airbyte_test_normalization", + tags = [ "nested-intermediate" ] +) }} +-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema +-- depends_on: {{ ref('arrays') }} +select + _airbyte_arrays_hashid, + {{ json_extract_string_array('nested_array_parent', ['nested_array'], ['nested_array']) }} as nested_array, + _airbyte_ab_id, + _airbyte_emitted_at, + {{ current_timestamp() }} as _airbyte_normalized_at +from {{ ref('arrays') }} as table_alias +-- nested_array_parent at arrays/nested_array_parent +where 1 = 1 +and nested_array_parent is not null + diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_ctes/test_normalization/arrays_nested_array_parent_ab2.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_ctes/test_normalization/arrays_nested_array_parent_ab2.sql new file mode 100644 index 0000000000000..0a2dde68d0b85 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_ctes/test_normalization/arrays_nested_array_parent_ab2.sql @@ -0,0 +1,17 @@ +{{ config( + indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}], + schema = "_airbyte_test_normalization", + tags = [ "nested-intermediate" ] +) }} +-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type +-- depends_on: {{ ref('arrays_nested_array_parent_ab1') }} +select + _airbyte_arrays_hashid, + nested_array, + _airbyte_ab_id, + _airbyte_emitted_at, + {{ current_timestamp() }} as _airbyte_normalized_at +from {{ ref('arrays_nested_array_parent_ab1') }} +-- nested_array_parent at arrays/nested_array_parent +where 1 = 1 + diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_ctes/test_normalization/arrays_nested_array_parent_ab3.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_ctes/test_normalization/arrays_nested_array_parent_ab3.sql new file mode 100644 index 0000000000000..c59efa0e9ad20 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_ctes/test_normalization/arrays_nested_array_parent_ab3.sql @@ -0,0 +1,17 @@ +{{ config( + indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}], + schema = "_airbyte_test_normalization", + tags = [ "nested-intermediate" ] +) }} +-- SQL model to build a hash column based on the values of this record +-- depends_on: {{ ref('arrays_nested_array_parent_ab2') }} +select + {{ dbt_utils.surrogate_key([ + '_airbyte_arrays_hashid', + array_to_string('nested_array'), + ]) }} as _airbyte_nested_array_parent_hashid, + tmp.* +from {{ ref('arrays_nested_array_parent_ab2') }} tmp +-- nested_array_parent at arrays/nested_array_parent +where 1 = 1 + diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_tables/test_normalization/arrays.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_tables/test_normalization/arrays.sql new file mode 100644 index 0000000000000..875d028168620 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_tables/test_normalization/arrays.sql @@ -0,0 +1,19 @@ +{{ config( + indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}], + unique_key = '_airbyte_ab_id', + schema = "test_normalization", + tags = [ "top-level" ] +) }} +-- Final base SQL model +-- depends_on: {{ ref('arrays_ab3') }} +select + array_of_strings, + nested_array_parent, + _airbyte_ab_id, + _airbyte_emitted_at, + {{ current_timestamp() }} as _airbyte_normalized_at, + _airbyte_arrays_hashid +from {{ ref('arrays_ab3') }} +-- arrays from {{ source('test_normalization', '_airbyte_raw_arrays') }} +where 1 = 1 + diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_tables/test_normalization/arrays_nested_array_parent.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_tables/test_normalization/arrays_nested_array_parent.sql new file mode 100644 index 0000000000000..73f13e380ac25 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_tables/test_normalization/arrays_nested_array_parent.sql @@ -0,0 +1,18 @@ +{{ config( + indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}], + schema = "test_normalization", + tags = [ "nested" ] +) }} +-- Final base SQL model +-- depends_on: {{ ref('arrays_nested_array_parent_ab3') }} +select + _airbyte_arrays_hashid, + nested_array, + _airbyte_ab_id, + _airbyte_emitted_at, + {{ current_timestamp() }} as _airbyte_normalized_at, + _airbyte_nested_array_parent_hashid +from {{ ref('arrays_nested_array_parent_ab3') }} +-- nested_array_parent at arrays/nested_array_parent from {{ ref('arrays') }} +where 1 = 1 + diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/sources.yml b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/sources.yml index 92fa4c9a2580e..29bae1b4b5105 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/sources.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/sources.yml @@ -6,6 +6,7 @@ sources: schema: false identifier: false tables: + - name: _airbyte_raw_arrays - name: _airbyte_raw_conflict_stream_array - name: _airbyte_raw_conflict_stream_name - name: _airbyte_raw_conflict_stream_scalar diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/second_output/airbyte_tables/test_normalization/arrays.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/second_output/airbyte_tables/test_normalization/arrays.sql new file mode 100644 index 0000000000000..e10c4619e53a4 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/second_output/airbyte_tables/test_normalization/arrays.sql @@ -0,0 +1,58 @@ + + + create table "postgres".test_normalization."arrays__dbt_tmp" + as ( + +with __dbt__cte__arrays_ab1 as ( + +-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema +-- depends_on: "postgres".test_normalization._airbyte_raw_arrays +select + jsonb_extract_path(_airbyte_data, 'array_of_strings') as array_of_strings, + + jsonb_extract_path(table_alias._airbyte_data, 'nested_array_parent') + as nested_array_parent, + _airbyte_ab_id, + _airbyte_emitted_at, + now() as _airbyte_normalized_at +from "postgres".test_normalization._airbyte_raw_arrays as table_alias +-- arrays +where 1 = 1 +), __dbt__cte__arrays_ab2 as ( + +-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type +-- depends_on: __dbt__cte__arrays_ab1 +select + array_of_strings, + cast(nested_array_parent as + jsonb +) as nested_array_parent, + _airbyte_ab_id, + _airbyte_emitted_at, + now() as _airbyte_normalized_at +from __dbt__cte__arrays_ab1 +-- arrays +where 1 = 1 +), __dbt__cte__arrays_ab3 as ( + +-- SQL model to build a hash column based on the values of this record +-- depends_on: __dbt__cte__arrays_ab2 +select + md5(cast(coalesce(cast(array_of_strings as text), '') || '-' || coalesce(cast(nested_array_parent as text), '') as text)) as _airbyte_arrays_hashid, + tmp.* +from __dbt__cte__arrays_ab2 tmp +-- arrays +where 1 = 1 +)-- Final base SQL model +-- depends_on: __dbt__cte__arrays_ab3 +select + array_of_strings, + nested_array_parent, + _airbyte_ab_id, + _airbyte_emitted_at, + now() as _airbyte_normalized_at, + _airbyte_arrays_hashid +from __dbt__cte__arrays_ab3 +-- arrays from "postgres".test_normalization._airbyte_raw_arrays +where 1 = 1 + ); \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/second_output/airbyte_tables/test_normalization/arrays_nested_array_parent.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/second_output/airbyte_tables/test_normalization/arrays_nested_array_parent.sql new file mode 100644 index 0000000000000..09ad8fe3cd3f9 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/second_output/airbyte_tables/test_normalization/arrays_nested_array_parent.sql @@ -0,0 +1,55 @@ + + + create table "postgres".test_normalization."arrays_nested_array_parent__dbt_tmp" + as ( + +with __dbt__cte__arrays_nested_array_parent_ab1 as ( + +-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema +-- depends_on: "postgres".test_normalization."arrays" +select + _airbyte_arrays_hashid, + jsonb_extract_path(nested_array_parent, 'nested_array') as nested_array, + _airbyte_ab_id, + _airbyte_emitted_at, + now() as _airbyte_normalized_at +from "postgres".test_normalization."arrays" as table_alias +-- nested_array_parent at arrays/nested_array_parent +where 1 = 1 +and nested_array_parent is not null +), __dbt__cte__arrays_nested_array_parent_ab2 as ( + +-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type +-- depends_on: __dbt__cte__arrays_nested_array_parent_ab1 +select + _airbyte_arrays_hashid, + nested_array, + _airbyte_ab_id, + _airbyte_emitted_at, + now() as _airbyte_normalized_at +from __dbt__cte__arrays_nested_array_parent_ab1 +-- nested_array_parent at arrays/nested_array_parent +where 1 = 1 +), __dbt__cte__arrays_nested_array_parent_ab3 as ( + +-- SQL model to build a hash column based on the values of this record +-- depends_on: __dbt__cte__arrays_nested_array_parent_ab2 +select + md5(cast(coalesce(cast(_airbyte_arrays_hashid as text), '') || '-' || coalesce(cast(nested_array as text), '') as text)) as _airbyte_nested_array_parent_hashid, + tmp.* +from __dbt__cte__arrays_nested_array_parent_ab2 tmp +-- nested_array_parent at arrays/nested_array_parent +where 1 = 1 +)-- Final base SQL model +-- depends_on: __dbt__cte__arrays_nested_array_parent_ab3 +select + _airbyte_arrays_hashid, + nested_array, + _airbyte_ab_id, + _airbyte_emitted_at, + now() as _airbyte_normalized_at, + _airbyte_nested_array_parent_hashid +from __dbt__cte__arrays_nested_array_parent_ab3 +-- nested_array_parent at arrays/nested_array_parent from "postgres".test_normalization."arrays" +where 1 = 1 + ); \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/catalog.json b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/catalog.json index cbab9cf3aa20c..4e5105f136e09 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/catalog.json +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/catalog.json @@ -284,6 +284,17 @@ "items": { "type": ["null", "string"] } + }, + "nested_array_parent": { + "type": ["null", "object"], + "properties": { + "nested_array": { + "type": ["null", "array"], + "items": { + "type": ["null", "string"] + } + } + } } } }, diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/messages.txt b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/messages.txt index ef61a0fa12f1e..e349c09afc31b 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/messages.txt +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/messages.txt @@ -15,4 +15,4 @@ {"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":1, "children": [{"ab_id": 1, "owner": {"owner_id": 1, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}},{"ab_id": 2, "owner": {"owner_id": 2, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}}]},"emitted_at":1623861660}} {"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":2, "children": [{"ab_id": 3, "owner": {"owner_id": 3, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}},{"ab_id": 4, "owner": {"owner_id": 4, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}}]},"emitted_at":1623861660}} -{"type":"RECORD","record":{"stream":"arrays","emitted_at":1602638599000,"data":{"array_of_strings":["string1","string2","string3"]}}} +{"type":"RECORD","record":{"stream":"arrays","emitted_at":1602638599000,"data":{"array_of_strings":["string1",null,"string2","string3"],"nested_array_parent":{"nested_array":["string1",null,"string2"]}}}} diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/dbt_test_config/dbt_data_tests_tmp/nested_streams_first_run_row_counts.sql b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/dbt_test_config/dbt_data_tests_tmp/nested_streams_first_run_row_counts.sql index 9dbdee8474503..42c4d3c229846 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/dbt_test_config/dbt_data_tests_tmp/nested_streams_first_run_row_counts.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/dbt_test_config/dbt_data_tests_tmp/nested_streams_first_run_row_counts.sql @@ -19,6 +19,12 @@ union all union all select 'some_stream_that_was_empty' as label, count(*) as row_count, 0 as expected_count from {{ ref('some_stream_that_was_empty') }} +union all + select 'arrays' as label, count(*) as row_count, 1 as expected_count + from {{ ref('arrays') }} +union all + select 'arrays_nested_array_parent' as label, count(*) as row_count, 1 as expected_count + from {{ ref('arrays_nested_array_parent') }} ) select * from table_row_counts diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java index 33d1f0f4c4b7c..a7af7eff48711 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java @@ -14,7 +14,7 @@ public class NormalizationRunnerFactory { public static final String BASE_NORMALIZATION_IMAGE_NAME = "airbyte/normalization"; - public static final String NORMALIZATION_VERSION = "0.2.7"; + public static final String NORMALIZATION_VERSION = "0.2.8"; static final Map> NORMALIZATION_MAPPING = ImmutableMap.>builder() diff --git a/docs/understanding-airbyte/basic-normalization.md b/docs/understanding-airbyte/basic-normalization.md index 42bdc336b54eb..7ead51689783e 100644 --- a/docs/understanding-airbyte/basic-normalization.md +++ b/docs/understanding-airbyte/basic-normalization.md @@ -353,6 +353,7 @@ Therefore, in order to "upgrade" to the desired normalization version, you need | Airbyte Version | Normalization Version | Date | Pull Request | Subject | |:----------------| :--- | :--- | :--- | :--- | +| | 0.2.8 | 2022-07-13 | [\#14522](https://github.com/airbytehq/airbyte/pull/14522) | BigQuery replaces `NULL` array entries with the string value `"NULL"` | | | 0.2.7 | 2022-07-05 | [\#11694](https://github.com/airbytehq/airbyte/pull/11694) | Do not return NULL for MySQL column values > 512 chars | | | 0.2.6 | 2022-06-16 | [\#13894](https://github.com/airbytehq/airbyte/pull/13894) | Fix incorrect jinja2 macro `json_extract_array` call | | | 0.2.5 | 2022-06-15 | [\#11470](https://github.com/airbytehq/airbyte/pull/11470) | Upgrade MySQL to dbt 1.0.0 |