From 957a0be226b65d0680a324124beb52179cd33adf Mon Sep 17 00:00:00 2001 From: "Pedro S. Lopez" Date: Tue, 17 May 2022 16:43:10 -0400 Subject: [PATCH] SAT: check that connectors emit an `AirbyteTraceMessage` on failure (#12796) * add SAT case for airbyte trace message on failure * add ability to opt-out * add tests * add option to docs * bump version, update changelog * fix type errors * update changelog --- .../bases/source-acceptance-test/CHANGELOG.md | 3 + .../bases/source-acceptance-test/Dockerfile | 2 +- .../source_acceptance_test/config.py | 3 +- .../source_acceptance_test/tests/test_core.py | 38 ++++++- .../utils/connector_runner.py | 4 +- .../unit_tests/test_core.py | 105 +++++++++++++++++- .../source-acceptance-tests-reference.md | 27 ++--- 7 files changed, 162 insertions(+), 20 deletions(-) diff --git a/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md b/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md index b6fb971fe5bc85..44f2593bee6021 100644 --- a/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md +++ b/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.1.52 +Add test case for `AirbyteTraceMessage` emission on connector failure: [#12796](https://github.com/airbytehq/airbyte/pull/12796/). + ## 0.1.51 - Add `threshold_days` option for lookback window support in incremental tests. - Update CDK to prevent warnings when encountering new `AirbyteTraceMessage`s. diff --git a/airbyte-integrations/bases/source-acceptance-test/Dockerfile b/airbyte-integrations/bases/source-acceptance-test/Dockerfile index 3351bd6eab6de7..f0347cd1ab1330 100644 --- a/airbyte-integrations/bases/source-acceptance-test/Dockerfile +++ b/airbyte-integrations/bases/source-acceptance-test/Dockerfile @@ -33,7 +33,7 @@ COPY pytest.ini setup.py ./ COPY source_acceptance_test ./source_acceptance_test RUN pip install . -LABEL io.airbyte.version=0.1.51 +LABEL io.airbyte.version=0.1.52 LABEL io.airbyte.name=airbyte/source-acceptance-test ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin", "-r", "fEsx"] diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/config.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/config.py index 44c9e3d133655f..618e1fec8bf9b6 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/config.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/config.py @@ -65,7 +65,7 @@ def validate_exact_order(cls, exact_order, values): @validator("extra_records", always=True) def validate_extra_records(cls, extra_records, values): if "extra_fields" in values and values["extra_fields"] and extra_records: - raise ValueError("extra_records must by off if extra_fields enabled") + raise ValueError("extra_records must be off if extra_fields enabled") return extra_records @@ -79,6 +79,7 @@ class BasicReadTestConfig(BaseConfig): validate_data_points: bool = Field( False, description="Set whether we need to validate that all fields in all streams contained at least one data point" ) + expect_trace_message_on_failure: bool = Field(True, description="Ensure that a trace message is emitted when the connector crashes") timeout_seconds: int = timeout_seconds diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_core.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_core.py index 87b5afb9b39769..f9e2316520f66c 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_core.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_core.py @@ -13,7 +13,18 @@ import dpath.util import jsonschema import pytest -from airbyte_cdk.models import AirbyteRecordMessage, ConfiguredAirbyteCatalog, ConnectorSpecification, Status, Type +from airbyte_cdk.models import ( + AirbyteRecordMessage, + AirbyteStream, + ConfiguredAirbyteCatalog, + ConfiguredAirbyteStream, + ConnectorSpecification, + DestinationSyncMode, + Status, + SyncMode, + TraceType, + Type, +) from docker.errors import ContainerError from jsonschema._utils import flatten from source_acceptance_test.base import BaseTest @@ -401,6 +412,31 @@ def test_read( records=records, expected_records=expected_records, flags=inputs.expect_records, detailed_logger=detailed_logger ) + def test_airbyte_trace_message_on_failure(self, connector_config, inputs: BasicReadTestConfig, docker_runner: ConnectorRunner): + if not inputs.expect_trace_message_on_failure: + pytest.skip("Skipping `test_airbyte_trace_message_on_failure` because `inputs.expect_trace_message_on_failure=False`") + return + + invalid_configured_catalog = ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=AirbyteStream( + name="__AIRBYTE__stream_that_does_not_exist", + json_schema={"type": "object", "properties": {"f1": {"type": "string"}}}, + supported_sync_modes=[SyncMode.full_refresh], + ), + sync_mode=SyncMode.full_refresh, + destination_sync_mode=DestinationSyncMode.overwrite, + ) + ] + ) + + output = docker_runner.call_read(connector_config, invalid_configured_catalog, raise_container_error=False) + trace_messages = filter_output(output, Type.TRACE) + error_trace_messages = list(filter(lambda m: m.trace.type == TraceType.ERROR, trace_messages)) + + assert len(error_trace_messages) >= 1, "Connector should emit at least one error trace message" + @staticmethod def remove_extra_fields(record: Any, spec: Any) -> Any: """Remove keys from record that spec doesn't have, works recursively""" diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/connector_runner.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/connector_runner.py index 9b98ddcb0df3c4..c571f59221a40f 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/connector_runner.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/connector_runner.py @@ -88,7 +88,7 @@ def call_read_with_state(self, config, catalog, state, **kwargs) -> List[Airbyte output = list(self.run(cmd=cmd, config=config, catalog=catalog, state=state, **kwargs)) return output - def run(self, cmd, config=None, state=None, catalog=None, **kwargs) -> Iterable[AirbyteMessage]: + def run(self, cmd, config=None, state=None, catalog=None, raise_container_error: bool = True, **kwargs) -> Iterable[AirbyteMessage]: self._runs += 1 volumes = self._prepare_volumes(config, state, catalog) logging.debug(f"Docker run {self._image}: \n{cmd}\n" f"input: {self.input_folder}\noutput: {self.output_folder}") @@ -103,7 +103,7 @@ def run(self, cmd, config=None, state=None, catalog=None, **kwargs) -> Iterable[ **kwargs, ) with open(self.output_folder / "raw", "wb+") as f: - for line in self.read(container, command=cmd): + for line in self.read(container, command=cmd, with_ext=raise_container_error): f.write(line.encode()) try: yield AirbyteMessage.parse_raw(line) diff --git a/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_core.py b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_core.py index 61e36b4db89cdb..2b8643717a3953 100644 --- a/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_core.py +++ b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_core.py @@ -2,10 +2,22 @@ # Copyright (c) 2021 Airbyte, Inc., all rights reserved. # -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import pytest -from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, Type +from airbyte_cdk.models import ( + AirbyteErrorTraceMessage, + AirbyteLogMessage, + AirbyteMessage, + AirbyteRecordMessage, + AirbyteStream, + AirbyteTraceMessage, + ConfiguredAirbyteCatalog, + ConfiguredAirbyteStream, + Level, + TraceType, + Type, +) from source_acceptance_test.config import BasicReadTestConfig from source_acceptance_test.tests.test_core import TestBasicRead as _TestBasicRead from source_acceptance_test.tests.test_core import TestDiscovery as _TestDiscovery @@ -144,6 +156,95 @@ def test_read(schema, record, should_fail): t.test_read(None, catalog, input_config, [], docker_runner_mock, MagicMock()) +@pytest.mark.parametrize( + "output, expect_trace_message_on_failure, should_fail", + [ + ( + [ + AirbyteMessage( + type=Type.TRACE, + trace=AirbyteTraceMessage(type=TraceType.ERROR, emitted_at=111, error=AirbyteErrorTraceMessage(message="oh no")), + ) + ], + True, + False, + ), + ( + [ + AirbyteMessage( + type=Type.LOG, + log=AirbyteLogMessage(level=Level.ERROR, message="oh no"), + ), + AirbyteMessage( + type=Type.TRACE, + trace=AirbyteTraceMessage(type=TraceType.ERROR, emitted_at=111, error=AirbyteErrorTraceMessage(message="oh no")), + ), + ], + True, + False, + ), + ( + [ + AirbyteMessage( + type=Type.TRACE, + trace=AirbyteTraceMessage(type=TraceType.ERROR, emitted_at=111, error=AirbyteErrorTraceMessage(message="oh no")), + ), + AirbyteMessage( + type=Type.TRACE, + trace=AirbyteTraceMessage(type=TraceType.ERROR, emitted_at=112, error=AirbyteErrorTraceMessage(message="oh no!!")), + ), + ], + True, + False, + ), + ( + [ + AirbyteMessage( + type=Type.LOG, + log=AirbyteLogMessage(level=Level.ERROR, message="oh no"), + ) + ], + True, + True, + ), + ([], True, True), + ( + [ + AirbyteMessage( + type=Type.TRACE, + trace=AirbyteTraceMessage(type=TraceType.ERROR, emitted_at=111, error=AirbyteErrorTraceMessage(message="oh no")), + ) + ], + False, + False, + ), + ( + [ + AirbyteMessage( + type=Type.LOG, + log=AirbyteLogMessage(level=Level.ERROR, message="oh no"), + ) + ], + False, + False, + ), + ([], False, False), + ], +) +def test_airbyte_trace_message_on_failure(output, expect_trace_message_on_failure, should_fail): + t = _TestBasicRead() + input_config = BasicReadTestConfig(expect_trace_message_on_failure=expect_trace_message_on_failure) + docker_runner_mock = MagicMock() + docker_runner_mock.call_read.return_value = output + + with patch.object(pytest, "skip", return_value=None): + if should_fail: + with pytest.raises(AssertionError, match="Connector should emit at least one error trace message"): + t.test_airbyte_trace_message_on_failure(None, input_config, docker_runner_mock) + else: + t.test_airbyte_trace_message_on_failure(None, input_config, docker_runner_mock) + + @pytest.mark.parametrize( "records, configured_catalog, expected_error", [ diff --git a/docs/connector-development/testing-connectors/source-acceptance-tests-reference.md b/docs/connector-development/testing-connectors/source-acceptance-tests-reference.md index 1454868754e8bb..355c0f9222c54e 100644 --- a/docs/connector-development/testing-connectors/source-acceptance-tests-reference.md +++ b/docs/connector-development/testing-connectors/source-acceptance-tests-reference.md @@ -125,19 +125,20 @@ Verifies when a discover operation is run on the connector using the given confi Configuring all streams in the input catalog to full refresh mode verifies that a read operation produces some RECORD messages. Each stream should have some data, if you can't guarantee this for particular streams - add them to the `empty_streams` list. Set `validate_data_points=True` if possible. This validation is going to be enabled by default and won't be configurable in future releases. -| Input | Type | Default | Note | -| :--- | :--- | :--- | :--- | -| `config_path` | string | `secrets/config.json` | Path to a JSON object representing a valid connector configuration | -| `configured_catalog_path` | string | `integration_tests/configured_catalog.json` | Path to configured catalog | -| `empty_streams` | array | \[\] | List of streams that might be empty | -| `validate_schema` | boolean | True | Verify that structure and types of records matches the schema from discovery command | -| `validate_data_points` | boolean | False | Validate that all fields in all streams contained at least one data point | -| `timeout_seconds` | int | 5\*60 | Test execution timeout in seconds | -| `expect_records` | object | None | Compare produced records with expected records, see details below | -| `expect_records.path` | string | | File with expected records | -| `expect_records.extra_fields` | boolean | False | Allow output records to have other fields i.e: expected records are a subset | -| `expect_records.exact_order` | boolean | False | Ensure that records produced in exact same order | -| `expect_records.extra_records` | boolean | True | Allow connector to produce extra records, but still enforce all records from the expected file to be produced | +| Input | Type | Default | Note | +|:----------------------------------| :--- | :--- | :--- | +| `config_path` | string | `secrets/config.json` | Path to a JSON object representing a valid connector configuration | +| `configured_catalog_path` | string | `integration_tests/configured_catalog.json` | Path to configured catalog | +| `empty_streams` | array | \[\] | List of streams that might be empty | +| `validate_schema` | boolean | True | Verify that structure and types of records matches the schema from discovery command | +| `validate_data_points` | boolean | False | Validate that all fields in all streams contained at least one data point | +| `timeout_seconds` | int | 5\*60 | Test execution timeout in seconds | +| `expect_trace_message_on_failure` | boolean | True | Ensure that a trace message is emitted when the connector crashes | +| `expect_records` | object | None | Compare produced records with expected records, see details below | +| `expect_records.path` | string | | File with expected records | +| `expect_records.extra_fields` | boolean | False | Allow output records to have other fields i.e: expected records are a subset | +| `expect_records.exact_order` | boolean | False | Ensure that records produced in exact same order | +| `expect_records.extra_records` | boolean | True | Allow connector to produce extra records, but still enforce all records from the expected file to be produced | `expect_records` is a nested configuration, if omitted - the part of the test responsible for record matching will be skipped. Due to the fact that we can't identify records without primary keys, only the following flag combinations are supported: