diff --git a/airbyte-integrations/bases/connector-acceptance-test/CHANGELOG.md b/airbyte-integrations/bases/connector-acceptance-test/CHANGELOG.md index a41617cf130f1..5166e0fc1dc05 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/CHANGELOG.md +++ b/airbyte-integrations/bases/connector-acceptance-test/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 3.5.0 +Add `validate_stream_statuses` to TestBasicRead.test_read:: Validate all statuses for all streams in the catalogs were emitted in correct order. + ## 3.4.0 Add TestConnectorDocumentation suite for validating connectors documentation structure and content. @@ -7,7 +10,7 @@ Add TestConnectorDocumentation suite for validating connectors documentation str Аix `NoAdditionalPropertiesValidator` if no type found in `items` ## 3.3.2 -Fix TestBasicRead.test_read.validate_schema: set `additionalProperties` to False recursively for objects +Fix TestBasicRead.test_read.validate_schema: set `additionalProperties` to False recursively for objects. ## 3.3.1 Fix TestSpec.test_oauth_is_default_method to skip connectors that doesn't have predicate_key object. diff --git a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/config.py b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/config.py index fbb823185b52d..a0d62f6461634 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/config.py +++ b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/config.py @@ -188,6 +188,7 @@ class BasicReadTestConfig(BaseConfig): ) expect_records: Optional[ExpectedRecordsConfig] = Field(description="Expected records from the read") validate_schema: bool = Field(True, description="Ensure that records match the schema of the corresponding stream") + validate_stream_statuses: bool = Field(None, description="Ensure that all streams emit status messages") fail_on_extra_columns: bool = Field(True, description="Fail if extra top-level properties (i.e. columns) are detected in records.") # TODO: remove this field after https://github.com/airbytehq/airbyte/issues/8312 is done validate_data_points: bool = Field( diff --git a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/conftest.py b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/conftest.py index dd3f6b5701c59..94e9f815f24a5 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/conftest.py +++ b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/conftest.py @@ -408,3 +408,8 @@ def docs_path_fixture(base_path, connector_metadata) -> Path: def connector_documentation_fixture(docs_path: str) -> str: with open(docs_path, "r") as f: return f.read().rstrip() + + +@pytest.fixture(name="is_connector_certified") +def connector_certification_status_fixture(connector_metadata: dict) -> bool: + return connector_metadata.get("data", {}).get("ab_internal", {}).get("ql", 0) >= 400 diff --git a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py index a91617944a79f..29e1fc4e39fd5 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py +++ b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py @@ -23,6 +23,8 @@ from airbyte_protocol.models import ( AirbyteRecordMessage, AirbyteStream, + AirbyteStreamStatus, + AirbyteStreamStatusTraceMessage, AirbyteTraceMessage, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, @@ -979,6 +981,14 @@ def should_validate_schema_fixture(self, inputs: BasicReadTestConfig, test_stric else: return inputs.validate_schema + @pytest.fixture(name="should_validate_stream_statuses") + def should_validate_stream_statuses_fixture(self, inputs: BasicReadTestConfig, is_connector_certified: bool): + if inputs.validate_stream_statuses is None and is_connector_certified: + return True + if not inputs.validate_stream_statuses and is_connector_certified: + pytest.fail("High strictness level error: validate_stream_statuses must be set to true in the basic read test configuration.") + return inputs.validate_stream_statuses + @pytest.fixture(name="should_fail_on_extra_columns") def should_fail_on_extra_columns_fixture(self, inputs: BasicReadTestConfig): # TODO (Ella): enforce this param once all connectors are passing @@ -1030,6 +1040,7 @@ async def test_read( expect_records_config: ExpectedRecordsConfig, should_validate_schema: Boolean, should_validate_data_points: Boolean, + should_validate_stream_statuses: Boolean, should_fail_on_extra_columns: Boolean, empty_streams: Set[EmptyStreamConfiguration], ignored_fields: Optional[Mapping[str, List[IgnoredFieldsConfiguration]]], @@ -1039,6 +1050,7 @@ async def test_read( certified_file_based_connector: bool, ): output = await docker_runner.call_read(connector_config, configured_catalog) + records = [message.record for message in filter_output(output, Type.RECORD)] if certified_file_based_connector: @@ -1071,6 +1083,14 @@ async def test_read( detailed_logger=detailed_logger, ) + if should_validate_stream_statuses: + all_statuses = [ + message.trace.stream_status + for message in filter_output(output, Type.TRACE) + if message.trace.type == TraceType.STREAM_STATUS + ] + self._validate_stream_statuses(configured_catalog=configured_catalog, statuses=all_statuses) + async def test_airbyte_trace_message_on_failure(self, connector_config, inputs: BasicReadTestConfig, docker_runner: ConnectorRunner): if not inputs.expect_trace_message_on_failure: pytest.skip("Skipping `test_airbyte_trace_message_on_failure` because `inputs.expect_trace_message_on_failure=False`") @@ -1189,15 +1209,13 @@ def group_by_stream(records: List[AirbyteRecordMessage]) -> MutableMapping[str, return result @pytest.fixture(name="certified_file_based_connector") - def is_certified_file_based_connector(self, connector_metadata: Dict[str, Any]) -> bool: + def is_certified_file_based_connector(self, connector_metadata: Dict[str, Any], is_connector_certified: bool) -> bool: metadata = connector_metadata.get("data", {}) # connector subtype is specified in data.connectorSubtype field file_based_connector = metadata.get("connectorSubtype") == "file" - # a certified connector has ab_internal.ql value >= 400 - certified_connector = metadata.get("ab_internal", {}).get("ql", 0) >= 400 - return file_based_connector and certified_connector + return file_based_connector and is_connector_certified @staticmethod def _get_file_extension(file_name: str) -> str: @@ -1237,6 +1255,29 @@ async def test_all_supported_file_types_present(self, certified_file_based_conne "or add them to the `file_types -> unsupported_types` list in config." ) + @staticmethod + def _validate_stream_statuses(configured_catalog: ConfiguredAirbyteCatalog, statuses: List[AirbyteStreamStatusTraceMessage]): + """Validate all statuses for all streams in the catalogs were emitted in correct order: + 1. STARTED + 2. RUNNING (can be >1) + 3. COMPLETE + """ + stream_statuses = defaultdict(list) + for status in statuses: + stream_statuses[f"{status.stream_descriptor.namespace}-{status.stream_descriptor.name}"].append(status.status) + + assert set(f"{x.stream.namespace}-{x.stream.name}" for x in configured_catalog.streams) == set( + stream_statuses + ), "All stream must emit status" + + for stream_name, status_list in stream_statuses.items(): + assert ( + len(status_list) >= 3 + ), f"Stream `{stream_name}` statuses should be emitted in the next order: `STARTED`, `RUNNING`,... `COMPLETE`" + assert status_list[0] == AirbyteStreamStatus.STARTED + assert status_list[-1] == AirbyteStreamStatus.COMPLETE + assert all(x == AirbyteStreamStatus.RUNNING for x in status_list[1:-1]) + @pytest.mark.default_timeout(TEN_MINUTES) class TestConnectorAttributes(BaseTest): @@ -1245,13 +1286,13 @@ class TestConnectorAttributes(BaseTest): MANDATORY_FOR_TEST_STRICTNESS_LEVELS = [] @pytest.fixture(name="operational_certification_test") - async def operational_certification_test_fixture(self, connector_metadata: dict) -> bool: + async def operational_certification_test_fixture(self, is_connector_certified: bool) -> bool: """ Fixture that is used to skip a test that is reserved only for connectors that are supposed to be tested against operational certification criteria """ - if connector_metadata.get("data", {}).get("ab_internal", {}).get("ql") < 400: + if not is_connector_certified: pytest.skip("Skipping operational connector certification test for uncertified connector") return True @@ -1350,12 +1391,12 @@ class TestConnectorDocumentation(BaseTest): CONNECTOR_SPECIFIC_HEADINGS = "" @pytest.fixture(name="operational_certification_test") - async def operational_certification_test_fixture(self, connector_metadata: dict) -> bool: + async def operational_certification_test_fixture(self, is_connector_certified: bool) -> bool: """ Fixture that is used to skip a test that is reserved only for connectors that are supposed to be tested against operational certification criteria """ - if connector_metadata.get("data", {}).get("ab_internal", {}).get("ql") < 400: + if not is_connector_certified: pytest.skip("Skipping testing source connector documentation due to low ql.") return True diff --git a/airbyte-integrations/bases/connector-acceptance-test/pyproject.toml b/airbyte-integrations/bases/connector-acceptance-test/pyproject.toml index d090aaa6fe8c8..ed80ea830fd8b 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/pyproject.toml +++ b/airbyte-integrations/bases/connector-acceptance-test/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "connector-acceptance-test" -version = "3.4.0" +version = "3.5.0" description = "Contains acceptance tests for connectors." authors = ["Airbyte "] license = "MIT" diff --git a/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_asserts.py b/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_asserts.py index 21346053a3789..d0732458ec4da 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_asserts.py +++ b/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_asserts.py @@ -90,15 +90,11 @@ def test_verify_records_schema(configured_catalog: ConfiguredAirbyteCatalog): @pytest.mark.parametrize( "json_schema, record, should_fail", [ - ( - {"type": "object", "properties": {"a": {"type": "string"}}}, - {"a": "str", "b": "extra_string"}, - True - ), + ({"type": "object", "properties": {"a": {"type": "string"}}}, {"a": "str", "b": "extra_string"}, True), ( {"type": "object", "properties": {"a": {"type": "string"}, "some_obj": {"type": ["null", "object"]}}}, {"a": "str", "some_obj": {"b": "extra_string"}}, - False + False, ), ( { @@ -106,13 +102,12 @@ def test_verify_records_schema(configured_catalog: ConfiguredAirbyteCatalog): "properties": {"a": {"type": "string"}, "some_obj": {"type": ["null", "object"], "properties": {"a": {"type": "string"}}}}, }, {"a": "str", "some_obj": {"a": "str", "b": "extra_string"}}, - True + True, ), - ( {"type": "object", "properties": {"a": {"type": "string"}, "b": {"type": "array", "items": {"type": "object"}}}}, {"a": "str", "b": [{"a": "extra_string"}]}, - False + False, ), ( { @@ -120,10 +115,10 @@ def test_verify_records_schema(configured_catalog: ConfiguredAirbyteCatalog): "properties": { "a": {"type": "string"}, "b": {"type": "array", "items": {"type": "object", "properties": {"a": {"type": "string"}}}}, - } + }, }, {"a": "str", "b": [{"a": "string", "b": "extra_string"}]}, - True + True, ), ], ids=[ @@ -136,7 +131,7 @@ def test_verify_records_schema(configured_catalog: ConfiguredAirbyteCatalog): ) def test_verify_records_schema_with_fail_on_extra_columns(configured_catalog: ConfiguredAirbyteCatalog, json_schema, record, should_fail): """Test that fail_on_extra_columns works correctly with nested objects, array of objects""" - configured_catalog.streams[0].stream.json_schema =json_schema + configured_catalog.streams[0].stream.json_schema = json_schema records = [AirbyteRecordMessage(stream="my_stream", data=record, emitted_at=0)] streams_with_errors = verify_records_schema(records, configured_catalog, fail_on_extra_columns=True) errors = [error.message for error in streams_with_errors["my_stream"].values()] diff --git a/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_core.py b/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_core.py index c3552717b22fb..3e5dbda69f3ea 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_core.py +++ b/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_core.py @@ -13,10 +13,13 @@ AirbyteMessage, AirbyteRecordMessage, AirbyteStream, + AirbyteStreamStatus, + AirbyteStreamStatusTraceMessage, AirbyteTraceMessage, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, Level, + StreamDescriptor, SyncMode, TraceType, Type, @@ -688,6 +691,7 @@ async def test_read(mocker, schema, ignored_fields, expect_records_config, recor expect_records_config=expect_records_config, should_validate_schema=True, should_validate_data_points=False, + should_validate_stream_statuses=False, should_fail_on_extra_columns=False, empty_streams=set(), expected_records_by_stream=expected_records_by_stream, @@ -743,6 +747,7 @@ async def test_fail_on_extra_columns( expect_records_config=ExpectedRecordsConfig(path="foobar"), should_validate_schema=True, should_validate_data_points=False, + should_validate_stream_statuses=False, should_fail_on_extra_columns=config_fail_on_extra_columns, empty_streams=set(), expected_records_by_stream={}, @@ -758,6 +763,7 @@ async def test_fail_on_extra_columns( expect_records_config=ExpectedRecordsConfig(path="foobar"), should_validate_schema=True, should_validate_data_points=False, + should_validate_stream_statuses=False, should_fail_on_extra_columns=config_fail_on_extra_columns, empty_streams=set(), expected_records_by_stream={}, @@ -1331,22 +1337,258 @@ def test_validate_field_appears_at_least_once(records, configured_catalog, expec t._validate_field_appears_at_least_once(records=records, configured_catalog=configured_catalog) +async def test_read_validate_async_output_stream_statuses(mocker): + configured_catalog = ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=AirbyteStream.parse_obj({"name": f"test_stream_{x}", "json_schema": {}, "supported_sync_modes": ["full_refresh"]}), + sync_mode="full_refresh", + destination_sync_mode="overwrite", + ) + for x in range(3) + ] + ) + async_stream_output = [ + AirbyteMessage( + type=Type.TRACE, + trace=AirbyteTraceMessage( + type=TraceType.STREAM_STATUS, + emitted_at=1, + stream_status=AirbyteStreamStatusTraceMessage( + stream_descriptor=StreamDescriptor(name="test_stream_0"), status=AirbyteStreamStatus.STARTED + ), + ), + ), + AirbyteMessage( + type=Type.TRACE, + trace=AirbyteTraceMessage( + type=TraceType.STREAM_STATUS, + emitted_at=1, + stream_status=AirbyteStreamStatusTraceMessage( + stream_descriptor=StreamDescriptor(name="test_stream_2"), status=AirbyteStreamStatus.STARTED + ), + ), + ), + AirbyteMessage( + type=Type.TRACE, + trace=AirbyteTraceMessage( + type=TraceType.STREAM_STATUS, + emitted_at=1, + stream_status=AirbyteStreamStatusTraceMessage( + stream_descriptor=StreamDescriptor(name="test_stream_1"), status=AirbyteStreamStatus.STARTED + ), + ), + ), + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream_0", data={"a": 1}, emitted_at=111)), + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream_1", data={"a": 1}, emitted_at=112)), + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream_2", data={"a": 1}, emitted_at=113)), + AirbyteMessage( + type=Type.TRACE, + trace=AirbyteTraceMessage( + type=TraceType.STREAM_STATUS, + emitted_at=114, + stream_status=AirbyteStreamStatusTraceMessage( + stream_descriptor=StreamDescriptor(name="test_stream_1"), status=AirbyteStreamStatus.RUNNING + ), + ), + ), + AirbyteMessage( + type=Type.TRACE, + trace=AirbyteTraceMessage( + type=TraceType.STREAM_STATUS, + emitted_at=114, + stream_status=AirbyteStreamStatusTraceMessage( + stream_descriptor=StreamDescriptor(name="test_stream_2"), status=AirbyteStreamStatus.RUNNING + ), + ), + ), + AirbyteMessage( + type=Type.TRACE, + trace=AirbyteTraceMessage( + type=TraceType.STREAM_STATUS, + emitted_at=114, + stream_status=AirbyteStreamStatusTraceMessage( + stream_descriptor=StreamDescriptor(name="test_stream_0"), status=AirbyteStreamStatus.RUNNING + ), + ), + ), + AirbyteMessage( + type=Type.TRACE, + trace=AirbyteTraceMessage( + type=TraceType.STREAM_STATUS, + emitted_at=115, + stream_status=AirbyteStreamStatusTraceMessage( + stream_descriptor=StreamDescriptor(name="test_stream_1"), status=AirbyteStreamStatus.RUNNING + ), + ), + ), + AirbyteMessage( + type=Type.TRACE, + trace=AirbyteTraceMessage( + type=TraceType.STREAM_STATUS, + emitted_at=115, + stream_status=AirbyteStreamStatusTraceMessage( + stream_descriptor=StreamDescriptor(name="test_stream_2"), status=AirbyteStreamStatus.COMPLETE + ), + ), + ), + AirbyteMessage( + type=Type.TRACE, + trace=AirbyteTraceMessage( + type=TraceType.STREAM_STATUS, + emitted_at=116, + stream_status=AirbyteStreamStatusTraceMessage( + stream_descriptor=StreamDescriptor(name="test_stream_1"), status=AirbyteStreamStatus.COMPLETE + ), + ), + ), + AirbyteMessage( + type=Type.TRACE, + trace=AirbyteTraceMessage( + type=TraceType.STREAM_STATUS, + emitted_at=120, + stream_status=AirbyteStreamStatusTraceMessage( + stream_descriptor=StreamDescriptor(name="test_stream_0"), status=AirbyteStreamStatus.COMPLETE + ), + ), + ), + ] + docker_runner_mock = mocker.MagicMock(call_read=mocker.AsyncMock(return_value=async_stream_output)) + + t = test_core.TestBasicRead() + await t.test_read( + connector_config=None, + configured_catalog=configured_catalog, + expect_records_config=ExpectedRecordsConfig(path="foobar"), + should_validate_schema=False, + should_validate_data_points=False, + should_validate_stream_statuses=True, + should_fail_on_extra_columns=False, + empty_streams=set(), + expected_records_by_stream={}, + docker_runner=docker_runner_mock, + ignored_fields=None, + detailed_logger=MagicMock(), + certified_file_based_connector=False, + ) + + @pytest.mark.parametrize( - ("metadata", "expected_file_based"), - ( - ({"data": {"connectorSubtype": "file", "ab_internal": {"ql": 400}}}, True), - ({"data": {"connectorSubtype": "file", "ab_internal": {"ql": 500}}}, True), - ({}, False), - ({"data": {"ab_internal": {}}}, False), - ({"data": {"ab_internal": {"ql": 400}}}, False), - ({"data": {"connectorSubtype": "file"}}, False), - ({"data": {"connectorSubtype": "file", "ab_internal": {"ql": 200}}}, False), - ({"data": {"connectorSubtype": "not_file", "ab_internal": {"ql": 400}}}, False), - ), + "output", + [ + (AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream_0", data={"a": 1}, emitted_at=111)),), + ( + AirbyteMessage( + type=Type.TRACE, + trace=AirbyteTraceMessage( + type=TraceType.STREAM_STATUS, + emitted_at=1, + stream_status=AirbyteStreamStatusTraceMessage( + stream_descriptor=StreamDescriptor(name="test_stream_0"), status=AirbyteStreamStatus.STARTED + ), + ), + ), + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream_0", data={"a": 1}, emitted_at=111)), + ), + ( + AirbyteMessage( + type=Type.TRACE, + trace=AirbyteTraceMessage( + type=TraceType.STREAM_STATUS, + emitted_at=1, + stream_status=AirbyteStreamStatusTraceMessage( + stream_descriptor=StreamDescriptor(name="test_stream_0"), status=AirbyteStreamStatus.STARTED + ), + ), + ), + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream_0", data={"a": 1}, emitted_at=111)), + AirbyteMessage( + type=Type.TRACE, + trace=AirbyteTraceMessage( + type=TraceType.STREAM_STATUS, + emitted_at=2, + stream_status=AirbyteStreamStatusTraceMessage( + stream_descriptor=StreamDescriptor(name="test_stream_0"), status=AirbyteStreamStatus.RUNNING + ), + ), + ), + ), + ( + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream_0", data={"a": 1}, emitted_at=111)), + AirbyteMessage( + type=Type.TRACE, + trace=AirbyteTraceMessage( + type=TraceType.STREAM_STATUS, + emitted_at=2, + stream_status=AirbyteStreamStatusTraceMessage( + stream_descriptor=StreamDescriptor(name="test_stream_0"), status=AirbyteStreamStatus.RUNNING + ), + ), + ), + ), + ( + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream_0", data={"a": 1}, emitted_at=111)), + AirbyteMessage( + type=Type.TRACE, + trace=AirbyteTraceMessage( + type=TraceType.STREAM_STATUS, + emitted_at=2, + stream_status=AirbyteStreamStatusTraceMessage( + stream_descriptor=StreamDescriptor(name="test_stream_0"), status=AirbyteStreamStatus.COMPLETE + ), + ), + ), + ), + ], + ids=["no_statuses", "only_started_present", "only_started_and_running_present", "only_running", "only_complete"], +) +async def test_read_validate_stream_statuses_exceptions(mocker, output): + configured_catalog = ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=AirbyteStream.parse_obj({"name": f"test_stream_0", "json_schema": {}, "supported_sync_modes": ["full_refresh"]}), + sync_mode="full_refresh", + destination_sync_mode="overwrite", + ) + ] + ) + docker_runner_mock = mocker.MagicMock(call_read=mocker.AsyncMock(return_value=output)) + + t = test_core.TestBasicRead() + with pytest.raises(AssertionError): + await t.test_read( + connector_config=None, + configured_catalog=configured_catalog, + expect_records_config=ExpectedRecordsConfig(path="foobar"), + should_validate_schema=False, + should_validate_data_points=False, + should_validate_stream_statuses=True, + should_fail_on_extra_columns=False, + empty_streams=set(), + expected_records_by_stream={}, + docker_runner=docker_runner_mock, + ignored_fields=None, + detailed_logger=MagicMock(), + certified_file_based_connector=False, + ) + + +@pytest.mark.parametrize( + "metadata, expected_file_based, is_connector_certified", + [ + ({"data": {"connectorSubtype": "file", "ab_internal": {"ql": 400}}}, True, True), + ({"data": {"connectorSubtype": "file", "ab_internal": {"ql": 500}}}, True, True), + ({}, False, False), + ({"data": {"ab_internal": {}}}, False, False), + ({"data": {"ab_internal": {"ql": 400}}}, False, False), + ({"data": {"connectorSubtype": "file"}}, False, False), + ({"data": {"connectorSubtype": "file", "ab_internal": {"ql": 200}}}, False, False), + ({"data": {"connectorSubtype": "not_file", "ab_internal": {"ql": 400}}}, False, False), + ], ) -def test_is_certified_file_based_connector(metadata, expected_file_based): +def test_is_certified_file_based_connector(metadata, is_connector_certified, expected_file_based): t = test_core.TestBasicRead() - assert test_core.TestBasicRead.is_certified_file_based_connector.__wrapped__(t, metadata) is expected_file_based + assert test_core.TestBasicRead.is_certified_file_based_connector.__wrapped__(t, metadata, is_connector_certified) is expected_file_based @pytest.mark.parametrize(