Skip to content

Commit

Permalink
cat / airbyte-ci: improve CAT container orchestration� (#31699)
Browse files Browse the repository at this point in the history
  • Loading branch information
alafanechere committed Oct 24, 2023
1 parent dcfa331 commit ff2fcf8
Show file tree
Hide file tree
Showing 16 changed files with 327 additions and 175 deletions.
2 changes: 2 additions & 0 deletions airbyte-ci/connectors/pipelines/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ flowchart TD
| `--fail-fast` | False | False | Abort after any tests fail, rather than continuing to run additional tests. Use this setting to confirm a known bug is fixed (or not), or when you only require a pass/fail result. |
| `--fast-tests-only` | True | False | Run unit tests only, skipping integration tests or any tests explicitly tagged as slow. Use this for more frequent checks, when it is not feasible to run the entire test suite. |
| `--code-tests-only` | True | False | Skip any tests not directly related to code updates. For instance, metadata checks, version bump checks, changelog verification, etc. Use this setting to help focus on code quality during development. |
| `--concurrent-cat` | False | False | Make CAT tests run concurrently using pytest-xdist. Be careful about source or destination API rate limits. |

Note:

Expand Down Expand Up @@ -399,6 +400,7 @@ This command runs the Python tests for a airbyte-ci poetry package.
## Changelog
| Version | PR | Description |
| ------- | ---------------------------------------------------------- | --------------------------------------------------------------------------------------------------------- |
| 2.3.0 | [#31699](https://github.com/airbytehq/airbyte/pull/31699) | Support optional concurrent CAT execution. |
| 2.2.6 | [#31752](https://github.com/airbytehq/airbyte/pull/31752) | Only authenticate when secrets are available.
| 2.2.5 | [#31718](https://github.com/airbytehq/airbyte/pull/31718) | Authenticate the sidecar docker daemon to DockerHub. |
| 2.2.4 | [#31535](https://github.com/airbytehq/airbyte/pull/31535) | Improve gradle caching when building java connectors. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def __init__(
docker_hub_password: Optional[str] = None,
s3_build_cache_access_key_id: Optional[str] = None,
s3_build_cache_secret_key: Optional[str] = None,
concurrent_cat: Optional[bool] = False,
):
"""Initialize a connector context.
Expand Down Expand Up @@ -86,6 +87,7 @@ def __init__(
docker_hub_password (Optional[str], optional): Docker Hub password to use to read registries. Defaults to None.
s3_build_cache_access_key_id (Optional[str], optional): Gradle S3 Build Cache credentials. Defaults to None.
s3_build_cache_secret_key (Optional[str], optional): Gradle S3 Build Cache credentials. Defaults to None.
concurrent_cat (bool, optional): Whether to run the CAT tests in parallel. Defaults to False.
"""

self.pipeline_name = pipeline_name
Expand All @@ -107,6 +109,7 @@ def __init__(
self.docker_hub_password = docker_hub_password
self.s3_build_cache_access_key_id = s3_build_cache_access_key_id
self.s3_build_cache_secret_key = s3_build_cache_secret_key
self.concurrent_cat = concurrent_cat

super().__init__(
pipeline_name=pipeline_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,20 @@
type=bool,
is_flag=True,
)
@click.option(
"--concurrent-cat",
help="When enabled, the CAT tests will run concurrently. Be careful about rate limits",
default=False,
type=bool,
is_flag=True,
)
@click.pass_context
def test(
ctx: click.Context,
code_tests_only: bool,
fail_fast: bool,
fast_tests_only: bool,
concurrent_cat: bool,
) -> bool:
"""Runs a test pipeline for the selected connectors.
Expand Down Expand Up @@ -87,6 +95,7 @@ def test(
s3_build_cache_secret_key=ctx.obj.get("s3_build_cache_secret_key"),
docker_hub_username=ctx.obj.get("docker_hub_username"),
docker_hub_password=ctx.obj.get("docker_hub_password"),
concurrent_cat=concurrent_cat,
)
for connector in ctx.obj["selected_connectors_with_modified_files"]
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
import semver
import yaml
from connector_ops.utils import Connector
from dagger import Container, Directory, File
from dagger import Container, Directory
from pipelines import hacks
from pipelines.consts import CIContext
from pipelines.dagger.actions import secrets
from pipelines.dagger.containers import internal_tools
from pipelines.helpers.utils import METADATA_FILE_NAME
from pipelines.models.contexts import PipelineContext
from pipelines.models.steps import Step, StepResult, StepStatus


Expand Down Expand Up @@ -187,15 +188,31 @@ class AcceptanceTests(Step):

@property
def base_cat_command(self) -> List[str]:
return [
command = [
"python",
"-m",
"pytest",
"-p",
"--disable-warnings",
"--durations=3", # Show the 3 slowest tests in the report
"-ra", # Show extra test summary info in the report for all but the passed tests
"-p", # Load the connector_acceptance_test plugin
"connector_acceptance_test.plugin",
"--acceptance-test-config",
self.CONTAINER_TEST_INPUT_DIRECTORY,
]
if self.concurrent_test_run:
command += ["--numprocesses=auto"] # Using pytest-xdist to run tests in parallel, auto means using all available cores
return command

def __init__(self, context: PipelineContext, concurrent_test_run: Optional[bool] = False) -> None:
"""Create a step to run acceptance tests for a connector if it has an acceptance test config file.
Args:
context (PipelineContext): The current test context, providing a connector object, a dagger client and a repository directory.
concurrent_test_run (Optional[bool], optional): Whether to run acceptance tests in parallel. Defaults to False.
"""
super().__init__(context)
self.concurrent_test_run = concurrent_test_run

async def get_cat_command(self, connector_dir: Directory) -> List[str]:
"""
Expand All @@ -209,19 +226,20 @@ async def get_cat_command(self, connector_dir: Directory) -> List[str]:
cat_command += ["-p", "integration_tests.acceptance"]
return cat_command

async def _run(self, connector_under_test_image_tar: File) -> StepResult:
async def _run(self, connector_under_test_container: Container) -> StepResult:
"""Run the acceptance test suite on a connector dev image. Build the connector acceptance test image if the tag is :dev.
Args:
connector_under_test_image_tar (File): The file holding the tar archive of the connector image.
connector_under_test_container (Container): The container holding the connector under test image.
Returns:
StepResult: Failure or success of the acceptances tests with stdout and stderr.
"""

if not self.context.connector.acceptance_test_config:
return StepResult(self, StepStatus.SKIPPED)
connector_dir = await self.context.get_connector_dir()
cat_container = await self._build_connector_acceptance_test(connector_under_test_image_tar, connector_dir)
cat_container = await self._build_connector_acceptance_test(connector_under_test_container, connector_dir)
cat_command = await self.get_cat_command(connector_dir)
cat_container = cat_container.with_(hacks.never_fail_exec(cat_command))
step_result = await self.get_step_result(cat_container)
Expand All @@ -234,28 +252,22 @@ async def _run(self, connector_under_test_image_tar: File) -> StepResult:
break
return step_result

async def get_cache_buster(self, connector_under_test_image_tar: File) -> str:
async def get_cache_buster(self) -> str:
"""
This bursts the CAT cached results everyday and on new version or image size change.
It's cool because in case of a partially failing nightly build the connectors that already ran CAT won't re-run CAT.
We keep the guarantee that a CAT runs everyday.
Args:
connector_under_test_image_tar (File): The file holding the tar archive of the connector image.
Returns:
str: A string representing the cachebuster value.
"""
return (
datetime.datetime.utcnow().strftime("%Y%m%d")
+ self.context.connector.version
+ str(await connector_under_test_image_tar.size())
)
return datetime.datetime.utcnow().strftime("%Y%m%d") + self.context.connector.version

async def _build_connector_acceptance_test(self, connector_under_test_image_tar: File, test_input: Directory) -> Container:
async def _build_connector_acceptance_test(self, connector_under_test_container: Container, test_input: Directory) -> Container:
"""Create a container to run connector acceptance tests.
Args:
connector_under_test_image_tar (File): The file containing the tar archive of the image of the connector under test.
connector_under_test_container (Container): The container holding the connector under test image.
test_input (Directory): The connector under test directory.
Returns:
Container: A container with connector acceptance tests installed.
Expand All @@ -266,12 +278,13 @@ async def _build_connector_acceptance_test(self, connector_under_test_image_tar:
else:
cat_container = self.dagger_client.container().from_(self.context.connector_acceptance_test_image)

connector_container_id = await connector_under_test_container.id()

cat_container = (
cat_container.with_env_variable("RUN_IN_AIRBYTE_CI", "1")
.with_exec(["mkdir", "/dagger_share"], skip_entrypoint=True)
.with_env_variable("CACHEBUSTER", await self.get_cache_buster(connector_under_test_image_tar))
.with_mounted_file("/dagger_share/connector_under_test_image.tar", connector_under_test_image_tar)
.with_env_variable("CONNECTOR_UNDER_TEST_IMAGE_TAR_PATH", "/dagger_share/connector_under_test_image.tar")
.with_env_variable("CACHEBUSTER", await self.get_cache_buster())
.with_new_file("/tmp/container_id.txt", str(connector_container_id))
.with_workdir("/test_input")
.with_mounted_directory("/test_input", test_input)
.with_(await secrets.mounted_connector_secrets(self.context, "/test_input/secrets"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,7 @@ async def run_docker_build_dependent_steps(dist_tar_dir: Directory) -> List[Step
soon_integration_tests_results = docker_build_dependent_group.soonify(IntegrationTests(context).run)(
connector_tar_file=connector_image_tar_file, normalization_tar_file=normalization_tar_file
)
soon_cat_results = docker_build_dependent_group.soonify(AcceptanceTests(context).run)(
connector_under_test_image_tar=connector_image_tar_file
)
soon_cat_results = docker_build_dependent_group.soonify(AcceptanceTests(context, True).run)(connector_container)

step_results += [soon_cat_results.value, soon_integration_tests_results.value]
return step_results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from pipelines.airbyte_ci.connectors.test.steps.common import AcceptanceTests, CheckBaseImageIsUsed
from pipelines.consts import LOCAL_BUILD_PLATFORM, PYPROJECT_TOML_FILE_PATH
from pipelines.dagger.actions import secrets
from pipelines.helpers.utils import export_container_to_tarball
from pipelines.models.steps import Step, StepResult, StepStatus


Expand Down Expand Up @@ -215,7 +214,6 @@ async def run_all_tests(context: ConnectorContext) -> List[StepResult]:
step_results.append(build_connector_image_results)

connector_container = build_connector_image_results.output_artifact[LOCAL_BUILD_PLATFORM]
connector_image_tar_file, _ = await export_container_to_tarball(context, connector_container)

context.connector_secrets = await secrets.get_connector_secrets(context)

Expand All @@ -227,7 +225,7 @@ async def run_all_tests(context: ConnectorContext) -> List[StepResult]:
async with asyncer.create_task_group() as task_group:
tasks = [
task_group.soonify(IntegrationTests(context).run)(connector_container),
task_group.soonify(AcceptanceTests(context).run)(connector_image_tar_file),
task_group.soonify(AcceptanceTests(context, context.concurrent_cat).run)(connector_container),
task_group.soonify(CheckBaseImageIsUsed(context).run)(),
]

Expand Down
2 changes: 1 addition & 1 deletion airbyte-ci/connectors/pipelines/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "pipelines"
version = "2.2.6"
version = "2.3.0"
description = "Packaged maintained by the connector operations team to perform CI for connectors' pipelines"
authors = ["Airbyte <contact@airbyte.io>"]

Expand Down
30 changes: 13 additions & 17 deletions airbyte-ci/connectors/pipelines/tests/test_tests/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,12 @@ def test_context(self, mocker, dagger_client):
return mocker.MagicMock(connector=ConnectorWithModifiedFiles("source-faker", frozenset()), dagger_client=dagger_client)

@pytest.fixture
def dummy_connector_under_test_image_tar(self, dagger_client, tmpdir) -> dagger.File:
dummy_tar_file = tmpdir / "dummy.tar"
dummy_tar_file.write_text("dummy", encoding="utf8")
return dagger_client.host().directory(str(tmpdir), include=["dummy.tar"]).file("dummy.tar")
def dummy_connector_under_test_container(self, dagger_client) -> dagger.Container:
return dagger_client.container().from_("airbyte/source-faker:latest")

@pytest.fixture
def another_dummy_connector_under_test_image_tar(self, dagger_client, tmpdir) -> dagger.File:
dummy_tar_file = tmpdir / "another_dummy.tar"
dummy_tar_file.write_text("another_dummy", encoding="utf8")
return dagger_client.host().directory(str(tmpdir), include=["another_dummy.tar"]).file("another_dummy.tar")
def another_dummy_connector_under_test_container(self, dagger_client) -> dagger.File:
return dagger_client.container().from_("airbyte/source-pokeapi:latest")

async def test_skipped_when_no_acceptance_test_config(self, mocker, test_context):
test_context.connector = mocker.MagicMock(acceptance_test_config=None)
Expand Down Expand Up @@ -160,7 +156,7 @@ def get_patched_acceptance_test_step(self, dagger_client, mocker, test_context,
return common.AcceptanceTests(test_context)

async def test_cat_container_provisioning(
self, dagger_client, mocker, test_context, test_input_dir, dummy_connector_under_test_image_tar
self, dagger_client, mocker, test_context, test_input_dir, dummy_connector_under_test_container
):
"""Check that the acceptance test container is correctly provisioned.
We check that:
Expand All @@ -175,7 +171,7 @@ async def test_cat_container_provisioning(
test_context.is_local = False
test_context.is_ci = True
acceptance_test_step = self.get_patched_acceptance_test_step(dagger_client, mocker, test_context, test_input_dir)
cat_container = await acceptance_test_step._build_connector_acceptance_test(dummy_connector_under_test_image_tar, test_input_dir)
cat_container = await acceptance_test_step._build_connector_acceptance_test(dummy_connector_under_test_container, test_input_dir)
assert (await cat_container.with_exec(["pwd"]).stdout()).strip() == acceptance_test_step.CONTAINER_TEST_INPUT_DIRECTORY
test_input_ls_result = await cat_container.with_exec(["ls"]).stdout()
assert all(
Expand All @@ -191,8 +187,8 @@ async def test_cat_container_caching(
mocker,
test_context,
test_input_dir,
dummy_connector_under_test_image_tar,
another_dummy_connector_under_test_image_tar,
dummy_connector_under_test_container,
another_dummy_connector_under_test_container,
):
"""Check that the acceptance test container caching behavior is correct."""

Expand All @@ -201,15 +197,15 @@ async def test_cat_container_caching(
with freeze_time(initial_datetime) as frozen_datetime:
acceptance_test_step = self.get_patched_acceptance_test_step(dagger_client, mocker, test_context, test_input_dir)
cat_container = await acceptance_test_step._build_connector_acceptance_test(
dummy_connector_under_test_image_tar, test_input_dir
dummy_connector_under_test_container, test_input_dir
)
cat_container = cat_container.with_exec(["date"])
fist_date_result = await cat_container.stdout()

frozen_datetime.tick(delta=datetime.timedelta(hours=5))
# Check that cache is used in the same day
cat_container = await acceptance_test_step._build_connector_acceptance_test(
dummy_connector_under_test_image_tar, test_input_dir
dummy_connector_under_test_container, test_input_dir
)
cat_container = cat_container.with_exec(["date"])
second_date_result = await cat_container.stdout()
Expand All @@ -218,16 +214,16 @@ async def test_cat_container_caching(
# Check that cache bursted after a day
frozen_datetime.tick(delta=datetime.timedelta(days=1, seconds=1))
cat_container = await acceptance_test_step._build_connector_acceptance_test(
dummy_connector_under_test_image_tar, test_input_dir
dummy_connector_under_test_container, test_input_dir
)
cat_container = cat_container.with_exec(["date"])
third_date_result = await cat_container.stdout()
assert third_date_result != second_date_result

time.sleep(1)
# Check that changing the tarball invalidates the cache
# Check that changing the container invalidates the cache
cat_container = await acceptance_test_step._build_connector_acceptance_test(
another_dummy_connector_under_test_image_tar, test_input_dir
another_dummy_connector_under_test_container, test_input_dir
)
cat_container = cat_container.with_exec(["date"])
fourth_date_result = await cat_container.stdout()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## 2.1.0
Make the container under test a sessions scoped fixture.
Support loading it from its Dagger container id for better performance.
Install pytest-xdist to support running tests in parallel.

## 2.0.2
Make `test_two_sequential_reads` handle namespace property in stream descriptor.

Expand Down

0 comments on commit ff2fcf8

Please sign in to comment.