Skip to content

Commit

Permalink
🎉 MySQL destination: normalization (#4163)
Browse files Browse the repository at this point in the history
* Add mysql dbt package

* Add mysql normalization support in java

* Add mysql normalization support in python

* Fix unit tests

* Update readme

* Setup mysql container in integration test

* Add macros

* Depend on dbt-mysql from git repo

* Remove mysql limitation test

* Test normalization

* Revert protocol format change

* Fix mysel json macros

* Fix two more macros

* Fix table name length

* Fix array macro

* Fix equality test macro

* Update replace-identifiers

* Add more identifiers to replace

* Fix unnest macro

* Fix equality macro

* Check in mysql test output

* Update column limit test for mysql

* Escape parentheses

* Remove unnecessary mysql test

* Remove mysql output for easier code review

* Remove unnecessary mysql test

* Remove parentheses

* Update dependencies

* Skip mysql instead of manually write out types

* Bump version

* Check in unit test for mysql name transformer

* Fix type conversion

* Use json_value to extract scalar json fields

* Move dbt-mysql to Dockerfile (#4459)

* Format code

* Check in mysql dbt output

* Remove unnecessary quote

* Update mysql equality test to match 0.19.0

* Check in schema_test update

* Update readme

* Bump base normalization version

* Update document

Co-authored-by: Christophe Duong <christophe.duong@gmail.com>
  • Loading branch information
tuliren and ChristopheDuong committed Jul 4, 2021
1 parent 9531c4d commit 2caf390
Show file tree
Hide file tree
Showing 124 changed files with 2,880 additions and 76 deletions.
5 changes: 3 additions & 2 deletions airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM fishtownanalytics/dbt:0.19.1
FROM fishtownanalytics/dbt:0.19.0
COPY --from=airbyte/base-airbyte-protocol-python:0.1.1 /airbyte /airbyte

WORKDIR /airbyte
Expand All @@ -14,6 +14,7 @@ RUN pip install .

WORKDIR /airbyte/normalization_code
RUN pip install .
RUN pip install git+https://github.com/dbeatty10/dbt-mysql@96655ea9f7fca7be90c9112ce8ffbb5aac1d3716#egg=dbt-mysql

WORKDIR /airbyte/normalization_code/dbt-template/
# Download external dbt dependencies
Expand All @@ -23,5 +24,5 @@ WORKDIR /airbyte
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.1.34
LABEL io.airbyte.version=0.1.35
LABEL io.airbyte.name=airbyte/normalization
10 changes: 10 additions & 0 deletions airbyte-integrations/bases/base-normalization/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ allowed characters, if quotes are needed or not, and the length limitations:
- [postgres](../../../docs/integrations/destinations/postgres.md)
- [redshift](../../../docs/integrations/destinations/redshift.md)
- [snowflake](../../../docs/integrations/destinations/snowflake.md)
- [mysql](../../../docs/integrations/destinations/mysql.md)

Rules about truncations, for example for both of these strings which are too long for the postgres 64 limit:
- `Aaaa_Bbbb_Cccc_Dddd_Eeee_Ffff_Gggg_Hhhh_Iiii`
Expand Down Expand Up @@ -216,6 +217,15 @@ A nice improvement would be to add csv/json seed files as expected output data f
The integration tests would verify that the content of such tables in the destination would match
these seed files or fail.

### Debug dbt operations with local database
This only works for testing databases launched in local containers (e.g. postgres and mysql).

- In `dbt_integration_test.py`, comment out the `tear_down_db` method so that the relevant database container is not deleted.
- Find the name of the database container in the logs (e.g. by searching `Executing`).
- Connect to the container by running `docker exec -it <container-name> bash` in the commandline.
- Connect to the database inside the container (e.g. `mysql -u root` for mysql).
- Test the generated dbt operations directly in the database.

## Standard Destination Tests

Generally, to invoke standard destination tests, you run with gradle using:
Expand Down
1 change: 1 addition & 0 deletions airbyte-integrations/bases/base-normalization/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ task("customIntegrationTestPython", type: PythonTask, dependsOn: installTestReqs
dependsOn ':airbyte-integrations:connectors:destination-postgres:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-redshift:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-snowflake:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-mysql:airbyteDocker'
}

integrationTest.dependsOn("customIntegrationTestPython")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
) as _airbyte_nested_data
{%- endmacro %}

{% macro mysql__cross_join_unnest(stream_name, array_col) -%}
left join joined on _airbyte_{{ stream_name }}_hashid = joined._airbyte_hashid
{%- endmacro %}

{% macro redshift__cross_join_unnest(stream_name, array_col) -%}
left join joined on _airbyte_{{ stream_name }}_hashid = joined._airbyte_hashid
{%- endmacro %}
Expand All @@ -36,7 +40,7 @@
cross join table(flatten({{ array_col }})) as {{ array_col }}
{%- endmacro %}

{# unnested_column_value ------------------------------------------------- #}
{# unnested_column_value -- this macro is related to unnest_cte #}

{% macro unnested_column_value(column_col) -%}
{{ adapter.dispatch('unnested_column_value')(column_col) }}
Expand All @@ -58,6 +62,10 @@
_airbyte_nested_data
{%- endmacro %}

{% macro mysql__unnested_column_value(column_col) -%}
_airbyte_nested_data
{%- endmacro %}

{# unnest_cte ------------------------------------------------- #}

{% macro unnest_cte(table_name, stream_name, column_col) -%}
Expand Down Expand Up @@ -97,3 +105,37 @@ joined as (
where numbers.generated_number <= json_array_length({{ column_col }}, true)
)
{%- endmacro %}

{% macro mysql__unnest_cte(table_name, stream_name, column_col) -%}
{%- if not execute -%}
{{ return('') }}
{% endif %}

{%- call statement('max_json_array_length', fetch_result=True) -%}
with max_value as (
select max(json_length({{ column_col }})) as max_number_of_items
from {{ ref(table_name) }}
)
select
case when max_number_of_items is not null and max_number_of_items > 1
then max_number_of_items
else 1 end as max_number_of_items
from max_value
{%- endcall -%}

{%- set max_length = load_result('max_json_array_length') -%}
with numbers as (
{{ dbt_utils.generate_series(max_length["data"][0][0]) }}
),
joined as (
select
_airbyte_{{ stream_name }}_hashid as _airbyte_hashid,
{# -- json_extract(column_col, '$[i][0]') as _airbyte_nested_data #}
json_extract({{ column_col }}, concat("$[", numbers.generated_number - 1, "][0]")) as _airbyte_nested_data
from {{ ref(table_name) }}
cross join numbers
-- only generate the number of records in the cross join that corresponds
-- to the number of items in {{ table_name }}.{{ column_col }}
where numbers.generated_number <= json_length({{ column_col }})
)
{%- endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,44 @@
{% macro snowflake__type_json() %}
variant
{% endmacro %}

{%- macro mysql__type_json() -%}
json
{%- endmacro -%}


{# string ------------------------------------------------- #}

{%- macro mysql__type_string() -%}
char
{%- endmacro -%}


{# float ------------------------------------------------- #}
{% macro mysql__type_float() %}
float
{% endmacro %}


{# int ------------------------------------------------- #}
{% macro default__type_int() %}
signed
{% endmacro %}


{# bigint ------------------------------------------------- #}
{% macro mysql__type_bigint() %}
signed
{% endmacro %}


{# numeric ------------------------------------------------- #}
{% macro mysql__type_numeric() %}
float
{% endmacro %}


{# timestamp ------------------------------------------------- #}
{% macro mysql__type_timestamp() %}
time
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{% macro mysql__except() %}
{% do exceptions.warn("MySQL does not support EXCEPT operator") %}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
Adapter Macros for the following functions:
- Bigquery: JSON_EXTRACT(json_string_expr, json_path_format) -> https://cloud.google.com/bigquery/docs/reference/standard-sql/json_functions
- Snowflake: JSON_EXTRACT_PATH_TEXT( <column_identifier> , '<path_name>' ) -> https://docs.snowflake.com/en/sql-reference/functions/json_extract_path_text.html
- Redshift: json_extract_path_text('json_string', 'path_elem' [,'path_elem'[, ] ] [, null_if_invalid ] ) -> https://docs.aws.amazon.com/redshift/latest/dg/JSON_EXTRACT_PATH_TEXT.html
- Redshift: json_extract_path_text('json_string', 'path_elem' [,'path_elem'[, ...] ] [, null_if_invalid ] ) -> https://docs.aws.amazon.com/redshift/latest/dg/JSON_EXTRACT_PATH_TEXT.html
- Postgres: json_extract_path_text(<from_json>, 'path' [, 'path' [, ...}}) -> https://www.postgresql.org/docs/12/functions-json.html
- MySQL: JSON_EXTRACT(json_doc, 'path' [, 'path'] ...) -> https://dev.mysql.com/doc/refman/8.0/en/json-search-functions.html
#}

{# format_json_path -------------------------------------------------- #}
Expand All @@ -23,6 +24,11 @@
{{ "'" ~ json_path_list|join("','") ~ "'" }}
{%- endmacro %}

{% macro mysql__format_json_path(json_path_list) -%}
{# -- '$."x"."y"."z"' #}
{{ "'$.\"" ~ json_path_list|join(".") ~ "\"'" }}
{%- endmacro %}

{% macro redshift__format_json_path(json_path_list) -%}
{{ "'" ~ json_path_list|join("','") ~ "'" }}
{%- endmacro %}
Expand All @@ -49,6 +55,10 @@
jsonb_extract_path({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro mysql__json_extract(json_column, json_path_list) -%}
json_extract({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro redshift__json_extract(json_column, json_path_list) -%}
case when json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true) != '' then json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true) end
{%- endmacro %}
Expand All @@ -75,6 +85,10 @@
jsonb_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro mysql__json_extract_scalar(json_column, json_path_list) -%}
json_value({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro redshift__json_extract_scalar(json_column, json_path_list) -%}
case when json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true) != '' then json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true) end
{%- endmacro %}
Expand All @@ -101,6 +115,10 @@
jsonb_extract_path({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro mysql__json_extract_array(json_column, json_path_list) -%}
json_extract({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro redshift__json_extract_array(json_column, json_path_list) -%}
json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true)
{%- endmacro %}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
{#
-- Adapted from https://github.com/dbt-labs/dbt-utils/blob/0-19-0-updates/macros/schema_tests/equality.sql
-- dbt-utils version: 0.6.4
-- This macro needs to be updated accordingly when dbt-utils is upgraded.
-- This is needed because MySQL does not support the EXCEPT operator!
#}

{% macro mysql__test_equality(model, compare_model, compare_columns=None) %}

{%- if not execute -%}
{{ return('') }}
{% endif %}

{%- do dbt_utils._is_relation(model, 'test_equality') -%}

{%- if not compare_columns -%}
{%- do dbt_utils._is_ephemeral(model, 'test_equality') -%}
{%- set compare_columns = adapter.get_columns_in_relation(model) | map(attribute='quoted') -%}
{%- endif -%}

{% set compare_cols_csv = compare_columns | join(', ') %}

with a as (
select * from {{ model }}
),

b as (
select * from {{ compare_model }}
),

a_minus_b as (
select {{ compare_cols_csv }} from a
where ({{ compare_cols_csv }}) not in
(select {{ compare_cols_csv }} from b)
),

b_minus_a as (
select {{ compare_cols_csv }} from b
where ({{ compare_cols_csv }}) not in
(select {{ compare_cols_csv }} from a)
),

unioned as (
select * from a_minus_b
union all
select * from b_minus_a
),

final as (
select (select count(*) from unioned) +
(select abs(
(select count(*) from a_minus_b) -
(select count(*) from b_minus_a)
))
as count
)

select count from final

{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,17 @@
class DbtIntegrationTest(object):
def __init__(self):
self.target_schema = "test_normalization"
self.container_name = "test_normalization_db_" + self.random_string(3)
self.container_prefix = f"test_normalization_db_{self.random_string(3)}"
self.db_names = ["postgres", "mysql"]

@staticmethod
def random_string(length: int) -> str:
return "".join(random.choice(string.ascii_lowercase) for i in range(length))

def setup_db(self):
self.setup_postgres_db()
self.setup_mysql_db()

def setup_postgres_db(self):
print("Starting localhost postgres container for tests")
port = self.find_free_port()
Expand All @@ -64,7 +69,7 @@ def setup_postgres_db(self):
"run",
"--rm",
"--name",
f"{self.container_name}",
f"{self.container_prefix}_postgres",
"-e",
f"POSTGRES_USER={config['username']}",
"-e",
Expand All @@ -81,6 +86,42 @@ def setup_postgres_db(self):
with open("../secrets/postgres.json", "w") as fh:
fh.write(json.dumps(config))

def setup_mysql_db(self):
print("Starting localhost mysql container for tests")
port = self.find_free_port()
config = {
"type": "mysql",
"host": "localhost",
"port": port,
"database": self.target_schema,
"username": "root",
"password": "",
}
commands = [
"docker",
"run",
"--rm",
"--name",
f"{self.container_prefix}_mysql",
"-e",
"MYSQL_ALLOW_EMPTY_PASSWORD=yes",
"-e",
"MYSQL_INITDB_SKIP_TZINFO=yes",
"-e",
f"MYSQL_DATABASE={config['database']}",
"-p",
f"{config['port']}:3306",
"-d",
"mysql",
]
print("Executing: ", " ".join(commands))
subprocess.call(commands)

if not os.path.exists("../secrets"):
os.makedirs("../secrets")
with open("../secrets/mysql.json", "w") as fh:
fh.write(json.dumps(config))

@staticmethod
def find_free_port():
"""
Expand All @@ -92,12 +133,13 @@ def find_free_port():
s.close()
return addr[1]

def tear_down_postgres_db(self):
print("Stopping localhost postgres container for tests")
try:
subprocess.call(["docker", "kill", f"{self.container_name}"])
except Exception as e:
print(f"WARN: Exception while shutting down postgres db: {e}")
def tear_down_db(self):
for db_name in self.db_names:
print(f"Stopping localhost {db_name} container for tests")
try:
subprocess.call(["docker", "kill", f"{self.container_prefix}_{db_name}"])
except Exception as e:
print(f"WARN: Exception while shutting down {db_name}: {e}")

@staticmethod
def change_current_test_dir(request):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ models:
- name: exchange_rate
tests:
- dbt_utils.equality:
description: check_streams_are_equal
In this integration test, we are sending the same records to both streams
exchange_rate and dedup_exchange_rate.
The SCD table of dedup_exchange_rate in append_dedup mode should therefore mirror
the final table with append or overwrite mode from exchange_rate.
# description: check_streams_are_equal
# In this integration test, we are sending the same records to both streams
# exchange_rate and dedup_exchange_rate.
# The SCD table of dedup_exchange_rate in append_dedup mode should therefore mirror
# the final table with append or overwrite mode from exchange_rate.
compare_model: ref('dedup_exchange_rate_scd')
compare_columns:
- id
Expand Down

0 comments on commit 2caf390

Please sign in to comment.