Skip to content

Commit

Permalink
Fix metadata service steps (#33981)
Browse files Browse the repository at this point in the history
Co-authored-by: erohmensing <erohmensing@gmail.com>
  • Loading branch information
bnchrch and erohmensing committed Jan 5, 2024
1 parent 3f9c700 commit e9d5377
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 24 deletions.
1 change: 1 addition & 0 deletions airbyte-ci/connectors/pipelines/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ E.G.: running `pytest` on a specific test folder:

| Version | PR | Description |
| ------- | ---------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------- |
| 3.0.1 | [#33981](https://github.com/airbytehq/airbyte/pull/33981) | Fix issues with deploying dagster, pin pendulum version in dagster-cli install |
| 3.0.0 | [#33582](https://github.com/airbytehq/airbyte/pull/33582) | Upgrade to Dagger 0.9.5 |
| 2.14.3 | [#33964](https://github.com/airbytehq/airbyte/pull/33964) | Reintroduce mypy with fixes for AssertionError on publish and missing report URL on connector test commit status. |
| 2.14.2 | [#33954](https://github.com/airbytehq/airbyte/pull/33954) | Revert mypy changes |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ async def _run(self) -> StepResult:
# mount metadata_service/lib and metadata_service/orchestrator
parent_dir = self.context.get_repo_dir("airbyte-ci/connectors/metadata_service")
python_base = with_python_base(self.context, "3.9")
python_with_dependencies = with_pip_packages(python_base, ["dagster-cloud==1.2.6", "pydantic==1.10.6", "poetry2setup==1.1.0"])
python_with_dependencies = with_pip_packages(
python_base, ["dagster-cloud==1.2.6", "pydantic==1.10.6", "poetry2setup==1.1.0", "pendulum==2.1.2"]
)
dagster_cloud_api_token_secret: dagger.Secret = get_secret_host_variable(
self.context.dagger_client, "DAGSTER_CLOUD_METADATA_API_TOKEN"
)
Expand Down Expand Up @@ -179,15 +181,19 @@ async def run_metadata_orchestrator_deploy_pipeline(

async with metadata_pipeline_context:
steps: STEP_TREE = [
StepToRun(
id=CONNECTOR_TEST_STEP_ID.TEST_ORCHESTRATOR,
step=TestOrchestrator(context=metadata_pipeline_context),
),
StepToRun(
id=CONNECTOR_TEST_STEP_ID.DEPLOY_ORCHESTRATOR,
step=DeployOrchestrator(context=metadata_pipeline_context),
depends_on=[CONNECTOR_TEST_STEP_ID.TEST_ORCHESTRATOR],
),
[
StepToRun(
id=CONNECTOR_TEST_STEP_ID.TEST_ORCHESTRATOR,
step=TestOrchestrator(context=metadata_pipeline_context),
)
],
[
StepToRun(
id=CONNECTOR_TEST_STEP_ID.DEPLOY_ORCHESTRATOR,
step=DeployOrchestrator(context=metadata_pipeline_context),
depends_on=[CONNECTOR_TEST_STEP_ID.TEST_ORCHESTRATOR],
)
],
]
steps_results = await run_steps(steps)
metadata_pipeline_context.report = Report(
Expand Down
21 changes: 17 additions & 4 deletions airbyte-ci/connectors/pipelines/pipelines/helpers/run_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
ARGS_TYPE = Union[Dict, Callable[[RESULTS_DICT], Dict], Awaitable[Dict]]


class InvalidStepConfiguration(Exception):
pass


@dataclass
class RunStepOptions:
"""Options for the run_step function."""
Expand Down Expand Up @@ -80,13 +84,22 @@ def _skip_remaining_steps(remaining_steps: STEP_TREE) -> RESULTS_DICT:
return skipped_results


def _step_dependencies_succeeded(depends_on: List[str], results: RESULTS_DICT) -> bool:
def _step_dependencies_succeeded(step_to_eval: StepToRun, results: RESULTS_DICT) -> bool:
"""
Check if all dependencies of a step have succeeded.
"""
main_logger.info(f"Checking if dependencies {depends_on} have succeeded")
main_logger.info(f"Checking if dependencies {step_to_eval.depends_on} have succeeded")

# Check if all depends_on keys are in the results dict
# If not, that means a step has not been run yet
# Implying that the order of the steps are not correct
for step_id in step_to_eval.depends_on:
if step_id not in results:
raise InvalidStepConfiguration(
f"Step {step_to_eval.id} depends on {step_id} which has not been run yet. This implies that the order of the steps is not correct. Please check that the steps are in the correct order."
)

return all(results[step_id] and results[step_id].status is StepStatus.SUCCESS for step_id in depends_on)
return all(results[step_id] and results[step_id].status is StepStatus.SUCCESS for step_id in step_to_eval.depends_on)


def _filter_skipped_steps(steps_to_evaluate: STEP_TREE, skip_steps: List[str], results: RESULTS_DICT) -> Tuple[STEP_TREE, RESULTS_DICT]:
Expand All @@ -109,7 +122,7 @@ def _filter_skipped_steps(steps_to_evaluate: STEP_TREE, skip_steps: List[str], r
results[step_to_eval.id] = step_to_eval.step.skip("Skipped by user")

# skip step if a dependency failed
elif not _step_dependencies_succeeded(step_to_eval.depends_on, results):
elif not _step_dependencies_succeeded(step_to_eval, results):
main_logger.info(
f"Skipping step {step_to_eval.id} because one of the dependencies have not been met: {step_to_eval.depends_on}"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,15 +303,16 @@ async def __aexit__(
Returns:
bool: Whether the teardown operation ran successfully.
"""
self.state = self.determine_final_state(self.report, exception_value)
self.stopped_at = datetime.utcnow()

if exception_value:
self.logger.error("An error was handled by the Pipeline", exc_info=True)

if self.report is None:
self.logger.error("No test report was provided. This is probably due to an upstream error")
self.report = Report(self, steps_results=[])

self.state = self.determine_final_state(self.report, exception_value)
self.stopped_at = datetime.utcnow()

self.report.print()

await asyncify(update_commit_status_check)(**self.github_commit_status)
Expand Down
2 changes: 1 addition & 1 deletion airbyte-ci/connectors/pipelines/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "pipelines"
version = "3.0.0"
version = "3.0.1"
description = "Packaged maintained by the connector operations team to perform CI for connectors' pipelines"
authors = ["Airbyte <contact@airbyte.io>"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import anyio
import pytest
from pipelines.helpers.run_steps import RunStepOptions, StepToRun, run_steps
from pipelines.helpers.run_steps import InvalidStepConfiguration, RunStepOptions, StepToRun, run_steps
from pipelines.models.contexts.pipeline_context import PipelineContext
from pipelines.models.steps import Step, StepResult, StepStatus

Expand Down Expand Up @@ -178,12 +178,11 @@ async def _run(self, result_status=StepStatus.SUCCESS) -> StepResult:
(
"step is skipped if the dependency fails",
[
StepToRun(id="step1", step=TestStep(test_context)),
StepToRun(id="step2", step=TestStep(test_context), args={"result_status": StepStatus.FAILURE}),
StepToRun(id="step3", step=TestStep(test_context)),
StepToRun(id="step4", step=TestStep(test_context), depends_on=["step2"]),
[StepToRun(id="step1", step=TestStep(test_context))],
[StepToRun(id="step2", step=TestStep(test_context), args={"result_status": StepStatus.FAILURE})],
[StepToRun(id="step3", step=TestStep(test_context), depends_on=["step2"])],
],
{"step1": StepStatus.SUCCESS, "step2": StepStatus.FAILURE, "step3": StepStatus.SUCCESS, "step4": StepStatus.SKIPPED},
{"step1": StepStatus.SUCCESS, "step2": StepStatus.FAILURE, "step3": StepStatus.SKIPPED},
RunStepOptions(fail_fast=False),
),
],
Expand All @@ -195,6 +194,17 @@ async def test_run_steps_output(desc, steps, expected_results, options):
assert results[step_id].status == expected_status, desc


@pytest.mark.anyio
async def test_run_steps_throws_on_invalid_order():
concurrent_steps = [
StepToRun(id="step1", step=TestStep(test_context)),
StepToRun(id="step2", step=TestStep(test_context), depends_on=["step1"]),
]

with pytest.raises(InvalidStepConfiguration):
await run_steps(concurrent_steps)


@pytest.mark.anyio
async def test_run_steps_concurrent():
ran_at = {}
Expand Down

0 comments on commit e9d5377

Please sign in to comment.