Skip to content

Commit

Permalink
馃悰 Destination BIgQuery can handle nulls inside arrays (#14522)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Jul 14, 2022
1 parent 6a5923b commit 89e78a6
Show file tree
Hide file tree
Showing 26 changed files with 500 additions and 67 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ WORKDIR /airbyte
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.2.7
LABEL io.airbyte.version=0.2.8
LABEL io.airbyte.name=airbyte/normalization
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@
{{ '\'$."' ~ json_path_list|join('."') ~ '"\'' }}
{%- endmacro %}

{#
BigQuery has different JSONPath syntax depending on which function you call.
Most of our macros use the "legacy" JSON functions, so this function uses
the legacy syntax.

These paths look like: "$['foo']['bar']"
#}
{% macro bigquery__format_json_path(json_path_list) -%}
{%- set str_list = [] -%}
{%- for json_path in json_path_list -%}
Expand All @@ -29,6 +36,20 @@
{{ '"$[\'' ~ str_list|join('\'][\'') ~ '\']"' }}
{%- endmacro %}

{#
For macros which use the newer JSON functions, define a new_format_json_path
macro which generates the correct path syntax.

These paths look like: '$."foo"."bar"'
#}
{% macro bigquery_new_format_json_path(json_path_list) -%}
{%- set str_list = [] -%}
{%- for json_path in json_path_list -%}
{%- if str_list.append(json_path.replace('\'', '\\\'')) -%} {%- endif -%}
{%- endfor -%}
{{ '\'$."' ~ str_list|join('"."') ~ '"\'' }}
{%- endmacro %}

{% macro postgres__format_json_path(json_path_list) -%}
{%- set str_list = [] -%}
{%- for json_path in json_path_list -%}
Expand Down Expand Up @@ -247,7 +268,14 @@
{{ json_extract_array(json_column, json_path_list, normalized_json_path) }}
{%- endmacro %}

# https://cloud.google.com/bigquery/docs/reference/standard-sql/json_functions#json_extract_string_array
{#
See https://cloud.google.com/bigquery/docs/reference/standard-sql/json_functions#json_extract_string_array

BigQuery does not allow NULL entries in REPEATED fields, so we replace those with literal "NULL" strings.
#}
{% macro bigquery__json_extract_string_array(json_column, json_path_list, normalized_json_path) -%}
json_extract_string_array({{ json_column }}, {{ format_json_path(normalized_json_path) }})
array(
select ifnull(x, "NULL")
from unnest(json_value_array({{ json_column }}, {{ bigquery_new_format_json_path(normalized_json_path) }})) as x
)
{%- endmacro %}
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
name: airbyte_utils
version: "1.0"
version: '1.0'
config-version: 2
profile: normalize
model-paths:
- models
- models
docs-paths:
- docs
- docs
analysis-paths:
- analysis
- analysis
test-paths:
- tests
- tests
seed-paths:
- data
- data
macro-paths:
- macros
- macros
target-path: ../build
log-path: ../logs
packages-install-path: /dbt
clean-targets:
- build
- dbt_modules
- build
- dbt_modules
quoting:
database: true
schema: false
Expand All @@ -42,10 +42,10 @@ models:
+tags: airbyte_internal_views
+materialized: view
dispatch:
- macro_namespace: dbt_utils
search_order:
- airbyte_utils
- dbt_utils
- macro_namespace: dbt_utils
search_order:
- airbyte_utils
- dbt_utils
vars:
json_column: _airbyte_data
models_to_source:
Expand Down Expand Up @@ -83,6 +83,10 @@ vars:
unnest_alias_ab2: test_normalization._airbyte_raw_unnest_alias
unnest_alias_ab3: test_normalization._airbyte_raw_unnest_alias
unnest_alias: test_normalization._airbyte_raw_unnest_alias
arrays_ab1: test_normalization._airbyte_raw_arrays
arrays_ab2: test_normalization._airbyte_raw_arrays
arrays_ab3: test_normalization._airbyte_raw_arrays
arrays: test_normalization._airbyte_raw_arrays
nested_stream_with_complex_columns_resulting_into_long_names_partition_ab1: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_ab2: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_ab3: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
Expand All @@ -95,6 +99,10 @@ vars:
unnest_alias_children_ab2: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children_ab3: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children: test_normalization._airbyte_raw_unnest_alias
arrays_nested_array_parent_ab1: test_normalization._airbyte_raw_arrays
arrays_nested_array_parent_ab2: test_normalization._airbyte_raw_arrays
arrays_nested_array_parent_ab3: test_normalization._airbyte_raw_arrays
arrays_nested_array_parent: test_normalization._airbyte_raw_arrays
nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data_ab1: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data_ab2: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data_ab3: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ sources:
schema: false
identifier: false
tables:
- name: _airbyte_raw_arrays
- name: _airbyte_raw_conflict_stream_array
- name: _airbyte_raw_conflict_stream_name
- name: _airbyte_raw_conflict_stream_scalar
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
name: airbyte_utils
version: "1.0"
version: '1.0'
config-version: 2
profile: normalize
model-paths:
- modified_models
- modified_models
docs-paths:
- docs
- docs
analysis-paths:
- analysis
- analysis
test-paths:
- tests
- tests
seed-paths:
- data
- data
macro-paths:
- macros
- macros
target-path: ../build
log-path: ../logs
packages-install-path: /dbt
clean-targets:
- build
- dbt_modules
- build
- dbt_modules
quoting:
database: true
schema: false
Expand All @@ -42,10 +42,10 @@ models:
+tags: airbyte_internal_views
+materialized: view
dispatch:
- macro_namespace: dbt_utils
search_order:
- airbyte_utils
- dbt_utils
- macro_namespace: dbt_utils
search_order:
- airbyte_utils
- dbt_utils
vars:
json_column: _airbyte_data
models_to_source:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
name: airbyte_utils
version: "1.0"
version: '1.0'
config-version: 2
profile: normalize
model-paths:
- models
- models
docs-paths:
- docs
- docs
analysis-paths:
- analysis
- analysis
test-paths:
- tests
- tests
seed-paths:
- data
- data
macro-paths:
- macros
- macros
target-path: ../build
log-path: ../logs
packages-install-path: /dbt
clean-targets:
- build
- dbt_modules
- build
- dbt_modules
quoting:
database: true
schema: false
Expand All @@ -42,10 +42,10 @@ models:
+tags: airbyte_internal_views
+materialized: view
dispatch:
- macro_namespace: dbt_utils
search_order:
- airbyte_utils
- dbt_utils
- macro_namespace: dbt_utils
search_order:
- airbyte_utils
- dbt_utils
vars:
json_column: _airbyte_data
models_to_source:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
version: 2
sources:
- name: test_normalization
quoting:
database: true
schema: false
identifier: false
tables:
- name: _airbyte_raw_dedup_cdc_excluded
- name: _airbyte_raw_dedup_exchange_rate
- name: _airbyte_raw_exchange_rate
- name: _airbyte_raw_renamed_dedup_cdc_excluded
- name: test_normalization
quoting:
database: true
schema: false
identifier: false
tables:
- name: _airbyte_raw_dedup_cdc_excluded
- name: _airbyte_raw_dedup_exchange_rate
- name: _airbyte_raw_exchange_rate
- name: _airbyte_raw_renamed_dedup_cdc_excluded
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
name: airbyte_utils
version: "1.0"
version: '1.0'
config-version: 2
profile: normalize
model-paths:
- models
- models
docs-paths:
- docs
- docs
analysis-paths:
- analysis
- analysis
test-paths:
- tests
- tests
seed-paths:
- data
- data
macro-paths:
- macros
- macros
target-path: ../build
log-path: ../logs
packages-install-path: /dbt
clean-targets:
- build
- dbt_modules
- build
- dbt_modules
quoting:
database: true
schema: false
Expand All @@ -42,10 +42,10 @@ models:
+tags: airbyte_internal_views
+materialized: view
dispatch:
- macro_namespace: dbt_utils
search_order:
- airbyte_utils
- dbt_utils
- macro_namespace: dbt_utils
search_order:
- airbyte_utils
- dbt_utils
vars:
json_column: _airbyte_data
models_to_source:
Expand Down Expand Up @@ -83,6 +83,10 @@ vars:
unnest_alias_ab2: test_normalization._airbyte_raw_unnest_alias
unnest_alias_ab3: test_normalization._airbyte_raw_unnest_alias
unnest_alias: test_normalization._airbyte_raw_unnest_alias
arrays_ab1: test_normalization._airbyte_raw_arrays
arrays_ab2: test_normalization._airbyte_raw_arrays
arrays_ab3: test_normalization._airbyte_raw_arrays
arrays: test_normalization._airbyte_raw_arrays
nested_stream_with_c___long_names_partition_ab1: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_c___long_names_partition_ab2: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_c___long_names_partition_ab3: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
Expand All @@ -95,6 +99,10 @@ vars:
unnest_alias_children_ab2: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children_ab3: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children: test_normalization._airbyte_raw_unnest_alias
arrays_nested_array_parent_ab1: test_normalization._airbyte_raw_arrays
arrays_nested_array_parent_ab2: test_normalization._airbyte_raw_arrays
arrays_nested_array_parent_ab3: test_normalization._airbyte_raw_arrays
arrays_nested_array_parent: test_normalization._airbyte_raw_arrays
nested_stream_with_c__ion_double_array_data_ab1: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_c__ion_double_array_data_ab2: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_c__ion_double_array_data_ab3: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@


create table "postgres".test_normalization."arrays__dbt_tmp"
as (

with __dbt__cte__arrays_ab1 as (

-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: "postgres".test_normalization._airbyte_raw_arrays
select
jsonb_extract_path(_airbyte_data, 'array_of_strings') as array_of_strings,

jsonb_extract_path(table_alias._airbyte_data, 'nested_array_parent')
as nested_array_parent,
_airbyte_ab_id,
_airbyte_emitted_at,
now() as _airbyte_normalized_at
from "postgres".test_normalization._airbyte_raw_arrays as table_alias
-- arrays
where 1 = 1
), __dbt__cte__arrays_ab2 as (

-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: __dbt__cte__arrays_ab1
select
array_of_strings,
cast(nested_array_parent as
jsonb
) as nested_array_parent,
_airbyte_ab_id,
_airbyte_emitted_at,
now() as _airbyte_normalized_at
from __dbt__cte__arrays_ab1
-- arrays
where 1 = 1
), __dbt__cte__arrays_ab3 as (

-- SQL model to build a hash column based on the values of this record
-- depends_on: __dbt__cte__arrays_ab2
select
md5(cast(coalesce(cast(array_of_strings as text), '') || '-' || coalesce(cast(nested_array_parent as text), '') as text)) as _airbyte_arrays_hashid,
tmp.*
from __dbt__cte__arrays_ab2 tmp
-- arrays
where 1 = 1
)-- Final base SQL model
-- depends_on: __dbt__cte__arrays_ab3
select
array_of_strings,
nested_array_parent,
_airbyte_ab_id,
_airbyte_emitted_at,
now() as _airbyte_normalized_at,
_airbyte_arrays_hashid
from __dbt__cte__arrays_ab3
-- arrays from "postgres".test_normalization._airbyte_raw_arrays
where 1 = 1
);
Loading

0 comments on commit 89e78a6

Please sign in to comment.