Skip to content

Commit

Permalink
airbyte-ci: run incremental acceptance tests step
Browse files Browse the repository at this point in the history
  • Loading branch information
alafanechere committed Jun 10, 2024
1 parent bc78187 commit 51837c8
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class CONNECTOR_TEST_STEP_ID(str, Enum):
"""

ACCEPTANCE = "acceptance"
INCREMENTAL_ACCEPTANCE = "incremental_acceptance"
BUILD_NORMALIZATION = "build_normalization"
BUILD_TAR = "build_tar"
BUILD = "build"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""This module groups steps made to run tests agnostic to a connector language."""

import datetime
import json
import os
import time
from abc import ABC, abstractmethod
Expand Down Expand Up @@ -334,12 +335,96 @@ async def get_step_result(self, container: Container) -> StepResult:
status=self.get_step_status_from_exit_code(exit_code),
stderr=stderr,
stdout=stdout,
output=container,
output={"report_log": report_log_artifact},
artifacts=[report_log_artifact],
consider_in_overall_status=consider_result_in_overall_status,
)


class IncrementalAcceptanceTests(Step):
"""This step runs the acceptance tests on the released image of the connector and compares the results with the current acceptance tests report log.
It fails if there are new failing tests in the current acceptance tests report log.
"""

title = "Incremental Acceptance Tests"
context: ConnectorContext

async def get_failed_pytest_node_ids(self, current_acceptance_tests_report_log: Artifact) -> List[str]:
"""Parse the report log of the acceptance tests and return the pytest node ids of the failed tests.
Args:
current_acceptance_tests_report_log (Artifact): The report log of the acceptance tests.
Returns:
List[str]: The pytest node ids of the failed tests.
"""
current_report_lines = (await current_acceptance_tests_report_log.content.contents()).splitlines()
failed_nodes = set()
for line in current_report_lines:
single_test_report = json.loads(line)
if "nodeid" not in single_test_report or "outcome" not in single_test_report:
continue
if single_test_report["outcome"] == "failed":
failed_nodes.add(single_test_report["nodeid"])
return failed_nodes

async def get_result_log_on_master(self) -> Artifact:
"""Runs acceptance test on the released image of the connector and returns the report log.
The released image version is fetched from the master metadata file of the connector.
We're not using the online connector registry here as some connectors might not be released to OSS nor Airbyte Cloud.
Thanks to Dagger caching subsequent runs of this step will be cached if the released image did not change.
Returns:
Artifact: The report log of the acceptance tests run on the released image.
"""
raw_master_metadata = requests.get(
f"https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-integrations/connectors/{self.context.connector.technical_name}/metadata.yaml"
)
master_metadata = yaml.safe_load(raw_master_metadata.text)
master_docker_image_tag = master_metadata["data"]["dockerImageTag"]
released_image = f'{master_metadata["data"]["dockerRepository"]}:{master_docker_image_tag}'
released_container = self.dagger_client.container().from_(released_image)
self.logger.info(f"Running acceptance tests on released image: {released_image}")
acceptance_tests_results_on_master = await AcceptanceTests(self.context, self.secrets).run(released_container)
return acceptance_tests_results_on_master.output["report_log"]

async def _run(self, current_acceptance_tests_report_log: Artifact) -> StepResult:
"""Compare the acceptance tests report log of the current image with the one of the released image.
Fails if there are new failing tests in the current acceptance tests report log.
"""
if self.context.connector.metadata.get("supportLevel") == "certified":
return StepResult(
step=self, status=StepStatus.SKIPPED, stdout="The connector is certified, skipping incremental acceptance tests."
)

current_failing_nodes = await self.get_failed_pytest_node_ids(current_acceptance_tests_report_log)
if not current_failing_nodes:
return StepResult(step=self, status=StepStatus.SUCCESS, stdout="No failing acceptance tests were detected.")

