Skip to content

Commit

Permalink
SAT: Improve error message when data mismatches schema (#4753)
Browse files Browse the repository at this point in the history
* improve message when data mismatch schema

Co-authored-by: Eugene Kulak <kulak.eugene@gmail.com>
  • Loading branch information
keu and eugene-kulak committed Jul 15, 2021
1 parent a10c805 commit 8871777
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.1.12
Improve error message when data mismatches schema: https://github.com/airbytehq/airbyte/pull/4753

## 0.1.11
Fix error in the naming of method `test_match_expected` for class `TestSpec`.

Expand All @@ -21,4 +24,4 @@ Add: `test_spec` additionally checks if Dockerfile has `ENV AIRBYTE_ENTRYPOINT`
Add test whether PKs present and not None if `source_defined_primary_key` defined: https://github.com/airbytehq/airbyte/pull/4140

## 0.1.5
Add configurable timeout for the acceptance tests: https://github.com/airbytehq/airbyte/pull/4296
Add configurable timeout for the acceptance tests: https://github.com/airbytehq/airbyte/pull/4296
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ COPY setup.py ./
COPY pytest.ini ./
RUN pip install .

LABEL io.airbyte.version=0.1.11
LABEL io.airbyte.version=0.1.12
LABEL io.airbyte.name=airbyte/source-acceptance-test

ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin"]
Original file line number Diff line number Diff line change
Expand Up @@ -134,16 +134,16 @@ def test_read(
output = docker_runner.call_read(connector_config, configured_catalog)
records = [message.record for message in output if message.type == Type.RECORD]
counter = Counter(record.stream for record in records)

if inputs.validate_schema:
streams_with_errors = set()
for record, errors in verify_records_schema(records, configured_catalog):
if record.stream not in streams_with_errors:
logging.error(f"The {record.stream} stream has the following schema errors: {errors}")
streams_with_errors.add(record.stream)

if streams_with_errors:
pytest.fail(f"Please check your json_schema in selected streams {streams_with_errors}.")
bar = "-" * 80
streams_errors = verify_records_schema(records, configured_catalog)
for stream_name, errors in streams_errors.items():
errors = map(str, errors.values())
str_errors = f"\n{bar}\n".join(errors)
logging.error(f"The {stream_name} stream has the following schema errors:\n{str_errors}")

if streams_errors:
pytest.fail(f"Please check your json_schema in selected streams {streams_errors.keys()}.")

all_streams = set(stream.stream.name for stream in configured_catalog.streams)
streams_with_records = set(counter.keys())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,33 @@
#

import logging
from typing import Iterator, List, Tuple
from collections import defaultdict
from typing import List, Mapping

from airbyte_cdk.models import AirbyteRecordMessage, ConfiguredAirbyteCatalog
from jsonschema import Draft4Validator, ValidationError


def verify_records_schema(
records: List[AirbyteRecordMessage], catalog: ConfiguredAirbyteCatalog
) -> Iterator[Tuple[AirbyteRecordMessage, List[ValidationError]]]:
) -> Mapping[str, Mapping[str, ValidationError]]:
"""Check records against their schemas from the catalog, yield error messages.
Only first record with error will be yielded for each stream.
"""
validators = {}
for stream in catalog.streams:
validators[stream.stream.name] = Draft4Validator(stream.stream.json_schema)

stream_errors = defaultdict(dict)

for record in records:
validator = validators.get(record.stream)
if not validator:
logging.error(f"Record from the {record.stream} stream that is not in the catalog.")
continue

errors = list(validator.iter_errors(record.data))
if errors:
yield record, sorted(errors, key=str)
for error in errors:
stream_errors[record.stream][str(error.schema_path)] = error

return stream_errors
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,10 @@ def test_verify_records_schema(configured_catalog: ConfiguredAirbyteCatalog):

records = [AirbyteRecordMessage(stream="my_stream", data=record, emitted_at=0) for record in records]

records_with_errors, record_errors = zip(*verify_records_schema(records, configured_catalog))
errors = [[error.message for error in errors] for errors in record_errors]
streams_with_errors = verify_records_schema(records, configured_catalog)
errors = [error.message for error in streams_with_errors["my_stream"].values()]

assert len(records_with_errors) == 3, "only 3 out of 4 records have errors"
assert records_with_errors[0] == records[0], "1st record should have errors"
assert records_with_errors[1] == records[1], "2nd record should have errors"
assert records_with_errors[2] == records[3], "4th record should have errors"
assert errors[0] == ["'text' is not of type 'number'", "123 is not of type 'null', 'string'"]
assert errors[1] == ["None is not of type 'number'", "None is not of type 'string'"]
assert errors[2] == ["'text' is not of type 'number'"]
assert "my_stream" in streams_with_errors
assert len(streams_with_errors) == 1, "only one stream"
assert len(streams_with_errors["my_stream"]) == 3, "only first error for each field"
assert errors == ["123 is not of type 'null', 'string'", "'text' is not of type 'number'", "None is not of type 'string'"]

0 comments on commit 8871777

Please sign in to comment.