Skip to content

Commit

Permalink
connectors-ci: improve concurrency and implement correct DAG logic (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
alafanechere committed Mar 15, 2023
1 parent 4d69d6e commit 26c3151
Show file tree
Hide file tree
Showing 10 changed files with 459 additions and 369 deletions.
40 changes: 19 additions & 21 deletions tools/ci_connector_ops/ci_connector_ops/pipelines/README.md
Expand Up @@ -82,24 +82,22 @@ The main differences are that:
## What does a connector pipeline run

```mermaid
flowchart LR;
AB_GIT_REPO[Airbyte Git Repo] --> MOUNT_AB[Mount Airbyte repo to container];
AB_GIT_REPO[Airbyte Git Repo] --> MOUNT_CONN[Mount connector source code to container];
DOWN_SECRETS[Download secrets from GSM]-->CAT
MOUNT_AB-->QA[Run QA checks];
MOUNT_CONN-->FORMAT[Code format checks];
MOUNT_CONN-->INSTALL_CONN[Install connector package in container];
INSTALL_CONN-->UNIT_TESTS[Run unit tests];
UNIT_TESTS-->INTEGRATION_TESTS[Run integration tests];
UNIT_TESTS-->DOCKER_BUILD[Docker build connector dev image];
DOCKER_BUILD-->CAT[Run acceptance tests];
CAT-->UPLOAD_SECRETS[Upload updated secrets to GSM];
CAT-->REPORT[Build test report];
UNIT_TESTS-->REPORT;
INTEGRATION_TESTS-->REPORT;
QA-->REPORT;
FORMAT-->REPORT;
REPORT--if in CI-->UPLOAD[Upload to S3];
flowchart TB
conn([Connector pipeline])==>qa & fmt & tu
src[[Mount connector source code to container]]-->fmt & pkg & ta
fmt & tu & ti & ta & qa -.-> r([Build test report])--if in CI-->s3[[Upload to S3]]
subgraph "Checks (static code analysis)"
repo[[Mount Airbyte repo to container]]-->qa[Run QA checks]
fmt[Run code format checks]
end
subgraph Tests
tu[Run unit tests]==>ti[Run integration tests] & ta[Run acceptance tests]
pkg[[Install connector package in container]]-->tu & ti
dsec[[Download secrets from GSM]] --> ti & ta
subgraph Acceptance
bld[[Build connector dev image]]-->ta-->upsec[[Upload updated secrets to GSM]]
end
end
```

This is the DAG we expect for every connector for which the pipeline is triggered.
Expand Down Expand Up @@ -146,6 +144,6 @@ A log grouping tool is under construction: https://www.youtube.com/watch&ab_chan

### Performance benchmarks

| Connector | Run integration test GHA duration | Dagger POC duration (local) | Dagger POC duration (CI) |
|----------------|-----------------------------------|-----------------------------|--------------------------|
| source-pokeapi | 3mn45s | 1mn48 | TBD |
| Connector | Run integration test GHA duration | Dagger POC duration (CI no cache) |
| -------------- | ---------------------------------------------------------------------- | ---------------------------------------------------------------------- |
| source-pokeapi | [7mn22s](https://github.com/airbytehq/airbyte/actions/runs/4395453220) | [5mn26s](https://github.com/airbytehq/airbyte/actions/runs/4403595746) |
Expand Up @@ -31,7 +31,7 @@
CI_CONNECTOR_OPS_SOURCE_PATH = "tools/ci_connector_ops"


async def with_python_base(context: ConnectorTestContext, python_image_name: str = "python:3.9-slim") -> Container:
def with_python_base(context: ConnectorTestContext, python_image_name: str = "python:3.9-slim") -> Container:
"""Builds a Python container with a cache volume for pip cache.
Args:
Expand All @@ -56,7 +56,7 @@ async def with_python_base(context: ConnectorTestContext, python_image_name: str
)


async def with_testing_dependencies(context: ConnectorTestContext) -> Container:
def with_testing_dependencies(context: ConnectorTestContext) -> Container:
"""Builds a testing environment by installing testing dependencies on top of a python base environment.
Args:
Expand All @@ -65,33 +65,30 @@ async def with_testing_dependencies(context: ConnectorTestContext) -> Container:
Returns:
Container: The testing environment container.
"""
python_environment: Container = await with_python_base(context)
python_environment: Container = with_python_base(context)
pyproject_toml_file = context.get_repo_dir(".", include=[PYPROJECT_TOML_FILE_PATH]).file(PYPROJECT_TOML_FILE_PATH)
return python_environment.with_exec(["pip", "install"] + CONNECTOR_TESTING_REQUIREMENTS).with_file(
f"/{PYPROJECT_TOML_FILE_PATH}", pyproject_toml_file
)


async def with_python_package(
def with_python_package(
context: ConnectorTestContext,
python_environment: Container,
package_source_code_path: str,
additional_dependency_groups: Optional[List] = None,
exclude: Optional[List] = None,
install: bool = True,
) -> Container:
"""Installs a python package in a python environment container.
"""Load a python package source code to a python environment container.
Args:
context (ConnectorTestContext): The current test context, providing the repository directory from which the python sources will be pulled.
python_environment (Container): An existing python environment in which the package will be installed.
package_source_code_path (str): The local path to the package source code.
additional_dependency_groups (Optional[List]): extra_requires dependency of setup.py to install. Defaults to None.
exclude (Optional[List]): A list of file or directory to exclude from the python package source code.
install (bool): Whether to install the python package or not. Defaults to True.
Returns:
Container: A python environment container with the python package installed.
Container: A python environment container with the python package source code.
"""
if exclude:
exclude = DEFAULT_PYTHON_EXCLUDE + exclude
Expand All @@ -101,45 +98,74 @@ async def with_python_package(
container = python_environment.with_mounted_directory("/" + package_source_code_path, package_source_code_directory).with_workdir(
"/" + package_source_code_path
)
return container


async def with_installed_python_package(
context: ConnectorTestContext,
python_environment: Container,
package_source_code_path: str,
additional_dependency_groups: Optional[List] = None,
exclude: Optional[List] = None,
) -> Container:
"""Installs a python package in a python environment container.
Args:
context (ConnectorTestContext): The current test context, providing the repository directory from which the python sources will be pulled.
python_environment (Container): An existing python environment in which the package will be installed.
package_source_code_path (str): The local path to the package source code.
additional_dependency_groups (Optional[List]): extra_requires dependency of setup.py to install. Defaults to None.
exclude (Optional[List]): A list of file or directory to exclude from the python package source code.
Returns:
Container: A python environment container with the python package installed.
"""

if install:
if requirements_txt := await get_file_contents(container, "requirements.txt"):
for line in requirements_txt.split("\n"):
if line.startswith("-e ."):
local_dependency_path = package_source_code_path + "/" + line[3:]
container = container.with_mounted_directory(
"/" + local_dependency_path, context.get_repo_dir(local_dependency_path, exclude=DEFAULT_PYTHON_EXCLUDE)
)
container = container.with_exec(INSTALL_LOCAL_REQUIREMENTS_CMD)
container = with_python_package(context, python_environment, package_source_code_path, exclude=exclude)
if requirements_txt := await get_file_contents(container, "requirements.txt"):
for line in requirements_txt.split("\n"):
if line.startswith("-e ."):
local_dependency_path = package_source_code_path + "/" + line[3:]
container = container.with_mounted_directory(
"/" + local_dependency_path, context.get_repo_dir(local_dependency_path, exclude=DEFAULT_PYTHON_EXCLUDE)
)
container = container.with_exec(INSTALL_LOCAL_REQUIREMENTS_CMD)

container = container.with_exec(INSTALL_CONNECTOR_PACKAGE_CMD)
container = container.with_exec(INSTALL_CONNECTOR_PACKAGE_CMD)

if additional_dependency_groups:
container = container.with_exec(
INSTALL_CONNECTOR_PACKAGE_CMD[:-1] + [INSTALL_CONNECTOR_PACKAGE_CMD[-1] + f"[{','.join(additional_dependency_groups)}]"]
)
if additional_dependency_groups:
container = container.with_exec(
INSTALL_CONNECTOR_PACKAGE_CMD[:-1] + [INSTALL_CONNECTOR_PACKAGE_CMD[-1] + f"[{','.join(additional_dependency_groups)}]"]
)

return container


async def with_airbyte_connector(context: ConnectorTestContext, install: bool = True) -> Container:
def with_airbyte_connector(context: ConnectorTestContext) -> Container:
"""Load an airbyte connector source code in a testing environment.
Args:
context (ConnectorTestContext): The current test context, providing the repository directory from which the connector sources will be pulled.
Returns:
Container: A python environment container (with the connector source code).
"""
connector_source_path = str(context.connector.code_directory)
testing_environment: Container = with_testing_dependencies(context)
return with_python_package(context, testing_environment, connector_source_path, exclude=["secrets"])


async def with_installed_airbyte_connector(context: ConnectorTestContext) -> Container:
"""Installs an airbyte connector python package in a testing environment.
Args:
context (ConnectorTestContext): The current test context, providing the repository directory from which the connector sources will be pulled.
install (bool): Whether to install the connector package or not. Defaults to True.
Returns:
Container: A python environment container (with the connector installed if install == True).
Container: A python environment container (with the connector installed).
"""
connector_source_path = str(context.connector.code_directory)
testing_environment: Container = await with_testing_dependencies(context)
return await with_python_package(
context,
testing_environment,
connector_source_path,
additional_dependency_groups=["dev", "tests", "main"],
exclude=["secrets"],
install=install,
testing_environment: Container = with_testing_dependencies(context)
return await with_installed_python_package(
context, testing_environment, connector_source_path, additional_dependency_groups=["dev", "tests", "main"], exclude=["secrets"]
)


Expand All @@ -153,8 +179,8 @@ async def with_ci_credentials(context: ConnectorTestContext, gsm_secret: Secret)
Returns:
Container: A python environment with the ci_credentials package installed.
"""
python_base_environment: Container = await with_python_base(context)
ci_credentials = await with_python_package(context, python_base_environment, CI_CREDENTIALS_SOURCE_PATH)
python_base_environment: Container = with_python_base(context)
ci_credentials = await with_installed_python_package(context, python_base_environment, CI_CREDENTIALS_SOURCE_PATH)

return ci_credentials.with_env_variable("VERSION", "dev").with_secret_variable("GCP_GSM_CREDENTIALS", gsm_secret).with_workdir("/")

Expand All @@ -168,6 +194,6 @@ async def with_ci_connector_ops(context: ConnectorTestContext) -> Container:
Returns:
Container: A python environment container with ci_connector_ops installed.
"""
python_base_environment: Container = await with_python_base(context, "python:3-alpine")
python_base_environment: Container = with_python_base(context, "python:3-alpine")
python_with_git = python_base_environment.with_exec(["apk", "add", "gcc", "libffi-dev", "musl-dev", "git"])
return await with_python_package(context, python_with_git, CI_CONNECTOR_OPS_SOURCE_PATH, exclude=["pipelines"])
return await with_installed_python_package(context, python_with_git, CI_CONNECTOR_OPS_SOURCE_PATH, exclude=["pipelines"])
Expand Up @@ -3,6 +3,7 @@
#
from pathlib import Path

from ci_connector_ops.pipelines.utils import with_exit_code
from dagger import Client, File, Secret


Expand All @@ -22,13 +23,12 @@ async def upload_to_s3(dagger_client: Client, file_to_upload_path: Path, key: st
aws_access_key_id: Secret = dagger_client.host().env_variable("AWS_ACCESS_KEY_ID").secret()
aws_secret_access_key: Secret = dagger_client.host().env_variable("AWS_SECRET_ACCESS_KEY").secret()
aws_region: Secret = dagger_client.host().env_variable("AWS_DEFAULT_REGION").secret()
return await (
return await with_exit_code(
dagger_client.container()
.from_("amazon/aws-cli:latest")
.with_file(str(file_to_upload_path), file_to_upload)
.with_secret_variable("AWS_ACCESS_KEY_ID", aws_access_key_id)
.with_secret_variable("AWS_SECRET_ACCESS_KEY", aws_secret_access_key)
.with_secret_variable("AWS_DEFAULT_REGION", aws_region)
.with_exec(["s3", "cp", str(file_to_upload_path), f"s3://{bucket}/{key}"])
.exit_code()
)
Expand Up @@ -27,7 +27,7 @@ async def download(context: ConnectorTestContext, gcp_gsm_env_variable_name: str
Directory: A directory with the downloaded secrets.
"""
gsm_secret = context.dagger_client.host().env_variable(gcp_gsm_env_variable_name).secret()
secrets_path = "/" + str(context.connector.code_directory) + "/secrets"
secrets_path = f"/{context.connector.code_directory}/secrets"

ci_credentials = await environments.with_ci_credentials(context, gsm_secret)
return (
Expand All @@ -51,7 +51,7 @@ async def upload(context: ConnectorTestContext, gcp_gsm_env_variable_name: str =
int: The exit code of the ci-credentials update-secrets command.
"""
gsm_secret = context.dagger_client.host().env_variable(gcp_gsm_env_variable_name).secret()
secrets_path = "/" + str(context.connector.code_directory) + "/secrets"
secrets_path = f"/{context.connector.code_directory}/secrets"

ci_credentials = await environments.with_ci_credentials(context, gsm_secret)

Expand Down

0 comments on commit 26c3151

Please sign in to comment.