master_failings = await self.get_failed_pytest_node_ids(await self.get_result_log_on_master())
new_failing_nodes = current_failing_nodes - master_failings
if not new_failing_nodes:
return StepResult(
step=self,
status=StepStatus.SUCCESS,
stdout=textwrap.dedent(
f"""
No new failing acceptance tests were detected.
Acceptance tests are still failing with {len(current_failing_nodes)} failing tests.
This connector is not certified so we are not enforcing a clean slate, but please checkout the original acceptance tests failures and assess how critical they are.
"""
),
)
else:
return StepResult(
step=self,
status=StepStatus.FAILURE,
stdout=f"{len(new_failing_nodes)} new failing acceptance tests detected:\n-"
+ "\n-".join(current_failing_nodes)
+ "\nPlease fix the new failing tests before merging this PR.",
)


class RegressionTests(Step):
"""A step to run regression tests for a connector."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from pipelines.airbyte_ci.connectors.build_image.steps.python_connectors import BuildConnectorImages
from pipelines.airbyte_ci.connectors.consts import CONNECTOR_TEST_STEP_ID
from pipelines.airbyte_ci.connectors.test.context import ConnectorTestContext
from pipelines.airbyte_ci.connectors.test.steps.common import AcceptanceTests, RegressionTests
from pipelines.airbyte_ci.connectors.test.steps.common import AcceptanceTests, IncrementalAcceptanceTests, RegressionTests
from pipelines.consts import LOCAL_BUILD_PLATFORM
from pipelines.dagger.actions import secrets
from pipelines.dagger.actions.python.poetry import with_poetry
Expand Down Expand Up @@ -293,4 +293,14 @@ def get_test_steps(context: ConnectorTestContext) -> STEP_TREE:
depends_on=[CONNECTOR_TEST_STEP_ID.BUILD],
),
],
[
StepToRun(
id=CONNECTOR_TEST_STEP_ID.INCREMENTAL_ACCEPTANCE,
step=IncrementalAcceptanceTests(context, secrets=context.get_secrets_for_step_id(CONNECTOR_TEST_STEP_ID.ACCEPTANCE)),
args=lambda results: {
"current_acceptance_tests_report_log": results[CONNECTOR_TEST_STEP_ID.ACCEPTANCE].output["report_log"]
},
depends_on=[CONNECTOR_TEST_STEP_ID.ACCEPTANCE],
)
],
]
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ function copyToClipBoard(htmlElement) {
<label for="{{ step_result.step.title }}" class="lbl-toggle success">{{ step_result.step.title }} | {{ format_duration(step_result.step.run_duration) }}</label>
{% elif step_result.status == StepStatus.FAILURE %}
{% if not step_result.consider_in_overall_status %}
<label for="{{ step_result.step.title }}" class="lbl-toggle failure">{{ step_result.step.title }} | Ignored | {{ format_duration(step_result.step.run_duration) }}</label>
<label for="{{ step_result.step.title }}" class="lbl-toggle failure">[Ignored] {{ step_result.step.title }} | {{ format_duration(step_result.step.run_duration) }}</label>
{% else %}
<label for="{{ step_result.step.title }}" class="lbl-toggle failure">{{ step_result.step.title }} | {{ format_duration(step_result.step.run_duration) }}</label>
{% endif %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,10 @@ def _step_dependencies_succeeded(step_to_eval: StepToRun, results: RESULTS_DICT)
f"Step {step_to_eval.id} depends on {step_id} which has not been run yet. This implies that the order of the steps is not correct. Please check that the steps are in the correct order."
)

return all(results[step_id] and results[step_id].status is StepStatus.SUCCESS for step_id in step_to_eval.depends_on)
return all(
results[step_id] and (results[step_id].status is StepStatus.SUCCESS or not results[step_id].consider_in_overall_status)
for step_id in step_to_eval.depends_on
)


def _filter_skipped_steps(steps_to_evaluate: STEP_TREE, skip_steps: List[str], results: RESULTS_DICT) -> Tuple[STEP_TREE, RESULTS_DICT]:
Expand Down Expand Up @@ -301,7 +304,7 @@ async def run_steps(
options.log_step_tree = False

# If any of the previous steps failed, skip the remaining steps
if options.fail_fast and any(result.status is StepStatus.FAILURE for result in results.values()):
if options.fail_fast and any(result.status is StepStatus.FAILURE and result.consider_in_overall_status for result in results.values()):
skipped_results = _skip_remaining_steps(runnables)
return {**results, **skipped_results}

Expand Down

0 comments on commit 51837c8

Please sign in to comment.