Skip to content

Commit

Permalink
Normalization: solve conflict when stream and field have same name (#…
Browse files Browse the repository at this point in the history
…4557)

* solve conflict when stream and field have same name

* add logic to handle conflict

* change files

* change json_extract functions

* json_operations

* add normalization files

* test integration mysql

* remove table_alias

* mysql run

* json ops

* solve conflict with master

* solve mysql circle dependency dbt

* add tests for scalar and arrays

* add sql files

* bump normalization version

* format
  • Loading branch information
marcosmarxm committed Aug 11, 2021
1 parent 87b8696 commit e4fe62f
Show file tree
Hide file tree
Showing 424 changed files with 3,859 additions and 434 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Expand Up @@ -24,5 +24,5 @@ WORKDIR /airbyte
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.1.38
LABEL io.airbyte.version=0.1.39
LABEL io.airbyte.name=airbyte/normalization
Expand Up @@ -55,32 +55,32 @@

{# json_extract ------------------------------------------------- #}

{% macro json_extract(json_column, json_path_list, normalized_json_path) -%}
{{ adapter.dispatch('json_extract')(json_column, json_path_list, normalized_json_path) }}
{% macro json_extract(from_table, json_column, json_path_list, normalized_json_path) -%}
{{ adapter.dispatch('json_extract')(from_table, json_column, json_path_list, normalized_json_path) }}
{%- endmacro %}

{% macro default__json_extract(json_column, json_path_list, normalized_json_path) -%}
json_extract({{ json_column }}, {{ format_json_path(json_path_list) }})
{% macro default__json_extract(from_table, json_column, json_path_list, normalized_json_path) -%}
json_extract({{ from_table}}.{{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro bigquery__json_extract(json_column, json_path_list, normalized_json_path) -%}
json_extract({{ json_column }}, {{ format_json_path(normalized_json_path) }})
{% macro bigquery__json_extract(from_table, json_column, json_path_list, normalized_json_path) -%}
json_extract({{ from_table}}.{{ json_column }}, {{ format_json_path(normalized_json_path) }})
{%- endmacro %}

{% macro postgres__json_extract(json_column, json_path_list, normalized_json_path) -%}
jsonb_extract_path({{ json_column }}, {{ format_json_path(json_path_list) }})
{% macro postgres__json_extract(from_table, json_column, json_path_list, normalized_json_path) -%}
jsonb_extract_path({{ from_table }}.{{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro mysql__json_extract(json_column, json_path_list, normalized_json_path) -%}
json_extract({{ json_column }}, {{ format_json_path(normalized_json_path) }})
{% macro mysql__json_extract(from_table, json_column, json_path_list, normalized_json_path) -%}
json_extract({{ from_table }}.{{ json_column }}, {{ format_json_path(normalized_json_path) }})
{%- endmacro %}

{% macro redshift__json_extract(json_column, json_path_list, normalized_json_path) -%}
case when json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true) != '' then json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true) end
{% macro redshift__json_extract(from_table, json_column, json_path_list, normalized_json_path) -%}
case when json_extract_path_text({{ from_table }}.{{ json_column }}, {{ format_json_path(json_path_list) }}, true) != '' then json_extract_path_text({{ from_table }}.{{ json_column }}, {{ format_json_path(json_path_list) }}, true) end
{%- endmacro %}

{% macro snowflake__json_extract(json_column, json_path_list, normalized_json_path) -%}
get_path(parse_json({{ json_column }}), {{ format_json_path(json_path_list) }})
{% macro snowflake__json_extract(from_table, json_column, json_path_list, normalized_json_path) -%}
get_path(parse_json({{ from_table }}.{{ json_column }}), {{ format_json_path(json_path_list) }})
{%- endmacro %}

{# json_extract_scalar ------------------------------------------------- #}
Expand Down
Expand Up @@ -33,6 +33,7 @@
import subprocess
import sys
import threading
import time
from typing import Any, Dict, List

from normalization.destination_type import DestinationType
Expand Down Expand Up @@ -81,6 +82,8 @@ def setup_postgres_db(self):
]
print("Executing: ", " ".join(commands))
subprocess.call(commands)
time.sleep(120)

if not os.path.exists("../secrets"):
os.makedirs("../secrets")
with open("../secrets/postgres.json", "w") as fh:
Expand Down Expand Up @@ -115,6 +118,7 @@ def setup_mysql_db(self):
]
print("Executing: ", " ".join(commands))
subprocess.call(commands)
time.sleep(120)

if not os.path.exists("../secrets"):
os.makedirs("../secrets")
Expand Down
@@ -0,0 +1,13 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_array_ab1`
OPTIONS()
as
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
select
json_extract_scalar(_airbyte_data, "$['id']") as id,
json_extract(table_alias._airbyte_data, "$['conflict_stream_array']") as conflict_stream_array,
_airbyte_emitted_at
from `dataline-integration-testing`.test_normalization._airbyte_raw_conflict_stream_array as table_alias
-- conflict_stream_array;

@@ -0,0 +1,17 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_array_ab2`
OPTIONS()
as
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
select
cast(id as
string
) as id,
cast(conflict_stream_array as
string
) as conflict_stream_array,
_airbyte_emitted_at
from `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_array_ab1`
-- conflict_stream_array;

@@ -0,0 +1,18 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_array_ab3`
OPTIONS()
as
-- SQL model to build a hash column based on the values of this record
select
*,
to_hex(md5(cast(concat(coalesce(cast(id as
string
), ''), '-', coalesce(cast(conflict_stream_array as
string
), '')) as
string
))) as _airbyte_conflict_stream_array_hashid
from `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_array_ab2`
-- conflict_stream_array;

@@ -0,0 +1,14 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_array_conflict_stream_array_ab1`
OPTIONS()
as
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
select
_airbyte_conflict_stream_array_hashid,
json_extract_array(conflict_stream_array, "$['conflict_stream_name']") as conflict_stream_name,
_airbyte_emitted_at
from `dataline-integration-testing`.test_normalization.`conflict_stream_array` as table_alias
where conflict_stream_array is not null
-- conflict_stream_array at conflict_stream_array/conflict_stream_array;

@@ -0,0 +1,13 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_array_conflict_stream_array_ab2`
OPTIONS()
as
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
select
_airbyte_conflict_stream_array_hashid,
conflict_stream_name,
_airbyte_emitted_at
from `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_array_conflict_stream_array_ab1`
-- conflict_stream_array at conflict_stream_array/conflict_stream_array;

@@ -0,0 +1,18 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_array_conflict_stream_array_ab3`
OPTIONS()
as
-- SQL model to build a hash column based on the values of this record
select
*,
to_hex(md5(cast(concat(coalesce(cast(_airbyte_conflict_stream_array_hashid as
string
), ''), '-', coalesce(cast(array_to_string(conflict_stream_name, "|", "") as
string
), '')) as
string
))) as _airbyte_conflict_stream_array_2_hashid
from `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_array_conflict_stream_array_ab2`
-- conflict_stream_array at conflict_stream_array/conflict_stream_array;

@@ -0,0 +1,16 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_array_conflict_stream_array_conflict_stream_name_ab1`
OPTIONS()
as
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema

select
_airbyte_conflict_stream_array_2_hashid,
json_extract_scalar(conflict_stream_name, "$['id']") as id,
_airbyte_emitted_at
from `dataline-integration-testing`.test_normalization.`conflict_stream_array_conflict_stream_array` as table_alias
cross join unnest(conflict_stream_name) as conflict_stream_name
where conflict_stream_name is not null
-- conflict_stream_name at conflict_stream_array/conflict_stream_array/conflict_stream_name;

@@ -0,0 +1,15 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_array_conflict_stream_array_conflict_stream_name_ab2`
OPTIONS()
as
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
select
_airbyte_conflict_stream_array_2_hashid,
cast(id as
int64
) as id,
_airbyte_emitted_at
from `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_array_conflict_stream_array_conflict_stream_name_ab1`
-- conflict_stream_name at conflict_stream_array/conflict_stream_array/conflict_stream_name;

@@ -0,0 +1,18 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_array_conflict_stream_array_conflict_stream_name_ab3`
OPTIONS()
as
-- SQL model to build a hash column based on the values of this record
select
*,
to_hex(md5(cast(concat(coalesce(cast(_airbyte_conflict_stream_array_2_hashid as
string
), ''), '-', coalesce(cast(id as
string
), '')) as
string
))) as _airbyte_conflict_stream_name_hashid
from `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_array_conflict_stream_array_conflict_stream_name_ab2`
-- conflict_stream_name at conflict_stream_array/conflict_stream_array/conflict_stream_name;

@@ -0,0 +1,13 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_name_ab1`
OPTIONS()
as
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
select
json_extract_scalar(_airbyte_data, "$['id']") as id,
json_extract(table_alias._airbyte_data, "$['conflict_stream_name']") as conflict_stream_name,
_airbyte_emitted_at
from `dataline-integration-testing`.test_normalization._airbyte_raw_conflict_stream_name as table_alias
-- conflict_stream_name;

@@ -0,0 +1,17 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_name_ab2`
OPTIONS()
as
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
select
cast(id as
string
) as id,
cast(conflict_stream_name as
string
) as conflict_stream_name,
_airbyte_emitted_at
from `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_name_ab1`
-- conflict_stream_name;

@@ -0,0 +1,18 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_name_ab3`
OPTIONS()
as
-- SQL model to build a hash column based on the values of this record
select
*,
to_hex(md5(cast(concat(coalesce(cast(id as
string
), ''), '-', coalesce(cast(conflict_stream_name as
string
), '')) as
string
))) as _airbyte_conflict_stream_name_hashid
from `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_name_ab2`
-- conflict_stream_name;

@@ -0,0 +1,14 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_name_conflict_stream_name_ab1`
OPTIONS()
as
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
select
_airbyte_conflict_stream_name_hashid,
json_extract(table_alias.conflict_stream_name, "$['conflict_stream_name']") as conflict_stream_name,
_airbyte_emitted_at
from `dataline-integration-testing`.test_normalization.`conflict_stream_name` as table_alias
where conflict_stream_name is not null
-- conflict_stream_name at conflict_stream_name/conflict_stream_name;

@@ -0,0 +1,15 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_name_conflict_stream_name_ab2`
OPTIONS()
as
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
select
_airbyte_conflict_stream_name_hashid,
cast(conflict_stream_name as
string
) as conflict_stream_name,
_airbyte_emitted_at
from `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_name_conflict_stream_name_ab1`
-- conflict_stream_name at conflict_stream_name/conflict_stream_name;

@@ -0,0 +1,18 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_name_conflict_stream_name_ab3`
OPTIONS()
as
-- SQL model to build a hash column based on the values of this record
select
*,
to_hex(md5(cast(concat(coalesce(cast(_airbyte_conflict_stream_name_hashid as
string
), ''), '-', coalesce(cast(conflict_stream_name as
string
), '')) as
string
))) as _airbyte_conflict_stream_name_2_hashid
from `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_name_conflict_stream_name_ab2`
-- conflict_stream_name at conflict_stream_name/conflict_stream_name;

@@ -0,0 +1,14 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_name_conflict_stream_name_conflict_stream_name_ab1`
OPTIONS()
as
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
select
_airbyte_conflict_stream_name_2_hashid,
json_extract_scalar(conflict_stream_name, "$['groups']") as `groups`,
_airbyte_emitted_at
from `dataline-integration-testing`.test_normalization.`conflict_stream_name_conflict_stream_name` as table_alias
where conflict_stream_name is not null
-- conflict_stream_name at conflict_stream_name/conflict_stream_name/conflict_stream_name;

@@ -0,0 +1,15 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_name_conflict_stream_name_conflict_stream_name_ab2`
OPTIONS()
as
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
select
_airbyte_conflict_stream_name_2_hashid,
cast(`groups` as
string
) as `groups`,
_airbyte_emitted_at
from `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_name_conflict_stream_name_conflict_stream_name_ab1`
-- conflict_stream_name at conflict_stream_name/conflict_stream_name/conflict_stream_name;

@@ -0,0 +1,18 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_name_conflict_stream_name_conflict_stream_name_ab3`
OPTIONS()
as
-- SQL model to build a hash column based on the values of this record
select
*,
to_hex(md5(cast(concat(coalesce(cast(_airbyte_conflict_stream_name_2_hashid as
string
), ''), '-', coalesce(cast(`groups` as
string
), '')) as
string
))) as _airbyte_conflict_stream_name_3_hashid
from `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_name_conflict_stream_name_conflict_stream_name_ab2`
-- conflict_stream_name at conflict_stream_name/conflict_stream_name/conflict_stream_name;

@@ -0,0 +1,13 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`conflict_stream_scalar_ab1`
OPTIONS()
as
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
select
json_extract_scalar(_airbyte_data, "$['id']") as id,
json_extract_scalar(_airbyte_data, "$['conflict_stream_scalar']") as conflict_stream_scalar,
_airbyte_emitted_at
from `dataline-integration-testing`.test_normalization._airbyte_raw_conflict_stream_scalar as table_alias
-- conflict_stream_scalar;

0 comments on commit e4fe62f

Please sign in to comment.