Skip to content

Commit

Permalink
SAT: check that connectors emit an AirbyteTraceMessage on failure (#…
Browse files Browse the repository at this point in the history
…12796)

* add SAT case for airbyte trace message on failure

* add ability to opt-out

* add tests

* add option to docs

* bump version, update changelog

* fix type errors

* update changelog
  • Loading branch information
pedroslopez committed May 17, 2022
1 parent 7ea27d0 commit 957a0be
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.1.52
Add test case for `AirbyteTraceMessage` emission on connector failure: [#12796](https://github.com/airbytehq/airbyte/pull/12796/).

## 0.1.51
- Add `threshold_days` option for lookback window support in incremental tests.
- Update CDK to prevent warnings when encountering new `AirbyteTraceMessage`s.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ COPY pytest.ini setup.py ./
COPY source_acceptance_test ./source_acceptance_test
RUN pip install .

LABEL io.airbyte.version=0.1.51
LABEL io.airbyte.version=0.1.52
LABEL io.airbyte.name=airbyte/source-acceptance-test

ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin", "-r", "fEsx"]
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def validate_exact_order(cls, exact_order, values):
@validator("extra_records", always=True)
def validate_extra_records(cls, extra_records, values):
if "extra_fields" in values and values["extra_fields"] and extra_records:
raise ValueError("extra_records must by off if extra_fields enabled")
raise ValueError("extra_records must be off if extra_fields enabled")
return extra_records


Expand All @@ -79,6 +79,7 @@ class BasicReadTestConfig(BaseConfig):
validate_data_points: bool = Field(
False, description="Set whether we need to validate that all fields in all streams contained at least one data point"
)
expect_trace_message_on_failure: bool = Field(True, description="Ensure that a trace message is emitted when the connector crashes")
timeout_seconds: int = timeout_seconds


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,18 @@
import dpath.util
import jsonschema
import pytest
from airbyte_cdk.models import AirbyteRecordMessage, ConfiguredAirbyteCatalog, ConnectorSpecification, Status, Type
from airbyte_cdk.models import (
AirbyteRecordMessage,
AirbyteStream,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
ConnectorSpecification,
DestinationSyncMode,
Status,
SyncMode,
TraceType,
Type,
)
from docker.errors import ContainerError
from jsonschema._utils import flatten
from source_acceptance_test.base import BaseTest
Expand Down Expand Up @@ -401,6 +412,31 @@ def test_read(
records=records, expected_records=expected_records, flags=inputs.expect_records, detailed_logger=detailed_logger
)

def test_airbyte_trace_message_on_failure(self, connector_config, inputs: BasicReadTestConfig, docker_runner: ConnectorRunner):
if not inputs.expect_trace_message_on_failure:
pytest.skip("Skipping `test_airbyte_trace_message_on_failure` because `inputs.expect_trace_message_on_failure=False`")
return

invalid_configured_catalog = ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
stream=AirbyteStream(
name="__AIRBYTE__stream_that_does_not_exist",
json_schema={"type": "object", "properties": {"f1": {"type": "string"}}},
supported_sync_modes=[SyncMode.full_refresh],
),
sync_mode=SyncMode.full_refresh,
destination_sync_mode=DestinationSyncMode.overwrite,
)
]
)

output = docker_runner.call_read(connector_config, invalid_configured_catalog, raise_container_error=False)
trace_messages = filter_output(output, Type.TRACE)
error_trace_messages = list(filter(lambda m: m.trace.type == TraceType.ERROR, trace_messages))

assert len(error_trace_messages) >= 1, "Connector should emit at least one error trace message"

