Skip to content

Commit

Permalink
SAT: Add validation of data types in the output records #3253 (#4345)
Browse files Browse the repository at this point in the history
* validate that records match the schema from the catalog

Co-authored-by: Eugene Kulak <kulak.eugene@gmail.com>
Co-authored-by: Vadym <vege1wgw@gmail.com>
  • Loading branch information
3 people committed Jul 5, 2021
1 parent f4ceebe commit 15c8f8a
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## 0.1.9
Add configurable validation of schema for all records in BasicRead test: https://github.com/airbytehq/airbyte/pull/4345
The validation is ON by default.
To disable validation for the source you need to set `validate_schema: off` in the config file.

## 0.1.8
Fix cursor_path to support nested and absolute paths: https://github.com/airbytehq/airbyte/pull/4552

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ COPY setup.py ./
COPY pytest.ini ./
RUN pip install .

LABEL io.airbyte.version=0.1.8
LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.name=airbyte/source-acceptance-test

ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin"]
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class BasicReadTestConfig(BaseConfig):
configured_catalog_path: Optional[str] = configured_catalog_path
validate_output_from_all_streams: bool = Field(False, description="Verify that all streams have records")
expect_records: Optional[ExpectedRecordsConfig] = Field(description="Expected records from the read")
validate_schema: bool = Field(True, description="Ensure that records match the schema of the corresponding stream")
timeout_seconds: int = timeout_seconds


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
# SOFTWARE.
#


import logging
from collections import Counter, defaultdict
from functools import reduce
from typing import Any, List, Mapping, MutableMapping
Expand All @@ -32,19 +32,31 @@
from docker.errors import ContainerError
from source_acceptance_test.base import BaseTest
from source_acceptance_test.config import BasicReadTestConfig, ConnectionTestConfig
from source_acceptance_test.utils import ConnectorRunner, serialize
from source_acceptance_test.utils import ConnectorRunner, serialize, verify_records_schema


@pytest.mark.default_timeout(10)
class TestSpec(BaseTest):
def test_spec(self, connector_spec: ConnectorSpecification, docker_runner: ConnectorRunner):
def test_match_expected(self, connector_spec: ConnectorSpecification, docker_runner: ConnectorRunner):
output = docker_runner.call_spec()
spec_messages = [message for message in output if message.type == Type.SPEC]

assert len(spec_messages) == 1, "Spec message should be emitted exactly once"
if connector_spec:
assert spec_messages[0].spec == connector_spec, "Spec should be equal to the one in spec.json file"

def test_required(self):
"""Check that connector will fail if any required field is missing"""

def test_optional(self):
"""Check that connector can work without any optional field"""

def test_has_secret(self):
"""Check that spec has a secret. Not sure if this should be always the case"""

def test_secret_never_in_the_output(self):
"""This test should be injected into any docker command it needs to know current config and spec"""


@pytest.mark.default_timeout(30)
class TestConnection(BaseTest):
Expand Down Expand Up @@ -98,7 +110,7 @@ def primary_keys_for_records(streams, records):
yield pk_values, stream_record


