diff --git a/airbyte_cdk/test/models/scenario.py b/airbyte_cdk/test/models/scenario.py index 2704f6e96..994ddb604 100644 --- a/airbyte_cdk/test/models/scenario.py +++ b/airbyte_cdk/test/models/scenario.py @@ -1,8 +1,107 @@ # Copyright (c) 2024 Airbyte, Inc., all rights reserved. -"""Run acceptance tests in PyTest. - -These tests leverage the same `acceptance-test-config.yml` configuration files as the -acceptance tests in CAT, but they run in PyTest instead of CAT. This allows us to run +"""Models to define test scenarios (smoke tests) which leverage the Standard Tests framework. + +NOTE: The `acceptance-test-config.yml` file has been deprecated in favor of the new format. + +Connector smoke tests should be defined in the connector's `metadata.yaml` file, under the +`connectorTestSuitesOptions` section, as shown in the example below: + +## Basic Configuration of Scenarios + +```yaml +data: + # ... + connectorTestSuitesOptions: + # ... + - suite: smokeTests + scenarios: + - name: default + config_file: secrets/config_oauth.json + - name: invalid_config + config_file: integration_tests/invalid_config.json + expect_failure: true +``` + +You can also specify config settings inline, instead of using a config file: + +```yaml +data: + # ... + connectorTestSuitesOptions: + # ... + - suite: smokeTests + scenarios: + - name: default + config_file: secrets/config_oauth.json + config_settings: + # This will override any matching settings in `config_file`: + start_date: "2025-01-01T00:00:00Z" + - name: invalid_config + # No config file needed if using fully hard-coded settings: + config_settings: + client_id: invalid + client_secret: invalid +``` + +## Streams Filtering + +There are several ways to filter which streams are read during a test scenario: + +- `only_streams`: A list of stream names to include in the scenario. +- `exclude_streams`: A list of stream names to exclude from the scenario. +- `suggested_streams_only`: A boolean indicating whether to limit to the connector's suggested + streams list, if present. (Looks for `data.suggestedStreams` field in `metadata.yaml`.) + +### Stream Filtering Examples + +Filter for just one stream: + +```yaml +data: + # ... + connectorTestSuitesOptions: + # ... + - suite: smokeTests + scenarios: + - name: default + config_file: secrets/config_oauth.json + only_streams: + - users +``` + +Exclude a set of premium or restricted streams: + +```yaml +data: + # ... + connectorTestSuitesOptions: + # ... + - suite: smokeTests + scenarios: + - name: default # exclude premium streams + exclude_streams: + - premium_users + - restricted_users +``` + +Filter to just the suggested streams, minus a specific excluded streams: + +```yaml +data: + # ... + connectorTestSuitesOptions: + # ... + - suite: smokeTests + scenarios: + - name: default # suggested streams, except restricted_users + suggested_streams_only: true + exclude_streams: + - restricted_users + +## Legacy Configuration + +For legacy purposes, these tests can leverage the same `acceptance-test-config.yml` configuration +files as the acceptance tests in CAT, but they run in PyTest instead of CAT. This allows us to run the acceptance tests in the same local environment as we are developing in, speeding up iteration cycles. """ @@ -11,12 +110,13 @@ import json import tempfile +from collections.abc import Callable, Generator from contextlib import contextmanager, suppress from pathlib import Path # noqa: TC003 # Pydantic needs this (don't move to 'if typing' block) from typing import TYPE_CHECKING, Any, Literal, cast import yaml -from pydantic import BaseModel, ConfigDict +from pydantic import BaseModel, ConfigDict, Field from airbyte_cdk.test.models.outcome import ExpectedOutcome @@ -24,45 +124,191 @@ from collections.abc import Generator +_LEGACY_FAILURE_STATUSES: set[str] = {"failed", "exception"} + + class ConnectorTestScenario(BaseModel): - """Acceptance test scenario, as a Pydantic model. + """Smoke test scenario, as a Pydantic model.""" - This class represents an acceptance test scenario, which is a single test case - that can be run against a connector. It is used to deserialize and validate the - acceptance test configuration file. + name: str = Field(kw_only=True) + """What to call this scenario in test reports. + + Common names include: + - "default": the default scenario for a connector, with a valid config. + - "invalid_config": a scenario with an invalid config, to test error handling. + - "oauth_config": a scenario that uses OAuth for authentication. """ - # Allows the class to be hashable, which PyTest will require - # when we use to parameterize tests. - model_config = ConfigDict(frozen=True) + config_file: Path | None = Field(kw_only=True, default=None) + """Relative path to the config file to use for this scenario.""" - class AcceptanceTestExpectRecords(BaseModel): - path: Path - exact_order: bool = False + config_settings: dict[str, Any] | None = Field(default=None, kw_only=True) + """Optional dictionary of config settings to use for this scenario. - class AcceptanceTestFileTypes(BaseModel): - skip_test: bool - bypass_reason: str + If both `config_settings` and `config_file` are provided, keys in `config_settings` take precedence over + corresponding settings within `config_file`. This allows a single secrets file to be used for multiple scenarios, + with scenario-specific overrides applied as needed. + """ - class AcceptanceTestEmptyStream(BaseModel): - name: str - bypass_reason: str | None = None + expect_failure: bool = Field(default=False, kw_only=True) + """Whether the scenario is expected to fail.""" - # bypass reason does not affect equality - def __hash__(self) -> int: - return hash(self.name) + only_streams: list[str] | None = Field(default=None, kw_only=True) + """List of stream names to include in the scenario.""" - config_path: Path | None = None - config_dict: dict[str, Any] | None = None + exclude_streams: list[str] | None = Field(default=None, kw_only=True) + """List of stream names to exclude from the scenario.""" - _id: str | None = None # Used to override the default ID generation + suggested_streams_only: bool = Field(default=False, kw_only=True) + """Whether to limit to the connector's suggested streams list, if present.""" - configured_catalog_path: Path | None = None - empty_streams: list[AcceptanceTestEmptyStream] | None = None - timeout_seconds: int | None = None - expect_records: AcceptanceTestExpectRecords | None = None - file_types: AcceptanceTestFileTypes | None = None - status: Literal["succeed", "failed", "exception"] | None = None + suggested_streams: list[str] | None = Field(default=None, kw_only=True) + """List of suggested stream names for the connector (if provided).""" + + configured_catalog_path: Path | None = Field(default=None, kw_only=True) + """Path to the configured catalog file for the scenario.""" + + def get_streams_filter( + self, + ) -> Callable[[str], bool]: + """Return a function that filters streams based on the scenario's only_streams and exclude_streams. + + If neither only_streams nor exclude_streams are set, return None. + """ + + def filter_fn(stream_name: str) -> bool: + if self.only_streams is not None and stream_name not in self.only_streams: + # Stream is not in the `only_streams` list, exclude it. + return False + + if self.exclude_streams is not None and stream_name in self.exclude_streams: + # Stream is in the `exclude_streams` list, exclude it. + return False + + # No exclusion reason found, include the stream. + return True + + return filter_fn + + @classmethod + def from_metadata_yaml(cls, metadata_yaml: Path) -> list[ConnectorTestScenario] | None: + """Return a list of scenarios defined within a metadata YAML file. + + Example `metadata_yaml` content: + ```yaml + + scenarios: + data: + # ... + connectorTestSuitesOptions: + # ... + - suite: smokeTests + scenarios: + - name: default + config_file: secrets/config_oauth.json + - name: invalid_config + config_file: integration_tests/invalid_config.json + expect_failure: true + ``` + + This simpler config replaces the legacy `acceptance-test-config.yml` file for + defining smoke test (previously called "Acceptance Test") scenarios for a connector. + + Returns: + - None if the `smokeTests` suite is not defined in the metadata. + - An empty list if the `smokeTests` suite is defined but has no scenarios. + - A list of `ConnectorTestScenario` instances if the `smokeTests` suite is defined + and has scenarios. + """ + metadata_data = yaml.safe_load(metadata_yaml.read_text()).get("data", {}) + if not metadata_data: + raise ValueError(f"Metadata YAML file {metadata_yaml} is missing 'data' section.") + + connector_test_suite_options = metadata_data.get("connectorTestSuitesOptions", []) + smoke_test_config: dict[str, Any] | None = next( + ( + option + for option in connector_test_suite_options + if option.get("suite") == "smokeTests" + ), + None, + ) + if smoke_test_config is None: + # Return `None` because the `smokeTests` suite is fully undefined. + return None + + suggested_streams = metadata_data.get("suggestedStreams") + + result: list[ConnectorTestScenario] = [] + + return [ + cls.from_metadata_smoke_test_definition( + definition=scenario, + connector_root=metadata_yaml.parent, + suggested_streams=suggested_streams, + ) + for scenario in smoke_test_config.get("scenarios", []) + ] + + @classmethod + def from_metadata_smoke_test_definition( + cls, + definition: dict[str, Any], + connector_root: Path, + suggested_streams: list[str] | None, + ) -> ConnectorTestScenario: + """Return a scenario defined within a smoke test definition. + + Example `definition` content: + ```yaml + name: default + config_file: secrets/config_oauth.json + expect_failure: true + ``` + + This simpler config replaces the legacy `acceptance-test-config.yml` file for + defining smoke test (previously called "Acceptance Test") scenarios for a connector. + """ + if "config_file" not in definition: + raise ValueError("Smoke test scenario definition must include a 'config_file' field.") + if "name" not in definition: + raise ValueError("Smoke test scenario definition must include a 'name' field.") + + return ConnectorTestScenario.model_validate({ + **definition, + "suggested_streams": suggested_streams, + }) + + def with_expecting_failure(self) -> ConnectorTestScenario: + """Return a copy of the scenario that expects failure. + + This is useful when deriving new scenarios from existing ones. + """ + if self.expect_failure is True: + return self + + return ConnectorTestScenario( + **self.model_dump(exclude={"expect_failure"}), + expect_failure=True, + ) + + def with_expecting_success(self) -> ConnectorTestScenario: + """Return a copy of the scenario that expects success. + + This is useful when deriving new scenarios from existing ones. + """ + if self.expect_failure is False: + return self + + return ConnectorTestScenario( + **self.model_dump(exclude={"expect_failure"}), + expect_failure=False, + ) + + @property + def requires_creds(self) -> bool: + """Return True if the scenario requires credentials to run.""" + return bool(self.config_file and "secrets" in self.config_file.parts) def get_config_dict( self, @@ -79,50 +325,27 @@ def get_config_dict( - return an empty dictionary if `empty_if_missing` is True - raise a ValueError if `empty_if_missing` is False """ - if self.config_dict is not None: - return self.config_dict + if not empty_if_missing and self.config_file is None and self.config_settings is None: + raise ValueError("No config dictionary or path provided.") - if self.config_path is not None: - config_path = self.config_path + result: dict[str, Any] = {} + if self.config_file is not None: + config_path = self.config_file if not config_path.is_absolute(): # We usually receive a relative path here. Let's resolve it. - config_path = (connector_root / self.config_path).resolve().absolute() + config_path = (connector_root / self.config_file).resolve().absolute() - return cast( - dict[str, Any], - yaml.safe_load(config_path.read_text()), + result.update( + cast( + dict[str, Any], + yaml.safe_load(config_path.read_text()), + ) ) - if empty_if_missing: - return {} - - raise ValueError("No config dictionary or path provided.") - - @property - def expected_outcome(self) -> ExpectedOutcome: - """Whether the test scenario expects an exception to be raised. - - Returns True if the scenario expects an exception, False if it does not, - and None if there is no set expectation. - """ - return ExpectedOutcome.from_status_str(self.status) - - @property - def id(self) -> str: - """Return a unique identifier for the test scenario. - - This is used by PyTest to identify the test scenario. - """ - if self._id: - return self._id + if self.config_settings is not None: + result.update(self.config_settings) - if self.config_path: - return self.config_path.stem - - return str(hash(self)) - - def __str__(self) -> str: - return f"'{self.id}' Test Scenario" + return result @contextmanager def with_temp_config_file( @@ -152,42 +375,87 @@ def with_temp_config_file( with suppress(OSError): temp_path.unlink() - def without_expected_outcome(self) -> ConnectorTestScenario: - """Return a copy of the scenario that does not expect failure or success. - This is useful when running multiple steps, to defer the expectations to a later step. +class LegacyAcceptanceTestScenario(BaseModel): + """Legacy acceptance test scenario, as a Pydantic model. + + This class represents an acceptance test scenario, which is a single test case + that can be run against a connector. It is used to deserialize and validate the + acceptance test configuration file. + """ + + # Allows the class to be hashable, which PyTest will require + # when we use to parameterize tests. + model_config = ConfigDict(frozen=True) + + class AcceptanceTestExpectRecords(BaseModel): + path: Path + exact_order: bool = False + + class AcceptanceTestFileTypes(BaseModel): + skip_test: bool + bypass_reason: str + + class AcceptanceTestEmptyStream(BaseModel): + name: str + bypass_reason: str | None = None + + # bypass reason does not affect equality + def __hash__(self) -> int: + return hash(self.name) + + config_path: Path | None = None + config_dict: dict[str, Any] | None = None + + _id: str | None = None # Used to override the default ID generation + + configured_catalog_path: Path | None = None + empty_streams: list[AcceptanceTestEmptyStream] | None = None + timeout_seconds: int | None = None + expect_records: AcceptanceTestExpectRecords | None = None + file_types: AcceptanceTestFileTypes | None = None + status: Literal["succeed", "failed", "exception"] | None = None + + @property + def expected_outcome(self) -> ExpectedOutcome: + """Whether the test scenario expects an exception to be raised. + + Returns True if the scenario expects an exception, False if it does not, + and None if there is no set expectation. """ - return ConnectorTestScenario( - **self.model_dump(exclude={"status"}), - ) + return ExpectedOutcome.from_status_str(self.status) - def with_expecting_failure(self) -> ConnectorTestScenario: - """Return a copy of the scenario that expects failure. + @property + def id(self) -> str: + """Return a unique identifier for the test scenario. - This is useful when deriving new scenarios from existing ones. + This is used by PyTest to identify the test scenario. """ - if self.status == "failed": - return self + if self._id: + return self._id - return ConnectorTestScenario( - **self.model_dump(exclude={"status"}), - status="failed", - ) + if self.config_path: + return self.config_path.stem - def with_expecting_success(self) -> ConnectorTestScenario: - """Return a copy of the scenario that expects success. + return str(hash(self)) - This is useful when deriving new scenarios from existing ones. + def __str__(self) -> str: + return f"'{self.id}' Test Scenario" + + def as_test_scenario(self) -> ConnectorTestScenario: + """Return a ConnectorTestScenario representation of this scenario. + + This is useful when we want to run the same scenario as both a legacy acceptance test + and a smoke test. """ - if self.status == "succeed": - return self + if not self.config_path: + raise ValueError("Cannot convert to ConnectorTestScenario without a config_path.") return ConnectorTestScenario( - **self.model_dump(exclude={"status"}), - status="succeed", + name=self.id, + config_file=self.config_path, + config_settings=self.config_dict, + exclude_streams=[s.name for s in self.empty_streams or []], + expect_failure=bool(self.status and self.status in _LEGACY_FAILURE_STATUSES), + configured_catalog_path=self.configured_catalog_path, ) - - @property - def requires_creds(self) -> bool: - """Return True if the scenario requires credentials to run.""" - return bool(self.config_path and "secrets" in self.config_path.parts) diff --git a/airbyte_cdk/test/standard_tests/_job_runner.py b/airbyte_cdk/test/standard_tests/_job_runner.py index ddd293ffa..a52c7b87c 100644 --- a/airbyte_cdk/test/standard_tests/_job_runner.py +++ b/airbyte_cdk/test/standard_tests/_job_runner.py @@ -44,7 +44,11 @@ def run_test_job( ) -> entrypoint_wrapper.EntrypointOutput: """Run a test scenario from provided CLI args and return the result.""" # Use default (empty) scenario if not provided: - test_scenario = test_scenario or ConnectorTestScenario() + test_scenario = test_scenario or ConnectorTestScenario( + name="default-scenario", + config_file=None, + config_settings={}, + ) if not connector: raise ValueError("Connector is required") @@ -102,9 +106,8 @@ def run_test_job( result: entrypoint_wrapper.EntrypointOutput = entrypoint_wrapper._run_command( # noqa: SLF001 # Non-public API source=connector_obj, # type: ignore [arg-type] args=args, - expected_outcome=test_scenario.expected_outcome, ) - if result.errors and test_scenario.expected_outcome.expect_success(): + if result.errors and not test_scenario.expect_failure: raise result.as_exception() if verb == "check": @@ -117,7 +120,7 @@ def run_test_job( + "\n".join([str(msg) for msg in result.connection_status_messages]) + result.get_formatted_error_message() ) - if test_scenario.expected_outcome.expect_exception(): + if test_scenario.expect_failure: conn_status = result.connection_status_messages[0].connectionStatus assert conn_status, ( "Expected CONNECTION_STATUS message to be present. Got: \n" @@ -131,13 +134,7 @@ def run_test_job( return result # For all other verbs, we assert check that an exception is raised (or not). - if test_scenario.expected_outcome.expect_exception(): - if not result.errors: - raise AssertionError("Expected exception but got none.") - - return result - - if test_scenario.expected_outcome.expect_success(): + if not test_scenario.expect_failure: assert not result.errors, ( f"Test job failed with {len(result.errors)} error(s): \n" + result.get_formatted_error_message() diff --git a/airbyte_cdk/test/standard_tests/declarative_sources.py b/airbyte_cdk/test/standard_tests/declarative_sources.py index 18a2a5910..f20707fcf 100644 --- a/airbyte_cdk/test/standard_tests/declarative_sources.py +++ b/airbyte_cdk/test/standard_tests/declarative_sources.py @@ -73,7 +73,11 @@ def create_connector( Subclasses should not need to override this method. """ - scenario = scenario or ConnectorTestScenario() # Use default (empty) scenario if None + scenario = scenario or ConnectorTestScenario( + name="default-scenario", + config_file=None, + config_settings={}, + ) # Use default (empty) scenario if None manifest_dict = yaml.safe_load(cls.manifest_yaml_path.read_text()) config = { "__injected_manifest": manifest_dict, diff --git a/airbyte_cdk/test/standard_tests/docker_base.py b/airbyte_cdk/test/standard_tests/docker_base.py index bc26802dd..674f065cd 100644 --- a/airbyte_cdk/test/standard_tests/docker_base.py +++ b/airbyte_cdk/test/standard_tests/docker_base.py @@ -1,6 +1,5 @@ # Copyright (c) 2024 Airbyte, Inc., all rights reserved. """Base class for connector test suites.""" - from __future__ import annotations import inspect @@ -27,8 +26,10 @@ from airbyte_cdk.models.connector_metadata import MetadataFile from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput from airbyte_cdk.test.models import ConnectorTestScenario +from airbyte_cdk.test.models.scenario import LegacyAcceptanceTestScenario from airbyte_cdk.utils.connector_paths import ( ACCEPTANCE_TEST_CONFIG, + METADATA_YAML, find_connector_root, ) from airbyte_cdk.utils.docker import ( @@ -84,6 +85,17 @@ def acceptance_test_config(cls) -> Any: ) return tests_config + @classproperty + def connector_metadata_yaml_path(cls) -> Path: + """Get the path to the connector metadata file.""" + connector_metadata_path = cls.get_connector_root_dir() / METADATA_YAML + if not connector_metadata_path.exists(): + raise FileNotFoundError( + f"Connector metadata file not found at: {str(connector_metadata_path)}" + ) + + return connector_metadata_path + @staticmethod def _dedup_scenarios(scenarios: list[ConnectorTestScenario]) -> list[ConnectorTestScenario]: """ @@ -99,14 +111,14 @@ def _dedup_scenarios(scenarios: list[ConnectorTestScenario]) -> list[ConnectorTe for scenario in scenarios: for existing_scenario in deduped_scenarios: - if scenario.config_path == existing_scenario.config_path: + if scenario.config_file == existing_scenario.config_file: # If a scenario with the same config_path already exists, we merge the empty streams. # scenarios are immutable, so we create a new one. - all_empty_streams = (existing_scenario.empty_streams or []) + ( - scenario.empty_streams or [] + all_skipped_streams = (existing_scenario.exclude_streams or []) + ( + scenario.exclude_streams or [] ) merged_scenario = existing_scenario.model_copy( - update={"empty_streams": list(set(all_empty_streams))} + update={"exclude_streams": list(set(all_skipped_streams))} ) deduped_scenarios.remove(existing_scenario) deduped_scenarios.append(merged_scenario) @@ -125,6 +137,16 @@ def get_scenarios( This has to be a separate function because pytest does not allow parametrization of fixtures with arguments from the test class itself. """ + connector_metadata_path = cls.connector_metadata_yaml_path + result: list[ConnectorTestScenario] | None = ConnectorTestScenario.from_metadata_yaml( + connector_metadata_path + ) + if result is not None: + # A non-null result indicates that the `smokeTests` key is present in metadata.yaml. + # Note: Modern smoke test configs do not need to be deduped. + return result + + # Else, look for a legacy `acceptance-test-config.yml` file. try: all_tests_config = cls.acceptance_test_config except FileNotFoundError as e: @@ -154,9 +176,9 @@ def get_scenarios( # We skip iam_role tests for now, as they are not supported in the test suite. continue - scenario = ConnectorTestScenario.model_validate(test) + legacy_scenario = LegacyAcceptanceTestScenario.model_validate(test) - test_scenarios.append(scenario) + test_scenarios.append(legacy_scenario.as_test_scenario()) deduped_test_scenarios = cls._dedup_scenarios(test_scenarios) @@ -216,7 +238,7 @@ def test_docker_image_build_and_check( - In the rare case that image caches need to be cleared, please clear the local docker image cache using `docker image prune -a` command. """ - if scenario.expected_outcome.expect_exception(): + if scenario.expect_failure: pytest.skip("Skipping test_docker_image_build_and_check (expected to fail).") tag = "dev-latest" @@ -277,7 +299,7 @@ def test_docker_image_build_and_read( if self.is_destination_connector(): pytest.skip("Skipping read test for destination connector.") - if scenario.expected_outcome.expect_exception(): + if scenario.expect_failure: pytest.skip("Skipping (expected to fail).") if read_from_streams == "none": @@ -290,15 +312,15 @@ def test_docker_image_build_and_read( if read_scenarios == "all": pass elif read_scenarios == "default": - if scenario.id not in default_scenario_ids: + if scenario.name not in default_scenario_ids: pytest.skip( - f"Skipping read test for scenario '{scenario.id}' " + f"Skipping read test for scenario '{scenario.name}' " f"(not in default scenarios list '{default_scenario_ids}')." ) - elif scenario.id not in read_scenarios: + elif scenario.name not in read_scenarios: # pytest.skip( raise ValueError( - f"Skipping read test for scenario '{scenario.id}' " + f"Skipping read test for scenario '{scenario.name}' " f"(not in --read-scenarios={read_scenarios})." ) @@ -363,11 +385,6 @@ def test_docker_image_build_and_read( # If `read_from_streams` is a list, we filter the discovered streams. streams_list = list(set(streams_list) & set(read_from_streams)) - if scenario.empty_streams: - # Filter out streams marked as empty in the scenario. - empty_stream_names = [stream.name for stream in scenario.empty_streams] - streams_list = [s for s in streams_list if s.name not in empty_stream_names] - configured_catalog: ConfiguredAirbyteCatalog = ConfiguredAirbyteCatalog( streams=[ ConfiguredAirbyteStream( @@ -380,7 +397,7 @@ def test_docker_image_build_and_read( destination_sync_mode=DestinationSyncMode.append, ) for stream in discovered_catalog.streams - if stream.name in streams_list + if scenario.get_streams_filter()(stream.name) ] ) configured_catalog_path = temp_dir / "catalog.json" diff --git a/airbyte_cdk/test/standard_tests/source_base.py b/airbyte_cdk/test/standard_tests/source_base.py index faecb03c7..eebed0c81 100644 --- a/airbyte_cdk/test/standard_tests/source_base.py +++ b/airbyte_cdk/test/standard_tests/source_base.py @@ -7,13 +7,11 @@ import pytest from airbyte_cdk.models import ( - AirbyteMessage, AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, DestinationSyncMode, SyncMode, - Type, ) from airbyte_cdk.test.models import ( ConnectorTestScenario, @@ -61,11 +59,11 @@ def test_discover( scenario: ConnectorTestScenario, ) -> None: """Standard test for `discover`.""" - if scenario.expected_outcome.expect_exception(): + if scenario.expect_failure: # If the scenario expects an exception, we can't ensure it specifically would fail # in discover, because some discover implementations do not need to make a connection. # We skip this test in that case. - pytest.skip("Skipping discover test for scenario that expects an exception.") + pytest.skip("Skipping `discover` test for scenario that expects an exception.") return run_test_job( @@ -115,18 +113,13 @@ def test_basic_read( self.create_connector(scenario), "discover", connector_root=self.get_connector_root_dir(), - test_scenario=scenario.without_expected_outcome(), + test_scenario=scenario, ) - if scenario.expected_outcome.expect_exception() and discover_result.errors: + if scenario.expect_failure and discover_result.errors: # Failed as expected; we're done. return - streams = discover_result.catalog.catalog.streams # type: ignore [reportOptionalMemberAccess, union-attr] - - if scenario.empty_streams: - # Filter out streams marked as empty in the scenario. - empty_stream_names = [stream.name for stream in scenario.empty_streams] - streams = [s for s in streams if s.name not in empty_stream_names] + streams = discover_result.catalog.catalog.streams # type: ignore [reportOptionalMemberAccess, union-attr] configured_catalog = ConfiguredAirbyteCatalog( streams=[ ConfiguredAirbyteStream( @@ -135,6 +128,7 @@ def test_basic_read( destination_sync_mode=DestinationSyncMode.append_dedup, ) for stream in streams + if scenario.get_streams_filter()(stream.name) ] ) result = run_test_job( @@ -145,7 +139,7 @@ def test_basic_read( catalog=configured_catalog, ) - if scenario.expected_outcome.expect_success() and not result.records: + if scenario.expect_failure and not result.records: raise AssertionError("Expected records but got none.") def test_fail_read_with_bad_catalog(