@staticmethod
def remove_extra_fields(record: Any, spec: Any) -> Any:
"""Remove keys from record that spec doesn't have, works recursively"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def call_read_with_state(self, config, catalog, state, **kwargs) -> List[Airbyte
output = list(self.run(cmd=cmd, config=config, catalog=catalog, state=state, **kwargs))
return output

def run(self, cmd, config=None, state=None, catalog=None, **kwargs) -> Iterable[AirbyteMessage]:
def run(self, cmd, config=None, state=None, catalog=None, raise_container_error: bool = True, **kwargs) -> Iterable[AirbyteMessage]:
self._runs += 1
volumes = self._prepare_volumes(config, state, catalog)
logging.debug(f"Docker run {self._image}: \n{cmd}\n" f"input: {self.input_folder}\noutput: {self.output_folder}")
Expand All @@ -103,7 +103,7 @@ def run(self, cmd, config=None, state=None, catalog=None, **kwargs) -> Iterable[
**kwargs,
)
with open(self.output_folder / "raw", "wb+") as f:
for line in self.read(container, command=cmd):
for line in self.read(container, command=cmd, with_ext=raise_container_error):
f.write(line.encode())
try:
yield AirbyteMessage.parse_raw(line)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,22 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch

import pytest
from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, Type
from airbyte_cdk.models import (
AirbyteErrorTraceMessage,
AirbyteLogMessage,
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStream,
AirbyteTraceMessage,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
Level,
TraceType,
Type,
)
from source_acceptance_test.config import BasicReadTestConfig
from source_acceptance_test.tests.test_core import TestBasicRead as _TestBasicRead
from source_acceptance_test.tests.test_core import TestDiscovery as _TestDiscovery
Expand Down Expand Up @@ -144,6 +156,95 @@ def test_read(schema, record, should_fail):
t.test_read(None, catalog, input_config, [], docker_runner_mock, MagicMock())


@pytest.mark.parametrize(
"output, expect_trace_message_on_failure, should_fail",
[
(
[
AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(type=TraceType.ERROR, emitted_at=111, error=AirbyteErrorTraceMessage(message="oh no")),
)
],
True,
False,
),
(
[
AirbyteMessage(
type=Type.LOG,
log=AirbyteLogMessage(level=Level.ERROR, message="oh no"),
),
AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(type=TraceType.ERROR, emitted_at=111, error=AirbyteErrorTraceMessage(message="oh no")),
),
],
True,
False,
),
(
[
AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(type=TraceType.ERROR, emitted_at=111, error=AirbyteErrorTraceMessage(message="oh no")),
),
AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(type=TraceType.ERROR, emitted_at=112, error=AirbyteErrorTraceMessage(message="oh no!!")),
),
],
True,
False,
),
(
[
AirbyteMessage(
type=Type.LOG,
log=AirbyteLogMessage(level=Level.ERROR, message="oh no"),
)
],
True,
True,
),
([], True, True),
(
[
AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(type=TraceType.ERROR, emitted_at=111, error=AirbyteErrorTraceMessage(message="oh no")),
)
],
False,
False,
),
(
[
AirbyteMessage(
type=Type.LOG,
log=AirbyteLogMessage(level=Level.ERROR, message="oh no"),
)
],
False,
False,
),
([], False, False),
],
)
def test_airbyte_trace_message_on_failure(output, expect_trace_message_on_failure, should_fail):
t = _TestBasicRead()
input_config = BasicReadTestConfig(expect_trace_message_on_failure=expect_trace_message_on_failure)
docker_runner_mock = MagicMock()
docker_runner_mock.call_read.return_value = output

with patch.object(pytest, "skip", return_value=None):
if should_fail:
with pytest.raises(AssertionError, match="Connector should emit at least one error trace message"):
t.test_airbyte_trace_message_on_failure(None, input_config, docker_runner_mock)
else:
t.test_airbyte_trace_message_on_failure(None, input_config, docker_runner_mock)


@pytest.mark.parametrize(
"records, configured_catalog, expected_error",
[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,19 +125,20 @@ Verifies when a discover operation is run on the connector using the given confi
Configuring all streams in the input catalog to full refresh mode verifies that a read operation produces some RECORD messages. Each stream should have some data, if you can't guarantee this for particular streams - add them to the `empty_streams` list.
Set `validate_data_points=True` if possible. This validation is going to be enabled by default and won't be configurable in future releases.

| Input | Type | Default | Note |
| :--- | :--- | :--- | :--- |
| `config_path` | string | `secrets/config.json` | Path to a JSON object representing a valid connector configuration |
| `configured_catalog_path` | string | `integration_tests/configured_catalog.json` | Path to configured catalog |
| `empty_streams` | array | \[\] | List of streams that might be empty |
| `validate_schema` | boolean | True | Verify that structure and types of records matches the schema from discovery command |
| `validate_data_points` | boolean | False | Validate that all fields in all streams contained at least one data point |
| `timeout_seconds` | int | 5\*60 | Test execution timeout in seconds |
| `expect_records` | object | None | Compare produced records with expected records, see details below |
| `expect_records.path` | string | | File with expected records |
| `expect_records.extra_fields` | boolean | False | Allow output records to have other fields i.e: expected records are a subset |
| `expect_records.exact_order` | boolean | False | Ensure that records produced in exact same order |
| `expect_records.extra_records` | boolean | True | Allow connector to produce extra records, but still enforce all records from the expected file to be produced |
| Input | Type | Default | Note |
|:----------------------------------| :--- | :--- | :--- |
| `config_path` | string | `secrets/config.json` | Path to a JSON object representing a valid connector configuration |
| `configured_catalog_path` | string | `integration_tests/configured_catalog.json` | Path to configured catalog |
| `empty_streams` | array | \[\] | List of streams that might be empty |
| `validate_schema` | boolean | True | Verify that structure and types of records matches the schema from discovery command |
| `validate_data_points` | boolean | False | Validate that all fields in all streams contained at least one data point |
| `timeout_seconds` | int | 5\*60 | Test execution timeout in seconds |
| `expect_trace_message_on_failure` | boolean | True | Ensure that a trace message is emitted when the connector crashes |
| `expect_records` | object | None | Compare produced records with expected records, see details below |
| `expect_records.path` | string | | File with expected records |
| `expect_records.extra_fields` | boolean | False | Allow output records to have other fields i.e: expected records are a subset |
| `expect_records.exact_order` | boolean | False | Ensure that records produced in exact same order |
| `expect_records.extra_records` | boolean | True | Allow connector to produce extra records, but still enforce all records from the expected file to be produced |

`expect_records` is a nested configuration, if omitted - the part of the test responsible for record matching will be skipped. Due to the fact that we can't identify records without primary keys, only the following flag combinations are supported:

Expand Down

0 comments on commit 957a0be

Please sign in to comment.