Skip to content

Commit

Permalink
Implement configurable test timeout (#4296)
Browse files Browse the repository at this point in the history
* Implement configurable test timeout

* Update incremental streams cursor_field formatting
  • Loading branch information
htrueman committed Jun 25, 2021
1 parent 683a67c commit 1d4164d
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 7 deletions.
@@ -0,0 +1,4 @@
# Changelog

## 0.1.5
Add configurable timeout for the acceptance tests: https://github.com/airbytehq/airbyte/pull/4296
Expand Up @@ -8,7 +8,7 @@ COPY setup.py ./
COPY pytest.ini ./
RUN pip install .

LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.version=0.1.5
LABEL io.airbyte.name=airbyte/source-acceptance-test

ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin"]
Expand Up @@ -35,6 +35,7 @@
default="secrets/spec.json", description="Path to a JSON object representing the spec expected to be output by this connector"
)
configured_catalog_path: str = Field(default="integration_tests/configured_catalog.json", description="Path to configured catalog")
timeout_seconds: int = Field(default=None, description="Test execution timeout_seconds", ge=0)


class BaseConfig(BaseModel):
Expand All @@ -44,6 +45,7 @@ class Config:

class SpecTestConfig(BaseConfig):
spec_path: str = spec_path
timeout_seconds: int = timeout_seconds


class ConnectionTestConfig(BaseConfig):
Expand All @@ -54,10 +56,12 @@ class Status(Enum):

config_path: str = config_path
status: Status = Field(Status.Succeed, description="Indicate if connection check should succeed with provided config")
timeout_seconds: int = timeout_seconds


class DiscoveryTestConfig(BaseConfig):
config_path: str = config_path
timeout_seconds: int = timeout_seconds


class ExpectedRecordsConfig(BaseModel):
Expand Down Expand Up @@ -91,11 +95,13 @@ class BasicReadTestConfig(BaseConfig):
configured_catalog_path: Optional[str] = configured_catalog_path
validate_output_from_all_streams: bool = Field(False, description="Verify that all streams have records")
expect_records: Optional[ExpectedRecordsConfig] = Field(description="Expected records from the read")
timeout_seconds: int = timeout_seconds


class FullRefreshConfig(BaseConfig):
config_path: str = config_path
configured_catalog_path: str = configured_catalog_path
timeout_seconds: int = timeout_seconds


class IncrementalConfig(BaseConfig):
Expand All @@ -105,6 +111,7 @@ class IncrementalConfig(BaseConfig):
description="For each stream, the path of its cursor field in the output state messages."
)
future_state_path: Optional[str] = Field(description="Path to a state file with values in far future")
timeout_seconds: int = timeout_seconds


class TestConfig(BaseConfig):
Expand Down
Expand Up @@ -34,6 +34,10 @@
HERE = Path(__file__).parent.absolute()


def pytest_configure(config):
config.addinivalue_line("markers", "default_timeout: mark test to be wrapped by `timeout` decorator with default value")


def pytest_load_initial_conftests(early_config: Config, parser: Parser, args: List[str]):
"""Hook function to add acceptance tests to args"""
args.append(str(HERE / "tests"))
Expand Down Expand Up @@ -78,6 +82,33 @@ def pytest_generate_tests(metafunc):
metafunc.parametrize("inputs", test_inputs)


def pytest_collection_modifyitems(config, items):
"""
Get prepared test items and wrap them with `pytest.mark.timeout(timeout_seconds)` decorator.
`timeout_seconds` may be received either from acceptance test config or `pytest.mark.default_timeout(timeout_seconds)`,
if `timeout_seconds` is not specified in the acceptance test config.
"""

config = load_config(config.getoption("--acceptance-test-config"))

i = 0
packed_items = []
while i < len(items):
inner_items = [item for item in items if item.originalname == items[i].originalname]
packed_items.append(inner_items)
i += len(inner_items)

for items in packed_items:
test_configs = getattr(config.tests, items[0].cls.config_key())
for test_config, item in zip(test_configs, items):
default_timeout = item.get_closest_marker("default_timeout")
if test_config.timeout_seconds:
item.add_marker(pytest.mark.timeout(test_config.timeout_seconds))
elif default_timeout:
item.add_marker(pytest.mark.timeout(*default_timeout.args))


def pytest_assertrepr_compare(config, op, left, right):
if op != "==":
return
Expand Down
Expand Up @@ -34,7 +34,7 @@
from source_acceptance_test.utils import ConnectorRunner, serialize


@pytest.mark.timeout(10)
@pytest.mark.default_timeout(10)
class TestSpec(BaseTest):
def test_spec(self, connector_spec: ConnectorSpecification, docker_runner: ConnectorRunner):
output = docker_runner.call_spec()
Expand All @@ -45,7 +45,7 @@ def test_spec(self, connector_spec: ConnectorSpecification, docker_runner: Conne
assert spec_messages[0].spec == connector_spec, "Spec should be equal to the one in spec.json file"


@pytest.mark.timeout(30)
@pytest.mark.default_timeout(30)
class TestConnection(BaseTest):
def test_check(self, connector_config, inputs: ConnectionTestConfig, docker_runner: ConnectorRunner):
if inputs.status == ConnectionTestConfig.Status.Succeed:
Expand All @@ -68,7 +68,7 @@ def test_check(self, connector_config, inputs: ConnectionTestConfig, docker_runn
assert "Traceback" in err.value.stderr.decode("utf-8"), "Connector should print exception"


@pytest.mark.timeout(30)
@pytest.mark.default_timeout(30)
class TestDiscovery(BaseTest):
def test_discover(self, connector_config, docker_runner: ConnectorRunner):
output = docker_runner.call_discover(config=connector_config)
Expand All @@ -84,7 +84,7 @@ def test_discover(self, connector_config, docker_runner: ConnectorRunner):
# assert stream1.dict() == stream2.dict(), f"Streams {stream1.name} and {stream2.name}, stream configs should match"


@pytest.mark.timeout(300)
@pytest.mark.default_timeout(300)
class TestBasicRead(BaseTest):
def test_read(
self,
Expand Down
Expand Up @@ -29,7 +29,7 @@
from source_acceptance_test.utils import ConnectorRunner, full_refresh_only_catalog, serialize


@pytest.mark.timeout(20 * 60)
@pytest.mark.default_timeout(20 * 60)
class TestFullRefresh(BaseTest):
def test_sequential_reads(self, connector_config, configured_catalog, docker_runner: ConnectorRunner):
configured_catalog = full_refresh_only_catalog(configured_catalog)
Expand Down
Expand Up @@ -90,7 +90,7 @@ def records_with_state(records, state, stream_mapping, state_cursor_paths) -> It
yield record_value, state_value


@pytest.mark.timeout(20 * 60)
@pytest.mark.default_timeout(20 * 60)
class TestIncremental(BaseTest):
def test_two_sequential_reads(self, connector_config, configured_catalog_for_incremental, cursor_paths, docker_runner: ConnectorRunner):
stream_mapping = {stream.stream.name: stream for stream in configured_catalog_for_incremental.streams}
Expand Down

0 comments on commit 1d4164d

Please sign in to comment.