diff --git a/airbyte-integrations/bases/source-acceptance-test/setup.py b/airbyte-integrations/bases/source-acceptance-test/setup.py index 68f2d73994b6ec..664c15ed4f017e 100644 --- a/airbyte-integrations/bases/source-acceptance-test/setup.py +++ b/airbyte-integrations/bases/source-acceptance-test/setup.py @@ -38,6 +38,7 @@ "pytest-timeout~=1.4", "pprintpp~=0.4", "dpath~=2.0.1", + "jsonschema~=3.2.0", ] setuptools.setup( diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/asserts.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/asserts.py index aecd97fdd9a7d3..2bd5501c9421eb 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/asserts.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/asserts.py @@ -23,11 +23,41 @@ # import logging +import re +import pendulum from collections import defaultdict from typing import List, Mapping from airbyte_cdk.models import AirbyteRecordMessage, ConfiguredAirbyteCatalog -from jsonschema import Draft4Validator, ValidationError +from jsonschema import Draft7Validator, FormatChecker, ValidationError, FormatError + +timestamp_regex = re.compile(("^\d{4}-\d?\d-\d?\d" # date + "(\s|T)" # separator + "\d?\d:\d?\d:\d?\d(.\d+)?" # time + ".*$" #timezone + )) + + +class CustomFormatChecker(FormatChecker): + + @staticmethod + def check_datetime(value: str) -> bool: + valid_format = timestamp_regex.match(value) + try: + pendulum.parse(value, strict=False) + except ValueError: + valid_time = False + else: + valid_time = True + return valid_format and valid_time + + def check(self, instance, format): + if format == "date-time": + if not self.check_datetime(instance): + raise FormatError(f"{instance} has invalid datetime format") + else: + return super().check(instance, format) + def verify_records_schema( @@ -38,7 +68,7 @@ def verify_records_schema( """ validators = {} for stream in catalog.streams: - validators[stream.stream.name] = Draft4Validator(stream.stream.json_schema) + validators[stream.stream.name] = Draft7Validator(stream.stream.json_schema, format_checker=CustomFormatChecker()) stream_errors = defaultdict(dict) diff --git a/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_asserts.py b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_asserts.py index fb16aa3ddb4958..7583bc2d20bd04 100644 --- a/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_asserts.py +++ b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_asserts.py @@ -48,7 +48,8 @@ def record_schema_fixture(): @pytest.fixture(name="configured_catalog") -def catalog_fixture(record_schema) -> ConfiguredAirbyteCatalog: +def catalog_fixture(request, record_schema) -> ConfiguredAirbyteCatalog: + record_schema = request.param if hasattr(request, "param") else record_schema stream = ConfiguredAirbyteStream( stream=AirbyteStream(name="my_stream", json_schema=record_schema), sync_mode=SyncMode.full_refresh, @@ -96,3 +97,61 @@ def test_verify_records_schema(configured_catalog: ConfiguredAirbyteCatalog): assert len(streams_with_errors) == 1, "only one stream" assert len(streams_with_errors["my_stream"]) == 3, "only first error for each field" assert errors == ["123 is not of type 'null', 'string'", "'text' is not of type 'number'", "None is not of type 'string'"] + + +@pytest.mark.parametrize( + "record, configured_catalog, valid", + [ + # Send null data + ({"a": None}, {"type": "object", "properties": {"a": {"type": "string", "format": "time"}}}, False), + # time + ({"a": "sdf"}, {"type": "object", "properties": {"a": {"type": "string", "format": "time"}}}, False), + ({"a": "12:00"}, {"type": "object", "properties": {"a": {"type": "string", "format": "time"}}}, False), + ({"a": "12:00:90"}, {"type": "object", "properties": {"a": {"type": "string", "format": "time"}}}, False), + ({"a": "12:00:22"}, {"type": "object", "properties": {"a": {"type": "string", "format": "time"}}}, True), + # date + ({"a": "12:00:90"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date"}}}, False), + ({"a": "2020-12-20"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date"}}}, True), + ({"a": "2020-20-20"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date"}}}, False), + # date-time + # full date-time format with timezone only valid, according to https://datatracker.ietf.org/doc/html/rfc3339#section-5.6 + ({"a": "12:11:00"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date-time"}}}, False), + ({"a": "2018-11-13 20:20:39"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date-time"}}}, True), + ({"a": "2021-08-10T12:43:15"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date-time"}}}, True), + ({"a": "2021-08-10T12:43:15Z"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date-time"}}}, True), + ({"a": "2018-11-13T20:20:39+00:00"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date-time"}}}, True), + ({"a": "2018-21-13T20:20:39+00:00"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date-time"}}}, False), + # This is valid for postgres sql but not valid for bigquery + ({"a": "2014-09-27 9:35z"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date-time"}}}, False), + # Seconds are obligatory for bigquery timestamp + ({"a": "2014-09-27 9:35"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date-time"}}}, False), + ({"a": "2014-09-27 9:35:0z"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date-time"}}}, True), + # email + ({"a": "2018-11-13 20:20:39"}, {"type": "object", "properties": {"a": {"type": "string", "format": "email"}}}, False), + ({"a": "hi@example.com"}, {"type": "object", "properties": {"a": {"type": "string", "format": "email"}}}, True), + ({"a": "Пример@example.com"}, {"type": "object", "properties": {"a": {"type": "string", "format": "email"}}}, True), + ({"a": "写电子邮件@子邮件"}, {"type": "object", "properties": {"a": {"type": "string", "format": "email"}}}, True), + # hostname + ({"a": "2018-11-13 20:20:39"}, {"type": "object", "properties": {"a": {"type": "string", "format": "hostname"}}}, False), + ({"a": "hi@example.com"}, {"type": "object", "properties": {"a": {"type": "string", "format": "hostname"}}}, False), + ({"a": "localhost"}, {"type": "object", "properties": {"a": {"type": "string", "format": "hostname"}}}, True), + ({"a": "example.com"}, {"type": "object", "properties": {"a": {"type": "string", "format": "hostname"}}}, True), + # ipv4 + ({"a": "example.com"}, {"type": "object", "properties": {"a": {"type": "string", "format": "ipv4"}}}, False), + ({"a": "0.0.0.1000"}, {"type": "object", "properties": {"a": {"type": "string", "format": "ipv4"}}}, False), + ({"a": "0.0.0.0"}, {"type": "object", "properties": {"a": {"type": "string", "format": "ipv4"}}}, True), + # ipv6 + ({"a": "example.com"}, {"type": "object", "properties": {"a": {"type": "string", "format": "ipv6"}}}, False), + ({"a": "1080:0:0:0:8:800:200C:417A"}, {"type": "object", "properties": {"a": {"type": "string", "format": "ipv6"}}}, True), + ({"a": "::1"}, {"type": "object", "properties": {"a": {"type": "string", "format": "ipv6"}}}, True), + ({"a": "::"}, {"type": "object", "properties": {"a": {"type": "string", "format": "ipv6"}}}, True), + ], + indirect=["configured_catalog"], +) +def test_validate_records_format(record, configured_catalog, valid): + records = [AirbyteRecordMessage(stream="my_stream", data=record, emitted_at=0)] + streams_with_errors = verify_records_schema(records, configured_catalog) + if valid: + assert not streams_with_errors + else: + assert streams_with_errors, f"Record {record} should produce errors against {configured_catalog.streams[0].stream.json_schema}" diff --git a/docs/connector-development/testing-connectors/source-acceptance-tests-reference.md b/docs/connector-development/testing-connectors/source-acceptance-tests-reference.md index 6d7cf5558b3b67..d3605a98a9aa17 100644 --- a/docs/connector-development/testing-connectors/source-acceptance-tests-reference.md +++ b/docs/connector-development/testing-connectors/source-acceptance-tests-reference.md @@ -132,7 +132,11 @@ Each stream should have some data, if you can't guarantee this for particular st ||x|| |||x| |||| +### Schema format checking +If some field has [format](https://json-schema.org/understanding-json-schema/reference/string.html#format) attribute specified on its catalog json schema, Source Acceptance Testing framework performs checking against format. It support checking of all [builtin](https://json-schema.org/understanding-json-schema/reference/string.html#built-in-formats) jsonschema formats for draft 7 specification: email, hostnames, ip addresses, time, date and date-time formats. + +Note: For date-time we are not checking against compliance against ISO8601 (and RFC3339 as subset of it). Since we are using specified format to set database column type on db normalization stage, value should be compliant to bigquery [timestamp](https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#timestamp_type) and SQL "timestamp with timezone" formats. ### Example of `expected_records.txt`: In general, the expected_records.json should contain the subset of output of the records of particular stream you need to test. The required fields are: `stream, data, emitted_at`