Skip to content

Commit

Permalink
SAT: backward compatibility - check that cursor fields were not chang…
Browse files Browse the repository at this point in the history
…ed (#15520)
  • Loading branch information
alafanechere authored and girarda committed Aug 11, 2022
1 parent 2daad7b commit d78e9b7
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 89 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.2.0
Finish backward compatibility syntactic tests implementation: check that cursor fields were not changed. [#15520](https://github.com/airbytehq/airbyte/pull/15520/)

## 0.1.62
Backward compatibility tests: add syntactic validation of catalogs [#15486](https://github.com/airbytehq/airbyte/pull/15486/)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ COPY pytest.ini setup.py ./
COPY source_acceptance_test ./source_acceptance_test
RUN pip install .

LABEL io.airbyte.version=0.1.62
LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.name=airbyte/source-acceptance-test

ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin", "-r", "fEsx"]
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ These iterations are more conveniently achieved by remaining in the current dire
* Existing test modules are defined in `./source_acceptance_test/tests`
* `acceptance-test-config.yaml` structure is defined in `./source_acceptance_test/config.py`
6. Unit test your changes by adding tests to `./unit_tests`
7. Run the unit tests on SAT again: `python -m pytest unit_tests`, make sure the coverage did not decrease.
7. Run the unit tests on SAT again: `python -m pytest unit_tests`, make sure the coverage did not decrease. You can bypass slow tests by using the `slow` marker: `python -m pytest unit_tests -m "not slow"`.
8. Manually test the changes you made by running SAT on a specific connector. e.g. `python -m pytest -p source_acceptance_test.plugin --acceptance-test-config=../../connectors/source-pokeapi`
9. Make sure you updated `docs/connector-development/testing-connectors/source-acceptance-tests-reference.md` according to your changes
10. Bump the SAT version in `airbyte-integrations/bases/source-acceptance-test/Dockerfile`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ testpaths =

markers =
default_timeout
backward_compatibility
slow: marks tests as slow (deselect with '-m "not slow"')
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
TraceType,
Type,
)
from deepdiff import DeepDiff
from docker.errors import ContainerError
from jsonschema._utils import flatten
from source_acceptance_test.base import BaseTest
Expand All @@ -46,15 +45,6 @@ class TestSpec(BaseTest):
spec_cache: ConnectorSpecification = None
previous_spec_cache: ConnectorSpecification = None

@staticmethod
def compute_spec_diff(actual_connector_spec: ConnectorSpecification, previous_connector_spec: ConnectorSpecification):
return DeepDiff(
previous_connector_spec.dict()["connectionSpecification"],
actual_connector_spec.dict()["connectionSpecification"],
view="tree",
ignore_order=True,
)

@pytest.fixture(name="skip_backward_compatibility_tests")
def skip_backward_compatibility_tests_fixture(self, inputs: SpecTestConfig, previous_connector_docker_runner: ConnectorRunner) -> bool:
if previous_connector_docker_runner is None:
Expand Down Expand Up @@ -185,13 +175,9 @@ def test_backward_compatibility(
previous_connector_spec: ConnectorSpecification,
number_of_configs_to_generate: int = 100,
):
"""Check if the current spec is backward_compatible:
1. Perform multiple hardcoded syntactic checks with SpecDiffChecker.
2. Validate fake generated previous configs against the actual connector specification with validate_previous_configs.
"""
"""Check if the current spec is backward_compatible with the previous one"""
assert isinstance(actual_connector_spec, ConnectorSpecification) and isinstance(previous_connector_spec, ConnectorSpecification)
spec_diff = self.compute_spec_diff(actual_connector_spec, previous_connector_spec)
checker = SpecDiffChecker(spec_diff)
checker = SpecDiffChecker(previous=previous_connector_spec.dict(), current=actual_connector_spec.dict())
checker.assert_is_backward_compatible()
validate_previous_configs(previous_connector_spec, actual_connector_spec, number_of_configs_to_generate)

Expand Down Expand Up @@ -235,17 +221,6 @@ def test_check(self, connector_config, inputs: ConnectionTestConfig, docker_runn

@pytest.mark.default_timeout(30)
class TestDiscovery(BaseTest):
@staticmethod
def compute_discovered_catalog_diff(
discovered_catalog: MutableMapping[str, AirbyteStream], previous_discovered_catalog: MutableMapping[str, AirbyteStream]
):
return DeepDiff(
{stream_name: airbyte_stream.dict().pop("json_schema") for stream_name, airbyte_stream in previous_discovered_catalog.items()},
{stream_name: airbyte_stream.dict().pop("json_schema") for stream_name, airbyte_stream in discovered_catalog.items()},
view="tree",
ignore_order=True,
)

@pytest.fixture(name="skip_backward_compatibility_tests")
def skip_backward_compatibility_tests_fixture(
self, inputs: DiscoveryTestConfig, previous_connector_docker_runner: ConnectorRunner
Expand Down Expand Up @@ -340,13 +315,9 @@ def test_backward_compatibility(
discovered_catalog: MutableMapping[str, AirbyteStream],
previous_discovered_catalog: MutableMapping[str, AirbyteStream],
):
"""Check if the current spec is backward_compatible:
1. Perform multiple hardcoded syntactic checks with SpecDiffChecker.
2. Validate fake generated previous configs against the actual connector specification with validate_previous_configs.
"""
"""Check if the current catalog is backward_compatible with the previous one."""
assert isinstance(discovered_catalog, MutableMapping) and isinstance(previous_discovered_catalog, MutableMapping)
catalog_diff = self.compute_discovered_catalog_diff(discovered_catalog, previous_discovered_catalog)
checker = CatalogDiffChecker(catalog_diff)
checker = CatalogDiffChecker(previous_discovered_catalog, discovered_catalog)
checker.assert_is_backward_compatible()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

from abc import ABC, abstractmethod
from multiprocessing import context
from enum import Enum

import jsonschema
from airbyte_cdk.models import ConnectorSpecification
Expand All @@ -13,50 +13,67 @@
from source_acceptance_test.utils import SecretDict


class BackwardIncompatibilityContext(Enum):
SPEC = 1
DISCOVER = 2


class NonBackwardCompatibleError(Exception):
pass
def __init__(self, error_message: str, context: BackwardIncompatibilityContext) -> None:
self.error_message = error_message
self.context = context
super().__init__(error_message)

def __str__(self):
return f"{self.context} - {self.error_message}"


class BaseDiffChecker(ABC):
def __init__(self, diff: DeepDiff) -> None:
self._diff = diff
def __init__(self, previous: dict, current: dict) -> None:
self._previous = previous
self._current = current
self.compute_diffs()

def _raise_error(self, message: str):
raise NonBackwardCompatibleError(f"{context} - {message}. Diff: {self._diff.pretty()}")
def _raise_error(self, message: str, diff: DeepDiff):
raise NonBackwardCompatibleError(f"{message}. Diff: {diff.pretty()}", self.context)

@property
@abstractmethod
def context(self): # pragma: no cover
pass

@abstractmethod
def compute_diffs(self): # pragma: no cover
pass

@abstractmethod
def assert_is_backward_compatible(self): # pragma: no cover
pass

def check_if_value_of_type_field_changed(self):
def check_if_value_of_type_field_changed(self, diff: DeepDiff):
"""Check if a type was changed"""
# Detect type value change in case type field is declared as a string (e.g "str" -> "int"):
type_values_changed = [change for change in self._diff.get("values_changed", []) if change.path(output_format="list")[-1] == "type"]
type_values_changed = [change for change in diff.get("values_changed", []) if change.path(output_format="list")[-1] == "type"]

# Detect type value change in case type field is declared as a single item list (e.g ["str"] -> ["int"]):
type_values_changed_in_list = [
change for change in self._diff.get("values_changed", []) if change.path(output_format="list")[-2] == "type"
change for change in diff.get("values_changed", []) if change.path(output_format="list")[-2] == "type"
]
if type_values_changed or type_values_changed_in_list:
self._raise_error("The'type' field value was changed.")
self._raise_error("The'type' field value was changed.", diff)

def check_if_new_type_was_added(self): # pragma: no cover
def check_if_new_type_was_added(self, diff: DeepDiff): # pragma: no cover
"""Detect type value added to type list if new type value is not None (e.g ["str"] -> ["str", "int"])"""
new_values_in_type_list = [
change
for change in self._diff.get("iterable_item_added", [])
for change in diff.get("iterable_item_added", [])
if change.path(output_format="list")[-2] == "type"
if change.t2 != "null"
]
if new_values_in_type_list:
self._raise_error("A new value was added to a 'type' field")

def check_if_type_of_type_field_changed(self):
def check_if_type_of_type_field_changed(self, diff: DeepDiff):
"""
Detect the change of type of a type field
e.g:
Expand All @@ -68,83 +85,89 @@ def check_if_type_of_type_field_changed(self):
- ["str"] -> "int" INVALID
- ["str"] -> 1 INVALID
"""
type_changes = [change for change in self._diff.get("type_changes", []) if change.path(output_format="list")[-1] == "type"]
type_changes = [change for change in diff.get("type_changes", []) if change.path(output_format="list")[-1] == "type"]
for change in type_changes:
# We only accept change on the type field if the new type for this field is list or string
# This might be something already guaranteed by JSON schema validation.
if isinstance(change.t1, str):
if not isinstance(change.t2, list):
self._raise_error("A 'type' field was changed from string to an invalid value.")
self._raise_error("A 'type' field was changed from string to an invalid value.", diff)
# If the new type field is a list we want to make sure it only has the original type (t1) and null: e.g. "str" -> ["str", "null"]
# We want to raise an error otherwise.
t2_not_null_types = [_type for _type in change.t2 if _type != "null"]
if not (len(t2_not_null_types) == 1 and t2_not_null_types[0] == change.t1):
self._raise_error("The 'type' field was changed to a list with multiple invalid values")
self._raise_error("The 'type' field was changed to a list with multiple invalid values", diff)
if isinstance(change.t1, list):
if not isinstance(change.t2, str):
self._raise_error("The 'type' field was changed from a list to an invalid value")
self._raise_error("The 'type' field was changed from a list to an invalid value", diff)
if not (len(change.t1) == 1 and change.t2 == change.t1[0]):
self._raise_error("An element was removed from the list of 'type'")
self._raise_error("An element was removed from the list of 'type'", diff)


class SpecDiffChecker(BaseDiffChecker):
"""A class to perform backward compatibility checks on a connector specification diff"""

context = "Specification"
context = BackwardIncompatibilityContext.SPEC

def compute_diffs(self):
self.connection_specification_diff = DeepDiff(
self._previous["connectionSpecification"],
self._current["connectionSpecification"],
view="tree",
ignore_order=True,
)

def assert_is_backward_compatible(self):
self.check_if_declared_new_required_field()
self.check_if_added_a_new_required_property()
self.check_if_value_of_type_field_changed()
# self.check_if_new_type_was_added() We want to allow type expansion atm
self.check_if_type_of_type_field_changed()
self.check_if_field_was_made_not_nullable()
self.check_if_enum_was_narrowed()
self.check_if_declared_new_enum_field()

def check_if_declared_new_required_field(self):
self.check_if_declared_new_required_field(self.connection_specification_diff)
self.check_if_added_a_new_required_property(self.connection_specification_diff)
self.check_if_value_of_type_field_changed(self.connection_specification_diff)
# self.check_if_new_type_was_added(self.connection_specification_diff) We want to allow type expansion atm
self.check_if_type_of_type_field_changed(self.connection_specification_diff)
self.check_if_field_was_made_not_nullable(self.connection_specification_diff)
self.check_if_enum_was_narrowed(self.connection_specification_diff)
self.check_if_declared_new_enum_field(self.connection_specification_diff)

def check_if_declared_new_required_field(self, diff: DeepDiff):
"""Check if the new spec declared a 'required' field."""
added_required_fields = [
addition for addition in self._diff.get("dictionary_item_added", []) if addition.path(output_format="list")[-1] == "required"
addition for addition in diff.get("dictionary_item_added", []) if addition.path(output_format="list")[-1] == "required"
]
if added_required_fields:
self._raise_error("A new 'required' field was declared.")
self._raise_error("A new 'required' field was declared.", diff)

def check_if_added_a_new_required_property(self):
def check_if_added_a_new_required_property(self, diff: DeepDiff):
"""Check if the new spec added a property to the 'required' list"""
added_required_properties = [
addition for addition in self._diff.get("iterable_item_added", []) if addition.up.path(output_format="list")[-1] == "required"
addition for addition in diff.get("iterable_item_added", []) if addition.up.path(output_format="list")[-1] == "required"
]
if added_required_properties:
self._raise_error("A new property was added to 'required'")
self._raise_error("A new property was added to 'required'", diff)

def check_if_field_was_made_not_nullable(self):
def check_if_field_was_made_not_nullable(self, diff: DeepDiff):
"""Detect when field was made not nullable but is still a list: e.g ["string", "null"] -> ["string"]"""
removed_nullable = [
change for change in self._diff.get("iterable_item_removed", []) if change.path(output_format="list")[-2] == "type"
]
removed_nullable = [change for change in diff.get("iterable_item_removed", []) if change.path(output_format="list")[-2] == "type"]
if removed_nullable:
self._raise_error("A field type was narrowed or made a field not nullable")
self._raise_error("A field type was narrowed or made a field not nullable", diff)

def check_if_enum_was_narrowed(self):
def check_if_enum_was_narrowed(self, diff: DeepDiff):
"""Check if the list of values in a enum was shortened in a spec."""
enum_removals = [
enum_removal
for enum_removal in self._diff.get("iterable_item_removed", [])
for enum_removal in diff.get("iterable_item_removed", [])
if enum_removal.up.path(output_format="list")[-1] == "enum"
]
if enum_removals:
self._raise_error("An enum field was narrowed.")
self._raise_error("An enum field was narrowed.", diff)

def check_if_declared_new_enum_field(self):
def check_if_declared_new_enum_field(self, diff: DeepDiff):
"""Check if an 'enum' field was added to the spec."""
enum_additions = [
enum_addition
for enum_addition in self._diff.get("dictionary_item_added", [])
for enum_addition in diff.get("dictionary_item_added", [])
if enum_addition.path(output_format="list")[-1] == "enum"
]
if enum_additions:
self._raise_error("An 'enum' field was declared on an existing property")
self._raise_error("An 'enum' field was declared on an existing property", diff)


def validate_previous_configs(
Expand All @@ -163,26 +186,45 @@ def check_fake_previous_config_against_actual_spec(fake_previous_config):
try:
jsonschema.validate(instance=filtered_fake_previous_config, schema=actual_connector_spec.connectionSpecification)
except jsonschema.exceptions.ValidationError as err:
raise NonBackwardCompatibleError(err)
raise NonBackwardCompatibleError(err, BackwardIncompatibilityContext.SPEC)

check_fake_previous_config_against_actual_spec()


class CatalogDiffChecker(BaseDiffChecker):
"""A class to perform backward compatibility checks on a discoverd catalog diff"""

context = "Catalog"
context = BackwardIncompatibilityContext.DISCOVER

def compute_diffs(self):
self.streams_json_schemas_diff = DeepDiff(
{stream_name: airbyte_stream.dict().pop("json_schema") for stream_name, airbyte_stream in self._previous.items()},
{stream_name: airbyte_stream.dict().pop("json_schema") for stream_name, airbyte_stream in self._current.items()},
view="tree",
ignore_order=True,
)
self.streams_cursor_fields_diff = DeepDiff(
{stream_name: airbyte_stream.dict().pop("default_cursor_field") for stream_name, airbyte_stream in self._previous.items()},
{stream_name: airbyte_stream.dict().pop("default_cursor_field") for stream_name, airbyte_stream in self._current.items()},
view="tree",
)

def assert_is_backward_compatible(self):
self.check_if_stream_was_removed()
self.check_if_value_of_type_field_changed()
self.check_if_type_of_type_field_changed()
self.check_if_stream_was_removed(self.streams_json_schemas_diff)
self.check_if_value_of_type_field_changed(self.streams_json_schemas_diff)
self.check_if_type_of_type_field_changed(self.streams_json_schemas_diff)
self.check_if_cursor_field_was_changed(self.streams_cursor_fields_diff)

def check_if_stream_was_removed(self):
def check_if_stream_was_removed(self, diff: DeepDiff):
"""Check if a stream was removed from the catalog."""
removed_streams = []
for removal in self._diff.get("dictionary_item_removed", []):
for removal in diff.get("dictionary_item_removed", []):
if removal.path() != "root" and removal.up.path() == "root":
removed_streams.append(removal.path(output_format="list")[0])
if removed_streams:
self._raise_error(f"The following streams were removed: {','.join(removed_streams)}")
self._raise_error(f"The following streams were removed: {','.join(removed_streams)}", diff)

def check_if_cursor_field_was_changed(self, diff: DeepDiff):
"""Check if a default cursor field value was changed."""
if diff:
self._raise_error("The value of 'default_cursor_field' was changed", diff)
Loading

0 comments on commit d78e9b7

Please sign in to comment.