Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

CAT: add validation for stream statuses #34675

Merged
merged 24 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
# Changelog

## 3.4.0
Add `validate_stream_statuses` to TestBasicRead.test_read:: Validate all statuses for all streams in the catalogs were emitted in correct order.

## 3.3.3
袗ix `NoAdditionalPropertiesValidator` if no type found in `items`

## 3.3.2
Fix TestBasicRead.test_read.validate_schema: set `additionalProperties` to False recursively for objects
Fix TestBasicRead.test_read.validate_schema: set `additionalProperties` to False recursively for objects.

## 3.3.1
Fix TestSpec.test_oauth_is_default_method to skip connectors that doesn't have predicate_key object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ class BasicReadTestConfig(BaseConfig):
)
expect_records: Optional[ExpectedRecordsConfig] = Field(description="Expected records from the read")
validate_schema: bool = Field(True, description="Ensure that records match the schema of the corresponding stream")
validate_stream_statuses: bool = Field(True, description="Ensure that all streams emit status messages")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that it is a breaking change. What is the plan for updating the sources so that CATs pass after this is merged?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We expect that all certified connectors will pass this test.

What is the plan for updating the sources so that CATs pass after this is merged?

Our connector-health engineer will take care of it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you be more explicit about this? I want to understand is CATs will be red for a while. We have a goal for Q1 which is to have CATs be green for certified connectors. This includes having a process which ensure that connectors are passing CATs