@pytest.mark.default_timeout(300)
@pytest.mark.default_timeout(5 * 60)
class TestBasicRead(BaseTest):
def test_read(
self,
Expand All @@ -112,6 +124,16 @@ def test_read(
records = [message.record for message in output if message.type == Type.RECORD]
counter = Counter(record.stream for record in records)

if inputs.validate_schema:
streams_with_errors = set()
for record, errors in verify_records_schema(records, configured_catalog):
if record.stream not in streams_with_errors:
logging.error(f"The {record.stream} stream has the following schema errors: {errors}")
streams_with_errors.add(record.stream)

if streams_with_errors:
pytest.fail(f"Please check your json_schema in selected streams {streams_with_errors}.")

all_streams = set(stream.stream.name for stream in configured_catalog.streams)
streams_with_records = set(counter.keys())
streams_without_records = all_streams - streams_with_records
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .asserts import verify_records_schema
from .common import SecretDict, filter_output, full_refresh_only_catalog, incremental_only_catalog, load_config
from .compare import diff_dicts, serialize
from .connector_runner import ConnectorRunner
Expand All @@ -13,4 +14,5 @@
"ConnectorRunner",
"diff_dicts",
"serialize",
"verify_records_schema",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#

import logging
from typing import Iterator, List, Tuple

from airbyte_cdk.models import AirbyteRecordMessage, ConfiguredAirbyteCatalog
from jsonschema import Draft4Validator, ValidationError


def verify_records_schema(
records: List[AirbyteRecordMessage], catalog: ConfiguredAirbyteCatalog
) -> Iterator[Tuple[AirbyteRecordMessage, List[ValidationError]]]:
"""Check records against their schemas from the catalog, yield error messages.
Only first record with error will be yielded for each stream.
"""
validators = {}
for stream in catalog.streams:
validators[stream.stream.name] = Draft4Validator(stream.stream.json_schema)

for record in records:
validator = validators.get(record.stream)
if not validator:
logging.error(f"Record from the {record.stream} stream that is not in the catalog.")
continue

errors = list(validator.iter_errors(record.data))
if errors:
yield record, sorted(errors, key=str)
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#

import pytest
from airbyte_cdk.models import (
AirbyteRecordMessage,
AirbyteStream,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
DestinationSyncMode,
SyncMode,
)
from source_acceptance_test.utils.asserts import verify_records_schema


@pytest.fixture(name="record_schema")
def record_schema_fixture():
return {
"properties": {
"text_or_null": {"type": ["null", "string"]},
"number_or_null": {"type": ["null", "number"]},
"text": {"type": ["string"]},
"number": {"type": ["number"]},
},
"type": ["null", "object"],
}


@pytest.fixture(name="configured_catalog")
def catalog_fixture(record_schema) -> ConfiguredAirbyteCatalog:
stream = ConfiguredAirbyteStream(
stream=AirbyteStream(name="my_stream", json_schema=record_schema),
sync_mode=SyncMode.full_refresh,
destination_sync_mode=DestinationSyncMode.append,
)

return ConfiguredAirbyteCatalog(streams=[stream])


def test_verify_records_schema(configured_catalog: ConfiguredAirbyteCatalog):
"""Test that correct records returned as records with errors, and verify specific error messages"""
records = [
{
"text_or_null": 123, # wrong format
"number_or_null": 10.3,
"text": "text",
"number": "text", # wrong format
},
{
"text_or_null": "test",
"number_or_null": None,
"text": None, # wrong value
"number": None, # wrong value
},
{
"text_or_null": None,
"number_or_null": None,
"text": "text",
"number": 77,
},
{
"text_or_null": None,
"number_or_null": None,
"text": "text",
"number": "text", # wrong format
},
]

records = [AirbyteRecordMessage(stream="my_stream", data=record, emitted_at=0) for record in records]

records_with_errors, record_errors = zip(*verify_records_schema(records, configured_catalog))
errors = [[error.message for error in errors] for errors in record_errors]

assert len(records_with_errors) == 3, "only 3 out of 4 records have errors"
assert records_with_errors[0] == records[0], "1st record should have errors"
assert records_with_errors[1] == records[1], "2nd record should have errors"
assert records_with_errors[2] == records[3], "4th record should have errors"
assert errors[0] == ["'text' is not of type 'number'", "123 is not of type 'null', 'string'"]
assert errors[1] == ["None is not of type 'number'", "None is not of type 'string'"]
assert errors[2] == ["'text' is not of type 'number'"]
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ Verify that a spec operation issued to the connector returns a valid spec.
| Input | Type| Default | Note |
|--|--|--|--|
| `spec_path` | string | `secrets/spec.json` |Path to a JSON object representing the spec expected to be output by this connector |
| `timeout_seconds` | int | 10 |Test execution timeout in seconds|

## Test Connection
Verify that a check operation issued to the connector with the input config file returns a successful response.
| Input | Type| Default | Note |
|--|--|--|--|
| `config_path` | string | `secrets/config.json` |Path to a JSON object representing a valid connector configuration|
| `status` | `succeed` `failed` `exception`| |Indicate if connection check should succeed with provided config|
| `timeout_seconds` | int | 30 |Test execution timeout in seconds|

## Test Discovery

Expand All @@ -39,6 +41,7 @@ Verifies when a discover operation is run on the connector using the given confi
|--|--|--|--|
| `config_path` | string | `secrets/config.json` |Path to a JSON object representing a valid connector configuration|
| `configured_catalog_path` | string| `integration_tests/configured_catalog.json` |Path to configured catalog|
| `timeout_seconds` | int | 30 |Test execution timeout in seconds|

## Test Basic Read

Expand All @@ -48,6 +51,7 @@ Configuring all streams in the input catalog to full refresh mode verifies that
| `config_path` | string | `secrets/config.json` |Path to a JSON object representing a valid connector configuration|
| `configured_catalog_path` | string| `integration_tests/configured_catalog.json` |Path to configured catalog|
| `validate_output_from_all_streams` | boolean | False | Verify that **all** streams have records|
| `timeout_seconds` | int | 5*60 |Test execution timeout in seconds|
| `expect_records` | object |None| Compare produced records with expected records, see details below|
| `expect_records.path` | string | | File with expected records|
| `expect_records.extra_fields` | boolean | False | Allow output records to have other fields i.e: expected records are a subset |
Expand All @@ -71,6 +75,7 @@ This test performs two read operations on all streams which support full refresh
|--|--|--|--|
| `config_path` | string | `secrets/config.json` |Path to a JSON object representing a valid connector configuration|
| `configured_catalog_path` | string | `integration_tests/configured_catalog.json` |Path to configured catalog|
| `timeout_seconds` | int | 20*60 |Test execution timeout in seconds|

## Test Incremental sync
### TestTwoSequentialReads
Expand All @@ -79,7 +84,8 @@ This test verifies that all streams in the input catalog which support increment
|--|--|--|--|
| `config_path` | string | `secrets/config.json` |Path to a JSON object representing a valid connector configuration|
| `configured_catalog_path` | string | `integration_tests/configured_catalog.json` |Path to configured catalog|
| `cursor_paths` | dict | {} | For each stream, the path of its cursor field in the output state messages. If omitted the test will be skipped|
| `cursor_paths` | dict | {} | For each stream, the path of its cursor field in the output state messages. If omitted the path will be taken from the last piece of path from stream cursor_field.|
| `timeout_seconds` | int | 20*60 |Test execution timeout in seconds|

### TestStateWithAbnormallyLargeValues

Expand All @@ -88,4 +94,5 @@ This test verifies that sync produces no records when run with the STATE with ab
|--|--|--|--|
| `config_path` | string | `secrets/config.json` |Path to a JSON object representing a valid connector configuration|
| `configured_catalog_path` | string | `integration_tests/configured_catalog.json` |Path to configured catalog|
| `state_path` | string | None |Path to the state file with abnormaly large cursor values|
| `future_state_path` | string | None |Path to the state file with abnormally large cursor values|
| `timeout_seconds` | int | 20*60 |Test execution timeout in seconds|

0 comments on commit 15c8f8a

Please sign in to comment.