Skip to content

Commit

Permalink
Adding unit tests to SAT full refresh test (#8589)
Browse files Browse the repository at this point in the history
* WIP - Adding unit tests to SAT full refresh test

* unit tests to SAT full refresh test

* Format

* Update airbyte-integrations/bases/source-acceptance-test/unit_tests/test_test_full_refresh.py

Co-authored-by: Eugene Kulak <widowmakerreborn@gmail.com>

* use symmetric_difference to check for differences between expected/actual when testing stream read on SAT

* bump version

* bump version

Co-authored-by: Eugene Kulak <widowmakerreborn@gmail.com>
  • Loading branch information
eliziario and keu committed Dec 21, 2021
1 parent de2b4ab commit fd716d7
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 8 deletions.
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.1.45",
version="0.1.46",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,14 @@ class Config:

@validator("exact_order", always=True)
def validate_exact_order(cls, exact_order, values):
if "extra_fields" in values:
if values["extra_fields"] and not exact_order:
raise ValueError("exact_order must be on if extra_fields enabled")
if "extra_fields" in values and values["extra_fields"] and not exact_order:
raise ValueError("exact_order must be on if extra_fields enabled")
return exact_order

@validator("extra_records", always=True)
def validate_extra_records(cls, extra_records, values):
if "extra_fields" in values:
if values["extra_fields"] and extra_records:
raise ValueError("extra_records must by off if extra_fields enabled")
if "extra_fields" in values and values["extra_fields"] and extra_records:
raise ValueError("extra_records must by off if extra_fields enabled")
return extra_records


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ def test_sequential_reads(
for record in records_2:
records_by_stream_2[record.stream].append(record.data)

for stream in records_by_stream_1.keys():
for stream in records_by_stream_1:
serializer = partial(make_hashable, exclude_fields=ignored_fields.get(stream))
stream_records_1 = records_by_stream_1.get(stream)
stream_records_2 = records_by_stream_2.get(stream)
# Using
output_diff = set(map(serializer, stream_records_1)).symmetric_difference(set(map(serializer, stream_records_2)))
if output_diff:
msg = f"{stream}: the two sequential reads should produce either equal set of records or one of them is a strict subset of the other"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from typing import Dict, List
from unittest.mock import MagicMock

import pytest
from _pytest.outcomes import Failed
from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, Type
from source_acceptance_test.config import ConnectionTestConfig
from source_acceptance_test.tests.test_full_refresh import TestFullRefresh as _TestFullRefresh


class ReadTestConfigWithIgnoreFields(ConnectionTestConfig):
ignored_fields: Dict[str, List[str]] = {"test_stream": ["ignore_me", "ignore_me_too"]}


test_cases = [
(
{"type": "object", "properties": {"created": {"type": "string"}}},
{"created": "23"},
{"created": "23"},
False,
"no_ignored_fields_present",
),
(
{
"type": "object",
"properties": {
"created": {"type": "string"},
"ignore_me": {"type": "string"},
},
},
{"created": "23"},
{"created": "23", "ignore_me": "23"},
False,
"with_ignored_field",
),
(
{
"type": "object",
"required": ["created", "DONT_ignore_me"],
"properties": {
"created": {"type": "string"},
"DONT_ignore_me": {"type": "string"},
"ignore_me": {"type": "string"},
},
},
{"created": "23"},
{"created": "23", "DONT_ignore_me": "23", "ignore_me": "hello"},
True,
"ignore_field_present_but_a_required_is_not",
),
]


@pytest.mark.parametrize(
"schema, record, expected_record, should_fail, test_case_name",
test_cases,
ids=[test_case[-1] for test_case in test_cases],
)
def test_read_with_ignore_fields(schema, record, expected_record, should_fail, test_case_name):
catalog = get_default_catalog(schema)
input_config = ReadTestConfigWithIgnoreFields()
docker_runner_mock = MagicMock()

def record_message_from_record(record_: Dict) -> List[AirbyteMessage]:
return [
AirbyteMessage(
type=Type.RECORD,
record=AirbyteRecordMessage(stream="test_stream", data=record_, emitted_at=111),
)
]

sequence_of_docker_callread_results = [record, expected_record]

# Ignored fields should work both ways
for pair in (
sequence_of_docker_callread_results,
list(reversed(sequence_of_docker_callread_results)),
):
docker_runner_mock.call_read.side_effect = [
record_message_from_record(pair[0]),
record_message_from_record(pair[1]),
]

t = _TestFullRefresh()
if should_fail:
with pytest.raises(
Failed,
match="the two sequential reads should produce either equal set of records or one of them is a strict subset of the other",
):
t.test_sequential_reads(
inputs=input_config,
connector_config=MagicMock(),
configured_catalog=catalog,
docker_runner=docker_runner_mock,
detailed_logger=MagicMock(),
)
else:
t.test_sequential_reads(
inputs=input_config,
connector_config=MagicMock(),
configured_catalog=catalog,
docker_runner=docker_runner_mock,
detailed_logger=MagicMock(),
)


def get_default_catalog(schema):
return ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
stream=AirbyteStream(
name="test_stream",
json_schema=schema,
supported_sync_modes=["full_refresh"],
),
sync_mode="full_refresh",
destination_sync_mode="overwrite",
)
]
)

0 comments on commit fd716d7

Please sign in to comment.