Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Add a drop table hook to drop scd tables in case of overwrite sync #18015

Merged
merged 14 commits into from
Nov 1, 2022
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ WORKDIR /airbyte
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.2.23
LABEL io.airbyte.version=0.2.24
LABEL io.airbyte.name=airbyte/normalization
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ def clean_tmp_tables(
schemas_to_remove[destination.value] = []

# based on test_type select path to source files
if test_type == "ephemeral":
if test_type == "ephemeral" or test_type == "test_reset_scd_overwrite":
if not tmp_folders:
raise TypeError("`tmp_folders` arg is not provided.")
for folder in tmp_folders:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{
"streams": [
{
"stream": {
"name": "stream_test_scd_drop",
"json_schema": {
"type": ["null", "object"],
"properties": {
"id": {
"type": "integer"
},
"date": {
"type": "string",
"format": "date"
},
"timestamp_col": {
"type": "string",
"format": "date-time"
},
"datetime_to_string": {
"type": "string",
"format": "date-time",
"airbyte_type": "timestamp_with_timezone"
},
"string_to_dt": {
"type": "string"
},
"number_to_int": {
"type": "number"
},
"int_to_number": {
"type": "integer"
}
}
},
"supported_sync_modes": ["incremental"],
"source_defined_cursor": true,
"default_cursor_field": []
},
"sync_mode": "incremental",
"cursor_field": ["date"],
"destination_sync_mode": "append_dedup",
"primary_key": [["id"]]
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{
"streams": [
{
"stream": {
"name": "stream_test_scd_drop",
"json_schema": {
"type": ["null", "object"],
"properties": {
"id": {
"type": "integer"
},
"date": {
"type": "string",
"format": "date"
},
"timestamp_col": {
"type": "string",
"format": "date-time"
},
"datetime_to_string": {
"type": "string"
},
"string_to_dt": {
"type": "string",
"format": "date-time",
"airbyte_type": "timestamp_with_timezone"
},
"number_to_int": {
"type": "integer"
},
"int_to_number": {
"type": "number"
}
}
},
"supported_sync_modes": ["incremental"],
"source_defined_cursor": true,
"default_cursor_field": []
},
"sync_mode": "incremental",
"cursor_field": ["date"],
"destination_sync_mode": "append_dedup",
"primary_key": [["id"]]
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{
"streams": [
{
"stream": {
"name": "stream_test_scd_drop",
"json_schema": {
"type": ["null", "object"],
"properties": {
"id": {
"type": "integer"
},
"date": {
"type": "string",
"format": "date"
},
"timestamp_col": {
"type": "string",
"format": "date-time"
},
"datetime_to_string": {
"type": "string"
},
"string_to_dt": {
"type": "string",
"format": "date-time",
"airbyte_type": "timestamp_with_timezone"
},
"number_to_int": {
"type": "integer"
},
"int_to_number": {
"type": "number"
}
}
},
"supported_sync_modes": ["incremental"],
"source_defined_cursor": true,
"default_cursor_field": []
},
"sync_mode": "incremental",
"cursor_field": ["date"],
"destination_sync_mode": "overwrite",
"primary_key": [["id"]]
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"type": "RECORD", "record": {"stream": "stream_test_scd_drop", "emitted_at": 1602637589000, "data": { "id": 1, "date": "2022-08-29", "timestamp_col": "2020-08-29T00:00:00.000000-0000", "datetime_to_string":"2022-10-01T01:04:04-04:00", "string_to_dt":"2022-11-01T02:03:04-07:00", "number_to_int": 1, "int_to_number": 10}}}
{"type": "RECORD", "record": {"stream": "stream_test_scd_drop", "emitted_at": 1602637689100, "data": { "id": 2, "date": "2022-08-30", "timestamp_col": "2020-08-30T00:00:00.000-00", "datetime_to_string":"2022-10-02T01:04:04-04:00", "string_to_dt":"2022-11-02T03:04:05-07:00", "number_to_int": 10, "int_to_number": 11}}}
{"type": "RECORD", "record": {"stream": "stream_test_scd_drop", "emitted_at": 1602637789200, "data": { "id": 3, "date": "2022-08-31", "timestamp_col": "2020-08-31T00:00:00+00", "datetime_to_string":"2022-10-03T01:04:04-04:00", "string_to_dt":"2022-11-03T03:04:06-07:00", "number_to_int": 11, "int_to_number": 12}}}
{"type": "RECORD", "record": {"stream": "stream_test_scd_drop", "emitted_at": 1602637889300, "data": { "id": 4, "date": "2022-09-01", "timestamp_col": "2020-08-31T00:00:00+0000", "datetime_to_string":"2022-10-04T01:04:04-04:00", "string_to_dt":"2022-11-04T03:04:07-07:00", "number_to_int": 111, "int_to_number": 133}}}
{"type": "RECORD", "record": {"stream": "stream_test_scd_drop", "emitted_at": 1602637989400, "data": { "id": 5, "date": "2022-09-02", "timestamp_col": "2020-09-01T00:00:00Z", "datetime_to_string":"2022-10-05T01:04:04-04:00", "string_to_dt":"2022-11-05T03:04:08-12:00", "number_to_int": 1010, "int_to_number": 1300}}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"type": "RECORD", "record": {"stream": "stream_test_scd_drop", "emitted_at": 1602637589000, "data": { "id": 1, "date": "2022-08-29", "timestamp_col": "2020-08-29T00:00:00.000000-0000", "datetime_to_string":"2022-10-01T01:04:04-04:00", "string_to_dt":"2022-11-01T02:03:04-07:00", "number_to_int": 1, "int_to_number": 10}}}
{"type": "RECORD", "record": {"stream": "stream_test_scd_drop", "emitted_at": 1602637689100, "data": { "id": 2, "date": "2022-08-30", "timestamp_col": "2020-08-30T00:00:00.000-00", "datetime_to_string":"2022-10-02T01:04:04-04:00", "string_to_dt":"2022-11-02T03:04:05-07:00", "number_to_int": 10, "int_to_number": 11}}}
{"type": "RECORD", "record": {"stream": "stream_test_scd_drop", "emitted_at": 1602637789200, "data": { "id": 3, "date": "2022-08-31", "timestamp_col": "2020-08-31T00:00:00+00", "datetime_to_string":"2022-10-03T01:04:04-04:00", "string_to_dt":"2022-11-03T03:04:06-07:00", "number_to_int": 11, "int_to_number": 12}}}
{"type": "RECORD", "record": {"stream": "stream_test_scd_drop", "emitted_at": 1602637889300, "data": { "id": 4, "date": "2022-09-01", "timestamp_col": "2020-08-31T00:00:00+0000", "datetime_to_string":"2022-10-04T01:04:04-04:00", "string_to_dt":"2022-11-04T03:04:07-07:00", "number_to_int": 111, "int_to_number": 133}}}
{"type": "RECORD", "record": {"stream": "stream_test_scd_drop", "emitted_at": 1602637989400, "data": { "id": 5, "date": "2022-09-02", "timestamp_col": "2020-09-01T00:00:00Z", "datetime_to_string":"2022-10-05T01:04:04-04:00", "string_to_dt":"2022-11-05T03:04:08-12:00", "number_to_int": 1010, "int_to_number": 1300}}}
{"type": "RECORD", "record": {"stream": "stream_test_scd_drop", "emitted_at": 1602637989400, "data": { "id": 6, "date": "2022-09-03", "timestamp_col": "2020-09-01T00:00:00Z", "datetime_to_string":"this is a string, not a datetime value", "string_to_dt":"2022-11-05T03:04:08-12:00", "number_to_int": 1010, "int_to_number": 1300.25}}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
select * from {{ ref('test_scd_drop_row_counts') }}
where row_count != expected_count
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import json
import os
import pathlib
import shutil

import pytest
from integration_tests.dbt_integration_test import DbtIntegrationTest
from integration_tests.utils import generate_dbt_models, run_destination_process, setup_test_dir
from normalization import DestinationType

temporary_folders = set()
dbt_test_utils = DbtIntegrationTest()


@pytest.fixture(scope="module", autouse=True)
def before_all_tests(request):
destinations_to_test = dbt_test_utils.get_test_targets()
# set clean-up args to clean target destination after the test
clean_up_args = {
"destination_type": [d for d in DestinationType if d.value in destinations_to_test],
"test_type": "test_reset_scd_overwrite",
"tmp_folders": temporary_folders,
}
dbt_test_utils.set_target_schema("test_reset_scd_overwrite")
dbt_test_utils.change_current_test_dir(request)
dbt_test_utils.setup_db(destinations_to_test)
os.environ["PATH"] = os.path.abspath("../.venv/bin/") + ":" + os.environ["PATH"]
yield
dbt_test_utils.clean_tmp_tables(**clean_up_args)
dbt_test_utils.tear_down_db()
for folder in temporary_folders:
print(f"Deleting temporary test folder {folder}")
shutil.rmtree(folder, ignore_errors=True)


@pytest.fixture
def setup_test_path(request):
dbt_test_utils.change_current_test_dir(request)
print(f"Running from: {pathlib.Path().absolute()}")
print(f"Current PATH is: {os.environ['PATH']}")
yield
os.chdir(request.config.invocation_dir)


@pytest.mark.parametrize("destination_type", list(DestinationType))
def test_reset_scd_on_overwrite(destination_type: DestinationType, setup_test_path):
if destination_type.value not in dbt_test_utils.get_test_targets():
pytest.skip(f"Destinations {destination_type} is not in NORMALIZATION_TEST_TARGET env variable")

if destination_type.value in [DestinationType.ORACLE.value, DestinationType.TIDB.value]:
# Oracle and TiDB do not support incremental syncs with schema changes yet
pytest.skip(f"{destination_type} does not support incremental sync with schema change yet")
elif destination_type.value == DestinationType.REDSHIFT.value:
# set unique schema for Redshift test
dbt_test_utils.set_target_schema(dbt_test_utils.generate_random_string("test_reset_scd_"))

test_resource_name = "test_reset_scd_overwrite"
# Select target schema
target_schema = dbt_test_utils.target_schema

try:
print(f"Testing resetting SCD tables on overwrite with {destination_type} in schema {target_schema}")
run_reset_scd_on_overwrite_test(destination_type, test_resource_name)
finally:
dbt_test_utils.set_target_schema(target_schema)


def run_reset_scd_on_overwrite_test(destination_type: DestinationType, test_resource_name: str):
# Generate DBT profile yaml
integration_type = destination_type.value
test_root_dir = setup_test_dir(integration_type, temporary_folders)
destination_config = dbt_test_utils.generate_profile_yaml_file(destination_type, test_root_dir)
test_directory = os.path.join(test_root_dir, "models/generated")
shutil.rmtree(test_directory, ignore_errors=True)

# Generate config file for the destination
config_file = os.path.join(test_root_dir, "destination_config.json")
with open(config_file, "w") as f:
f.write(json.dumps(destination_config))

# make sure DBT dependencies are installed
dbt_test_utils.dbt_check(destination_type, test_root_dir)

# Generate catalog for an initial reset/cleanup (pre-test)
original_catalog_file = os.path.join("resources", test_resource_name, "data_input", "test_drop_scd_catalog.json")
dbt_test_utils.copy_replace(
original_catalog_file,
os.path.join(test_root_dir, "initial_reset_catalog.json"),
pattern='"destination_sync_mode": ".*"',
replace_value='"destination_sync_mode": "overwrite"',
)

# Force a reset in destination raw tables to remove any data left over from previous test runs
assert run_destination_process(destination_type, test_root_dir, "", "initial_reset_catalog.json", dbt_test_utils)
# generate models from catalog
generate_dbt_models(destination_type, test_resource_name, test_root_dir, "models", "test_drop_scd_catalog_reset.json", dbt_test_utils)

# Run dbt process to normalize data from the first sync
dbt_test_utils.dbt_run(destination_type, test_root_dir, force_full_refresh=True)

# Remove models generated in previous step to avoid DBT compilation errors
test_directory = os.path.join(test_root_dir, "models/generated/airbyte_incremental")
shutil.rmtree(test_directory, ignore_errors=True)
test_directory = os.path.join(test_root_dir, "models/generated/airbyte_views")
shutil.rmtree(test_directory, ignore_errors=True)
test_directory = os.path.join(test_root_dir, "models/generated/airbyte_ctes")
shutil.rmtree(test_directory, ignore_errors=True)
test_directory = os.path.join(test_root_dir, "models/generated/airbyte_tables")
shutil.rmtree(test_directory, ignore_errors=True)

# Run the first sync to create raw tables in destinations
dbt_test_utils.copy_replace(original_catalog_file, os.path.join(test_root_dir, "destination_catalog.json"))
message_file = os.path.join("resources", test_resource_name, "data_input", "test_drop_scd_messages.txt")
assert run_destination_process(destination_type, test_root_dir, message_file, "destination_catalog.json", dbt_test_utils)

# generate models from catalog
generate_dbt_models(destination_type, test_resource_name, test_root_dir, "models", "test_drop_scd_catalog.json", dbt_test_utils)

# Run dbt process to normalize data from the first sync
dbt_test_utils.dbt_run(destination_type, test_root_dir, force_full_refresh=True)

# Remove models generated in previous step to avoid DBT compilation errors
test_directory = os.path.join(test_root_dir, "models/generated/airbyte_incremental")
shutil.rmtree(test_directory, ignore_errors=True)
test_directory = os.path.join(test_root_dir, "models/generated/airbyte_views")
shutil.rmtree(test_directory, ignore_errors=True)
test_directory = os.path.join(test_root_dir, "models/generated/airbyte_ctes")
shutil.rmtree(test_directory, ignore_errors=True)

# Generate a catalog with modified schema for a reset
reset_catalog_file = os.path.join("resources", test_resource_name, "data_input", "test_drop_scd_catalog_reset.json")
dbt_test_utils.copy_replace(reset_catalog_file, os.path.join(test_root_dir, "reset_catalog.json"))

# Run a reset
assert run_destination_process(destination_type, test_root_dir, "", "reset_catalog.json", dbt_test_utils)

# Run dbt process after reset to drop SCD table
generate_dbt_models(destination_type, test_resource_name, test_root_dir, "models", "test_drop_scd_catalog_reset.json", dbt_test_utils)
dbt_test_utils.dbt_run(destination_type, test_root_dir, force_full_refresh=True)

# Remove models generated in previous step to avoid DBT compilation errors
test_directory = os.path.join(test_root_dir, "models/generated/airbyte_incremental")
shutil.rmtree(test_directory, ignore_errors=True)
test_directory = os.path.join(test_root_dir, "models/generated/airbyte_views")
shutil.rmtree(test_directory, ignore_errors=True)
test_directory = os.path.join(test_root_dir, "models/generated/airbyte_ctes")
shutil.rmtree(test_directory, ignore_errors=True)

# Run another sync with modified catalog
modified_catalog_file = os.path.join("resources", test_resource_name, "data_input", "test_drop_scd_catalog_incremental.json")
dbt_test_utils.copy_replace(modified_catalog_file, os.path.join(test_root_dir, "destination_catalog.json"))
message_file = os.path.join("resources", test_resource_name, "data_input", "test_scd_reset_messages_incremental.txt")
assert run_destination_process(destination_type, test_root_dir, message_file, "destination_catalog.json", dbt_test_utils)

# Run dbt process
generate_dbt_models(destination_type, test_resource_name, test_root_dir, "models", "test_drop_scd_catalog_reset.json", dbt_test_utils)
dbt_test_utils.dbt_run(destination_type, test_root_dir)
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@
import pathlib
import re
import shutil
import tempfile
from distutils.dir_util import copy_tree
from typing import Any, Dict

import pytest
from integration_tests.dbt_integration_test import DbtIntegrationTest
from integration_tests.utils import setup_test_dir
from normalization.destination_type import DestinationType
from normalization.transform_catalog import TransformCatalog

Expand Down Expand Up @@ -108,7 +107,7 @@ def run_test(destination_type: DestinationType, column_count: int, expected_exce
print("Testing ephemeral")
integration_type = destination_type.value
# Create the test folder with dbt project and appropriate destination settings to run integration tests from
test_root_dir = setup_test_dir(integration_type)
test_root_dir = setup_test_dir(integration_type, temporary_folders)
destination_config = dbt_test_utils.generate_profile_yaml_file(destination_type, test_root_dir)
# generate a catalog and associated dbt models files
generate_dbt_models(destination_type, test_root_dir, column_count)
Expand All @@ -131,30 +130,6 @@ def search_logs_for_pattern(log_file: str, pattern: str):
return False


def setup_test_dir(integration_type: str) -> str:
"""
We prepare a clean folder to run the tests from.
"""
test_root_dir = f"{pathlib.Path().joinpath('..', 'build', 'normalization_test_output', integration_type.lower()).resolve()}"
os.makedirs(test_root_dir, exist_ok=True)
test_root_dir = tempfile.mkdtemp(dir=test_root_dir)
temporary_folders.add(test_root_dir)
shutil.rmtree(test_root_dir, ignore_errors=True)
print(f"Setting up test folder {test_root_dir}")
copy_tree("../dbt-project-template", test_root_dir)
if integration_type == DestinationType.MSSQL.value:
copy_tree("../dbt-project-template-mssql", test_root_dir)
elif integration_type == DestinationType.MYSQL.value:
copy_tree("../dbt-project-template-mysql", test_root_dir)
elif integration_type == DestinationType.ORACLE.value:
copy_tree("../dbt-project-template-oracle", test_root_dir)
elif integration_type == DestinationType.SNOWFLAKE.value:
copy_tree("../dbt-project-template-snowflake", test_root_dir)
elif integration_type == DestinationType.TIDB.value:
copy_tree("../dbt-project-template-tidb", test_root_dir)
return test_root_dir


def setup_input_raw_data(integration_type: str, test_root_dir: str, destination_config: Dict[str, Any]) -> bool:
"""
This should populate the associated "raw" tables from which normalization is reading from when running dbt CLI.
Expand Down
Loading