diff --git a/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md b/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md index e19a423358cd..076792c08dbf 100644 --- a/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md +++ b/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.2.0 +Finish backward compatibility syntactic tests implementation: check that cursor fields were not changed. [#15520](https://github.com/airbytehq/airbyte/pull/15520/) + ## 0.1.62 Backward compatibility tests: add syntactic validation of catalogs [#15486](https://github.com/airbytehq/airbyte/pull/15486/) diff --git a/airbyte-integrations/bases/source-acceptance-test/Dockerfile b/airbyte-integrations/bases/source-acceptance-test/Dockerfile index 2fdc11fa3f01..864012bc21c5 100644 --- a/airbyte-integrations/bases/source-acceptance-test/Dockerfile +++ b/airbyte-integrations/bases/source-acceptance-test/Dockerfile @@ -33,7 +33,7 @@ COPY pytest.ini setup.py ./ COPY source_acceptance_test ./source_acceptance_test RUN pip install . -LABEL io.airbyte.version=0.1.62 +LABEL io.airbyte.version=0.2.0 LABEL io.airbyte.name=airbyte/source-acceptance-test ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin", "-r", "fEsx"] diff --git a/airbyte-integrations/bases/source-acceptance-test/README.md b/airbyte-integrations/bases/source-acceptance-test/README.md index e5783d9fd47e..2a1b2d0f206c 100644 --- a/airbyte-integrations/bases/source-acceptance-test/README.md +++ b/airbyte-integrations/bases/source-acceptance-test/README.md @@ -48,7 +48,7 @@ These iterations are more conveniently achieved by remaining in the current dire * Existing test modules are defined in `./source_acceptance_test/tests` * `acceptance-test-config.yaml` structure is defined in `./source_acceptance_test/config.py` 6. Unit test your changes by adding tests to `./unit_tests` -7. Run the unit tests on SAT again: `python -m pytest unit_tests`, make sure the coverage did not decrease. +7. Run the unit tests on SAT again: `python -m pytest unit_tests`, make sure the coverage did not decrease. You can bypass slow tests by using the `slow` marker: `python -m pytest unit_tests -m "not slow"`. 8. Manually test the changes you made by running SAT on a specific connector. e.g. `python -m pytest -p source_acceptance_test.plugin --acceptance-test-config=../../connectors/source-pokeapi` 9. Make sure you updated `docs/connector-development/testing-connectors/source-acceptance-tests-reference.md` according to your changes 10. Bump the SAT version in `airbyte-integrations/bases/source-acceptance-test/Dockerfile` diff --git a/airbyte-integrations/bases/source-acceptance-test/pytest.ini b/airbyte-integrations/bases/source-acceptance-test/pytest.ini index c5ee5ea27018..2531c1f41463 100644 --- a/airbyte-integrations/bases/source-acceptance-test/pytest.ini +++ b/airbyte-integrations/bases/source-acceptance-test/pytest.ini @@ -6,4 +6,4 @@ testpaths = markers = default_timeout - backward_compatibility + slow: marks tests as slow (deselect with '-m "not slow"') diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_core.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_core.py index 38ca1c657341..d53594855323 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_core.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_core.py @@ -24,7 +24,6 @@ TraceType, Type, ) -from deepdiff import DeepDiff from docker.errors import ContainerError from jsonschema._utils import flatten from source_acceptance_test.base import BaseTest @@ -46,15 +45,6 @@ class TestSpec(BaseTest): spec_cache: ConnectorSpecification = None previous_spec_cache: ConnectorSpecification = None - @staticmethod - def compute_spec_diff(actual_connector_spec: ConnectorSpecification, previous_connector_spec: ConnectorSpecification): - return DeepDiff( - previous_connector_spec.dict()["connectionSpecification"], - actual_connector_spec.dict()["connectionSpecification"], - view="tree", - ignore_order=True, - ) - @pytest.fixture(name="skip_backward_compatibility_tests") def skip_backward_compatibility_tests_fixture(self, inputs: SpecTestConfig, previous_connector_docker_runner: ConnectorRunner) -> bool: if previous_connector_docker_runner is None: @@ -185,13 +175,9 @@ def test_backward_compatibility( previous_connector_spec: ConnectorSpecification, number_of_configs_to_generate: int = 100, ): - """Check if the current spec is backward_compatible: - 1. Perform multiple hardcoded syntactic checks with SpecDiffChecker. - 2. Validate fake generated previous configs against the actual connector specification with validate_previous_configs. - """ + """Check if the current spec is backward_compatible with the previous one""" assert isinstance(actual_connector_spec, ConnectorSpecification) and isinstance(previous_connector_spec, ConnectorSpecification) - spec_diff = self.compute_spec_diff(actual_connector_spec, previous_connector_spec) - checker = SpecDiffChecker(spec_diff) + checker = SpecDiffChecker(previous=previous_connector_spec.dict(), current=actual_connector_spec.dict()) checker.assert_is_backward_compatible() validate_previous_configs(previous_connector_spec, actual_connector_spec, number_of_configs_to_generate) @@ -235,17 +221,6 @@ def test_check(self, connector_config, inputs: ConnectionTestConfig, docker_runn @pytest.mark.default_timeout(30) class TestDiscovery(BaseTest): - @staticmethod - def compute_discovered_catalog_diff( - discovered_catalog: MutableMapping[str, AirbyteStream], previous_discovered_catalog: MutableMapping[str, AirbyteStream] - ): - return DeepDiff( - {stream_name: airbyte_stream.dict().pop("json_schema") for stream_name, airbyte_stream in previous_discovered_catalog.items()}, - {stream_name: airbyte_stream.dict().pop("json_schema") for stream_name, airbyte_stream in discovered_catalog.items()}, - view="tree", - ignore_order=True, - ) - @pytest.fixture(name="skip_backward_compatibility_tests") def skip_backward_compatibility_tests_fixture( self, inputs: DiscoveryTestConfig, previous_connector_docker_runner: ConnectorRunner @@ -340,13 +315,9 @@ def test_backward_compatibility( discovered_catalog: MutableMapping[str, AirbyteStream], previous_discovered_catalog: MutableMapping[str, AirbyteStream], ): - """Check if the current spec is backward_compatible: - 1. Perform multiple hardcoded syntactic checks with SpecDiffChecker. - 2. Validate fake generated previous configs against the actual connector specification with validate_previous_configs. - """ + """Check if the current catalog is backward_compatible with the previous one.""" assert isinstance(discovered_catalog, MutableMapping) and isinstance(previous_discovered_catalog, MutableMapping) - catalog_diff = self.compute_discovered_catalog_diff(discovered_catalog, previous_discovered_catalog) - checker = CatalogDiffChecker(catalog_diff) + checker = CatalogDiffChecker(previous_discovered_catalog, discovered_catalog) checker.assert_is_backward_compatible() diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/backward_compatibility.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/backward_compatibility.py index 26f5152c88a2..b9638fcbcbef 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/backward_compatibility.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/backward_compatibility.py @@ -3,7 +3,7 @@ # from abc import ABC, abstractmethod -from multiprocessing import context +from enum import Enum import jsonschema from airbyte_cdk.models import ConnectorSpecification @@ -13,50 +13,67 @@ from source_acceptance_test.utils import SecretDict +class BackwardIncompatibilityContext(Enum): + SPEC = 1 + DISCOVER = 2 + + class NonBackwardCompatibleError(Exception): - pass + def __init__(self, error_message: str, context: BackwardIncompatibilityContext) -> None: + self.error_message = error_message + self.context = context + super().__init__(error_message) + + def __str__(self): + return f"{self.context} - {self.error_message}" class BaseDiffChecker(ABC): - def __init__(self, diff: DeepDiff) -> None: - self._diff = diff + def __init__(self, previous: dict, current: dict) -> None: + self._previous = previous + self._current = current + self.compute_diffs() - def _raise_error(self, message: str): - raise NonBackwardCompatibleError(f"{context} - {message}. Diff: {self._diff.pretty()}") + def _raise_error(self, message: str, diff: DeepDiff): + raise NonBackwardCompatibleError(f"{message}. Diff: {diff.pretty()}", self.context) @property @abstractmethod def context(self): # pragma: no cover pass + @abstractmethod + def compute_diffs(self): # pragma: no cover + pass + @abstractmethod def assert_is_backward_compatible(self): # pragma: no cover pass - def check_if_value_of_type_field_changed(self): + def check_if_value_of_type_field_changed(self, diff: DeepDiff): """Check if a type was changed""" # Detect type value change in case type field is declared as a string (e.g "str" -> "int"): - type_values_changed = [change for change in self._diff.get("values_changed", []) if change.path(output_format="list")[-1] == "type"] + type_values_changed = [change for change in diff.get("values_changed", []) if change.path(output_format="list")[-1] == "type"] # Detect type value change in case type field is declared as a single item list (e.g ["str"] -> ["int"]): type_values_changed_in_list = [ - change for change in self._diff.get("values_changed", []) if change.path(output_format="list")[-2] == "type" + change for change in diff.get("values_changed", []) if change.path(output_format="list")[-2] == "type" ] if type_values_changed or type_values_changed_in_list: - self._raise_error("The'type' field value was changed.") + self._raise_error("The'type' field value was changed.", diff) - def check_if_new_type_was_added(self): # pragma: no cover + def check_if_new_type_was_added(self, diff: DeepDiff): # pragma: no cover """Detect type value added to type list if new type value is not None (e.g ["str"] -> ["str", "int"])""" new_values_in_type_list = [ change - for change in self._diff.get("iterable_item_added", []) + for change in diff.get("iterable_item_added", []) if change.path(output_format="list")[-2] == "type" if change.t2 != "null" ] if new_values_in_type_list: self._raise_error("A new value was added to a 'type' field") - def check_if_type_of_type_field_changed(self): + def check_if_type_of_type_field_changed(self, diff: DeepDiff): """ Detect the change of type of a type field e.g: @@ -68,83 +85,89 @@ def check_if_type_of_type_field_changed(self): - ["str"] -> "int" INVALID - ["str"] -> 1 INVALID """ - type_changes = [change for change in self._diff.get("type_changes", []) if change.path(output_format="list")[-1] == "type"] + type_changes = [change for change in diff.get("type_changes", []) if change.path(output_format="list")[-1] == "type"] for change in type_changes: # We only accept change on the type field if the new type for this field is list or string # This might be something already guaranteed by JSON schema validation. if isinstance(change.t1, str): if not isinstance(change.t2, list): - self._raise_error("A 'type' field was changed from string to an invalid value.") + self._raise_error("A 'type' field was changed from string to an invalid value.", diff) # If the new type field is a list we want to make sure it only has the original type (t1) and null: e.g. "str" -> ["str", "null"] # We want to raise an error otherwise. t2_not_null_types = [_type for _type in change.t2 if _type != "null"] if not (len(t2_not_null_types) == 1 and t2_not_null_types[0] == change.t1): - self._raise_error("The 'type' field was changed to a list with multiple invalid values") + self._raise_error("The 'type' field was changed to a list with multiple invalid values", diff) if isinstance(change.t1, list): if not isinstance(change.t2, str): - self._raise_error("The 'type' field was changed from a list to an invalid value") + self._raise_error("The 'type' field was changed from a list to an invalid value", diff) if not (len(change.t1) == 1 and change.t2 == change.t1[0]): - self._raise_error("An element was removed from the list of 'type'") + self._raise_error("An element was removed from the list of 'type'", diff) class SpecDiffChecker(BaseDiffChecker): """A class to perform backward compatibility checks on a connector specification diff""" - context = "Specification" + context = BackwardIncompatibilityContext.SPEC + + def compute_diffs(self): + self.connection_specification_diff = DeepDiff( + self._previous["connectionSpecification"], + self._current["connectionSpecification"], + view="tree", + ignore_order=True, + ) def assert_is_backward_compatible(self): - self.check_if_declared_new_required_field() - self.check_if_added_a_new_required_property() - self.check_if_value_of_type_field_changed() - # self.check_if_new_type_was_added() We want to allow type expansion atm - self.check_if_type_of_type_field_changed() - self.check_if_field_was_made_not_nullable() - self.check_if_enum_was_narrowed() - self.check_if_declared_new_enum_field() - - def check_if_declared_new_required_field(self): + self.check_if_declared_new_required_field(self.connection_specification_diff) + self.check_if_added_a_new_required_property(self.connection_specification_diff) + self.check_if_value_of_type_field_changed(self.connection_specification_diff) + # self.check_if_new_type_was_added(self.connection_specification_diff) We want to allow type expansion atm + self.check_if_type_of_type_field_changed(self.connection_specification_diff) + self.check_if_field_was_made_not_nullable(self.connection_specification_diff) + self.check_if_enum_was_narrowed(self.connection_specification_diff) + self.check_if_declared_new_enum_field(self.connection_specification_diff) + + def check_if_declared_new_required_field(self, diff: DeepDiff): """Check if the new spec declared a 'required' field.""" added_required_fields = [ - addition for addition in self._diff.get("dictionary_item_added", []) if addition.path(output_format="list")[-1] == "required" + addition for addition in diff.get("dictionary_item_added", []) if addition.path(output_format="list")[-1] == "required" ] if added_required_fields: - self._raise_error("A new 'required' field was declared.") + self._raise_error("A new 'required' field was declared.", diff) - def check_if_added_a_new_required_property(self): + def check_if_added_a_new_required_property(self, diff: DeepDiff): """Check if the new spec added a property to the 'required' list""" added_required_properties = [ - addition for addition in self._diff.get("iterable_item_added", []) if addition.up.path(output_format="list")[-1] == "required" + addition for addition in diff.get("iterable_item_added", []) if addition.up.path(output_format="list")[-1] == "required" ] if added_required_properties: - self._raise_error("A new property was added to 'required'") + self._raise_error("A new property was added to 'required'", diff) - def check_if_field_was_made_not_nullable(self): + def check_if_field_was_made_not_nullable(self, diff: DeepDiff): """Detect when field was made not nullable but is still a list: e.g ["string", "null"] -> ["string"]""" - removed_nullable = [ - change for change in self._diff.get("iterable_item_removed", []) if change.path(output_format="list")[-2] == "type" - ] + removed_nullable = [change for change in diff.get("iterable_item_removed", []) if change.path(output_format="list")[-2] == "type"] if removed_nullable: - self._raise_error("A field type was narrowed or made a field not nullable") + self._raise_error("A field type was narrowed or made a field not nullable", diff) - def check_if_enum_was_narrowed(self): + def check_if_enum_was_narrowed(self, diff: DeepDiff): """Check if the list of values in a enum was shortened in a spec.""" enum_removals = [ enum_removal - for enum_removal in self._diff.get("iterable_item_removed", []) + for enum_removal in diff.get("iterable_item_removed", []) if enum_removal.up.path(output_format="list")[-1] == "enum" ] if enum_removals: - self._raise_error("An enum field was narrowed.") + self._raise_error("An enum field was narrowed.", diff) - def check_if_declared_new_enum_field(self): + def check_if_declared_new_enum_field(self, diff: DeepDiff): """Check if an 'enum' field was added to the spec.""" enum_additions = [ enum_addition - for enum_addition in self._diff.get("dictionary_item_added", []) + for enum_addition in diff.get("dictionary_item_added", []) if enum_addition.path(output_format="list")[-1] == "enum" ] if enum_additions: - self._raise_error("An 'enum' field was declared on an existing property") + self._raise_error("An 'enum' field was declared on an existing property", diff) def validate_previous_configs( @@ -163,7 +186,7 @@ def check_fake_previous_config_against_actual_spec(fake_previous_config): try: jsonschema.validate(instance=filtered_fake_previous_config, schema=actual_connector_spec.connectionSpecification) except jsonschema.exceptions.ValidationError as err: - raise NonBackwardCompatibleError(err) + raise NonBackwardCompatibleError(err, BackwardIncompatibilityContext.SPEC) check_fake_previous_config_against_actual_spec() @@ -171,18 +194,37 @@ def check_fake_previous_config_against_actual_spec(fake_previous_config): class CatalogDiffChecker(BaseDiffChecker): """A class to perform backward compatibility checks on a discoverd catalog diff""" - context = "Catalog" + context = BackwardIncompatibilityContext.DISCOVER + + def compute_diffs(self): + self.streams_json_schemas_diff = DeepDiff( + {stream_name: airbyte_stream.dict().pop("json_schema") for stream_name, airbyte_stream in self._previous.items()}, + {stream_name: airbyte_stream.dict().pop("json_schema") for stream_name, airbyte_stream in self._current.items()}, + view="tree", + ignore_order=True, + ) + self.streams_cursor_fields_diff = DeepDiff( + {stream_name: airbyte_stream.dict().pop("default_cursor_field") for stream_name, airbyte_stream in self._previous.items()}, + {stream_name: airbyte_stream.dict().pop("default_cursor_field") for stream_name, airbyte_stream in self._current.items()}, + view="tree", + ) def assert_is_backward_compatible(self): - self.check_if_stream_was_removed() - self.check_if_value_of_type_field_changed() - self.check_if_type_of_type_field_changed() + self.check_if_stream_was_removed(self.streams_json_schemas_diff) + self.check_if_value_of_type_field_changed(self.streams_json_schemas_diff) + self.check_if_type_of_type_field_changed(self.streams_json_schemas_diff) + self.check_if_cursor_field_was_changed(self.streams_cursor_fields_diff) - def check_if_stream_was_removed(self): + def check_if_stream_was_removed(self, diff: DeepDiff): """Check if a stream was removed from the catalog.""" removed_streams = [] - for removal in self._diff.get("dictionary_item_removed", []): + for removal in diff.get("dictionary_item_removed", []): if removal.path() != "root" and removal.up.path() == "root": removed_streams.append(removal.path(output_format="list")[0]) if removed_streams: - self._raise_error(f"The following streams were removed: {','.join(removed_streams)}") + self._raise_error(f"The following streams were removed: {','.join(removed_streams)}", diff) + + def check_if_cursor_field_was_changed(self, diff: DeepDiff): + """Check if a default cursor field value was changed.""" + if diff: + self._raise_error("The value of 'default_cursor_field' was changed", diff) diff --git a/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_backward_compatibility.py b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_backward_compatibility.py index 95e7c174a966..a4f8250a5f63 100644 --- a/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_backward_compatibility.py +++ b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_backward_compatibility.py @@ -915,6 +915,7 @@ def test_spec_backward_compatibility(previous_connector_spec, actual_connector_s ] +@pytest.mark.slow @pytest.mark.parametrize("previous_connector_spec, actual_connector_spec, should_fail", VALID_JSON_SCHEMA_TRANSITIONS_PARAMS) def test_validate_previous_configs(previous_connector_spec, actual_connector_spec, should_fail): expectation = pytest.raises(NonBackwardCompatibleError) if should_fail else does_not_raise() @@ -989,6 +990,72 @@ def test_validate_previous_configs(previous_connector_spec, actual_connector_spe ) }, ), + Transition( + name="Changing a cursor in a stream should fail.", + should_fail=True, + previous={ + "test_stream": AirbyteStream.parse_obj( + { + "name": "test_stream", + "json_schema": {"properties": {"user": {"type": "object", "properties": {"username": {"type": "string"}}}}}, + "default_cursor_field": ["a"], + } + ), + }, + current={ + "test_stream": AirbyteStream.parse_obj( + { + "name": "test_stream", + "json_schema": {"properties": {"user": {"type": "object", "properties": {"username": {"type": "string"}}}}}, + "default_cursor_field": ["b"], + } + ), + }, + ), + Transition( + name="Changing a cursor in a stream should fail (nested cursors).", + should_fail=True, + previous={ + "test_stream": AirbyteStream.parse_obj( + { + "name": "test_stream", + "json_schema": {"properties": {"user": {"type": "object", "properties": {"username": {"type": "string"}}}}}, + "default_cursor_field": ["a"], + } + ), + }, + current={ + "test_stream": AirbyteStream.parse_obj( + { + "name": "test_stream", + "json_schema": {"properties": {"user": {"type": "object", "properties": {"username": {"type": "string"}}}}}, + "default_cursor_field": ["a", "b"], + } + ), + }, + ), + Transition( + name="Changing a cursor in a stream should fail (nested cursors removal).", + should_fail=True, + previous={ + "test_stream": AirbyteStream.parse_obj( + { + "name": "test_stream", + "json_schema": {"properties": {"user": {"type": "object", "properties": {"username": {"type": "string"}}}}}, + "default_cursor_field": ["a", "b"], + } + ), + }, + current={ + "test_stream": AirbyteStream.parse_obj( + { + "name": "test_stream", + "json_schema": {"properties": {"user": {"type": "object", "properties": {"username": {"type": "string"}}}}}, + "default_cursor_field": ["a"], + } + ), + }, + ), ] VALID_CATALOG_TRANSITIONS = [ @@ -1056,6 +1123,28 @@ def test_validate_previous_configs(previous_connector_spec, actual_connector_spe ) }, ), + Transition( + name="Not changing a cursor in a stream should not fail.", + should_fail=False, + previous={ + "test_stream": AirbyteStream.parse_obj( + { + "name": "test_stream", + "json_schema": {"properties": {"user": {"type": "object", "properties": {"username": {"type": "string"}}}}}, + "default_cursor_field": ["a"], + } + ), + }, + current={ + "test_stream": AirbyteStream.parse_obj( + { + "name": "test_stream", + "json_schema": {"properties": {"user": {"type": "object", "properties": {"username": {"type": "string"}}}}}, + "default_cursor_field": ["a"], + } + ), + }, + ), ] # Checking that all transitions in FAILING_CATALOG_TRANSITIONS have should_fail == True to prevent typos