fail_on_extra_columns: bool = Field(True, description="Fail if extra top-level properties (i.e. columns) are detected in records.")
# TODO: remove this field after https://github.com/airbytehq/airbyte/issues/8312 is done
validate_data_points: bool = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from airbyte_protocol.models import (
AirbyteRecordMessage,
AirbyteStream,
AirbyteStreamStatus,
AirbyteStreamStatusTraceMessage,
AirbyteTraceMessage,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
Expand Down Expand Up @@ -975,6 +977,13 @@ def should_validate_schema_fixture(self, inputs: BasicReadTestConfig, test_stric
else:
return inputs.validate_schema

@pytest.fixture(name="should_validate_stream_statuses")
def should_validate_stream_statuses_fixture(self, inputs: BasicReadTestConfig, test_strictness_level: Config.TestStrictnessLevel):
if not inputs.validate_stream_statuses and test_strictness_level is Config.TestStrictnessLevel.high:
pytest.fail("High strictness level error: validate_stream_statuses must be set to true in the basic read test configuration.")
else:
return inputs.validate_stream_statuses
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a grooming note that mentions that we should not apply this by default for community connectors. Right now, the default value for validate_stream_statuses is True. Does isn't that contradictory?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, this test will not run if ql < 400

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the new change, does that mean that if we want to enable this for ql < 400, we just can't? It feels like we should be able to test this even if the connector is ql < 400

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this case, do we need to set it to False as default, and force to True for certified (ql > 400) connectors?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we set it to "None" so that we have three states:

  • Not explicitly defined which means ql > 400 are enabled but not the rest
  • False would fail for ql > 400 but skip for non-certified
  • True would work for everyone

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!
fixed.


@pytest.fixture(name="should_fail_on_extra_columns")
def should_fail_on_extra_columns_fixture(self, inputs: BasicReadTestConfig):
# TODO (Ella): enforce this param once all connectors are passing
Expand Down Expand Up @@ -1026,6 +1035,7 @@ async def test_read(
expect_records_config: ExpectedRecordsConfig,
should_validate_schema: Boolean,
should_validate_data_points: Boolean,
should_validate_stream_statuses: Boolean,
should_fail_on_extra_columns: Boolean,
empty_streams: Set[EmptyStreamConfiguration],
ignored_fields: Optional[Mapping[str, List[IgnoredFieldsConfiguration]]],
Expand All @@ -1035,6 +1045,7 @@ async def test_read(
certified_file_based_connector: bool,
):
output = await docker_runner.call_read(connector_config, configured_catalog)

records = [message.record for message in filter_output(output, Type.RECORD)]

if certified_file_based_connector:
Expand Down Expand Up @@ -1067,6 +1078,14 @@ async def test_read(
detailed_logger=detailed_logger,
)

if should_validate_stream_statuses:
all_statuses = [
message.trace.stream_status
for message in filter_output(output, Type.TRACE)
if message.trace.type == TraceType.STREAM_STATUS
]
self._validate_stream_statuses(configured_catalog=configured_catalog, statuses=all_statuses)

async def test_airbyte_trace_message_on_failure(self, connector_config, inputs: BasicReadTestConfig, docker_runner: ConnectorRunner):
if not inputs.expect_trace_message_on_failure:
pytest.skip("Skipping `test_airbyte_trace_message_on_failure` because `inputs.expect_trace_message_on_failure=False`")
Expand Down Expand Up @@ -1233,6 +1252,22 @@ async def test_all_supported_file_types_present(self, certified_file_based_conne
"or add them to the `file_types -> unsupported_types` list in config."
)

@staticmethod
def _validate_stream_statuses(configured_catalog: ConfiguredAirbyteCatalog, statuses: List[AirbyteStreamStatusTraceMessage]):
"""Validate all statuses for all streams in the catalogs were emitted in correct order"""
stream_statuses = defaultdict(list)
for status in statuses:
stream_statuses[status.stream_descriptor.name].append(status.status)
maxi297 marked this conversation as resolved.
Show resolved Hide resolved

assert set(x.stream.name for x in configured_catalog.streams) == set(stream_statuses), "All stream must emit status"

for stream_name, status_list in stream_statuses.items():
assert status_list == [
AirbyteStreamStatus.STARTED,
AirbyteStreamStatus.RUNNING,
maxi297 marked this conversation as resolved.
Show resolved Hide resolved
AirbyteStreamStatus.COMPLETE,
], f"Stream `{stream_name}` statuses should be emitted in the next order: `STARTED`, `RUNNING`, `COMPLETE`"


@pytest.mark.default_timeout(TEN_MINUTES)
class TestConnectorAttributes(BaseTest):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "connector-acceptance-test"
version = "3.3.3"
version = "3.4.0"
description = "Contains acceptance tests for connectors."
authors = ["Airbyte <contact@airbyte.io>"]
license = "MIT"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,40 +90,35 @@ def test_verify_records_schema(configured_catalog: ConfiguredAirbyteCatalog):
@pytest.mark.parametrize(
"json_schema, record, should_fail",
[
(
{"type": "object", "properties": {"a": {"type": "string"}}},
{"a": "str", "b": "extra_string"},
True
),
({"type": "object", "properties": {"a": {"type": "string"}}}, {"a": "str", "b": "extra_string"}, True),
(
{"type": "object", "properties": {"a": {"type": "string"}, "some_obj": {"type": ["null", "object"]}}},
{"a": "str", "some_obj": {"b": "extra_string"}},
False
False,
),
(
{
"type": "object",
"properties": {"a": {"type": "string"}, "some_obj": {"type": ["null", "object"], "properties": {"a": {"type": "string"}}}},
},
{"a": "str", "some_obj": {"a": "str", "b": "extra_string"}},
True
True,
),

(
{"type": "object", "properties": {"a": {"type": "string"}, "b": {"type": "array", "items": {"type": "object"}}}},
{"a": "str", "b": [{"a": "extra_string"}]},
False
False,
),
(
{
"type": "object",
"properties": {
"a": {"type": "string"},
"b": {"type": "array", "items": {"type": "object", "properties": {"a": {"type": "string"}}}},
}
},
},
{"a": "str", "b": [{"a": "string", "b": "extra_string"}]},
True
True,
),
],
ids=[
Expand All @@ -136,7 +131,7 @@ def test_verify_records_schema(configured_catalog: ConfiguredAirbyteCatalog):
)
def test_verify_records_schema_with_fail_on_extra_columns(configured_catalog: ConfiguredAirbyteCatalog, json_schema, record, should_fail):
"""Test that fail_on_extra_columns works correctly with nested objects, array of objects"""
configured_catalog.streams[0].stream.json_schema =json_schema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, what formatter do you personally use, and what formatter does most of the team have?

@alafanechere works on setting up requirements on new PRs including type checks with mypy, and I'm considering a formatter as well.

Copy link
Collaborator Author

@artem1205 artem1205 Feb 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what formatter do you personally use?

Mainly airbyte-ci format fix all (but it is needed to remove unit_test folder from excluded list in pyproject.toml

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@natikgadzhi we already have a repo wide formatter (airbyte-ci format fix all/python/js/java) which runs in CI.
We can double the same rules at the connector level if we want to localize the formatting.

configured_catalog.streams[0].stream.json_schema = json_schema
records = [AirbyteRecordMessage(stream="my_stream", data=record, emitted_at=0)]
streams_with_errors = verify_records_schema(records, configured_catalog, fail_on_extra_columns=True)
errors = [error.message for error in streams_with_errors["my_stream"].values()]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStream,
AirbyteStreamStatus,
AirbyteStreamStatusTraceMessage,
AirbyteTraceMessage,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
Level,
StreamDescriptor,
SyncMode,
TraceType,
Type,
Expand Down Expand Up @@ -688,6 +691,7 @@ async def test_read(mocker, schema, ignored_fields, expect_records_config, recor
expect_records_config=expect_records_config,
should_validate_schema=True,
should_validate_data_points=False,
should_validate_stream_statuses=False,
should_fail_on_extra_columns=False,
empty_streams=set(),
expected_records_by_stream=expected_records_by_stream,
Expand Down Expand Up @@ -743,6 +747,7 @@ async def test_fail_on_extra_columns(
expect_records_config=ExpectedRecordsConfig(path="foobar"),
should_validate_schema=True,
should_validate_data_points=False,
should_validate_stream_statuses=False,
should_fail_on_extra_columns=config_fail_on_extra_columns,
empty_streams=set(),
expected_records_by_stream={},
Expand All @@ -758,6 +763,7 @@ async def test_fail_on_extra_columns(
expect_records_config=ExpectedRecordsConfig(path="foobar"),
should_validate_schema=True,
should_validate_data_points=False,
should_validate_stream_statuses=False,
should_fail_on_extra_columns=config_fail_on_extra_columns,
empty_streams=set(),
expected_records_by_stream={},
Expand Down Expand Up @@ -1331,6 +1337,188 @@ def test_validate_field_appears_at_least_once(records, configured_catalog, expec
t._validate_field_appears_at_least_once(records=records, configured_catalog=configured_catalog)


@pytest.mark.parametrize(
"output, expected_exception",
[
maxi297 marked this conversation as resolved.
Show resolved Hide resolved
(
[
AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.STREAM_STATUS,
emitted_at=1,
stream_status=AirbyteStreamStatusTraceMessage(
stream_descriptor=StreamDescriptor(name="test_stream_0"), status=AirbyteStreamStatus.STARTED
),
),
),
AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.STREAM_STATUS,
emitted_at=1,
stream_status=AirbyteStreamStatusTraceMessage(
stream_descriptor=StreamDescriptor(name="test_stream_2"), status=AirbyteStreamStatus.STARTED
),
),
),
AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.STREAM_STATUS,
emitted_at=1,
stream_status=AirbyteStreamStatusTraceMessage(
stream_descriptor=StreamDescriptor(name="test_stream_1"), status=AirbyteStreamStatus.STARTED
),
),
),
AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream_0", data={"a": 1}, emitted_at=111)),
AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream_1", data={"a": 1}, emitted_at=112)),
AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream_2", data={"a": 1}, emitted_at=113)),
AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.STREAM_STATUS,
emitted_at=114,
stream_status=AirbyteStreamStatusTraceMessage(
stream_descriptor=StreamDescriptor(name="test_stream_1"), status=AirbyteStreamStatus.RUNNING
),
),
),
AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.STREAM_STATUS,
emitted_at=114,
stream_status=AirbyteStreamStatusTraceMessage(
stream_descriptor=StreamDescriptor(name="test_stream_2"), status=AirbyteStreamStatus.RUNNING
),
),
),
AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.STREAM_STATUS,
emitted_at=114,
stream_status=AirbyteStreamStatusTraceMessage(
stream_descriptor=StreamDescriptor(name="test_stream_0"), status=AirbyteStreamStatus.RUNNING
),
),
),
AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.STREAM_STATUS,
emitted_at=115,
stream_status=AirbyteStreamStatusTraceMessage(
stream_descriptor=StreamDescriptor(name="test_stream_2"), status=AirbyteStreamStatus.COMPLETE
),
),
),
AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.STREAM_STATUS,
emitted_at=116,
stream_status=AirbyteStreamStatusTraceMessage(
stream_descriptor=StreamDescriptor(name="test_stream_1"), status=AirbyteStreamStatus.COMPLETE
),
),
),
AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.STREAM_STATUS,
emitted_at=120,
stream_status=AirbyteStreamStatusTraceMessage(
stream_descriptor=StreamDescriptor(name="test_stream_0"), status=AirbyteStreamStatus.COMPLETE
),
),
),
],
does_not_raise(),
),
(
[
AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream_0", data={"a": 1}, emitted_at=111)),
AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream_1", data={"a": 1}, emitted_at=112)),
AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream_2", data={"a": 1}, emitted_at=113)),
],
pytest.raises(AssertionError),
),
(
[
AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.STREAM_STATUS,
emitted_at=1,
stream_status=AirbyteStreamStatusTraceMessage(
stream_descriptor=StreamDescriptor(name="test_stream_0"), status=AirbyteStreamStatus.STARTED
),
),
),
AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.STREAM_STATUS,
emitted_at=1,
stream_status=AirbyteStreamStatusTraceMessage(
stream_descriptor=StreamDescriptor(name="test_stream_2"), status=AirbyteStreamStatus.STARTED
),
),
),
AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.STREAM_STATUS,
emitted_at=1,
stream_status=AirbyteStreamStatusTraceMessage(
stream_descriptor=StreamDescriptor(name="test_stream_1"), status=AirbyteStreamStatus.STARTED
),
),
),
AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream_0", data={"a": 1}, emitted_at=111)),
AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream_1", data={"a": 1}, emitted_at=112)),
AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream_2", data={"a": 1}, emitted_at=113)),
],
pytest.raises(AssertionError),
),
maxi297 marked this conversation as resolved.
Show resolved Hide resolved
],
ids=["no_exception", "with_exception_no_statuses", "with_exception_only_started_present"],
)
async def test_read_validate_stream_statuses(mocker, output, expected_exception):
configured_catalog = ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
stream=AirbyteStream.parse_obj({"name": f"test_stream_{x}", "json_schema": {}, "supported_sync_modes": ["full_refresh"]}),
sync_mode="full_refresh",
destination_sync_mode="overwrite",
)
for x in range(3)
]
)
docker_runner_mock = mocker.MagicMock(call_read=mocker.AsyncMock(return_value=output))

t = test_core.TestBasicRead()
with expected_exception:
await t.test_read(
connector_config=None,
configured_catalog=configured_catalog,
expect_records_config=ExpectedRecordsConfig(path="foobar"),
should_validate_schema=False,
should_validate_data_points=False,
should_validate_stream_statuses=True,
should_fail_on_extra_columns=False,
empty_streams=set(),
expected_records_by_stream={},
docker_runner=docker_runner_mock,
ignored_fields=None,
detailed_logger=MagicMock(),
certified_file_based_connector=False,
)


@pytest.mark.parametrize(
("metadata", "expected_file_based"),
(
Expand Down
Loading