Skip to content

Commit

Permalink
connectors-ci: check if the pushed image can be pulled (#26088)
Browse files Browse the repository at this point in the history
  • Loading branch information
alafanechere committed May 15, 2023
1 parent 95dd113 commit 0e3ec70
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 29 deletions.
58 changes: 46 additions & 12 deletions tools/ci_connector_ops/ci_connector_ops/pipelines/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from ci_connector_ops.pipelines.bases import ConnectorReport, Step, StepResult, StepStatus
from ci_connector_ops.pipelines.contexts import PublishConnectorContext
from ci_connector_ops.pipelines.pipelines import metadata
from ci_connector_ops.pipelines.utils import with_stderr, with_stdout
from ci_connector_ops.pipelines.utils import with_exit_code, with_stderr, with_stdout
from dagger import Container, File, QueryError
from pydantic import ValidationError

Expand Down Expand Up @@ -73,6 +73,28 @@ async def _run(self, built_containers_per_platform: List[Container]) -> StepResu
return StepResult(self, status=StepStatus.FAILURE, stderr=str(e))


class PullConnectorImageFromRegistry(Step):
title = "Pull connector image from registry"

async def _run(self, attempt: int = 3) -> StepResult:
try:
exit_code = await with_exit_code(
self.context.dagger_client.container().from_(f"docker.io/{self.context.docker_image_name}").with_exec(["spec"])
)
if exit_code != 0:
if attempt > 0:
await anyio.sleep(10)
return await self._run(attempt - 1)
else:
return StepResult(self, status=StepStatus.FAILURE, stderr=f"Failed to pull {self.context.docker_image_name}")
return StepResult(self, status=StepStatus.SUCCESS, stdout=f"Pulled {self.context.docker_image_name} and ran spec command")
except QueryError as e:
if attempt > 0:
await anyio.sleep(10)
return await self._run(attempt - 1)
return StepResult(self, status=StepStatus.FAILURE, stderr=str(e))


class InvalidSpecOutputError(Exception):
pass

Expand Down Expand Up @@ -174,13 +196,14 @@ def create_connector_report(results: List[StepResult]) -> ConnectorReport:

check_connector_image_results = await CheckConnectorImageDoesNotExist(context).run()
results.append(check_connector_image_results)
if check_connector_image_results.status is StepStatus.SKIPPED and not context.pre_release:
context.logger.info(
"The connector version is already published. Let's upload metadata.yaml to GCS even if no version bump happened."
)
metadata_upload_results = await metadata.MetadataUpload(context).run()
results.append(metadata_upload_results)

if check_connector_image_results.status is not StepStatus.SUCCESS:
if check_connector_image_results.status is StepStatus.SKIPPED:
context.logger.info(
"The connector version is already published. Let's upload metadata.yaml to GCS even if no version bump happened."
)
metadata_upload_results = await metadata.MetadataUpload(context).run()
results.append(metadata_upload_results)
return create_connector_report(results)

build_connector_results = await BuildConnectorForPublish(context).run()
Expand All @@ -195,11 +218,22 @@ def create_connector_report(results: List[StepResult]) -> ConnectorReport:
if push_connector_image_results.status is not StepStatus.SUCCESS:
return create_connector_report(results)

upload_to_spec_cache_results = await UploadSpecToCache(context).run(built_connector_platform_variants[0])
results.append(upload_to_spec_cache_results)
if upload_to_spec_cache_results.status is not StepStatus.SUCCESS:
# Make sure the image published is healthy by pulling it and running SPEC on it.
# See https://github.com/airbytehq/airbyte/issues/26085
pull_connector_image_results = await PullConnectorImageFromRegistry(context).run()
results.append(pull_connector_image_results)
if pull_connector_image_results.status is not StepStatus.SUCCESS:
return create_connector_report(results)

metadata_upload_results = await metadata.MetadataUpload(context).run()
results.append(metadata_upload_results)
if not context.pre_release:
# Only upload to spec cache bucket if the connector is not a pre-release.
upload_to_spec_cache_results = await UploadSpecToCache(context).run(built_connector_platform_variants[0])
results.append(upload_to_spec_cache_results)
if upload_to_spec_cache_results.status is not StepStatus.SUCCESS:
return create_connector_report(results)

# Only upload to metadata service bucket if the connector is not a pre-release.
metadata_upload_results = await metadata.MetadataUpload(context).run()
results.append(metadata_upload_results)

return create_connector_report(results)
72 changes: 55 additions & 17 deletions tools/ci_connector_ops/tests/test_pipelines/test_publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,19 +162,21 @@ def test_parse_spec_output_no_spec(self, context):
(publish, "CheckConnectorImageDoesNotExist"),
(publish, "UploadSpecToCache"),
(publish, "PushConnectorImageToRegistry"),
(publish, "PullConnectorImageFromRegistry"),
(publish, "BuildConnectorForPublish"),
]


