Skip to content

Commit

Permalink
Standard Tests: allow specifying exact record matches #2186 (#2960)
Browse files Browse the repository at this point in the history
* records matching

+ extra_fields
+ exact_order
+ extra_records

* fix serialization before comparision

* fix pydantic validation

* fix reporting and enable for hubspot

* format

* update docs

* update examples

* fix template

* Update docs/contributing-to-airbyte/building-new-connector/source-acceptance-tests.md

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>

* Update airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_core.py

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>

* Update docs/contributing-to-airbyte/building-new-connector/source-acceptance-tests.md

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>

Co-authored-by: Eugene Kulak <kulak.eugene@gmail.com>
Co-authored-by: Sherif A. Nada <snadalive@gmail.com>
  • Loading branch information
3 people committed Apr 30, 2021
1 parent e9287e7 commit 7f66569
Show file tree
Hide file tree
Showing 9 changed files with 234 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ tests:
- config_path: "secrets/config.json"
configured_catalog_path: "sample_files/configured_catalog.json"
validate_output_from_all_streams: yes
expect_records:
path: "integration_tests/expected_records.txt"
extra_fields: no
exact_order: no
extra_records: yes
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "sample_files/configured_catalog.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@


from enum import Enum
from pathlib import Path
from typing import List, Mapping, Optional

from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, validator

config_path: str = Field(default="secrets/config.json", description="Path to a JSON object representing a valid connector configuration")
invalid_config_path: str = Field(description="Path to a JSON object representing an invalid connector configuration")
Expand Down Expand Up @@ -58,10 +59,37 @@ class DiscoveryTestConfig(BaseConfig):
configured_catalog_path: Optional[str] = configured_catalog_path


class ExpectedRecordsConfig(BaseModel):
class Config:
extra = "forbid"

path: Path = Field(description="File with expected records")
extra_fields: bool = Field(False, description="Allow records to have other fields")
exact_order: bool = Field(False, description="Ensure that records produced in exact same order")
extra_records: bool = Field(
True, description="Allow connector to produce extra records, but still enforce all records from the expected file to be produced"
)

@validator("exact_order", always=True)
def validate_exact_order(cls, exact_order, values):
if "extra_fields" in values:
if values["extra_fields"] and not exact_order:
raise ValueError("exact_order must by on if extra_fields enabled")
return exact_order

@validator("extra_records", always=True)
def validate_extra_records(cls, extra_records, values):
if "extra_fields" in values:
if values["extra_fields"] and extra_records:
raise ValueError("extra_records must by off if extra_fields enabled")
return extra_records


class BasicReadTestConfig(BaseConfig):
config_path: str = config_path
configured_catalog_path: Optional[str] = configured_catalog_path
validate_output_from_all_streams: bool = Field(False, description="Verify that all streams have records")
expect_records: Optional[ExpectedRecordsConfig] = Field(description="Expected records from the read")


class FullRefreshConfig(BaseConfig):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from typing import Any, List, MutableMapping, Optional

import pytest
from airbyte_protocol import AirbyteCatalog, AirbyteMessage, ConfiguredAirbyteCatalog, ConnectorSpecification
from airbyte_protocol import AirbyteCatalog, AirbyteRecordMessage, ConfiguredAirbyteCatalog, ConnectorSpecification
from source_acceptance_test.config import Config
from source_acceptance_test.utils import ConnectorRunner, SecretDict, load_config

Expand Down Expand Up @@ -132,10 +132,10 @@ def pull_docker_image(acceptance_test_config) -> None:


@pytest.fixture(name="expected_records")
def expected_records_fixture(inputs, base_path) -> List[AirbyteMessage]:
path = getattr(inputs, "expected_records_path")
if not path:
def expected_records_fixture(inputs, base_path) -> List[AirbyteRecordMessage]:
expect_records = getattr(inputs, "expect_records")
if not expect_records:
return []

with open(str(base_path / path)) as f:
return [AirbyteMessage.parse_raw(line) for line in f]
with open(str(base_path / getattr(expect_records, "path"))) as f:
return [AirbyteRecordMessage.parse_raw(line) for line in f]
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
# SOFTWARE.


from collections import Counter
import json
from collections import Counter, defaultdict
from typing import Any, List, Mapping, MutableMapping

