Skip to content

Commit

Permalink
live-tests: add regression tests suite (#35837)
Browse files Browse the repository at this point in the history
  • Loading branch information
alafanechere committed Mar 7, 2024
1 parent 72eb8a6 commit 0965ebd
Show file tree
Hide file tree
Showing 15 changed files with 720 additions and 75 deletions.
13 changes: 13 additions & 0 deletions airbyte-ci/connectors/live-tests/README.md
Expand Up @@ -102,7 +102,20 @@ And run:
mitmweb --rfile=http_dump.mitm
```

## Regression tests
We created a regression test suite to run tests to compare outputs of connector commands on different versions of the same connector.
You can run the existing test suites with the following command:

```bash
cd src/live_tests/regression_tests
poetry run pytest --connector-image=airbyte/source-pokeapi --config-path=<path-to-config-path> --catalog-path=<path-to-catalog-path>
```


## Changelog

### 0.2.0
Declare the regression tests suite.

### 0.1.0
Implement initial primitives and a `debug` command to run connector commands and persist the outputs to local storage.
44 changes: 22 additions & 22 deletions airbyte-ci/connectors/live-tests/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 4 additions & 6 deletions airbyte-ci/connectors/live-tests/pyproject.toml
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "live-tests"
version = "0.1.0"
version = "0.2.0"
description = "Contains utilities for testing connectors against live data."
authors = ["Airbyte <contact@airbyte.io>"]
license = "MIT"
Expand All @@ -20,7 +20,6 @@ airbyte-protocol-models = "<1.0.0"
cachetools = "~=5.3.3"
dagger-io = "==0.9.6"
pydantic = "*"
pytest = "~=8.0.2"
pytest-asyncio = "~=0.23.5"
pydash = "~=7.0.7"
docker = ">=6,<7"
Expand All @@ -37,10 +36,9 @@ types-cachetools = "^5.3.0.7"
[tool.poe.tasks]
test = "pytest tests"
lint = "ruff check src"
type_check = "mypy src"
format = "ruff format src"
type_check = "mypy src --disallow-untyped-defs"
pre-push = ["format", "lint", "test", "type_check"]

[tool.airbyte_ci]
poe_tasks = ["test", "lint", "type_check"]

[tool.pytest.ini_options]
pythonpath = ["src"]
2 changes: 1 addition & 1 deletion airbyte-ci/connectors/live-tests/src/live_tests/cli.py
Expand Up @@ -6,7 +6,7 @@

@click.group()
@click.pass_context
async def live_tests(ctx):
async def live_tests(ctx: click.Context) -> None:
pass


Expand Down
Expand Up @@ -2,7 +2,7 @@

import json
from pathlib import Path
from typing import Iterable, TextIO, Tuple
from typing import Any, Iterable, TextIO, Tuple

import pydash
from airbyte_protocol.models import AirbyteMessage # type: ignore
Expand All @@ -12,9 +12,9 @@


class FileDescriptorLRUCache(LRUCache):
def popitem(self):
def popitem(self) -> Tuple[Any, Any]:
filepath, fd = LRUCache.popitem(self)
fd.close() # Close the file descriptor when it's evicted from the cache
fd.close() # type: ignore # Close the file descriptor when it's evicted from the cache
return filepath, fd


Expand All @@ -33,7 +33,7 @@ class FileBackend(BaseBackend):
def __init__(self, output_directory: Path):
self._output_directory = output_directory

async def write(self, airbyte_messages: Iterable[AirbyteMessage]):
async def write(self, airbyte_messages: Iterable[AirbyteMessage]) -> None:
"""
Write AirbyteMessages to the appropriate file.
Expand Down
Expand Up @@ -30,7 +30,7 @@ async def get_container_from_id(dagger_client: dagger.Client, container_id: str)
pytest.exit(f"Failed to load connector container: {e}")


async def get_container_from_tarball_path(dagger_client: dagger.Client, tarball_path: Path):
async def get_container_from_tarball_path(dagger_client: dagger.Client, tarball_path: Path) -> dagger.Container:
if not tarball_path.exists():
pytest.exit(f"Connector image tarball {tarball_path} does not exist")
container_under_test_tar_file = (
Expand Down Expand Up @@ -149,7 +149,7 @@ def __init__(
def _connector_under_test_container(self) -> dagger.Container:
return self.connector_under_test.container

def _get_full_command(self, command: Command):
def _get_full_command(self, command: Command) -> List[str]:
if command is Command.SPEC:
return ["spec"]
elif command is Command.CHECK:
Expand Down Expand Up @@ -180,11 +180,12 @@ def _get_full_command(self, command: Command):
async def get_container_env_variable_value(self, name: str) -> Optional[str]:
return await self._connector_under_test_container.env_variable(name)

async def get_container_label(self, label: str):
async def get_container_label(self, label: str) -> Optional[str]:
return await self._connector_under_test_container.label(label)

async def get_container_entrypoint(self):
async def get_container_entrypoint(self) -> str:
entrypoint = await self._connector_under_test_container.entrypoint()
assert entrypoint, "The connector container has no entrypoint"
return " ".join(entrypoint)

async def run(
Expand Down Expand Up @@ -251,7 +252,7 @@ async def _get_proxy_container(

return proxy_container.with_exec(command)

async def _bind_connector_container_to_proxy(self, container: dagger.Container):
async def _bind_connector_container_to_proxy(self, container: dagger.Container) -> dagger.Container:
proxy_srv = await self._get_proxy_container()
proxy_host, proxy_port = "proxy_server", 8080
cert_path_in_volume = "/mitmproxy_dir/mitmproxy-ca.pem"
Expand Down
72 changes: 37 additions & 35 deletions airbyte-ci/connectors/live-tests/src/live_tests/commons/models.py
Expand Up @@ -4,7 +4,7 @@
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from typing import Any, Dict, Iterable, Iterator, List, MutableMapping, Optional, Tuple

import _collections_abc
import dagger
Expand All @@ -14,73 +14,73 @@
from pydantic import ValidationError


class UserDict(_collections_abc.MutableMapping):
class UserDict(_collections_abc.MutableMapping): # type: ignore
# Start by filling-out the abstract methods
def __init__(self, dict=None, /, **kwargs):
self.data = {}
if dict is not None:
self.update(dict)
def __init__(self, _dict: Optional[MutableMapping] = None, **kwargs: Any):
self.data: MutableMapping = {}
if _dict is not None:
self.update(_dict)
if kwargs:
self.update(kwargs)

def __len__(self):
def __len__(self) -> int:
return len(self.data)

def __getitem__(self, key):
def __getitem__(self, key: Any) -> Any:
if key in self.data:
return self.data[key]
if hasattr(self.__class__, "__missing__"):
return self.__class__.__missing__(self, key)
raise KeyError(key)

def __setitem__(self, key, item):
def __setitem__(self, key: Any, item: Any) -> None:
self.data[key] = item

def __delitem__(self, key):
def __delitem__(self, key: Any) -> None:
del self.data[key]

def __iter__(self):
def __iter__(self) -> Iterator:
return iter(self.data)

# Modify __contains__ to work correctly when __missing__ is present
def __contains__(self, key):
def __contains__(self, key: Any) -> bool:
return key in self.data

# Now, add the methods in dicts but not in MutableMapping
def __repr__(self):
def __repr__(self) -> str:
return repr(self.data)

def __or__(self, other):
def __or__(self, other: "UserDict" | dict) -> "UserDict":
if isinstance(other, UserDict):
return self.__class__(self.data | other.data)
return self.__class__(self.data | other.data) # type: ignore
if isinstance(other, dict):
return self.__class__(self.data | other)
return self.__class__(self.data | other) # type: ignore
return NotImplemented

def __ror__(self, other):
def __ror__(self, other: "UserDict" | dict) -> "UserDict":
if isinstance(other, UserDict):
return self.__class__(other.data | self.data)
return self.__class__(other.data | self.data) # type: ignore
if isinstance(other, dict):
return self.__class__(other | self.data)
return self.__class__(other | self.data) # type: ignore
return NotImplemented

def __ior__(self, other):
def __ior__(self, other: "UserDict" | dict) -> "UserDict":
if isinstance(other, UserDict):
self.data |= other.data
self.data |= other.data # type: ignore
else:
self.data |= other
self.data |= other # type: ignore
return self

def __copy__(self):
def __copy__(self) -> "UserDict":
inst = self.__class__.__new__(self.__class__)
inst.__dict__.update(self.__dict__)
# Create a copy and avoid triggering descriptors
inst.__dict__["data"] = self.__dict__["data"].copy()
return inst

def copy(self):
def copy(self) -> "UserDict":
if self.__class__ is UserDict:
return UserDict(self.data.copy())
return UserDict(self.data.copy()) # type: ignore
import copy

data = self.data
Expand All @@ -93,7 +93,7 @@ def copy(self):
return c

@classmethod
def fromkeys(cls, iterable, value=None):
def fromkeys(cls, iterable: Iterable, value: Optional[Any] = None) -> "UserDict":
d = cls()
for key in iterable:
d[key] = value
Expand Down Expand Up @@ -122,11 +122,11 @@ class ConnectorUnderTest:
container: dagger.Container

@property
def name(self):
def name(self) -> str:
return self.image_name.replace("airbyte/", "").split(":")[0]

@property
def version(self):
def version(self) -> str:
return self.image_name.replace("airbyte/", "").split(":")[1]


Expand All @@ -151,11 +151,11 @@ def to_dict(self) -> dict:
"enable_http_cache": self.enable_http_cache,
}

def raise_if_missing_attr_for_command(self, attribute: str):
def raise_if_missing_attr_for_command(self, attribute: str) -> None:
if getattr(self, attribute) is None:
raise ValueError(f"We need a {attribute} to run the {self.command.value} command")

def __post_init__(self):
def __post_init__(self) -> None:
if self.command is Command.CHECK:
self.raise_if_missing_attr_for_command("config")
if self.command is Command.DISCOVER:
Expand All @@ -178,7 +178,7 @@ class ExecutionResult:
airbyte_messages: List[AirbyteMessage] = field(default_factory=list)
airbyte_messages_parsing_errors: List[Tuple[Exception, str]] = field(default_factory=list)

def __post_init__(self):
def __post_init__(self) -> None:
self.airbyte_messages, self.airbyte_messages_parsing_errors = self.parse_airbyte_messages_from_command_output(self.stdout)

@staticmethod
Expand All @@ -200,21 +200,22 @@ class ExecutionReport:
execution_inputs: ExecutionInputs
execution_result: ExecutionResult
created_at: int = field(default_factory=lambda: int(time.time()))
saved_path: Optional[Path] = None

@property
def report_dir(self) -> str:
return f"{self.created_at}/{self.execution_inputs.connector_under_test.name}/{self.execution_inputs.command.value}/{self.execution_inputs.connector_under_test.version}/"
return f"{self.execution_inputs.connector_under_test.name}/{self.execution_inputs.command.value}/{self.execution_inputs.connector_under_test.version}/"

@property
def stdout_filename(self):
def stdout_filename(self) -> str:
return "stdout.log"

@property
def stderr_filename(self):
def stderr_filename(self) -> str:
return "stderr.log"

@property
def http_dump_filename(self):
def http_dump_filename(self) -> str:
return "http_dump.mitm"

async def save_to_disk(self, output_dir: Path) -> None:
Expand All @@ -233,3 +234,4 @@ async def save_to_disk(self, output_dir: Path) -> None:
airbyte_messages_dir = final_dir / "airbyte_messages"
airbyte_messages_dir.mkdir(parents=True, exist_ok=True)
await FileBackend(airbyte_messages_dir).write(self.execution_result.airbyte_messages)
self.saved_path = final_dir
Expand Up @@ -4,5 +4,7 @@
import os
import sys

DAGGER_EXEC_TIMEOUT = dagger.Timeout(int(os.environ.get("DAGGER_EXEC_TIMEOUT", "3600"))) # One hour by default
DAGGER_EXEC_TIMEOUT = dagger.Timeout(
int(os.environ.get("DAGGER_EXEC_TIMEOUT", "3600"))
) # One hour by default
DAGGER_CONFIG = dagger.Config(timeout=DAGGER_EXEC_TIMEOUT, log_output=sys.stderr)

0 comments on commit 0965ebd

Please sign in to comment.