Skip to content

Commit

Permalink
cat: validate types, formats, airbyte types and their combinations on…
Browse files Browse the repository at this point in the history
… catalog (#26669)
  • Loading branch information
alafanechere committed May 30, 2023
1 parent 312a361 commit 7de2886
Show file tree
Hide file tree
Showing 4 changed files with 228 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.10.0
Discovery test: add validation that fails if the declared types/format/airbyte_types in the connector's streams properties are not [supported data types](https://docs.airbyte.com/understanding-airbyte/supported-data-types/) or if their combination is invalid.

## 0.9.0
Basic read test: add validation that fails if undeclared columns are present in records. Add `fail_on_extra_fields` input parameter to ignore this failure if desired.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ COPY pytest.ini setup.py ./
COPY connector_acceptance_test ./connector_acceptance_test
RUN pip install .

LABEL io.airbyte.version=0.9.0
LABEL io.airbyte.version=0.10.0
LABEL io.airbyte.name=airbyte/connector-acceptance-test

ENTRYPOINT ["python", "-m", "pytest", "-p", "connector_acceptance_test.plugin", "-r", "fEsx"]
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,22 @@ def test_check(self, connector_config, inputs: ConnectionTestConfig, docker_runn

@pytest.mark.default_timeout(30)
class TestDiscovery(BaseTest):

VALID_TYPES = {"null", "string", "number", "integer", "boolean", "object", "array"}
VALID_AIRBYTE_TYPES = {"timestamp_with_timezone", "timestamp_without_timezone"}
VALID_FORMATS = {"date-time", "date"}
VALID_TYPE_FORMAT_COMBINATIONS = [
({"string"}, "date"),
({"string"}, "date-time"),
({"string", "null"}, "date"),
({"string", "null"}, "date-time"),
]
VALID_TYPE_AIRBYTE_TYPE_COMBINATIONS = [
({"string"}, "timestamp_with_timezone"),
({"string"}, "timestamp_without_timezone"),
({"string", "null"}, "timestamp_with_timezone"),
]

@pytest.fixture(name="skip_backward_compatibility_tests")
def skip_backward_compatibility_tests_fixture(
self,
Expand Down Expand Up @@ -672,6 +688,49 @@ def test_backward_compatibility(
checker = CatalogDiffChecker(previous_discovered_catalog, discovered_catalog)
checker.assert_is_backward_compatible()

def test_catalog_has_supported_data_types(self, discovered_catalog: Mapping[str, Any]):
"""Check that all streams have supported data types, format and airbyte_types.
Supported data types are listed there: https://docs.airbyte.com/understanding-airbyte/supported-data-types/
"""

for stream_name, stream_data in discovered_catalog.items():
schema_helper = JsonSchemaHelper(stream_data.json_schema)

for type_path, type_value in dpath.util.search(stream_data.json_schema, "**/type", yielded=True):
parent_path = schema_helper.get_parent_path(type_path)
parent = schema_helper.get_parent(type_path)
type_values = set(type_value) if isinstance(type_value, list) else {type_value}

# Check unsupported type
has_unsupported_type = any(t not in self.VALID_TYPES for t in type_values)
if has_unsupported_type:
raise AssertionError(f"Found unsupported type ({type_values}) in {stream_name} stream on property {parent_path}")

# Check unsupported format
property_format = parent.get("format")
if property_format and property_format not in self.VALID_FORMATS:
raise AssertionError(f"Found unsupported format ({property_format}) in {stream_name} stream on property {parent_path}")

# Check unsupported airbyte_type and type/airbyte_type combination
airbyte_type = parent.get("airbyte_type")
if airbyte_type and airbyte_type not in self.VALID_AIRBYTE_TYPES:
raise AssertionError(
f"Found unsupported airbyte_type ({airbyte_type}) in {stream_name} stream on property {parent_path}"
)
if airbyte_type:
type_airbyte_type_combination = (type_values, airbyte_type)
if type_airbyte_type_combination not in self.VALID_TYPE_AIRBYTE_TYPE_COMBINATIONS:
raise AssertionError(
f"Found unsupported type/airbyte_type combination {type_airbyte_type_combination} in {stream_name} stream on property {parent_path}"
)
# Check unsupported type/format combination
if property_format:
type_format_combination = (type_values, property_format)
if type_format_combination not in self.VALID_TYPE_FORMAT_COMBINATIONS:
raise AssertionError(
f"Found unsupported type/format combination {type_format_combination} in {stream_name} stream on property {parent_path}"
)


def primary_keys_for_records(streams, records):
streams_with_primary_key = [stream for stream in streams if stream.stream.source_defined_primary_key]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,163 @@ def test_additional_properties_is_true(discovered_catalog, expectation):
t.test_additional_properties_is_true(discovered_catalog)


@pytest.mark.parametrize(
"discovered_catalog, expectation",
[
(
{
"test_stream_1": AirbyteStream.parse_obj(
{
"name": "test_stream_1",
"json_schema": {"properties": {"username": {"type": "string"}}},
"supported_sync_modes": ["full_refresh"],
}
)
},
does_not_raise(),
),
(
{
"test_stream_1": AirbyteStream.parse_obj(
{
"name": "test_stream_1",
"json_schema": {"properties": {"username": {"type": ["null", "string"]}}},
"supported_sync_modes": ["full_refresh"],
}
)
},
does_not_raise(),
),
(
{
"test_stream_1": AirbyteStream.parse_obj(
{
"name": "test_stream_1",
"json_schema": {"properties": {"user": {"type": "object", "properties": {"name": {"type": "string"}}}}},
"supported_sync_modes": ["full_refresh"],
}
)
},
does_not_raise(),
),
(
{
"test_stream_1": AirbyteStream.parse_obj(
{
"name": "test_stream_1",
"json_schema": {"properties": {"username": {"type": "unsupported"}}},
"supported_sync_modes": ["full_refresh"],
}
)
},
pytest.raises(AssertionError),
),
(
{
"test_stream_1": AirbyteStream.parse_obj(
{
"name": "test_stream_1",
"json_schema": {"properties": {"username": {"type": ["null", "unsupported"]}}},
"supported_sync_modes": ["full_refresh"],
}
)
},
pytest.raises(AssertionError),
),
(
{
"test_stream_1": AirbyteStream.parse_obj(
{
"name": "test_stream_1",
"json_schema": {"properties": {"username": {"type": "string", "format": "date"}}},
"supported_sync_modes": ["full_refresh"],
}
)
},
does_not_raise(),
),
(
{
"test_stream_1": AirbyteStream.parse_obj(
{
"name": "test_stream_1",
"json_schema": {"properties": {"username": {"type": "string", "format": "date-time"}}},
"supported_sync_modes": ["full_refresh"],
}
)
},
does_not_raise(),
),
(
{
"test_stream_1": AirbyteStream.parse_obj(
{
"name": "test_stream_1",
"json_schema": {"properties": {"username": {"type": "string", "format": "datetime"}}},
"supported_sync_modes": ["full_refresh"],
}
)
},
pytest.raises(AssertionError),
),
(
{
"test_stream_1": AirbyteStream.parse_obj(
{
"name": "test_stream_1",
"json_schema": {"properties": {"username": {"type": "number", "format": "date"}}},
"supported_sync_modes": ["full_refresh"],
}
)
},
pytest.raises(AssertionError),
),
(
{
"test_stream_1": AirbyteStream.parse_obj(
{
"name": "test_stream_1",
"json_schema": {"properties": {"username": {"type": "string", "format": "date", "airbyte_type": "unsupported"}}},
"supported_sync_modes": ["full_refresh"],
}
)
},
pytest.raises(AssertionError),
),
(
{
"test_stream_1": AirbyteStream.parse_obj(
{
"name": "test_stream_1",
"json_schema": {"properties": {"username": {"type": "number", "airbyte_type": "timestamp_with_timezone"}}},
"supported_sync_modes": ["full_refresh"],
}
)
},
pytest.raises(AssertionError),
),
(
{
"test_stream_1": AirbyteStream.parse_obj(
{
"name": "test_stream_1",
"json_schema": {
"properties": {"user": {"type": "object", "properties": {"name": {"type": "string", "format": "unsupported"}}}}
},
"supported_sync_modes": ["full_refresh"],
}
)
},
pytest.raises(AssertionError),
),
],
)
def test_catalog_has_supported_data_types(discovered_catalog, expectation):
t = test_core.TestDiscovery()
with expectation:
t.test_catalog_has_supported_data_types(discovered_catalog)


@pytest.mark.parametrize(
"test_strictness_level, configured_catalog_path",
[
Expand Down Expand Up @@ -280,22 +437,8 @@ def test_configured_catalog_fixture(mocker, test_strictness_level, configured_ca
@pytest.mark.parametrize(
"schema, ignored_fields, expect_records_config, record, expected_records_by_stream, expectation",
[
(
{"type": "object"},
{},
ExpectedRecordsConfig(path="foobar"),
{"aa": 23},
{},
does_not_raise()
),
(
{"type": "object"},
{},
ExpectedRecordsConfig(path="foobar"),
{},
{},
does_not_raise()
),
({"type": "object"}, {}, ExpectedRecordsConfig(path="foobar"), {"aa": 23}, {}, does_not_raise()),
({"type": "object"}, {}, ExpectedRecordsConfig(path="foobar"), {}, {}, does_not_raise()),
(
{"type": "object", "properties": {"created": {"type": "string"}}},
{},
Expand All @@ -310,7 +453,7 @@ def test_configured_catalog_fixture(mocker, test_strictness_level, configured_ca
ExpectedRecordsConfig(path="foobar"),
{"created": "23"},
{},
does_not_raise()
does_not_raise(),
),
(
{"type": "object", "properties": {"created": {"type": "string"}}},
Expand Down Expand Up @@ -398,12 +541,15 @@ def test_read(schema, ignored_fields, expect_records_config, record, expected_re
)


@pytest.mark.parametrize("config_fail_on_extra_columns, record_has_unexpected_column, expectation_should_fail", [
@pytest.mark.parametrize(
"config_fail_on_extra_columns, record_has_unexpected_column, expectation_should_fail",
[
(True, True, True),
(True, False, False),
(False, False, False),
(False, True, False),
])
],
)
@pytest.mark.parametrize("additional_properties", [True, False, None])
def test_fail_on_extra_columns(config_fail_on_extra_columns, record_has_unexpected_column, expectation_should_fail, additional_properties):
schema = {"type": "object", "properties": {"field_1": {"type": ["string"]}, "field_2": {"type": ["string"]}}}
Expand Down

0 comments on commit 7de2886

Please sign in to comment.