async def test_run_connector_publish_pipeline_when_failed_validation(mocker):
@pytest.mark.parametrize("pre_release", [True, False])
async def test_run_connector_publish_pipeline_when_failed_validation(mocker, pre_release):
"""We validate the no other steps are called if the metadata validation step fails."""
for module, to_mock in STEPS_TO_PATCH:
mocker.patch.object(module, to_mock, return_value=mocker.AsyncMock())

run_metadata_validation = publish.metadata.MetadataValidation.return_value.run
run_metadata_validation.return_value = mocker.Mock(status=StepStatus.FAILURE)

context = mocker.MagicMock()
context = mocker.MagicMock(pre_release=pre_release)
semaphore = anyio.Semaphore(1)
report = await publish.run_connector_publish_pipeline(context, semaphore)
run_metadata_validation.assert_called_once()
Expand All @@ -193,8 +195,11 @@ async def test_run_connector_publish_pipeline_when_failed_validation(mocker):
)


@pytest.mark.parametrize("check_image_exists_status", [StepStatus.SKIPPED, StepStatus.FAILURE])
async def test_run_connector_publish_pipeline_when_image_exists_or_failed(mocker, check_image_exists_status):
@pytest.mark.parametrize(
"check_image_exists_status, pre_release",
[(StepStatus.SKIPPED, False), (StepStatus.SKIPPED, True), (StepStatus.FAILURE, True), (StepStatus.FAILURE, False)],
)
async def test_run_connector_publish_pipeline_when_image_exists_or_failed(mocker, check_image_exists_status, pre_release):
"""We validate that when the connector image exists or the check fails, we don't run the rest of the pipeline.
We also validate that the metadata upload step is called when the image exists (Skipped status).
We do this to ensure that the metadata is still updated in the case where the connector image already exists.
Expand All @@ -212,7 +217,7 @@ async def test_run_connector_publish_pipeline_when_image_exists_or_failed(mocker

run_metadata_upload = publish.metadata.MetadataUpload.return_value.run

context = mocker.MagicMock()
context = mocker.MagicMock(pre_release=pre_release)
semaphore = anyio.Semaphore(1)
report = await publish.run_connector_publish_pipeline(context, semaphore)
run_metadata_validation.assert_called_once()
Expand All @@ -223,7 +228,18 @@ async def test_run_connector_publish_pipeline_when_image_exists_or_failed(mocker
if to_mock not in ["MetadataValidation", "MetadataUpload", "CheckConnectorImageDoesNotExist"]:
getattr(module, to_mock).return_value.run.assert_not_called()

if check_image_exists_status == StepStatus.SKIPPED:
if check_image_exists_status is StepStatus.SKIPPED and pre_release:
run_metadata_upload.assert_not_called()
assert (
report.steps_results
== context.report.steps_results
== [
run_metadata_validation.return_value,
run_check_connector_image_does_not_exist.return_value,
]
)

if check_image_exists_status is StepStatus.SKIPPED and not pre_release:
run_metadata_upload.assert_called_once()
assert (
report.steps_results
Expand All @@ -234,7 +250,8 @@ async def test_run_connector_publish_pipeline_when_image_exists_or_failed(mocker
run_metadata_upload.return_value,
]
)
if check_image_exists_status == StepStatus.FAILURE:

if check_image_exists_status is StepStatus.FAILURE:
run_metadata_upload.assert_not_called()
assert (
report.steps_results
Expand All @@ -247,17 +264,25 @@ async def test_run_connector_publish_pipeline_when_image_exists_or_failed(mocker


@pytest.mark.parametrize(
"build_step_status, push_step_status, upload_to_spec_cache_step_status, metadata_upload_step_status",
"pre_release, build_step_status, push_step_status, pull_step_status, upload_to_spec_cache_step_status, metadata_upload_step_status",
[
(StepStatus.SUCCESS, StepStatus.SUCCESS, StepStatus.SUCCESS, StepStatus.SUCCESS),
(StepStatus.SUCCESS, StepStatus.SUCCESS, StepStatus.SUCCESS, StepStatus.FAILURE),
(StepStatus.SUCCESS, StepStatus.SUCCESS, StepStatus.FAILURE, None),
(StepStatus.SUCCESS, StepStatus.FAILURE, None, None),
(StepStatus.FAILURE, None, None, None),
(False, StepStatus.SUCCESS, StepStatus.SUCCESS, StepStatus.SUCCESS, StepStatus.SUCCESS, StepStatus.SUCCESS),
(False, StepStatus.SUCCESS, StepStatus.SUCCESS, StepStatus.SUCCESS, StepStatus.SUCCESS, StepStatus.FAILURE),
(False, StepStatus.SUCCESS, StepStatus.SUCCESS, StepStatus.SUCCESS, StepStatus.FAILURE, None),
(False, StepStatus.SUCCESS, StepStatus.SUCCESS, StepStatus.FAILURE, None, None),
(False, StepStatus.SUCCESS, StepStatus.FAILURE, None, None, None),
(False, StepStatus.FAILURE, None, None, None, None),
(True, StepStatus.SUCCESS, StepStatus.SUCCESS, StepStatus.SUCCESS, StepStatus.SUCCESS, StepStatus.SUCCESS),
],
)
async def test_run_connector_publish_pipeline_when_image_does_not_exist(
mocker, build_step_status, push_step_status, upload_to_spec_cache_step_status, metadata_upload_step_status
mocker,
pre_release,
build_step_status,
push_step_status,
pull_step_status,
upload_to_spec_cache_step_status,
metadata_upload_step_status,
):
"""We check that the full pipeline is executed as expected when the connector image does not exist and the metadata validation passed."""
for module, to_mock in STEPS_TO_PATCH:
Expand All @@ -277,14 +302,19 @@ async def test_run_connector_publish_pipeline_when_image_does_not_exist(
publish.PushConnectorImageToRegistry.return_value.run.return_value = mocker.Mock(
name="push_connector_image_to_registry_result", status=push_step_status
)

publish.PullConnectorImageFromRegistry.return_value.run.return_value = mocker.Mock(
name="pull_connector_image_from_registry_result", status=pull_step_status
)

publish.UploadSpecToCache.return_value.run.return_value = mocker.Mock(
name="upload_spec_to_cache_result", status=upload_to_spec_cache_step_status
)
publish.metadata.MetadataUpload.return_value.run.return_value = mocker.Mock(
name="metadata_upload_result", status=metadata_upload_step_status
)

context = mocker.MagicMock()
context = mocker.MagicMock(pre_release=pre_release)
semaphore = anyio.Semaphore(1)
report = await publish.run_connector_publish_pipeline(context, semaphore)

Expand All @@ -293,9 +323,12 @@ async def test_run_connector_publish_pipeline_when_image_does_not_exist(
publish.CheckConnectorImageDoesNotExist.return_value.run,
publish.BuildConnectorForPublish.return_value.run,
publish.PushConnectorImageToRegistry.return_value.run,
publish.UploadSpecToCache.return_value.run,
publish.metadata.MetadataUpload.return_value.run,
publish.PullConnectorImageFromRegistry.return_value.run,
]

if not pre_release:
steps_to_run += [publish.UploadSpecToCache.return_value.run, publish.metadata.MetadataUpload.return_value.run]

for i, step_to_run in enumerate(steps_to_run):
if step_to_run.return_value.status is StepStatus.FAILURE or i == len(steps_to_run) - 1:
assert len(report.steps_results) == len(context.report.steps_results) == i + 1
Expand All @@ -313,5 +346,10 @@ async def test_run_connector_publish_pipeline_when_image_does_not_exist(
publish.PushConnectorImageToRegistry.return_value.run.assert_called_once_with([built_connector_platform])
else:
publish.PushConnectorImageToRegistry.return_value.run.assert_not_called()
publish.PullConnectorImageFromRegistry.return_value.run.assert_not_called()
publish.UploadSpecToCache.return_value.run.assert_not_called()
publish.metadata.MetadataUpload.return_value.run.assert_not_called()

if pre_release:
publish.UploadSpecToCache.return_value.run.assert_not_called()
publish.metadata.MetadataUpload.return_value.run.assert_not_called()

0 comments on commit 0e3ec70

Please sign in to comment.