import pytest
from airbyte_protocol import ConnectorSpecification, Status, Type
from airbyte_protocol import AirbyteMessage, ConnectorSpecification, Status, Type
from docker.errors import ContainerError
from source_acceptance_test.base import BaseTest
from source_acceptance_test.config import BasicReadTestConfig, ConnectionTestConfig
Expand Down Expand Up @@ -82,7 +84,14 @@ def test_discover(self, connector_config, catalog, docker_runner: ConnectorRunne

@pytest.mark.timeout(300)
class TestBasicRead(BaseTest):
def test_read(self, connector_config, configured_catalog, inputs: BasicReadTestConfig, docker_runner: ConnectorRunner):
def test_read(
self,
connector_config,
configured_catalog,
inputs: BasicReadTestConfig,
expected_records: List[AirbyteMessage],
docker_runner: ConnectorRunner,
):
output = docker_runner.call_read(connector_config, configured_catalog)
records = [message.record for message in output if message.type == Type.RECORD]
counter = Counter(record.stream for record in records)
Expand All @@ -97,3 +106,68 @@ def test_read(self, connector_config, configured_catalog, inputs: BasicReadTestC
assert (
not streams_without_records
), f"All streams should return some records, streams without records: {streams_without_records}"

if expected_records:
actual_by_stream = self.group_by_stream(records)
expected_by_stream = self.group_by_stream(expected_records)
for stream_name, expected in expected_by_stream.items():
actual = actual_by_stream.get(stream_name, [])

self.compare_records(
stream_name=stream_name,
actual=actual,
expected=expected,
extra_fields=inputs.expect_records.extra_fields,
exact_order=inputs.expect_records.exact_order,
extra_records=inputs.expect_records.extra_records,
)

@staticmethod
def remove_extra_fields(record: Any, spec: Any) -> Any:
"""Remove keys from record that spec doesn't have, works recursively"""
if not isinstance(spec, Mapping):
return record

assert isinstance(record, Mapping), "Record or part of it is not a dictionary, but expected record is."
result = {}

for k, v in spec.items():
assert k in record, "Record or part of it doesn't have attribute that has expected record."
result[k] = TestBasicRead.remove_extra_fields(record[k], v)

return result

@staticmethod
def compare_records(stream_name, actual, expected, extra_fields, exact_order, extra_records):
"""Compare records using combination of restrictions"""
if exact_order:
for r1, r2 in zip(expected, actual):
if r1 is None:
assert extra_records, f"Stream {stream_name}: There are more records than expected, but extra_records is off"
break
if extra_fields:
r2 = TestBasicRead.remove_extra_fields(r2, r1)
assert r1 == r2, f"Stream {stream_name}: Mismatch of record order or values"
else:
expected = set(map(TestBasicRead.serialize_record_for_comparison, expected))
actual = set(map(TestBasicRead.serialize_record_for_comparison, actual))
missing_expected = set(expected) - set(actual)

assert not missing_expected, f"Stream {stream_name}: All expected records must be produced"

if not extra_records:
extra_actual = set(actual) - set(expected)
assert not extra_actual, f"Stream {stream_name}: There are more records than expected, but extra_records is off"

@staticmethod
def group_by_stream(records) -> MutableMapping[str, List[MutableMapping]]:
"""Group records by a source stream"""
result = defaultdict(list)
for record in records:
result[record.stream].append(record.data)

return result

@staticmethod
def serialize_record_for_comparison(record: Mapping) -> str:
return json.dumps(record, sort_keys=True)
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
validate_output_from_all_streams: yes
expect_records:
path: "integration_tests/expected_records.txt"
extra_fields: no
exact_order: no
extra_records: yes
incremental: # TODO if your connector does not implement incremental sync, remove this block
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# See [Source Acceptance Tests](https://docs.airbyte.io/contributing-to-airbyte/building-new-connector/source-acceptance-tests.md)
# for more information about how to configure these tests
connector_image: airbyte/{{dashCase name}}
connector_image: airbyte/{{dashCase name}}-singer
tests:
spec:
- spec_path: "source_{{snakeCase name}}/spec.json"
Expand All @@ -15,6 +15,11 @@ tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
validate_output_from_all_streams: yes
expect_records:
path: "integration_tests/expected_records.txt"
extra_fields: no
exact_order: no
extra_records: yes
incremental: # TODO if your connector does not implement incremental sync, remove this block
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ tests:
- config_path: "secrets/config.json"
configured_catalog_path: "sample_files/configured_catalog.json"
validate_output_from_all_streams: yes
expect_records:
path: "integration_tests/expected_records.txt"
extra_fields: yes
exact_order: yes
extra_records: no
# incremental: fixme (eugene): '<=' not supported between instances of 'int' and 'str'
# - config_path: "secrets/config.json"
# configured_catalog_path: "sample_files/configured_catalog.json"
Expand Down
Loading

0 comments on commit 7f66569

Please sign in to comment.