Skip to content

Commit

Permalink
SAT: basic read on full catalog when test_strictness_level == high (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
alafanechere committed Nov 7, 2022
1 parent a990d8c commit 7d73b08
Show file tree
Hide file tree
Showing 11 changed files with 385 additions and 72 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.2.16
Run `basic_read` on the discovered catalog in `high` `test_strictness_level`. [#18937](https://github.com/airbytehq/airbyte/pull/18937).

## 0.2.15
Make `expect_records` mandatory in `high` `test_strictness_level`. [#18497](https://github.com/airbytehq/airbyte/pull/18497/).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ COPY pytest.ini setup.py ./
COPY source_acceptance_test ./source_acceptance_test
RUN pip install .

LABEL io.airbyte.version=0.2.15
LABEL io.airbyte.version=0.2.16
LABEL io.airbyte.name=airbyte/source-acceptance-test

ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin", "-r", "fEsx"]
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,20 @@
from typing import Any, List, MutableMapping, Optional, Set

import pytest
from airbyte_cdk.models import (
AirbyteRecordMessage,
AirbyteStream,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
ConnectorSpecification,
DestinationSyncMode,
Type,
)
from airbyte_cdk.models import AirbyteRecordMessage, AirbyteStream, ConfiguredAirbyteCatalog, ConnectorSpecification, Type
from docker import errors
from source_acceptance_test.base import BaseTest
from source_acceptance_test.config import Config, EmptyStreamConfiguration
from source_acceptance_test.config import Config, EmptyStreamConfiguration, ExpectedRecordsConfig
from source_acceptance_test.tests import TestBasicRead
from source_acceptance_test.utils import ConnectorRunner, SecretDict, filter_output, load_config, load_yaml_or_json_path
from source_acceptance_test.utils import (
ConnectorRunner,
SecretDict,
build_configured_catalog_from_custom_catalog,
build_configured_catalog_from_discovered_catalog_and_empty_streams,
filter_output,
load_config,
load_yaml_or_json_path,
)


@pytest.fixture(name="acceptance_test_config", scope="session")
Expand All @@ -44,7 +44,7 @@ def base_path_fixture(pytestconfig, acceptance_test_config) -> Path:


@pytest.fixture(name="test_strictness_level", scope="session")
def test_strictness_level_fixture(acceptance_test_config: Config):
def test_strictness_level_fixture(acceptance_test_config: Config) -> Config.TestStrictnessLevel:
return acceptance_test_config.test_strictness_level


Expand Down Expand Up @@ -75,24 +75,17 @@ def configured_catalog_path_fixture(inputs, base_path) -> Optional[str]:


@pytest.fixture(name="configured_catalog")
def configured_catalog_fixture(configured_catalog_path, discovered_catalog) -> ConfiguredAirbyteCatalog:
"""Take ConfiguredAirbyteCatalog from discover command by default"""
def configured_catalog_fixture(
configured_catalog_path: Optional[str],
discovered_catalog: MutableMapping[str, AirbyteStream],
) -> ConfiguredAirbyteCatalog:
"""Build a configured catalog.
If a configured catalog path is given we build a configured catalog from it, we build it from the discovered catalog otherwise.
"""
if configured_catalog_path:
catalog = ConfiguredAirbyteCatalog.parse_file(configured_catalog_path)
for configured_stream in catalog.streams:
configured_stream.stream = discovered_catalog.get(configured_stream.stream.name, configured_stream.stream)
return catalog
streams = [
ConfiguredAirbyteStream(
stream=stream,
sync_mode=stream.supported_sync_modes[0],
destination_sync_mode=DestinationSyncMode.append,
cursor_field=stream.default_cursor_field,
primary_key=stream.source_defined_primary_key,
)
for _, stream in discovered_catalog.items()
]
return ConfiguredAirbyteCatalog(streams=streams)
return build_configured_catalog_from_custom_catalog(configured_catalog_path, discovered_catalog)
else:
return build_configured_catalog_from_discovered_catalog_and_empty_streams(discovered_catalog, set())


@pytest.fixture(name="image_tag")
Expand Down Expand Up @@ -178,12 +171,17 @@ def empty_streams_fixture(inputs, test_strictness_level) -> Set[EmptyStreamConfi
return empty_streams


@pytest.fixture(name="expect_records_config")
def expect_records_config_fixture(inputs):
return inputs.expect_records


@pytest.fixture(name="expected_records_by_stream")
def expected_records_by_stream_fixture(
test_strictness_level: Config.TestStrictnessLevel,
configured_catalog: ConfiguredAirbyteCatalog,
empty_streams: Set[EmptyStreamConfiguration],
inputs,
expect_records_config: ExpectedRecordsConfig,
base_path,
) -> MutableMapping[str, List[MutableMapping]]:
def enforce_high_strictness_level_rules(expect_records_config, configured_catalog, empty_streams, records_by_stream) -> Optional[str]:
Expand All @@ -197,8 +195,9 @@ def enforce_high_strictness_level_rules(expect_records_config, configured_catalo
error_prefix
+ f"{', '.join(not_seeded_streams)} streams are declared in the catalog but do not have expected records. Please add expected records to {expect_records_config.path} or declare these streams in empty_streams."
)

expect_records_config = inputs.expect_records
else:
if not getattr(expect_records_config, "bypass_reason", None):
pytest.fail(error_prefix / "A bypass reason must be filled if no path to expected records is provided.")

expected_records_by_stream = {}
if expect_records_config:
Expand Down Expand Up @@ -246,7 +245,6 @@ def discovered_catalog_fixture(connector_config, docker_runner: ConnectorRunner,
catalogs = [message.catalog for message in output if message.type == Type.CATALOG]
for stream in catalogs[-1].streams:
cached_schemas[stream.name] = stream

return cached_schemas


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from collections import Counter, defaultdict
from functools import reduce
from logging import Logger
from typing import Any, Dict, List, Mapping, MutableMapping, Set
from typing import Any, Dict, List, Mapping, MutableMapping, Optional, Set
from xmlrpc.client import Boolean

import dpath.util
import jsonschema
Expand All @@ -27,10 +28,23 @@
from docker.errors import ContainerError
from jsonschema._utils import flatten
from source_acceptance_test.base import BaseTest
from source_acceptance_test.config import BasicReadTestConfig, ConnectionTestConfig, DiscoveryTestConfig, SpecTestConfig
from source_acceptance_test.config import (
BasicReadTestConfig,
Config,
ConnectionTestConfig,
DiscoveryTestConfig,
EmptyStreamConfiguration,
ExpectedRecordsConfig,
SpecTestConfig,
)
from source_acceptance_test.utils import ConnectorRunner, SecretDict, filter_output, make_hashable, verify_records_schema
from source_acceptance_test.utils.backward_compatibility import CatalogDiffChecker, SpecDiffChecker, validate_previous_configs
from source_acceptance_test.utils.common import find_all_values_for_key_in_schema, find_keyword_schema
from source_acceptance_test.utils.common import (
build_configured_catalog_from_custom_catalog,
build_configured_catalog_from_discovered_catalog_and_empty_streams,
find_all_values_for_key_in_schema,
find_keyword_schema,
)
from source_acceptance_test.utils.json_schema_helper import JsonSchemaHelper, get_expected_schema_structure, get_object_structure


Expand Down Expand Up @@ -462,11 +476,58 @@ def _validate_expected_records(
detailed_logger=detailed_logger,
)

@pytest.fixture(name="should_validate_schema")
def should_validate_schema_fixture(self, inputs: BasicReadTestConfig, test_strictness_level: Config.TestStrictnessLevel):
if not inputs.validate_schema and test_strictness_level is Config.TestStrictnessLevel.high:
pytest.fail("High strictness level error: validate_schema must be set to true in the basic read test configuration.")
else:
return inputs.validate_schema

@pytest.fixture(name="should_validate_data_points")
def should_validate_data_points_fixture(self, inputs: BasicReadTestConfig) -> Boolean:
# TODO: we might want to enforce this when Config.TestStrictnessLevel.high
return inputs.validate_data_points

@pytest.fixture(name="configured_catalog")
def configured_catalog_fixture(
self,
test_strictness_level: Config.TestStrictnessLevel,
configured_catalog_path: Optional[str],
discovered_catalog: MutableMapping[str, AirbyteStream],
empty_streams: Set[EmptyStreamConfiguration],
) -> ConfiguredAirbyteCatalog:
"""Build a configured catalog for basic read only.
We discard the use of custom configured catalog if:
- No custom configured catalog is declared with configured_catalog_path.
- We are in high test strictness level.
When a custom configured catalog is discarded we use the discovered catalog from which we remove the declared empty streams.
We use a custom configured catalog if a configured_catalog_path is declared and we are not in high test strictness level.
Args:
test_strictness_level (Config.TestStrictnessLevel): The current test strictness level according to the global test configuration.
configured_catalog_path (Optional[str]): Path to a JSON file containing a custom configured catalog.
discovered_catalog (MutableMapping[str, AirbyteStream]): The discovered catalog.
empty_streams (Set[EmptyStreamConfiguration]): The empty streams declared in the test configuration.
Returns:
ConfiguredAirbyteCatalog: the configured Airbyte catalog.
"""
if test_strictness_level is Config.TestStrictnessLevel.high or not configured_catalog_path:
if configured_catalog_path:
pytest.fail(
"High strictness level error: you can't set a custom configured catalog on the basic read test when strictness level is high."
)
return build_configured_catalog_from_discovered_catalog_and_empty_streams(discovered_catalog, empty_streams)
else:
return build_configured_catalog_from_custom_catalog(configured_catalog_path, discovered_catalog)

def test_read(
self,
connector_config,
configured_catalog,
inputs: BasicReadTestConfig,
expect_records_config: ExpectedRecordsConfig,
should_validate_schema: Boolean,
should_validate_data_points: Boolean,
empty_streams: Set[EmptyStreamConfiguration],
expected_records_by_stream: MutableMapping[str, List[MutableMapping]],
docker_runner: ConnectorRunner,
detailed_logger,
Expand All @@ -476,25 +537,25 @@ def test_read(

assert records, "At least one record should be read using provided catalog"

if inputs.validate_schema:
if should_validate_schema:
self._validate_schema(records=records, configured_catalog=configured_catalog)

self._validate_empty_streams(records=records, configured_catalog=configured_catalog, allowed_empty_streams=inputs.empty_streams)
self._validate_empty_streams(records=records, configured_catalog=configured_catalog, allowed_empty_streams=empty_streams)
for pks, record in primary_keys_for_records(streams=configured_catalog.streams, records=records):
for pk_path, pk_value in pks.items():
assert (
pk_value is not None
), f"Primary key subkeys {repr(pk_path)} have null values or not present in {record.stream} stream records."

# TODO: remove this condition after https://github.com/airbytehq/airbyte/issues/8312 is done
if inputs.validate_data_points:
if should_validate_data_points:
self._validate_field_appears_at_least_once(records=records, configured_catalog=configured_catalog)

if expected_records_by_stream:
self._validate_expected_records(
records=records,
expected_records_by_stream=expected_records_by_stream,
flags=inputs.expect_records,
flags=expect_records_config,
detailed_logger=detailed_logger,
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from .asserts import verify_records_schema
from .common import SecretDict, filter_output, full_refresh_only_catalog, incremental_only_catalog, load_config, load_yaml_or_json_path
from .common import (
SecretDict,
build_configured_catalog_from_custom_catalog,
build_configured_catalog_from_discovered_catalog_and_empty_streams,
filter_output,
full_refresh_only_catalog,
incremental_only_catalog,
load_config,
load_yaml_or_json_path,
)
from .compare import diff_dicts, make_hashable
from .connector_runner import ConnectorRunner
from .json_schema_helper import JsonSchemaHelper
Expand All @@ -19,4 +28,6 @@
"diff_dicts",
"make_hashable",
"verify_records_schema",
"build_configured_catalog_from_custom_catalog",
"build_configured_catalog_from_discovered_catalog_and_empty_streams",
]
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
#

import json
import logging
from collections import UserDict
from pathlib import Path
from typing import Iterable, List, Union
from typing import Iterable, List, MutableMapping, Set, Union

import pytest
from yaml import load
Expand All @@ -15,8 +16,15 @@
except ImportError:
from yaml import Loader

from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, SyncMode
from source_acceptance_test.config import Config
from airbyte_cdk.models import (
AirbyteMessage,
AirbyteStream,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
DestinationSyncMode,
SyncMode,
)
from source_acceptance_test.config import Config, EmptyStreamConfiguration


def load_config(path: str) -> Config:
Expand Down Expand Up @@ -111,3 +119,58 @@ def find_all_values_for_key_in_schema(schema: dict, searched_key: str):
yield value
if isinstance(value, dict) or isinstance(value, list):
yield from find_all_values_for_key_in_schema(value, searched_key)


def build_configured_catalog_from_discovered_catalog_and_empty_streams(
discovered_catalog: MutableMapping[str, AirbyteStream], empty_streams: Set[EmptyStreamConfiguration]
):
"""Build a configured catalog from the discovered catalog with empty streams removed.
Args:
discovered_catalog (MutableMapping[str, AirbyteStream]): The discovered catalog.
empty_streams (Set[EmptyStreamConfiguration]): The set of empty streams declared in the test configuration.
Returns:
ConfiguredAirbyteCatalog: a configured Airbyte catalog.
"""
empty_stream_names = [empty_stream.name for empty_stream in empty_streams]
streams = [
ConfiguredAirbyteStream(
stream=stream,
sync_mode=stream.supported_sync_modes[0],
destination_sync_mode=DestinationSyncMode.append,
cursor_field=stream.default_cursor_field,
primary_key=stream.source_defined_primary_key,
)
for _, stream in discovered_catalog.items()
if stream.name not in empty_stream_names
]
if empty_stream_names:
logging.warning(
f"The configured catalog was built with the discovered catalog from which the following empty streams were removed: {', '.join(empty_stream_names)}."
)
else:
logging.info("The configured catalog is built from a fully discovered catalog.")
return ConfiguredAirbyteCatalog(streams=streams)


def build_configured_catalog_from_custom_catalog(configured_catalog_path: str, discovered_catalog: MutableMapping[str, AirbyteStream]):
"""Build a configured catalog from a local one stored in a JSON file.
Args:
configured_catalog_path (str): Local path to a custom configured catalog path
discovered_catalog (MutableMapping[str, AirbyteStream]): The discovered catalog
Returns:
ConfiguredAirbyteCatalog: a configured Airbyte catalog
"""
catalog = ConfiguredAirbyteCatalog.parse_file(configured_catalog_path)
for configured_stream in catalog.streams:
try:
configured_stream.stream = discovered_catalog[configured_stream.stream.name]
except KeyError:
pytest.fail(
f"The {configured_stream.stream.name} stream you have set in {configured_catalog_path} is not part of the discovered_catalog"
)
logging.info("The configured catalog is built from a custom configured catalog.")
return catalog
Loading

0 comments on commit 7d73b08

Please sign in to comment.