From 5af9696e372669b884593fe8371f0922abb7f966 Mon Sep 17 00:00:00 2001 From: Augustin Date: Wed, 7 Feb 2024 18:02:56 +0100 Subject: [PATCH] airbyte-ci: run poe tasks declared in pyproject.toml file of internal poetry packages (#34736) --- .github/workflows/airbyte-ci-tests.yml | 144 +++------ .github/workflows/cat-tests.yml | 35 --- airbyte-ci/connectors/pipelines/README.md | 33 +- .../connectors/build_image/steps/common.py | 14 +- .../build_image/steps/java_connectors.py | 4 +- .../build_image/steps/normalization.py | 2 +- .../connectors/bump_version/pipeline.py | 20 +- .../airbyte_ci/connectors/commands.py | 18 +- .../migrate_to_base_image/pipeline.py | 36 +-- .../airbyte_ci/connectors/publish/pipeline.py | 32 +- .../connectors/test/steps/common.py | 34 +-- .../connectors/test/steps/java_connectors.py | 2 +- .../connectors/upgrade_cdk/pipeline.py | 16 +- .../airbyte_ci/format/format_command.py | 8 +- .../pipelines/airbyte_ci/steps/no_op.py | 2 +- .../pipelines/airbyte_ci/test/__init__.py | 16 + .../pipelines/airbyte_ci/test/commands.py | 220 ++++++------- .../pipelines/airbyte_ci/test/pipeline.py | 289 ++++++++++++++++++ .../pipelines/pipelines/cli/airbyte_ci.py | 3 +- .../pipelines/helpers/execution/run_steps.py | 2 +- .../pipelines/pipelines/helpers/git.py | 16 + .../pipelines/pipelines/models/steps.py | 86 ++++-- .../connectors/pipelines/pyproject.toml | 2 +- .../connectors/pipelines/tests/test_bases.py | 17 +- .../test_steps/test_common.py | 2 +- .../connectors/pipelines/tests/test_gradle.py | 2 +- .../test_execution/test_run_steps.py | 10 +- .../pipelines/tests/test_tests/test_common.py | 23 +- 28 files changed, 646 insertions(+), 442 deletions(-) delete mode 100644 .github/workflows/cat-tests.yml create mode 100644 airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/test/pipeline.py diff --git a/.github/workflows/airbyte-ci-tests.yml b/.github/workflows/airbyte-ci-tests.yml index 098943c75627a9..6d6ec1aa2f4f35 100644 --- a/.github/workflows/airbyte-ci-tests.yml +++ b/.github/workflows/airbyte-ci-tests.yml @@ -1,4 +1,4 @@ -name: Connector Ops CI - Pipeline Unit Test +name: Internal Poetry packages CI concurrency: group: ${{ github.workflow }}-${{ github.ref }} @@ -6,15 +6,24 @@ concurrency: on: workflow_dispatch: + inputs: + airbyte_ci_subcommand: + description: "Subcommand to pass to the 'airbyte-ci test' command" + default: "--poetry-package-path=airbyte-ci/connectors/pipelines" pull_request: types: - opened - reopened - synchronize jobs: - run-airbyte-ci-tests: + run-airbyte-ci-poetry-ci: + #name: Internal Poetry packages CI + # To rename in a follow up PR name: Run Airbyte CI tests runs-on: tooling-test-large + permissions: + pull-requests: read + statuses: write steps: - name: Checkout Airbyte uses: actions/checkout@v3 @@ -22,121 +31,50 @@ jobs: fetch-depth: 0 ref: ${{ github.event.pull_request.head.ref }} - # IMPORTANT! This is nessesary to make sure that a status is reported on the PR - # even if the workflow is skipped. If we used github actions filters, the workflow - # would not be reported as skipped, but instead would be forever pending. - # - # I KNOW THIS SOUNDS CRAZY, BUT IT IS TRUE. - # - # Also it gets worse - # - # IMPORTANT! DO NOT CHANGE THE QUOTES AROUND THE GLOBS. THEY ARE REQUIRED. - # MAKE SURE TO TEST ANY SYNTAX CHANGES BEFORE MERGING. - - name: Get changed files - uses: tj-actions/changed-files@v39 - id: changes - with: - files_yaml: | - ops: - - 'airbyte-ci/connectors/connector_ops/**' - - '!**/*.md' - base_images: - - 'airbyte-ci/connectors/connector_ops/**' - - 'airbyte-ci/connectors/base_images/**' - - '!**/*.md' - pipelines: - - 'airbyte-ci/connectors/connector_ops/**' - - 'airbyte-ci/connectors/base_images/**' - - 'airbyte-ci/connectors/pipelines/**' - - '!**/*.md' - metadata_lib: - - 'airbyte-ci/connectors/metadata_service/lib/**' - - '!**/*.md' - metadata_orchestrator: - - 'airbyte-ci/connectors/metadata_service/lib/**' - - 'airbyte-ci/connectors/metadata_service/orchestrator/**' - - '!**/*.md' - airbyte_lib: - - 'airbyte-lib/**' - - '!**/*.md' - - - name: Run airbyte-ci/connectors/connector_ops tests - if: steps.changes.outputs.ops_any_changed == 'true' - id: run-airbyte-ci-connectors-connector-ops-tests - uses: ./.github/actions/run-airbyte-ci - with: - context: "pull_request" - dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }} - docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }} - docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }} - gcs_credentials: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }} - sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }} - github_token: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }} - subcommand: "test airbyte-ci/connectors/connector_ops --poetry-run-command='pytest tests'" - - - name: Run airbyte-ci/connectors/pipelines tests - id: run-airbyte-ci-connectors-pipelines-tests - if: steps.changes.outputs.pipelines_any_changed == 'true' - uses: ./.github/actions/run-airbyte-ci - with: - context: "pull_request" - dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }} - docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }} - docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }} - gcs_credentials: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }} - sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }} - github_token: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }} - subcommand: "test airbyte-ci/connectors/pipelines --poetry-run-command='pytest tests' --poetry-run-command='mypy pipelines --disallow-untyped-defs' --poetry-run-command='ruff check pipelines'" + - name: Extract branch name [WORKFLOW DISPATCH] + shell: bash + if: github.event_name == 'workflow_dispatch' + run: echo "branch=${GITHUB_REF#refs/heads/}" >> $GITHUB_OUTPUT + id: extract_branch + - name: Fetch last commit id from remote branch [PULL REQUESTS] + if: github.event_name == 'pull_request' + id: fetch_last_commit_id_pr + run: echo "commit_id=$(git ls-remote --heads origin ${{ github.head_ref }} | cut -f 1)" >> $GITHUB_OUTPUT + - name: Fetch last commit id from remote branch [WORKFLOW DISPATCH] + if: github.event_name == 'workflow_dispatch' + id: fetch_last_commit_id_wd + run: echo "commit_id=$(git rev-parse origin/${{ steps.extract_branch.outputs.branch }})" >> $GITHUB_OUTPUT - - name: Run airbyte-ci/connectors/base_images tests - id: run-airbyte-ci-connectors-base-images-tests - if: steps.changes.outputs.base_images_any_changed == 'true' + - name: Run poe tasks for modified internal packages [PULL REQUEST] + if: github.event_name == 'pull_request' + id: run-airbyte-ci-test-pr uses: ./.github/actions/run-airbyte-ci with: context: "pull_request" dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }} docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }} docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }} + gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }} gcs_credentials: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }} + git_branch: ${{ github.head_ref }} + git_revision: ${{ steps.fetch_last_commit_id_pr.outputs.commit_id }} + github_token: ${{ github.token }} sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }} - github_token: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }} - subcommand: "test airbyte-ci/connectors/base_images --poetry-run-command='pytest tests'" - - - name: Run test pipeline for the metadata lib - id: metadata_lib-test-pipeline - if: steps.changes.outputs.metadata_lib_any_changed == 'true' - uses: ./.github/actions/run-airbyte-ci - with: - subcommand: "test airbyte-ci/connectors/metadata_service/lib/ --poetry-run-command='pytest tests'" - context: "pull_request" - dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }} - github_token: ${{ secrets.GITHUB_TOKEN }} - docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }} - docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }} - - - name: Run test for the metadata orchestrator - id: metadata_orchestrator-test-pipeline - if: steps.changes.outputs.metadata_orchestrator_any_changed == 'true' - uses: ./.github/actions/run-airbyte-ci - with: - subcommand: "test airbyte-ci/connectors/metadata_service/orchestrator/ --poetry-run-command='pytest tests'" - context: "pull_request" - dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }} - github_token: ${{ secrets.GITHUB_TOKEN }} - docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }} - docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }} + subcommand: "test --modified" - - name: Run airbyte-lib tests - if: steps.changes.outputs.airbyte_lib_any_changed == 'true' - id: run-airbyte-lib-tests + - name: Run poe tasks for requested internal packages [WORKFLOW DISPATCH] + id: run-airbyte-ci-test-workflow-dispatch + if: github.event_name == 'workflow_dispatch' uses: ./.github/actions/run-airbyte-ci with: - context: "pull_request" + context: "manual" dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }} docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }} docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }} - gcs_credentials: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }} gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }} + gcs_credentials: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }} + git_branch: ${{ steps.extract_branch.outputs.branch }} + git_revision: ${{ steps.fetch_last_commit_id_pr.outputs.commit_id }} + github_token: ${{ github.token }} sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }} - github_token: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }} - subcommand: "test airbyte-lib --side-car-docker-engine --pass-env-var=GCP_GSM_CREDENTIALS --poetry-run-command='pytest'" + subcommand: "test ${{ inputs.airbyte_ci_subcommand}}" diff --git a/.github/workflows/cat-tests.yml b/.github/workflows/cat-tests.yml deleted file mode 100644 index 6b5732526d624b..00000000000000 --- a/.github/workflows/cat-tests.yml +++ /dev/null @@ -1,35 +0,0 @@ -name: Connector Ops CI - CAT Unit Tests - -concurrency: - group: ${{ github.workflow }}-${{ github.ref }} - cancel-in-progress: true - -on: - workflow_dispatch: - pull_request: - types: - - opened - - reopened - - synchronize - paths: - - airbyte-integrations/bases/connector-acceptance-test/** -jobs: - run-cat-unit-tests: - name: Run CAT unit tests - runs-on: tooling-test-large - steps: - - name: Checkout Airbyte - uses: actions/checkout@v3 - - name: Run CAT unit tests - id: run-cat-unit-tests - uses: ./.github/actions/run-airbyte-ci - with: - context: "pull_request" - dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }} - docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }} - docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }} - gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }} - gcs_credentials: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }} - sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }} - github_token: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }} - subcommand: "test airbyte-integrations/bases/connector-acceptance-test --poetry-run-command='pytest unit_tests'" diff --git a/airbyte-ci/connectors/pipelines/README.md b/airbyte-ci/connectors/pipelines/README.md index f18fdc144c05fc..1d35871303b2df 100644 --- a/airbyte-ci/connectors/pipelines/README.md +++ b/airbyte-ci/connectors/pipelines/README.md @@ -613,39 +613,34 @@ flowchart TD ### `tests` command -This command runs the Python tests for a airbyte-ci poetry package. +This command runs the poe tasks declared in the `[tool.airbyte-ci]` section of our internal poetry packages. +Feel free to checkout this [Pydantic model](https://github.com/airbytehq/airbyte/blob/main/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/test/models.py#L9) to see the list of available options in `[tool.airbyte-ci]` section. -#### Arguments - -| Option | Required | Default | Mapped environment variable | Description | -| --------------------- | -------- | ------- | --------------------------- | ----------------------------------- | -| `poetry_package_path` | True | | | The path to poetry package to test. | +You can find the list of internal packages [here](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/test/__init__.py#L1) #### Options -| Option | Required | Default | Mapped environment variable | Description | -| ------------------------- | -------- | ------- | --------------------------- | ------------------------------------------------------------------------------------------- | -| `-c/--poetry-run-command` | True | None | | The command to run with `poetry run` | -| `-e/--pass-env-var` | False | None | | Host environment variable that is passed to the container running the poetry command | -| `--ci-requirements` | False | | | Output the CI requirements as a JSON payload. It is used to determine the CI runner to use. | +| Option | Required | Multiple| Description | +| ------------------------- | -------- | ------- | ------------------------------------------------------------------------------------------- | +| `--poetry-package-path/-p`| False | True | Poetry packages path to run the poe tasks for. | +| `--modified` | False | False | Run poe tasks of modified internal poetry packages. | +| `--ci-requirements` | False | False | Output the CI requirements as a JSON payload. It is used to determine the CI runner to use. | #### Examples +You can pass multiple `--poetry-package-path` options to run poe tasks. -You can pass multiple `-c/--poetry-run-command` options to run multiple commands. - -E.G.: running `pytest` and `mypy`: -`airbyte-ci test airbyte-ci/connectors/pipelines --poetry-run-command='pytest tests' --poetry-run-command='mypy pipelines'` +E.G.: running Poe tasks on `airbyte-lib` and `airbyte-ci/connectors/pipelines`: +`airbyte-ci test --poetry-package-path=airbyte-ci/connectors/pipelines --poetry-package-path=airbyte-lib` -E.G.: passing the environment variable `GCP_GSM_CREDENTIALS` environment variable to the container -running the poetry command: `airbyte-ci test airbyte-lib --pass-env-var='GCP_GSM_CREDENTIALS'` +E.G.: running Poe tasks on the modified internal packages of the current branch: +`airbyte-ci test --modified` -E.G.: running `pytest` on a specific test folder: -`airbyte-ci tests airbyte-integrations/bases/connector-acceptance-test --poetry-run-command='pytest tests/unit_tests'` ## Changelog | Version | PR | Description | | ------- | ---------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------- | +| 4.0.0 | [#34736](https://github.com/airbytehq/airbyte/pull/34736) | Run poe tasks declared in internal poetry packages. | | 3.10.4 | [#34867](https://github.com/airbytehq/airbyte/pull/34867) | Remove connector ops team | | 3.10.3 | [#34836](https://github.com/airbytehq/airbyte/pull/34836) | Add check for python registry publishing enabled for certified python sources. | | 3.10.2 | [#34044](https://github.com/airbytehq/airbyte/pull/34044) | Add pypi validation testing. | diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/common.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/common.py index 2b03a45b3f7587..ce07f20846072c 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/common.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/common.py @@ -39,16 +39,16 @@ async def _run(self, *args: Any) -> StepResult: await connector.with_exec(["spec"]) except ExecError: return StepResult( - self, StepStatus.FAILURE, stderr=f"Failed to run spec on the connector built for platform {platform}." + step=self, status=StepStatus.FAILURE, stderr=f"Failed to run spec on the connector built for platform {platform}." ) build_results_per_platform[platform] = connector except QueryError as e: - return StepResult(self, StepStatus.FAILURE, stderr=f"Failed to build connector image for platform {platform}: {e}") + return StepResult(step=self, status=StepStatus.FAILURE, stderr=f"Failed to build connector image for platform {platform}: {e}") success_message = ( f"The {self.context.connector.technical_name} docker image " f"was successfully built for platform(s) {', '.join(self.build_platforms)}" ) - return StepResult(self, StepStatus.SUCCESS, stdout=success_message, output_artifact=build_results_per_platform) + return StepResult(step=self, status=StepStatus.SUCCESS, stdout=success_message, output_artifact=build_results_per_platform) async def _build_connector(self, platform: Platform, *args: Any, **kwargs: Any) -> Container: """Implement the generation of the image for the platform and return the corresponding container. @@ -89,8 +89,8 @@ async def _run(self) -> StepResult: _, exported_tar_path = await export_container_to_tarball(self.context, container, platform) if not exported_tar_path: return StepResult( - self, - StepStatus.FAILURE, + step=self, + status=StepStatus.FAILURE, stderr=f"Failed to export the connector image {self.image_name}:{self.image_tag} to a tarball.", ) try: @@ -104,7 +104,7 @@ async def _run(self) -> StepResult: loaded_images.append(full_image_name) except docker.errors.DockerException as e: return StepResult( - self, StepStatus.FAILURE, stderr=f"Something went wrong while interacting with the local docker client: {e}" + step=self, status=StepStatus.FAILURE, stderr=f"Something went wrong while interacting with the local docker client: {e}" ) - return StepResult(self, StepStatus.SUCCESS, stdout=f"Loaded image {','.join(loaded_images)} to your Docker host ({image_sha}).") + return StepResult(step=self, status=StepStatus.SUCCESS, stdout=f"Loaded image {','.join(loaded_images)} to your Docker host ({image_sha}).") diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/java_connectors.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/java_connectors.py index f8a4c7ed0d61a3..aa0b3448ba6828 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/java_connectors.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/java_connectors.py @@ -36,10 +36,10 @@ async def _run(self, dist_dir: Directory) -> StepResult: if num_files == 0 else "More than one distribution tar file was built for the current java connector." ) - return StepResult(self, StepStatus.FAILURE, stderr=error_message) + return StepResult(step=self, status=StepStatus.FAILURE, stderr=error_message) dist_tar = dist_dir.file(tar_files[0]) except QueryError as e: - return StepResult(self, StepStatus.FAILURE, stderr=str(e)) + return StepResult(step=self, status=StepStatus.FAILURE, stderr=str(e)) return await super()._run(dist_tar) async def _build_connector(self, platform: Platform, dist_tar: File) -> Container: diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/normalization.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/normalization.py index 4774c4fe78f309..0ac35aab7fa4d1 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/normalization.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/normalization.py @@ -36,4 +36,4 @@ async def _run(self) -> StepResult: build_normalization_container = normalization.with_normalization(self.context, self.build_platform) else: build_normalization_container = self.context.dagger_client.container().from_(self.normalization_image) - return StepResult(self, StepStatus.SUCCESS, output_artifact=build_normalization_container) + return StepResult(step=self, status=StepStatus.SUCCESS, output_artifact=build_normalization_container) diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/bump_version/pipeline.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/bump_version/pipeline.py index b6319dc0c91701..7b2fda58b3d524 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/bump_version/pipeline.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/bump_version/pipeline.py @@ -52,8 +52,8 @@ async def _run(self) -> StepResult: doc_path = self.context.connector.documentation_file_path if not doc_path.exists(): return StepResult( - self, - StepStatus.SKIPPED, + step=self, + status=StepStatus.SKIPPED, stdout="Connector does not have a documentation file.", output_artifact=self.repo_dir, ) @@ -61,15 +61,15 @@ async def _run(self) -> StepResult: updated_doc = self.add_changelog_entry(doc_path.read_text()) except Exception as e: return StepResult( - self, - StepStatus.FAILURE, + step=self, + status=StepStatus.FAILURE, stdout=f"Could not add changelog entry: {e}", output_artifact=self.repo_dir, ) updated_repo_dir = self.repo_dir.with_new_file(str(doc_path), contents=updated_doc) return StepResult( - self, - StepStatus.SUCCESS, + step=self, + status=StepStatus.SUCCESS, stdout=f"Added changelog entry to {doc_path}", output_artifact=updated_repo_dir, ) @@ -115,8 +115,8 @@ async def _run(self) -> StepResult: current_version = metadata_change_helpers.get_current_version(current_metadata) if current_version is None: return StepResult( - self, - StepStatus.SKIPPED, + step=self, + status=StepStatus.SKIPPED, stdout="Can't retrieve the connector current version.", output_artifact=self.repo_dir, ) @@ -131,8 +131,8 @@ async def _run(self) -> StepResult: return metadata_validation_results return StepResult( - self, - StepStatus.SUCCESS, + step=self, + status=StepStatus.SUCCESS, stdout=f"Updated dockerImageTag from {current_version} to {self.new_version} in {metadata_path}", output_artifact=repo_dir_with_updated_metadata, ) diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/commands.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/commands.py index e3f6a5e0922c60..6bda567160fdb4 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/commands.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/commands.py @@ -11,9 +11,8 @@ from pipelines import main_logger from pipelines.cli.click_decorators import click_append_to_context_object, click_ignore_unused_kwargs, click_merge_args_into_context_obj from pipelines.cli.lazy_group import LazyGroup -from pipelines.consts import CIContext from pipelines.helpers.connectors.modifed import ConnectorWithModifiedFiles, get_connector_modified_files, get_modified_connectors -from pipelines.helpers.git import get_modified_files_in_branch, get_modified_files_in_commit +from pipelines.helpers.git import get_modified_files from pipelines.helpers.utils import transform_strs_to_paths ALL_CONNECTORS = get_all_connectors_in_repo() @@ -263,18 +262,3 @@ async def connectors( ctx.obj["enable_dependency_scanning"], ) log_selected_connectors(ctx.obj["selected_connectors_with_modified_files"]) - - -async def get_modified_files(git_branch: str, git_revision: str, diffed_branch: str, is_local: bool, ci_context: CIContext) -> Set[str]: - """Get the list of modified files in the current git branch. - If the current branch is master, it will return the list of modified files in the head commit. - The head commit on master should be the merge commit of the latest merged pull request as we squash commits on merge. - Pipelines like "publish on merge" are triggered on each new commit on master. - - If the CI context is a pull request, it will return the list of modified files in the pull request, without using git diff. - If the current branch is not master, it will return the list of modified files in the current branch. - This latest case is the one we encounter when running the pipeline locally, on a local branch, or manually on GHA with a workflow dispatch event. - """ - if ci_context is CIContext.MASTER or (ci_context is CIContext.MANUAL and git_branch == "master"): - return await get_modified_files_in_commit(git_branch, git_revision, is_local) - return await get_modified_files_in_branch(git_branch, git_revision, diffed_branch, is_local) diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/migrate_to_base_image/pipeline.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/migrate_to_base_image/pipeline.py index 38f0bf477713f0..47fba701ff998e 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/migrate_to_base_image/pipeline.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/migrate_to_base_image/pipeline.py @@ -61,8 +61,8 @@ async def _run(self) -> StepResult: latest_base_image_address = await self.get_latest_base_image_address() if latest_base_image_address is None: return StepResult( - self, - StepStatus.SKIPPED, + step=self, + status=StepStatus.SKIPPED, stdout="Could not find a base image for this connector language.", output_artifact=self.repo_dir, ) @@ -73,16 +73,16 @@ async def _run(self) -> StepResult: if current_base_image_address is None and not self.set_if_not_exists: return StepResult( - self, - StepStatus.SKIPPED, + step=self, + status=StepStatus.SKIPPED, stdout="Connector does not have a base image metadata field.", output_artifact=self.repo_dir, ) if current_base_image_address == latest_base_image_address: return StepResult( - self, - StepStatus.SKIPPED, + step=self, + status=StepStatus.SKIPPED, stdout="Connector already uses latest base image", output_artifact=self.repo_dir, ) @@ -90,8 +90,8 @@ async def _run(self) -> StepResult: updated_repo_dir = metadata_change_helpers.get_repo_dir_with_updated_metadata(self.repo_dir, metadata_path, updated_metadata) return StepResult( - self, - StepStatus.SUCCESS, + step=self, + status=StepStatus.SUCCESS, stdout=f"Updated base image to {latest_base_image_address} in {metadata_path}", output_artifact=updated_repo_dir, ) @@ -116,16 +116,16 @@ async def _run(self) -> StepResult: file_to_delete_path = self.context.connector.code_directory / self.file_to_delete if not file_to_delete_path.exists(): return StepResult( - self, - StepStatus.SKIPPED, + step=self, + status=StepStatus.SKIPPED, stdout=f"Connector does not have a {self.file_to_delete}", ) # As this is a deletion of a file, this has to happen on the host fs # Deleting the file in a Directory container would not work because the directory.export method would not export the deleted file from the Directory back to host. file_to_delete_path.unlink() return StepResult( - self, - StepStatus.SUCCESS, + step=self, + status=StepStatus.SUCCESS, stdout=f"Deleted {file_to_delete_path}", ) @@ -143,8 +143,8 @@ async def _run(self) -> StepResult: readme_path = self.context.connector.code_directory / "README.md" if not readme_path.exists(): return StepResult( - self, - StepStatus.SKIPPED, + step=self, + status=StepStatus.SKIPPED, stdout="Connector does not have a documentation file.", output_artifact=self.repo_dir, ) @@ -153,15 +153,15 @@ async def _run(self) -> StepResult: updated_readme = self.add_build_instructions(current_readme) except Exception as e: return StepResult( - self, - StepStatus.FAILURE, + step=self, + status=StepStatus.FAILURE, stdout=str(e), output_artifact=self.repo_dir, ) updated_repo_dir = await self.repo_dir.with_new_file(str(readme_path), contents=updated_readme) return StepResult( - self, - StepStatus.SUCCESS, + step=self, + status=StepStatus.SUCCESS, stdout=f"Added build instructions to {readme_path}", output_artifact=updated_repo_dir, ) diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/publish/pipeline.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/publish/pipeline.py index 35f1a961d09ef0..849a9348f1155d 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/publish/pipeline.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/publish/pipeline.py @@ -43,15 +43,15 @@ async def _run(self) -> StepResult: crane_ls_stdout = await crane_ls.stdout() except ExecError as e: if "NAME_UNKNOWN" in e.stderr: - return StepResult(self, status=StepStatus.SUCCESS, stdout=f"The docker repository {docker_repository} does not exist.") + return StepResult(step=self, status=StepStatus.SUCCESS, stdout=f"The docker repository {docker_repository} does not exist.") else: - return StepResult(self, status=StepStatus.FAILURE, stderr=e.stderr, stdout=e.stdout) + return StepResult(step=self, status=StepStatus.FAILURE, stderr=e.stderr, stdout=e.stdout) else: # The docker repo exists and ls was successful existing_tags = crane_ls_stdout.split("\n") docker_tag_already_exists = docker_tag in existing_tags if docker_tag_already_exists: - return StepResult(self, status=StepStatus.SKIPPED, stderr=f"{self.context.docker_image} already exists.") - return StepResult(self, status=StepStatus.SUCCESS, stdout=f"No manifest found for {self.context.docker_image}.") + return StepResult(step=self, status=StepStatus.SKIPPED, stderr=f"{self.context.docker_image} already exists.") + return StepResult(step=self, status=StepStatus.SUCCESS, stdout=f"No manifest found for {self.context.docker_image}.") class CheckPythonRegistryPackageDoesNotExist(Step): @@ -64,13 +64,13 @@ async def _run(self) -> StepResult: ) if is_published: return StepResult( - self, + step=self, status=StepStatus.SKIPPED, stderr=f"{self.context.package_metadata.name} already exists in version {self.context.package_metadata.version}.", ) else: return StepResult( - self, + step=self, status=StepStatus.SUCCESS, stdout=f"{self.context.package_metadata.name} does not exist in version {self.context.package_metadata.version}.", ) @@ -97,14 +97,14 @@ async def _run(self, built_containers_per_platform: List[Container], attempts: i platform_variants=built_containers_per_platform[1:], forced_compression=ImageLayerCompression.Gzip, ) - return StepResult(self, status=StepStatus.SUCCESS, stdout=f"Published {image_ref}") + return StepResult(step=self, status=StepStatus.SUCCESS, stdout=f"Published {image_ref}") except QueryError as e: if attempts > 0: self.context.logger.error(str(e)) self.context.logger.warn(f"Failed to publish {self.context.docker_image}. Retrying. {attempts} attempts left.") await anyio.sleep(5) return await self._run(built_containers_per_platform, attempts - 1) - return StepResult(self, status=StepStatus.FAILURE, stderr=str(e)) + return StepResult(step=self, status=StepStatus.FAILURE, stderr=str(e)) class PullConnectorImageFromRegistry(Step): @@ -145,16 +145,16 @@ async def _run(self, attempt: int = 3) -> StepResult: await anyio.sleep(10) return await self._run(attempt - 1) else: - return StepResult(self, status=StepStatus.FAILURE, stderr=f"Failed to pull {self.context.docker_image}") + return StepResult(step=self, status=StepStatus.FAILURE, stderr=f"Failed to pull {self.context.docker_image}") if not await self.check_if_image_only_has_gzip_layers(): return StepResult( - self, + step=self, status=StepStatus.FAILURE, stderr=f"Image {self.context.docker_image} does not only have gzip compressed layers. Please rebuild the connector with Docker < 21.", ) else: return StepResult( - self, + step=self, status=StepStatus.SUCCESS, stdout=f"Pulled {self.context.docker_image} and validated it has gzip only compressed layers and we can run spec on it.", ) @@ -162,7 +162,7 @@ async def _run(self, attempt: int = 3) -> StepResult: if attempt > 0: await anyio.sleep(10) return await self._run(attempt - 1) - return StepResult(self, status=StepStatus.FAILURE, stderr=str(e)) + return StepResult(step=self, status=StepStatus.FAILURE, stderr=str(e)) class UploadSpecToCache(Step): @@ -214,7 +214,7 @@ async def _run(self, built_connector: Container) -> StepResult: oss_spec: str = await self._get_connector_spec(built_connector, "OSS") cloud_spec: str = await self._get_connector_spec(built_connector, "CLOUD") except InvalidSpecOutputError as e: - return StepResult(self, status=StepStatus.FAILURE, stderr=str(e)) + return StepResult(step=self, status=StepStatus.FAILURE, stderr=str(e)) specs_to_uploads: List[Tuple[str, File]] = [(self.oss_spec_key, await self._get_spec_as_file(oss_spec))] @@ -231,8 +231,8 @@ async def _run(self, built_connector: Container) -> StepResult: flags=['--cache-control="no-cache"'], ) if exit_code != 0: - return StepResult(self, status=StepStatus.FAILURE, stdout=stdout, stderr=stderr) - return StepResult(self, status=StepStatus.SUCCESS, stdout="Uploaded connector spec to spec cache bucket.") + return StepResult(step=self, status=StepStatus.FAILURE, stdout=stdout, stderr=stderr) + return StepResult(step=self, status=StepStatus.SUCCESS, stdout="Uploaded connector spec to spec cache bucket.") # Pipeline @@ -356,7 +356,7 @@ async def _run_python_registry_publish_pipeline(context: PublishConnectorContext # If the python registry token or url are not set, we can't publish to the python registry - stop the pipeline. return [ StepResult( - PublishToPythonRegistry(python_registry_context), + step=PublishToPythonRegistry(python_registry_context), status=StepStatus.FAILURE, stderr="Pypi publishing is enabled, but python registry token or url are not set.", ) diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/common.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/common.py index 1a9985b629ef2d..426ae273d1a446 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/common.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/common.py @@ -62,11 +62,11 @@ def current_connector_version(self) -> semver.Version: @property def success_result(self) -> StepResult: - return StepResult(self, status=StepStatus.SUCCESS) + return StepResult(step=self, status=StepStatus.SUCCESS) @property def failure_result(self) -> StepResult: - return StepResult(self, status=StepStatus.FAILURE, stderr=self.failure_message) + return StepResult(step=self, status=StepStatus.FAILURE, stderr=self.failure_message) @abstractmethod def validate(self) -> StepResult: @@ -74,13 +74,13 @@ def validate(self) -> StepResult: async def _run(self) -> StepResult: if not self.should_run: - return StepResult(self, status=StepStatus.SKIPPED, stdout="No modified files required a version bump.") + return StepResult(step=self, status=StepStatus.SKIPPED, stdout="No modified files required a version bump.") if self.context.ci_context == CIContext.MASTER: - return StepResult(self, status=StepStatus.SKIPPED, stdout="Version check are not running in master context.") + return StepResult(step=self, status=StepStatus.SKIPPED, stdout="Version check are not running in master context.") try: return self.validate() except (requests.HTTPError, ValueError, TypeError) as e: - return StepResult(self, status=StepStatus.FAILURE, stderr=str(e)) + return StepResult(step=self, status=StepStatus.FAILURE, stderr=str(e)) class VersionIncrementCheck(VersionCheck): @@ -257,7 +257,7 @@ async def _run(self, connector_under_test_container: Container) -> StepResult: """ if not self.context.connector.acceptance_test_config: - return StepResult(self, StepStatus.SKIPPED) + return StepResult(step=self, status=StepStatus.SKIPPED) connector_dir = await self.context.get_connector_dir() cat_container = await self._build_connector_acceptance_test(connector_under_test_container, connector_dir) cat_command = await self.get_cat_command(connector_dir) @@ -272,7 +272,7 @@ async def _run(self, connector_under_test_container: Container) -> StepResult: break return step_result - async def get_cache_buster(self) -> str: + def get_cache_buster(self) -> str: """ This bursts the CAT cached results everyday and on new version or image size change. It's cool because in case of a partially failing nightly build the connectors that already ran CAT won't re-run CAT. @@ -303,7 +303,7 @@ async def _build_connector_acceptance_test(self, connector_under_test_container: cat_container = ( cat_container.with_env_variable("RUN_IN_AIRBYTE_CI", "1") .with_exec(["mkdir", "/dagger_share"], skip_entrypoint=True) - .with_env_variable("CACHEBUSTER", await self.get_cache_buster()) + .with_env_variable("CACHEBUSTER", self.get_cache_buster()) .with_new_file("/tmp/container_id.txt", contents=str(connector_container_id)) .with_workdir("/test_input") .with_mounted_directory("/test_input", test_input) @@ -333,18 +333,18 @@ async def _run(self, *args: Any, **kwargs: Any) -> StepResult: migration_hint = f"Please run 'airbyte-ci connectors --name={self.context.connector.technical_name} migrate_to_base_image ' and commit the changes." if not is_using_base_image: return StepResult( - self, - StepStatus.FAILURE, + step=self, + status=StepStatus.FAILURE, stdout=f"Connector is certified but does not use our base image. {migration_hint}", ) has_dockerfile = "Dockerfile" in await (await self.context.get_connector_dir(include=["Dockerfile"])).entries() if has_dockerfile: return StepResult( - self, - StepStatus.FAILURE, + step=self, + status=StepStatus.FAILURE, stdout=f"Connector is certified but is still using a Dockerfile. {migration_hint}", ) - return StepResult(self, StepStatus.SUCCESS, stdout="Connector is certified and uses our base image.") + return StepResult(step=self, status=StepStatus.SUCCESS, stdout="Connector is certified and uses our base image.") class CheckPythonRegistryPublishConfiguration(Step): @@ -354,7 +354,7 @@ class CheckPythonRegistryPublishConfiguration(Step): async def _run(self, *args: Any, **kwargs: Any) -> StepResult: is_python_registry_published = self.context.connector.metadata.get("remoteRegistries", {}).get("pypi", {}).get("enabled", False) if is_python_registry_published: - return StepResult(self, StepStatus.SUCCESS, stdout="Connector is published to PyPI.") + return StepResult(step=self, status=StepStatus.SUCCESS, stdout="Connector is published to PyPI.") tags = self.context.connector.metadata.get("tags", []) is_python_registry_compatible = ("language:python" in tags or "language:low-code" in tags) and "language:java" not in tags @@ -368,8 +368,8 @@ async def _run(self, *args: Any, **kwargs: Any) -> StepResult: migration_hint = "Check the airbyte-ci readme under https://github.com/airbytehq/airbyte/tree/master/airbyte-ci/connectors/pipelines#python-registry-publishing for how to configure publishing." if not is_python_registry_published: return StepResult( - self, - StepStatus.FAILURE, + step=self, + status=StepStatus.FAILURE, stdout=f"Connector is a certified python source but publication to PyPI is not enabled. {migration_hint}", ) - return StepResult(self, StepStatus.SUCCESS, stdout="Connector is a certified python source and is published to PyPI.") + return StepResult(step=self, status=StepStatus.SUCCESS, stdout="Connector is a certified python source and is published to PyPI.") diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/java_connectors.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/java_connectors.py index 06b0aaea43141e..c35a48fdbf5f3b 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/java_connectors.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/java_connectors.py @@ -64,7 +64,7 @@ async def _run(self, connector_tar_file: File, normalization_tar_file: Optional[ tg.start_soon(self._load_normalization_image, normalization_tar_file) tg.start_soon(self._load_connector_image, connector_tar_file) except QueryError as e: - return StepResult(self, StepStatus.FAILURE, stderr=str(e)) + return StepResult(step=self, status=StepStatus.FAILURE, stderr=str(e)) # Run the gradle integration test task now that the required docker images have been loaded. return await super()._run() diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/upgrade_cdk/pipeline.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/upgrade_cdk/pipeline.py index 95839877f6c246..6824f9d25550a9 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/upgrade_cdk/pipeline.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/upgrade_cdk/pipeline.py @@ -44,8 +44,8 @@ async def _run(self) -> StepResult: updated_connector_dir = await self.upgrade_cdk_version_for_java_connector(og_connector_dir) else: return StepResult( - self, - StepStatus.FAILURE, + step=self, + status=StepStatus.FAILURE, stderr=f"No CDK for connector {self.context.connector.technical_name} of written in {self.context.connector.language}", ) @@ -55,15 +55,17 @@ async def _run(self) -> StepResult: exported_successfully = await diff.export(os.path.join(git.get_git_repo_path(), context.connector.code_directory)) if not exported_successfully: return StepResult( - self, - StepStatus.FAILURE, + step=self, + status=StepStatus.FAILURE, stdout="Could not export diff to local git repo.", ) - return StepResult(self, StepStatus.SUCCESS, stdout=f"Updated CDK version to {self.new_version}", output_artifact=diff) + return StepResult( + step=self, status=StepStatus.SUCCESS, stdout=f"Updated CDK version to {self.new_version}", output_artifact=diff + ) except ValueError as e: return StepResult( - self, - StepStatus.FAILURE, + step=self, + status=StepStatus.FAILURE, stderr=f"Could not set CDK version: {e}", exc_info=e, ) diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/format/format_command.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/format/format_command.py index 07b4c72381a94d..aeb17b08e50440 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/format/format_command.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/format/format_command.py @@ -203,7 +203,9 @@ async def get_format_command_result( if await dir_with_modified_files.entries(): modified_files = await list_files_in_directory(dagger_client, dir_with_modified_files) self.logger.debug(f"Modified files: {modified_files}") - return CommandResult(self, status=StepStatus.FAILURE, stdout=stdout, stderr=stderr, output_artifact=dir_with_modified_files) - return CommandResult(self, status=StepStatus.SUCCESS, stdout=stdout, stderr=stderr) + return CommandResult( + command=self, status=StepStatus.FAILURE, stdout=stdout, stderr=stderr, output_artifact=dir_with_modified_files + ) + return CommandResult(command=self, status=StepStatus.SUCCESS, stdout=stdout, stderr=stderr) except dagger.ExecError as e: - return CommandResult(self, status=StepStatus.FAILURE, stderr=e.stderr, stdout=e.stdout, exc_info=e) + return CommandResult(command=self, status=StepStatus.FAILURE, stderr=e.stderr, stdout=e.stdout, exc_info=e) diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/steps/no_op.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/steps/no_op.py index 2d1629e05672e7..86b9712713a30f 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/steps/no_op.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/steps/no_op.py @@ -19,4 +19,4 @@ def __init__(self, context: PipelineContext, step_status: StepStatus) -> None: self.step_status = step_status async def _run(self, *args: Any, **kwargs: Any) -> StepResult: - return StepResult(self, self.step_status) + return StepResult(step=self, status=self.step_status) diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/test/__init__.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/test/__init__.py index c941b30457953b..54ae253f8e6ab6 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/test/__init__.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/test/__init__.py @@ -1,3 +1,19 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # + +from pathlib import Path + +INTERNAL_POETRY_PACKAGES = [ + "airbyte-lib", + "airbyte-ci/connectors/pipelines", + "airbyte-ci/connectors/base_images", + "airbyte-ci/connectors/common_utils", + "airbyte-ci/connectors/connector_ops", + "airbyte-ci/connectors/ci_credentials", + "airbyte-ci/connectors/metadata_service/lib", + "airbyte-ci/connectors/metadata_service/orchestrator", + "airbyte-integrations/bases/connector-acceptance-test" +] + +INTERNAL_POETRY_PACKAGES_PATH = [Path(package) for package in INTERNAL_POETRY_PACKAGES] diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/test/commands.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/test/commands.py index e8000ebca25317..a79e936408620e 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/test/commands.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/test/commands.py @@ -3,167 +3,127 @@ # from __future__ import annotations -import logging -import os -from pathlib import Path from typing import TYPE_CHECKING import asyncclick as click import asyncer +from pipelines.airbyte_ci.test import INTERNAL_POETRY_PACKAGES, INTERNAL_POETRY_PACKAGES_PATH, pipeline from pipelines.cli.click_decorators import click_ci_requirements_option, click_ignore_unused_kwargs, click_merge_args_into_context_obj -from pipelines.consts import DOCKER_HOST_NAME, DOCKER_HOST_PORT, DOCKER_VERSION -from pipelines.dagger.actions.system import docker -from pipelines.helpers.utils import sh_dash_c +from pipelines.helpers.git import get_modified_files +from pipelines.helpers.utils import transform_strs_to_paths from pipelines.models.contexts.click_pipeline_context import ClickPipelineContext, pass_pipeline_context +from pipelines.models.steps import StepStatus if TYPE_CHECKING: - from typing import List, Tuple + from pathlib import Path + from typing import List, Set, Tuple - import dagger -## HELPERS -async def run_poetry_command(container: dagger.Container, command: str) -> Tuple[str, str]: - """Run a poetry command in a container and return the stdout and stderr. +async def find_modified_internal_packages(pipeline_context: ClickPipelineContext) -> Set[Path]: + """Finds the modified internal packages according to the modified files on the branch/commit. Args: - container (dagger.Container): The container to run the command in. - command (str): The command to run. + pipeline_context (ClickPipelineContext): The context object. Returns: - Tuple[str, str]: The stdout and stderr of the command. + Set[Path]: The set of modified internal packages. """ - container = container.with_exec(["poetry", "run", *command.split(" ")]) - return await container.stdout(), await container.stderr() + modified_files = transform_strs_to_paths( + await get_modified_files( + pipeline_context.params["git_branch"], + pipeline_context.params["git_revision"], + pipeline_context.params["diffed_branch"], + pipeline_context.params["is_local"], + pipeline_context.params["ci_context"], + ) + ) + modified_packages = set() + for modified_file in modified_files: + for internal_package in INTERNAL_POETRY_PACKAGES_PATH: + if modified_file.is_relative_to(internal_package): + modified_packages.add(internal_package) + return modified_packages + + +async def get_packages_to_run(pipeline_context: ClickPipelineContext) -> Set[Path]: + """Gets the packages to run the poe tasks on. + + Args: + pipeline_context (ClickPipelineContext): The context object. + + Raises: + click.ClickException: If no packages are specified to run the poe tasks on. + + Returns: + Set[Path]: The set of packages to run the poe tasks on. + """ + if not pipeline_context.params["poetry_package_paths"] and not pipeline_context.params["modified"]: + raise click.ClickException("You must specify at least one package to test.") + + poetry_package_paths = set() + if pipeline_context.params["modified"]: + poetry_package_paths = await find_modified_internal_packages(pipeline_context) + + return poetry_package_paths.union(set(pipeline_context.params["poetry_package_paths"])) + +def crash_on_any_failure(poetry_package_poe_tasks_results: List[Tuple[Path, asyncer.SoonValue]]) -> None: + """Fail the command if any of the poe tasks failed. -def validate_env_vars_exist(_ctx: dict, _param: dict, value: List[str]) -> List[str]: - for var in value: - if var not in os.environ: - raise click.BadParameter(f"Environment variable {var} does not exist.") - return value + Args: + poetry_package_poe_tasks_results (List[Tuple[Path, asyncer.SoonValue]]): The results of the poe tasks. + + Raises: + click.ClickException: If any of the poe tasks failed. + """ + failed_packages = set() + for poetry_package_paths, package_result in poetry_package_poe_tasks_results: + poe_command_results = package_result.value + if any([result.status is StepStatus.FAILURE for result in poe_command_results]): + failed_packages.add(poetry_package_paths) + if failed_packages: + raise click.ClickException( + f"The following packages failed to run poe tasks: {', '.join([str(package_path) for package_path in failed_packages])}" + ) + return None @click.command() -@click.argument("poetry_package_path") -@click_ci_requirements_option() -@click.option( - "-c", - "--poetry-run-command", - multiple=True, - help="The poetry run command to run.", - required=True, -) +@click.option("--modified", default=False, is_flag=True, help="Run on modified internal packages.") @click.option( - "--pass-env-var", - "-e", - "passed_env_vars", + "--poetry-package-path", + "-p", + "poetry_package_paths", + help="The path to the poetry package to test.", + type=click.Choice(INTERNAL_POETRY_PACKAGES), multiple=True, - help="The environment variables to pass to the container.", - required=False, - callback=validate_env_vars_exist, -) -@click.option( - "--side-car-docker-engine", help="Run a docker engine side car bound to poetry container.", default=False, type=bool, is_flag=True ) +@click_ci_requirements_option() @click_merge_args_into_context_obj @pass_pipeline_context @click_ignore_unused_kwargs +# TODO this command should be renamed ci and go under the poetry command group +# e.g. airbyte-ci poetry ci --poetry-package-path airbyte-ci/connectors/pipelines async def test(pipeline_context: ClickPipelineContext) -> None: """Runs the tests for the given airbyte-ci package Args: pipeline_context (ClickPipelineContext): The context object. """ - poetry_package_path = pipeline_context.params["poetry_package_path"] - if not Path(f"{poetry_package_path}/pyproject.toml").exists(): - raise click.UsageError(f"Could not find pyproject.toml in {poetry_package_path}") - - commands_to_run: List[str] = pipeline_context.params["poetry_run_command"] - - logger = logging.getLogger(f"{poetry_package_path}.tests") - logger.info(f"Running tests for {poetry_package_path}") - - # The following directories are always mounted because a lot of tests rely on them - directories_to_always_mount = [ - ".git", # This is needed as some package tests rely on being in a git repo - ".github", - "docs", - "airbyte-integrations", - "airbyte-ci", - "airbyte-cdk", - "pyproject.toml", - "LICENSE_SHORT", - "poetry.lock", - "spotless-maven-pom.xml", - "tools/gradle/codestyle/java-google-style.xml", - ] - directories_to_mount = list(set([poetry_package_path, *directories_to_always_mount])) - - pipeline_name = f"Unit tests for {poetry_package_path}" - - dagger_client = await pipeline_context.get_dagger_client(pipeline_name=pipeline_name) - - dockerd_service = None - if pipeline_context.params["side_car_docker_engine"]: - dockerd_service = docker.with_global_dockerd_service(dagger_client) - - await dockerd_service.start() - - test_container = await ( - dagger_client.container() - .from_("python:3.10.12") - .with_env_variable("PIPX_BIN_DIR", "/usr/local/bin") - .with_exec( - sh_dash_c( - [ - "apt-get update", - "apt-get install -y bash git curl", - "pip install pipx", - "pipx ensurepath", - "pipx install poetry", - ] + poetry_package_paths = await get_packages_to_run(pipeline_context) + click.echo(f"Running poe tasks of the following packages: {', '.join([str(package_path) for package_path in poetry_package_paths])}") + dagger_client = await pipeline_context.get_dagger_client(pipeline_name="Internal poetry packages CI") + + poetry_package_poe_tasks_results: List[Tuple[Path, asyncer.SoonValue]] = [] + async with asyncer.create_task_group() as poetry_packages_task_group: + for poetry_package_path in poetry_package_paths: + poetry_package_poe_tasks_results.append( + ( + poetry_package_path, + poetry_packages_task_group.soonify(pipeline.run_poe_tasks_for_package)( + dagger_client, poetry_package_path, pipeline_context.params + ), + ) ) - ) - .with_env_variable("VERSION", DOCKER_VERSION) - .with_exec(sh_dash_c(["curl -fsSL https://get.docker.com | sh"])) - .with_mounted_directory( - "/airbyte", - dagger_client.host().directory( - ".", - exclude=["**/__pycache__", "**/.pytest_cache", "**/.venv", "**.log", "**/.gradle"], - include=directories_to_mount, - ), - ) - .with_workdir(f"/airbyte/{poetry_package_path}") - .with_exec(["poetry", "install", "--with=dev"]) - .with_env_variable("CI", str(pipeline_context.params["is_ci"])) - .with_workdir(f"/airbyte/{poetry_package_path}") - ) - if dockerd_service: - test_container = ( - test_container.with_env_variable("DOCKER_HOST", f"tcp://{DOCKER_HOST_NAME}:{DOCKER_HOST_PORT}") - .with_env_variable("DOCKER_HOST_NAME", DOCKER_HOST_NAME) - .with_service_binding(DOCKER_HOST_NAME, dockerd_service) - ) - else: - test_container = test_container.with_unix_socket("/var/run/docker.sock", dagger_client.host().unix_socket("/var/run/docker.sock")) - - # register passed env vars as secrets and add them to the container - for var in pipeline_context.params["passed_env_vars"]: - secret = dagger_client.set_secret(var, os.environ[var]) - test_container = test_container.with_secret_variable(var, secret) - - soon_command_executions_results = [] - async with asyncer.create_task_group() as poetry_commands_task_group: - for command in commands_to_run: - logger.info(f"Running command: {command}") - soon_command_execution_result = poetry_commands_task_group.soonify(run_poetry_command)(test_container, command) - soon_command_executions_results.append(soon_command_execution_result) - - if dockerd_service: - await dockerd_service.stop() - for result in soon_command_executions_results: - stdout, stderr = result.value - logger.info(stdout) - logger.error(stderr) + crash_on_any_failure(poetry_package_poe_tasks_results) diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/test/pipeline.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/test/pipeline.py new file mode 100644 index 00000000000000..f8ada153f44c6f --- /dev/null +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/test/pipeline.py @@ -0,0 +1,289 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. + +from __future__ import annotations + +import logging +import os +from typing import TYPE_CHECKING + +import asyncer +import dagger +import toml +from pipelines.airbyte_ci.test.models import deserialize_airbyte_ci_config +from pipelines.consts import DOCKER_HOST_NAME, DOCKER_HOST_PORT, DOCKER_VERSION, POETRY_CACHE_VOLUME_NAME, PYPROJECT_TOML_FILE_PATH +from pipelines.dagger.actions.system import docker +from pipelines.helpers.github import update_commit_status_check +from pipelines.helpers.utils import sh_dash_c +from pipelines.models.steps import PoeTaskResult, StepStatus + +if TYPE_CHECKING: + from logging import Logger + from pathlib import Path + from typing import Dict, List + + from pipelines.airbyte_ci.test.models import AirbyteCiPackageConfiguration + +# The following directories are always mounted because a lot of tests rely on them +DIRECTORIES_TO_ALWAYS_MOUNT = [ + ".git", # This is needed as some package tests rely on being in a git repo + ".github", + "docs", + "airbyte-integrations", + "airbyte-ci", + "airbyte-cdk", + PYPROJECT_TOML_FILE_PATH, + "LICENSE_SHORT", + "poetry.lock", + "spotless-maven-pom.xml", + "tools/gradle/codestyle/java-google-style.xml", +] + +DEFAULT_EXCLUDE = ["**/__pycache__", "**/.pytest_cache", "**/.venv", "**.log", "**/.gradle"] + + +async def get_filtered_airbyte_repo_dir(dagger_client: dagger.Client, poetry_package_path: Path) -> dagger.Directory: + """Get a filtered airbyte repo directory with the directories to always mount and the poetry package path. + + Args: + dagger_client (dagger.Client): Dagger client. + poetry_package_path (Path): Path to the poetry package in the airbyte repo. + + Returns: + dagger.Directory: The filtered airbyte repo directory. + """ + directories_to_mount = list(set([str(poetry_package_path), *DIRECTORIES_TO_ALWAYS_MOUNT])) + return dagger_client.host().directory( + ".", + exclude=DEFAULT_EXCLUDE, + include=directories_to_mount, + ) + + +async def get_poetry_package_dir(airbyte_repo_dir: dagger.Directory, poetry_package_path: Path) -> dagger.Directory: + """Get the poetry package directory from the airbyte repo directory. + + Args: + airbyte_repo_dir (dagger.Directory): The airbyte repo directory. + poetry_package_path (Path): Path to the poetry package in the airbyte repo. + + Raises: + FileNotFoundError: If the pyproject.toml file is not found in the poetry package directory. + FileNotFoundError: If the poetry package directory is not found in the airbyte repo directory. + + Returns: + dagger.Directory: The poetry package directory. + """ + try: + package_directory = await airbyte_repo_dir.directory(str(poetry_package_path)) + if PYPROJECT_TOML_FILE_PATH not in await package_directory.entries(): + raise FileNotFoundError(f"Could not find pyproject.toml in {poetry_package_path}, are you sure this is a poetry package?") + except dagger.DaggerError: + raise FileNotFoundError(f"Could not find {poetry_package_path} in the repository, are you sure this path is correct?") + return package_directory + + +async def get_airbyte_ci_package_config(poetry_package_dir: dagger.Directory) -> AirbyteCiPackageConfiguration: + """Get the airbyte ci package configuration from the pyproject.toml file in the poetry package directory. + + Args: + poetry_package_dir (dagger.Directory): The poetry package directory. + + Returns: + AirbyteCiPackageConfiguration: The airbyte ci package configuration. + """ + raw_pyproject_toml = await poetry_package_dir.file(PYPROJECT_TOML_FILE_PATH).contents() + pyproject_toml = toml.loads(raw_pyproject_toml) + return deserialize_airbyte_ci_config(pyproject_toml) + + +def get_poetry_base_container(dagger_client: dagger.Client) -> dagger.Container: + """Get a base container with system dependencies to run poe tasks of poetry package: + - git: required for packages using GitPython + - poetry + - poethepoet + - docker: required for packages using docker in their tests + + Args: + dagger_client (dagger.Client): The dagger client. + + Returns: + dagger.Container: The base container. + """ + poetry_cache_volume: dagger.CacheVolume = dagger_client.cache_volume(POETRY_CACHE_VOLUME_NAME) + poetry_cache_path = "/root/.cache/poetry" + return ( + dagger_client.container() + .from_("python:3.10.12") + .with_env_variable("PIPX_BIN_DIR", "/usr/local/bin") + .with_env_variable("POETRY_CACHE_DIR", poetry_cache_path) + .with_mounted_cache(poetry_cache_path, poetry_cache_volume) + .with_exec( + sh_dash_c( + [ + "apt-get update", + "apt-get install -y bash git curl", + "pip install pipx", + "pipx ensurepath", + "pipx install poetry", + "pipx install poethepoet", + ] + ) + ) + .with_env_variable("VERSION", DOCKER_VERSION) + .with_exec(sh_dash_c(["curl -fsSL https://get.docker.com | sh"])) + ) + + +def prepare_container_for_poe_tasks( + dagger_client: dagger.Client, + airbyte_repo_dir: dagger.Directory, + airbyte_ci_package_config: AirbyteCiPackageConfiguration, + poetry_package_path: Path, + is_ci: bool, +) -> dagger.Container: + """Prepare a container to run poe tasks for a poetry package. + + Args: + dagger_client (dagger.Client): The dagger client. + airbyte_repo_dir (dagger.Directory): The airbyte repo directory. + airbyte_ci_package_config (AirbyteCiPackageConfiguration): The airbyte ci package configuration. + poetry_package_path (Path): The path to the poetry package in the airbyte repo. + is_ci (bool): Whether the container is running in a CI environment. + + Returns: + dagger.Container: The container to run poe tasks for the poetry package. + """ + + # BE CAREFUL ABOUT THE ORDER OF THESE INSTRUCTIONS + # PLEASE REMIND THAT DAGGER OPERATION ARE CACHED LIKE IN DOCKERFILE: + # ANY CHANGE IN THE INPUTS OF AN OPERATION WILL INVALIDATE THE DOWNSTREAM OPERATIONS CACHE + + # Start from the base container + container = get_poetry_base_container(dagger_client) + + # Set the CI environment variable + if is_ci: + container = container.with_env_variable("CI", "true") + + # Bind to dockerd service if needed + if airbyte_ci_package_config.side_car_docker_engine: + dockerd_service = docker.with_global_dockerd_service(dagger_client) + container = ( + container.with_env_variable("DOCKER_HOST", f"tcp://{DOCKER_HOST_NAME}:{DOCKER_HOST_PORT}") + .with_env_variable("DOCKER_HOST_NAME", DOCKER_HOST_NAME) + .with_service_binding(DOCKER_HOST_NAME, dockerd_service) + ) + + # Mount the docker socket if needed + if airbyte_ci_package_config.mount_docker_socket: + container = container.with_unix_socket("/var/run/docker.sock", dagger_client.host().unix_socket("/var/run/docker.sock")) + + # Set the required environment variables according to the package configuration + for required_env_var in airbyte_ci_package_config.required_environment_variables: + # We consider any environment variable as a secret for safety reasons + secret_env_var = dagger_client.set_secret(required_env_var, os.environ[required_env_var]) + container = container.with_secret_variable(required_env_var, secret_env_var) + + # Mount the airbyte repo directory + container = container.with_mounted_directory("/airbyte", airbyte_repo_dir) + + # Set working directory to the poetry package directory + container = container.with_workdir(f"/airbyte/{poetry_package_path}") + + # Install the poetry package + container = container.with_exec(["poetry", "install"] + [f"--with={group}" for group in airbyte_ci_package_config.extra_poetry_groups]) + return container + + +async def run_poe_task(container: dagger.Container, poe_task: str) -> PoeTaskResult: + """Run the poe task in the container and return a PoeTaskResult. + + Args: + container (dagger.Container): The container to run the poe task in. + poe_task (str): The poe task to run. + + Returns: + PoeTaskResult: The result of the command execution. + """ + try: + executed_container = await container.pipeline(f"Run poe {poe_task}").with_exec(["poe", poe_task]) + return PoeTaskResult( + task_name=poe_task, + status=StepStatus.SUCCESS, + stdout=await executed_container.stdout(), + stderr=await executed_container.stderr(), + ) + except dagger.ExecError as e: + return PoeTaskResult(task_name=poe_task, status=StepStatus.FAILURE, exc_info=e) + + +async def run_and_log_poe_task_results( + pipeline_context_params: Dict, package_name: str, container: dagger.Container, poe_task: str, logger: Logger +) -> PoeTaskResult: + """Run the poe task in the container and log the result. + + Args: + pipeline_context_params (Dict): The pipeline context parameters. + package_name (str): The name of the package to run the poe task for. + container (dagger.Container): The container to run the poe task in. + poe_task (str): The poe task to run. + logger (Logger): The logger to log the result. + + Returns: + PoeTaskResult: The result of the command execution. + """ + + commit_status_check_params = { + "sha": pipeline_context_params["git_revision"], + "description": f"{poe_task} execution for {package_name}", + "context": f"{package_name} - {poe_task}", + "target_url": f"{pipeline_context_params['gha_workflow_run_url']}", + "should_send": pipeline_context_params["is_ci"], + "logger": logger, + } + + logger.info(f"Running poe task: {poe_task}") + # Send pending status check + update_commit_status_check(**{**commit_status_check_params, "state": "pending"}) + result = await run_poe_task(container, poe_task) + result.log(logger) + # Send the final status check + update_commit_status_check(**{**commit_status_check_params, "state": result.status.get_github_state()}) + + return result + + +async def run_poe_tasks_for_package( + dagger_client: dagger.Client, poetry_package_path: Path, pipeline_context_params: Dict +) -> List[PoeTaskResult]: + """Concurrently Run the poe tasks declared in pyproject.toml for a poetry package. + + Args: + dagger_client (dagger.Client): The dagger client. + poetry_package_path (Path): The path to the poetry package in the airbyte repo. + pipeline_context_params (Dict): The pipeline context parameters. + Returns: + List[PoeTaskResult]: The results of the poe tasks. + """ + dagger_client = dagger_client.pipeline(f"Run poe tasks for {poetry_package_path}") + airbyte_repo_dir = await get_filtered_airbyte_repo_dir(dagger_client, poetry_package_path) + package_dir = await get_poetry_package_dir(airbyte_repo_dir, poetry_package_path) + package_config = await get_airbyte_ci_package_config(package_dir) + container = prepare_container_for_poe_tasks( + dagger_client, airbyte_repo_dir, package_config, poetry_package_path, pipeline_context_params["is_ci"] + ) + logger = logging.getLogger(str(poetry_package_path)) + + if not package_config.poe_tasks: + logger.warning("No poe tasks to run.") + return [] + + poe_task_results = [] + async with asyncer.create_task_group() as poe_tasks_task_group: + for task in package_config.poe_tasks: + poe_task_results.append( + poe_tasks_task_group.soonify(run_and_log_poe_task_results)( + pipeline_context_params, str(poetry_package_path), container, task, logger + ) + ) + return [result.value for result in poe_task_results] diff --git a/airbyte-ci/connectors/pipelines/pipelines/cli/airbyte_ci.py b/airbyte-ci/connectors/pipelines/pipelines/cli/airbyte_ci.py index fafd98973da9d1..0cc95dcb056ce6 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/cli/airbyte_ci.py +++ b/airbyte-ci/connectors/pipelines/pipelines/cli/airbyte_ci.py @@ -34,7 +34,7 @@ from pipelines.cli.confirm_prompt import pre_confirm_all_flag from pipelines.cli.lazy_group import LazyGroup from pipelines.cli.telemetry import click_track_command -from pipelines.consts import DAGGER_WRAP_ENV_VAR_NAME, CIContext +from pipelines.consts import DAGGER_WRAP_ENV_VAR_NAME, LOCAL_BUILD_PLATFORM, CIContext from pipelines.dagger.actions.connector.hooks import get_dagger_sdk_version from pipelines.helpers import github from pipelines.helpers.git import get_current_git_branch, get_current_git_revision @@ -53,6 +53,7 @@ def log_context_info(ctx: click.Context) -> None: main_logger.info(f"GitHub Workflow Run URL: {ctx.obj['gha_workflow_run_url']}") main_logger.info(f"Pull Request Number: {ctx.obj['pull_request_number']}") main_logger.info(f"Pipeline Start Timestamp: {ctx.obj['pipeline_start_timestamp']}") + main_logger.info(f"Local build platform: {LOCAL_BUILD_PLATFORM}") def _get_gha_workflow_run_url(ctx: click.Context) -> Optional[str]: diff --git a/airbyte-ci/connectors/pipelines/pipelines/helpers/execution/run_steps.py b/airbyte-ci/connectors/pipelines/pipelines/helpers/execution/run_steps.py index 5db187af307c3c..9517e1f8d40bee 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/helpers/execution/run_steps.py +++ b/airbyte-ci/connectors/pipelines/pipelines/helpers/execution/run_steps.py @@ -248,7 +248,7 @@ async def run_steps( >>> from pipelines.models.steps import Step, StepResult, StepStatus >>> class TestStep(Step): ... async def _run(self) -> StepResult: - ... return StepResult(self, StepStatus.SUCCESS) + ... return StepResult(step=self, status=StepStatus.SUCCESS) >>> steps = [ ... StepToRun(id="step1", step=TestStep()), ... [ diff --git a/airbyte-ci/connectors/pipelines/pipelines/helpers/git.py b/airbyte-ci/connectors/pipelines/pipelines/helpers/git.py index 0a68e74d22713c..682b77cd45030f 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/helpers/git.py +++ b/airbyte-ci/connectors/pipelines/pipelines/helpers/git.py @@ -7,6 +7,7 @@ import git from dagger import Connection, SessionError +from pipelines.consts import CIContext from pipelines.dagger.containers.git import checked_out_git_container from pipelines.helpers.utils import DAGGER_CONFIG, DIFF_FILTER @@ -95,3 +96,18 @@ def get_git_repo() -> git.Repo: def get_git_repo_path() -> str: """Retrieve the git repo path.""" return str(get_git_repo().working_tree_dir) + + +async def get_modified_files(git_branch: str, git_revision: str, diffed_branch: str, is_local: bool, ci_context: CIContext) -> Set[str]: + """Get the list of modified files in the current git branch. + If the current branch is master, it will return the list of modified files in the head commit. + The head commit on master should be the merge commit of the latest merged pull request as we squash commits on merge. + Pipelines like "publish on merge" are triggered on each new commit on master. + + If the CI context is a pull request, it will return the list of modified files in the pull request, without using git diff. + If the current branch is not master, it will return the list of modified files in the current branch. + This latest case is the one we encounter when running the pipeline locally, on a local branch, or manually on GHA with a workflow dispatch event. + """ + if ci_context is CIContext.MASTER or (ci_context is CIContext.MANUAL and git_branch == "master"): + return await get_modified_files_in_commit(git_branch, git_revision, is_local) + return await get_modified_files_in_branch(git_branch, git_revision, diffed_branch, is_local) diff --git a/airbyte-ci/connectors/pipelines/pipelines/models/steps.py b/airbyte-ci/connectors/pipelines/pipelines/models/steps.py index bc3acafebc060a..8fbc509f10ccfd 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/models/steps.py +++ b/airbyte-ci/connectors/pipelines/pipelines/models/steps.py @@ -65,17 +65,25 @@ def is_file(self) -> bool: return self.get_path().is_file() -@dataclass(frozen=True) -class StepResult: - """A dataclass to capture the result of a step.""" - - step: Step +@dataclass(kw_only=True, frozen=True) +class Result: status: StepStatus created_at: datetime = field(default_factory=datetime.utcnow) stderr: Optional[str] = None stdout: Optional[str] = None - output_artifact: Any = None exc_info: Optional[Exception] = None + output_artifact: Any = None + + @property + def success(self) -> bool: + return self.status is StepStatus.SUCCESS + + +@dataclass(kw_only=True, frozen=True) +class StepResult(Result): + """A dataclass to capture the result of a step.""" + + step: Step def __repr__(self) -> str: # noqa D105 return f"{self.step.title}: {self.status.value}" @@ -85,9 +93,9 @@ def __str__(self) -> str: # noqa D105 def __post_init__(self) -> None: if self.stderr: - super().__setattr__("stderr", self.redact_secrets_from_string(self.stderr)) + object.__setattr__(self, "stderr", self.redact_secrets_from_string(self.stderr)) if self.stdout: - super().__setattr__("stdout", self.redact_secrets_from_string(self.stdout)) + object.__setattr__(self, "stdout", self.redact_secrets_from_string(self.stdout)) def redact_secrets_from_string(self, value: str) -> str: for secret in self.step.context.secrets_to_mask: @@ -95,17 +103,11 @@ def redact_secrets_from_string(self, value: str) -> str: return value -@dataclass(frozen=True) -class CommandResult: +@dataclass(kw_only=True, frozen=True) +class CommandResult(Result): """A dataclass to capture the result of a command.""" command: click.Command | FormatCommand - status: StepStatus - created_at: datetime = field(default_factory=datetime.utcnow) - stderr: Optional[str] = None - stdout: Optional[str] = None - exc_info: Optional[Exception] = None - output_artifact: Any = None def __repr__(self) -> str: # noqa D105 return f"{self.command.name}: {self.status.value}" @@ -113,9 +115,35 @@ def __repr__(self) -> str: # noqa D105 def __str__(self) -> str: # noqa D105 return f"{self.command.name}: {self.status.value}\n\nSTDOUT:\n{self.stdout}\n\nSTDERR:\n{self.stderr}" - @property - def success(self) -> bool: - return self.status is StepStatus.SUCCESS + +@dataclass(kw_only=True, frozen=True) +class PoeTaskResult(Result): + + task_name: str + + def __repr__(self) -> str: # noqa D105 + return f"{self.task_name}: {self.status.value}" + + def __str__(self) -> str: # noqa D105 + return f"{self.task_name}: {self.status.value}\n\nSTDOUT:\n{self.stdout}\n\nSTDERR:\n{self.stderr}" + + def log(self, logger: logging.Logger, verbose: bool = False) -> None: + """Log the step result. + + Args: + logger (logging.Logger): The logger to use. + """ + if self.status is StepStatus.FAILURE: + logger.exception(self.exc_info) + else: + logger.info(f"{self.status.get_emoji()} - Poe {self.task_name} - {self.status.value}") + if verbose: + if self.stdout: + for line in self.stdout.splitlines(): + logger.info(line) + if self.stderr: + for line in self.stderr.splitlines(): + logger.error(line) class StepStatus(Enum): @@ -143,6 +171,14 @@ def get_emoji(self) -> str: if self is StepStatus.SKIPPED: return "🟡" + def get_github_state(self) -> str: + """Match state used in the GitHub commit checks to the step status.""" + if self in [StepStatus.SUCCESS, StepStatus.SKIPPED]: + return "success" + if self is StepStatus.FAILURE: + return "failure" + raise NotImplementedError(f"Unknown state for {self}") + def __str__(self) -> str: # noqa D105 return self.value @@ -270,7 +306,7 @@ async def run(self, *args: Any, **kwargs: Any) -> StepResult: step_result = soon_result.value except DaggerError as e: self.logger.error("Step failed with an unexpected dagger error", exc_info=e) - step_result = StepResult(self, StepStatus.FAILURE, stderr=str(e), exc_info=e) + step_result = StepResult(step=self, status=StepStatus.FAILURE, stderr=str(e), exc_info=e) self.stopped_at = datetime.utcnow() self.log_step_result(step_result) @@ -326,7 +362,7 @@ def skip(self, reason: Optional[str] = None) -> StepResult: Returns: StepResult: A skipped step result. """ - return StepResult(self, StepStatus.SKIPPED, stdout=reason) + return StepResult(step=self, status=StepStatus.SKIPPED, stdout=reason) def get_step_status_from_exit_code( self, @@ -363,8 +399,8 @@ async def get_step_result(self, container: Container) -> StepResult: """ exit_code, stdout, stderr = await get_exec_result(container) return StepResult( - self, - self.get_step_status_from_exit_code(exit_code), + step=self, + status=self.get_step_status_from_exit_code(exit_code), stderr=stderr, stdout=stdout, output_artifact=container, @@ -372,7 +408,7 @@ async def get_step_result(self, container: Container) -> StepResult: def _get_timed_out_step_result(self) -> StepResult: return StepResult( - self, - StepStatus.FAILURE, + step=self, + status=StepStatus.FAILURE, stdout=f"Timed out after the max duration of {format_duration(self.max_duration)}. Please checkout the Dagger logs to see what happened.", ) diff --git a/airbyte-ci/connectors/pipelines/pyproject.toml b/airbyte-ci/connectors/pipelines/pyproject.toml index 8b4cb54f98fc2c..277803511f19c9 100644 --- a/airbyte-ci/connectors/pipelines/pyproject.toml +++ b/airbyte-ci/connectors/pipelines/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "pipelines" -version = "3.10.4" +version = "3.10.5" description = "Packaged maintained by the connector operations team to perform CI for connectors' pipelines" authors = ["Airbyte "] diff --git a/airbyte-ci/connectors/pipelines/tests/test_bases.py b/airbyte-ci/connectors/pipelines/tests/test_bases.py index 5b4547df1e4551..a109e1e33a0b07 100644 --- a/airbyte-ci/connectors/pipelines/tests/test_bases.py +++ b/airbyte-ci/connectors/pipelines/tests/test_bases.py @@ -22,7 +22,7 @@ class DummyStep(steps.Step): async def _run(self, run_duration: timedelta) -> steps.StepResult: await anyio.sleep(run_duration.total_seconds()) - return steps.StepResult(self, steps.StepStatus.SUCCESS) + return steps.StepResult(step=self, status=steps.StepStatus.SUCCESS) @pytest.fixture def test_context(self, mocker): @@ -67,7 +67,8 @@ async def test_run_with_retries(self, mocker, test_context, step_status, exc_inf step.max_duration = timedelta(seconds=60) step.retry_delay = timedelta(seconds=0) step._run = mocker.AsyncMock( - side_effect=[steps.StepResult(step, step_status, exc_info=exc_info)] * (max(max_dagger_error_retries, max_retries) + 1) + side_effect=[steps.StepResult(step=step, status=step_status, exc_info=exc_info)] + * (max(max_dagger_error_retries, max_retries) + 1) ) step_result = await step.run() @@ -87,21 +88,23 @@ def test_context(self, mocker): def test_report_failed_if_it_has_no_step_result(self, test_context): report = reports.Report(test_context, []) assert not report.success - report = reports.Report(test_context, [steps.StepResult(None, steps.StepStatus.FAILURE)]) + report = reports.Report(test_context, [steps.StepResult(step=None, status=steps.StepStatus.FAILURE)]) assert not report.success report = reports.Report( - test_context, [steps.StepResult(None, steps.StepStatus.FAILURE), steps.StepResult(None, steps.StepStatus.SUCCESS)] + test_context, + [steps.StepResult(step=None, status=steps.StepStatus.FAILURE), steps.StepResult(step=None, status=steps.StepStatus.SUCCESS)], ) assert not report.success - report = reports.Report(test_context, [steps.StepResult(None, steps.StepStatus.SUCCESS)]) + report = reports.Report(test_context, [steps.StepResult(step=None, status=steps.StepStatus.SUCCESS)]) assert report.success report = reports.Report( - test_context, [steps.StepResult(None, steps.StepStatus.SUCCESS), steps.StepResult(None, steps.StepStatus.SKIPPED)] + test_context, + [steps.StepResult(step=None, status=steps.StepStatus.SUCCESS), steps.StepResult(step=None, status=steps.StepStatus.SKIPPED)], ) assert report.success - report = reports.Report(test_context, [steps.StepResult(None, steps.StepStatus.SKIPPED)]) + report = reports.Report(test_context, [steps.StepResult(step=None, status=steps.StepStatus.SKIPPED)]) assert report.success diff --git a/airbyte-ci/connectors/pipelines/tests/test_build_image/test_steps/test_common.py b/airbyte-ci/connectors/pipelines/tests/test_build_image/test_steps/test_common.py index 8b2fbbcf1e3d8d..b2289abb503d63 100644 --- a/airbyte-ci/connectors/pipelines/tests/test_build_image/test_steps/test_common.py +++ b/airbyte-ci/connectors/pipelines/tests/test_build_image/test_steps/test_common.py @@ -80,7 +80,7 @@ async def test_run(self, dagger_client, test_context, platforms): docker_client.images.get(full_image_name) # CI can't run docker arm64 containers - if platform is LOCAL_BUILD_PLATFORM or (os.getenv("CI") != "True"): + if platform is LOCAL_BUILD_PLATFORM or (os.environ.get("CI", "false").lower() != "true"): docker_client.containers.run(full_image_name, "spec") docker_client.images.remove(full_image_name, force=True) diff --git a/airbyte-ci/connectors/pipelines/tests/test_gradle.py b/airbyte-ci/connectors/pipelines/tests/test_gradle.py index 34312ec1d0d34d..96a397e7285548 100644 --- a/airbyte-ci/connectors/pipelines/tests/test_gradle.py +++ b/airbyte-ci/connectors/pipelines/tests/test_gradle.py @@ -21,7 +21,7 @@ class DummyStep(gradle.GradleTask): title = "Dummy Step" async def _run(self) -> steps.StepResult: - return steps.StepResult(self, steps.StepStatus.SUCCESS) + return steps.StepResult(step=self, status=steps.StepStatus.SUCCESS) @pytest.fixture def test_context(self, mocker, dagger_client): diff --git a/airbyte-ci/connectors/pipelines/tests/test_helpers/test_execution/test_run_steps.py b/airbyte-ci/connectors/pipelines/tests/test_helpers/test_execution/test_run_steps.py index b1975799ae354e..37bfa991eee097 100644 --- a/airbyte-ci/connectors/pipelines/tests/test_helpers/test_execution/test_run_steps.py +++ b/airbyte-ci/connectors/pipelines/tests/test_helpers/test_execution/test_run_steps.py @@ -15,7 +15,7 @@ class TestStep(Step): title = "Test Step" async def _run(self, result_status=StepStatus.SUCCESS) -> StepResult: - return StepResult(self, result_status) + return StepResult(step=self, status=result_status) @pytest.mark.anyio @@ -215,7 +215,7 @@ class SleepStep(Step): async def _run(self, name, sleep) -> StepResult: await anyio.sleep(sleep) ran_at[name] = time.time() - return StepResult(self, StepStatus.SUCCESS) + return StepResult(step=self, status=StepStatus.SUCCESS) steps = [ StepToRun(id="step1", step=SleepStep(test_context), args={"name": "step1", "sleep": 2}), @@ -242,7 +242,7 @@ class SleepStep(Step): async def _run(self, name, sleep) -> StepResult: ran_at[name] = time.time() await anyio.sleep(sleep) - return StepResult(self, StepStatus.SUCCESS) + return StepResult(step=self, status=StepStatus.SUCCESS) steps = [ StepToRun(id="step1", step=SleepStep(test_context), args={"name": "step1", "sleep": 1}), @@ -269,7 +269,7 @@ class SleepStep(Step): async def _run(self, name, sleep) -> StepResult: await anyio.sleep(sleep) ran_at[name] = time.time() - return StepResult(self, StepStatus.SUCCESS) + return StepResult(step=self, status=StepStatus.SUCCESS) steps = [ [StepToRun(id="step1", step=SleepStep(test_context), args={"name": "step1", "sleep": 1})], @@ -310,7 +310,7 @@ class Simple(Step): async def _run(self, arg1, arg2) -> StepResult: output_artifact = f"{arg1}:{arg2}" - return StepResult(self, StepStatus.SUCCESS, output_artifact=output_artifact) + return StepResult(step=self, status=StepStatus.SUCCESS, output_artifact=output_artifact) async def async_args(results): return {"arg1": results["step2"].output_artifact, "arg2": "4"} diff --git a/airbyte-ci/connectors/pipelines/tests/test_tests/test_common.py b/airbyte-ci/connectors/pipelines/tests/test_tests/test_common.py index 42ee3d85bf49d5..79b7ba853e7b33 100644 --- a/airbyte-ci/connectors/pipelines/tests/test_tests/test_common.py +++ b/airbyte-ci/connectors/pipelines/tests/test_tests/test_common.py @@ -208,37 +208,34 @@ async def test_cat_container_caching( with freeze_time(initial_datetime) as frozen_datetime: acceptance_test_step = self.get_patched_acceptance_test_step(dagger_client, mocker, test_context_ci, test_input_dir) - cat_container = await acceptance_test_step._build_connector_acceptance_test( + first_cat_container = await acceptance_test_step._build_connector_acceptance_test( dummy_connector_under_test_container, test_input_dir ) - cat_container = cat_container.with_exec(["date"]) - fist_date_result = await cat_container.stdout() + fist_date_result = await first_cat_container.with_exec(["date"]).stdout() frozen_datetime.tick(delta=datetime.timedelta(hours=5)) # Check that cache is used in the same day - cat_container = await acceptance_test_step._build_connector_acceptance_test( + second_cat_container = await acceptance_test_step._build_connector_acceptance_test( dummy_connector_under_test_container, test_input_dir ) - cat_container = cat_container.with_exec(["date"]) - second_date_result = await cat_container.stdout() + + second_date_result = await second_cat_container.with_exec(["date"]).stdout() assert fist_date_result == second_date_result # Check that cache bursted after a day - frozen_datetime.tick(delta=datetime.timedelta(days=1, seconds=1)) - cat_container = await acceptance_test_step._build_connector_acceptance_test( + frozen_datetime.tick(delta=datetime.timedelta(days=1, minutes=10)) + third_cat_container = await acceptance_test_step._build_connector_acceptance_test( dummy_connector_under_test_container, test_input_dir ) - cat_container = cat_container.with_exec(["date"]) - third_date_result = await cat_container.stdout() + third_date_result = await third_cat_container.with_exec(["date"]).stdout() assert third_date_result != second_date_result time.sleep(1) # Check that changing the container invalidates the cache - cat_container = await acceptance_test_step._build_connector_acceptance_test( + fourth_cat_container = await acceptance_test_step._build_connector_acceptance_test( another_dummy_connector_under_test_container, test_input_dir ) - cat_container = cat_container.with_exec(["date"]) - fourth_date_result = await cat_container.stdout() + fourth_date_result = await fourth_cat_container.with_exec(["date"]).stdout() assert fourth_date_result != third_date_result async def test_params(self, dagger_client, mocker, test_context_ci, test_input_dir):