Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Normalization: solve conflict when stream and field have same name #4557

Merged
merged 21 commits into from
Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ WORKDIR /airbyte
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.1.38
LABEL io.airbyte.version=0.1.39
LABEL io.airbyte.name=airbyte/normalization
Original file line number Diff line number Diff line change
Expand Up @@ -55,32 +55,32 @@

{# json_extract ------------------------------------------------- #}

{% macro json_extract(json_column, json_path_list, normalized_json_path) -%}
{{ adapter.dispatch('json_extract')(json_column, json_path_list, normalized_json_path) }}
{% macro json_extract(from_table, json_column, json_path_list, normalized_json_path) -%}
{{ adapter.dispatch('json_extract')(from_table, json_column, json_path_list, normalized_json_path) }}
{%- endmacro %}

{% macro default__json_extract(json_column, json_path_list, normalized_json_path) -%}
json_extract({{ json_column }}, {{ format_json_path(json_path_list) }})
{% macro default__json_extract(from_table, json_column, json_path_list, normalized_json_path) -%}
json_extract({{ from_table}}.{{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro bigquery__json_extract(json_column, json_path_list, normalized_json_path) -%}
json_extract({{ json_column }}, {{ format_json_path(normalized_json_path) }})
{% macro bigquery__json_extract(from_table, json_column, json_path_list, normalized_json_path) -%}
json_extract({{ from_table}}.{{ json_column }}, {{ format_json_path(normalized_json_path) }})
{%- endmacro %}

{% macro postgres__json_extract(json_column, json_path_list, normalized_json_path) -%}
jsonb_extract_path({{ json_column }}, {{ format_json_path(json_path_list) }})
{% macro postgres__json_extract(from_table, json_column, json_path_list, normalized_json_path) -%}
jsonb_extract_path({{ from_table }}.{{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro mysql__json_extract(json_column, json_path_list, normalized_json_path) -%}
json_extract({{ json_column }}, {{ format_json_path(normalized_json_path) }})
{% macro mysql__json_extract(from_table, json_column, json_path_list, normalized_json_path) -%}
json_extract({{ from_table }}.{{ json_column }}, {{ format_json_path(normalized_json_path) }})
{%- endmacro %}

{% macro redshift__json_extract(json_column, json_path_list, normalized_json_path) -%}
case when json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true) != '' then json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true) end
{% macro redshift__json_extract(from_table, json_column, json_path_list, normalized_json_path) -%}
case when json_extract_path_text({{ from_table }}.{{ json_column }}, {{ format_json_path(json_path_list) }}, true) != '' then json_extract_path_text({{ from_table }}.{{ json_column }}, {{ format_json_path(json_path_list) }}, true) end
{%- endmacro %}

{% macro snowflake__json_extract(json_column, json_path_list, normalized_json_path) -%}
get_path(parse_json({{ json_column }}), {{ format_json_path(json_path_list) }})
{% macro snowflake__json_extract(from_table, json_column, json_path_list, normalized_json_path) -%}
get_path(parse_json({{ from_table }}.{{ json_column }}), {{ format_json_path(json_path_list) }})
{%- endmacro %}

{# json_extract_scalar ------------------------------------------------- #}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import subprocess
import sys
import threading
import time
from typing import Any, Dict, List

from normalization.destination_type import DestinationType
Expand Down Expand Up @@ -81,6 +82,8 @@ def setup_postgres_db(self):
]
print("Executing: ", " ".join(commands))
subprocess.call(commands)
time.sleep(120)

if not os.path.exists("../secrets"):
os.makedirs("../secrets")
with open("../secrets/postgres.json", "w") as fh:
Expand Down Expand Up @@ -115,6 +118,7 @@ def setup_mysql_db(self):
]
print("Executing: ", " ".join(commands))
subprocess.call(commands)
time.sleep(120)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be here, it makes the test wait for two minutes before doing anything

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry Chris! I add this because in my Ubuntu Setup the mysql and postgres took longer to start. I created this: #6091


if not os.path.exists("../secrets"):
os.makedirs("../secrets")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_array_ab1`
OPTIONS()
as
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
select
json_extract_scalar(_airbyte_data, "$['id']") as id,
json_extract(table_alias._airbyte_data, "$['conflict_stream_array']") as conflict_stream_array,
_airbyte_emitted_at
from `dataline-integration-testing`.test_normalization._airbyte_raw_conflict_stream_array as table_alias
-- conflict_stream_array;

Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_array_ab2`
OPTIONS()
as
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
select
cast(id as
string
) as id,
cast(conflict_stream_array as
string
) as conflict_stream_array,
_airbyte_emitted_at
from `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_array_ab1`
-- conflict_stream_array;

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_array_ab3`
OPTIONS()
as
-- SQL model to build a hash column based on the values of this record
select
*,
to_hex(md5(cast(concat(coalesce(cast(id as
string
), ''), '-', coalesce(cast(conflict_stream_array as
string
), '')) as
string
))) as _airbyte_conflict_stream_array_hashid
from `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_array_ab2`
-- conflict_stream_array;

Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_array_conflict_stream_array_ab1`
OPTIONS()
as
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
select
_airbyte_conflict_stream_array_hashid,
json_extract_array(conflict_stream_array, "$['conflict_stream_name']") as conflict_stream_name,
_airbyte_emitted_at
from `dataline-integration-testing`.test_normalization.`conflict_stream_array` as table_alias
where conflict_stream_array is not null
-- conflict_stream_array at conflict_stream_array/conflict_stream_array;

Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_array_conflict_stream_array_ab2`
OPTIONS()
as
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
select
_airbyte_conflict_stream_array_hashid,
conflict_stream_name,
_airbyte_emitted_at
from `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_array_conflict_stream_array_ab1`
-- conflict_stream_array at conflict_stream_array/conflict_stream_array;

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_array_conflict_stream_array_ab3`
OPTIONS()
as
-- SQL model to build a hash column based on the values of this record
select
*,
to_hex(md5(cast(concat(coalesce(cast(_airbyte_conflict_stream_array_hashid as
string
), ''), '-', coalesce(cast(array_to_string(conflict_stream_name, "|", "") as
string
), '')) as
string
))) as _airbyte_conflict_stream_array_2_hashid
from `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_array_conflict_stream_array_ab2`
-- conflict_stream_array at conflict_stream_array/conflict_stream_array;

Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_array_conflict_stream_array_conflict_stream_name_ab1`
OPTIONS()
as
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema

select
_airbyte_conflict_stream_array_2_hashid,
json_extract_scalar(conflict_stream_name, "$['id']") as id,
_airbyte_emitted_at
from `dataline-integration-testing`.test_normalization.`conflict_stream_array_conflict_stream_array` as table_alias
cross join unnest(conflict_stream_name) as conflict_stream_name
where conflict_stream_name is not null
-- conflict_stream_name at conflict_stream_array/conflict_stream_array/conflict_stream_name;

Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_array_conflict_stream_array_conflict_stream_name_ab2`
OPTIONS()
as
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
select
_airbyte_conflict_stream_array_2_hashid,
cast(id as
int64
) as id,
_airbyte_emitted_at
from `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_array_conflict_stream_array_conflict_stream_name_ab1`
-- conflict_stream_name at conflict_stream_array/conflict_stream_array/conflict_stream_name;

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_array_conflict_stream_array_conflict_stream_name_ab3`
OPTIONS()
as
-- SQL model to build a hash column based on the values of this record
select
*,
to_hex(md5(cast(concat(coalesce(cast(_airbyte_conflict_stream_array_2_hashid as
string
), ''), '-', coalesce(cast(id as
string
), '')) as
string
))) as _airbyte_conflict_stream_name_hashid
from `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_array_conflict_stream_array_conflict_stream_name_ab2`
-- conflict_stream_name at conflict_stream_array/conflict_stream_array/conflict_stream_name;

Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_name_ab1`
OPTIONS()
as
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
select
json_extract_scalar(_airbyte_data, "$['id']") as id,
json_extract(table_alias._airbyte_data, "$['conflict_stream_name']") as conflict_stream_name,
_airbyte_emitted_at
from `dataline-integration-testing`.test_normalization._airbyte_raw_conflict_stream_name as table_alias
-- conflict_stream_name;

Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_name_ab2`
OPTIONS()
as
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
select
cast(id as
string
) as id,
cast(conflict_stream_name as
string
) as conflict_stream_name,
_airbyte_emitted_at
from `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_name_ab1`
-- conflict_stream_name;

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_name_ab3`
OPTIONS()
as
-- SQL model to build a hash column based on the values of this record
select
*,
to_hex(md5(cast(concat(coalesce(cast(id as
string
), ''), '-', coalesce(cast(conflict_stream_name as
string
), '')) as
string
))) as _airbyte_conflict_stream_name_hashid
from `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_name_ab2`
-- conflict_stream_name;

Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_name_conflict_stream_name_ab1`
OPTIONS()
as
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
select
_airbyte_conflict_stream_name_hashid,
json_extract(table_alias.conflict_stream_name, "$['conflict_stream_name']") as conflict_stream_name,
_airbyte_emitted_at
from `dataline-integration-testing`.test_normalization.`conflict_stream_name` as table_alias
where conflict_stream_name is not null
-- conflict_stream_name at conflict_stream_name/conflict_stream_name;

Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_name_conflict_stream_name_ab2`
OPTIONS()
as
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
select
_airbyte_conflict_stream_name_hashid,
cast(conflict_stream_name as
string
) as conflict_stream_name,
_airbyte_emitted_at
from `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_name_conflict_stream_name_ab1`
-- conflict_stream_name at conflict_stream_name/conflict_stream_name;

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_name_conflict_stream_name_ab3`
OPTIONS()
as
-- SQL model to build a hash column based on the values of this record
select
*,
to_hex(md5(cast(concat(coalesce(cast(_airbyte_conflict_stream_name_hashid as
string
), ''), '-', coalesce(cast(conflict_stream_name as
string
), '')) as
string
))) as _airbyte_conflict_stream_name_2_hashid
from `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_name_conflict_stream_name_ab2`
-- conflict_stream_name at conflict_stream_name/conflict_stream_name;

Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_name_conflict_stream_name_conflict_stream_name_ab1`
OPTIONS()
as
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
select
_airbyte_conflict_stream_name_2_hashid,
json_extract_scalar(conflict_stream_name, "$['groups']") as `groups`,
marcosmarxm marked this conversation as resolved.
Show resolved Hide resolved
_airbyte_emitted_at
from `dataline-integration-testing`.test_normalization.`conflict_stream_name_conflict_stream_name` as table_alias
where conflict_stream_name is not null
-- conflict_stream_name at conflict_stream_name/conflict_stream_name/conflict_stream_name;

Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_name_conflict_stream_name_conflict_stream_name_ab2`
OPTIONS()
as
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
select
_airbyte_conflict_stream_name_2_hashid,
cast(`groups` as
string
) as `groups`,
_airbyte_emitted_at
from `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_name_conflict_stream_name_conflict_stream_name_ab1`
-- conflict_stream_name at conflict_stream_name/conflict_stream_name/conflict_stream_name;

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_name_conflict_stream_name_conflict_stream_name_ab3`
OPTIONS()
as
-- SQL model to build a hash column based on the values of this record
select
*,
to_hex(md5(cast(concat(coalesce(cast(_airbyte_conflict_stream_name_2_hashid as
string
), ''), '-', coalesce(cast(`groups` as
string
), '')) as
string
))) as _airbyte_conflict_stream_name_3_hashid
from `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_name_conflict_stream_name_conflict_stream_name_ab2`
-- conflict_stream_name at conflict_stream_name/conflict_stream_name/conflict_stream_name;

Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_scalar_ab1`
OPTIONS()
as
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
select
json_extract_scalar(_airbyte_data, "$['id']") as id,
json_extract_scalar(_airbyte_data, "$['conflict_stream_scalar']") as conflict_stream_scalar,
_airbyte_emitted_at
from `dataline-integration-testing`.test_normalization._airbyte_raw_conflict_stream_scalar as table_alias
-- conflict_stream_scalar;

Loading