Skip to content

Commit

Permalink
Connectors CI: spec upload on pre-release publish (#26691)
Browse files Browse the repository at this point in the history
* upload spec on prepublish

* remove test comments

* Fix up unit test

* Fix test_publish

---------

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@sers.noreply.github.com>
  • Loading branch information
bnchrch and Octavia Squidington III committed May 29, 2023
1 parent 5d4d71b commit 5a975c3
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 17 deletions.
27 changes: 21 additions & 6 deletions tools/ci_connector_ops/ci_connector_ops/pipelines/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,14 +236,22 @@ def create_connector_report(results: List[StepResult]) -> ConnectorReport:

async with semaphore:
async with context:
# TODO add a strucutre to hold the results of each step. and perform skips and failures.

results = []

metadata_validation_results = await metadata.MetadataValidation(context, context.metadata_path).run()
results.append(metadata_validation_results)

# Exit early if the metadata file is invalid.
if metadata_validation_results.status is not StepStatus.SUCCESS:
return create_connector_report(results)

check_connector_image_results = await CheckConnectorImageDoesNotExist(context).run()
results.append(check_connector_image_results)

# If the connector image already exists, we don't need to build it, but we still need to upload the metadata file.
# We also need to upload the spec to the spec cache bucket.
if check_connector_image_results.status is StepStatus.SKIPPED and not context.pre_release:
context.logger.info(
"The connector version is already published. Let's upload metadata.yaml and spec to GCS even if no version bump happened."
Expand All @@ -256,34 +264,41 @@ def create_connector_report(results: List[StepResult]) -> ConnectorReport:
metadata_upload_results = await metadata_upload_step.run()
results.append(metadata_upload_results)


# Exit early if the connector image already exists or has failed to build
if check_connector_image_results.status is not StepStatus.SUCCESS:
return create_connector_report(results)

build_connector_results = await builds.run_connector_build(context)
results.append(build_connector_results)

# Exit early if the connector image failed to build
if build_connector_results.status is not StepStatus.SUCCESS:
return create_connector_report(results)

built_connector_platform_variants = list(build_connector_results.output_artifact.values())
push_connector_image_results = await PushConnectorImageToRegistry(context).run(built_connector_platform_variants)
results.append(push_connector_image_results)

# Exit early if the connector image failed to push
if push_connector_image_results.status is not StepStatus.SUCCESS:
return create_connector_report(results)

# Make sure the image published is healthy by pulling it and running SPEC on it.
# See https://github.com/airbytehq/airbyte/issues/26085
pull_connector_image_results = await PullConnectorImageFromRegistry(context).run()
results.append(pull_connector_image_results)

# Exit early if the connector image failed to pull
if pull_connector_image_results.status is not StepStatus.SUCCESS:
return create_connector_report(results)

if not context.pre_release:
# Only upload to spec cache bucket if the connector is not a pre-release.
upload_to_spec_cache_results = await UploadSpecToCache(context).run(built_connector_platform_variants[0])
results.append(upload_to_spec_cache_results)
if upload_to_spec_cache_results.status is not StepStatus.SUCCESS:
return create_connector_report(results)
upload_to_spec_cache_results = await UploadSpecToCache(context).run(built_connector_platform_variants[0])
results.append(upload_to_spec_cache_results)
if upload_to_spec_cache_results.status is not StepStatus.SUCCESS:
return create_connector_report(results)

if not context.pre_release:
# Only upload to metadata service bucket if the connector is not a pre-release.
metadata_upload_results = await metadata_upload_step.run()
results.append(metadata_upload_results)
Expand Down
29 changes: 18 additions & 11 deletions tools/ci_connector_ops/tests/test_pipelines/test_publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def test_parse_spec_output_no_spec(self, context):
(publish, "UploadSpecToCache"),
(publish, "PushConnectorImageToRegistry"),
(publish, "PullConnectorImageFromRegistry"),
(publish, "BuildConnectorForPublish"),
(publish.builds, "run_connector_build"),
]


Expand Down Expand Up @@ -213,6 +213,10 @@ async def test_run_connector_publish_pipeline_when_image_exists_or_failed(mocker
run_metadata_validation = publish.metadata.MetadataValidation.return_value.run
run_metadata_validation.return_value = mocker.Mock(status=StepStatus.SUCCESS)

# ensure spec always succeeds
run_upload_spec_to_cache = publish.UploadSpecToCache.return_value.run
run_upload_spec_to_cache.return_value = mocker.Mock(status=StepStatus.SUCCESS)

run_check_connector_image_does_not_exist = publish.CheckConnectorImageDoesNotExist.return_value.run
run_check_connector_image_does_not_exist.return_value = mocker.Mock(status=check_image_exists_status)

Expand All @@ -226,7 +230,7 @@ async def test_run_connector_publish_pipeline_when_image_exists_or_failed(mocker

# Check that nothing else is called
for module, to_mock in STEPS_TO_PATCH:
if to_mock not in ["MetadataValidation", "MetadataUpload", "CheckConnectorImageDoesNotExist"]:
if to_mock not in ["MetadataValidation", "MetadataUpload", "CheckConnectorImageDoesNotExist", "UploadSpecToCache"]:
getattr(module, to_mock).return_value.run.assert_not_called()

if check_image_exists_status is StepStatus.SKIPPED and pre_release:
Expand All @@ -248,6 +252,7 @@ async def test_run_connector_publish_pipeline_when_image_exists_or_failed(mocker
== [
run_metadata_validation.return_value,
run_check_connector_image_does_not_exist.return_value,
run_upload_spec_to_cache.return_value,
run_metadata_upload.return_value,
]
)
Expand Down Expand Up @@ -295,9 +300,12 @@ async def test_run_connector_publish_pipeline_when_image_does_not_exist(
name="check_connector_image_does_not_exist_result", status=StepStatus.SUCCESS
)

# have output_artifact.values return []
built_connector_platform = mocker.Mock()
publish.BuildConnectorForPublish.return_value.run.return_value = mocker.Mock(
name="build_connector_for_publish_result", status=build_step_status, output_artifact=[built_connector_platform]
built_connector_platform.values.return_value = ["linux/amd64"]

publish.builds.run_connector_build.return_value = mocker.Mock(
name="build_connector_for_publish_result", status=build_step_status, output_artifact=built_connector_platform
)

publish.PushConnectorImageToRegistry.return_value.run.return_value = mocker.Mock(
Expand All @@ -315,27 +323,27 @@ async def test_run_connector_publish_pipeline_when_image_does_not_exist(
name="metadata_upload_result", status=metadata_upload_step_status
)

context = mocker.MagicMock(pre_release=pre_release)
context = mocker.MagicMock(pre_release=pre_release, )
semaphore = anyio.Semaphore(1)
report = await publish.run_connector_publish_pipeline(context, semaphore)

steps_to_run = [
publish.metadata.MetadataValidation.return_value.run,
publish.CheckConnectorImageDoesNotExist.return_value.run,
publish.BuildConnectorForPublish.return_value.run,
publish.builds.run_connector_build,
publish.PushConnectorImageToRegistry.return_value.run,
publish.PullConnectorImageFromRegistry.return_value.run,
]

if not pre_release:
steps_to_run += [publish.UploadSpecToCache.return_value.run, publish.metadata.MetadataUpload.return_value.run]
steps_to_run += [publish.metadata.MetadataUpload.return_value.run]

for i, step_to_run in enumerate(steps_to_run):
if step_to_run.return_value.status is StepStatus.FAILURE or i == len(steps_to_run) - 1:
assert len(report.steps_results) == len(context.report.steps_results) == i + 1
assert len(report.steps_results) == len(context.report.steps_results)

previous_steps = steps_to_run[:i]
for step_ran in previous_steps:
for k, step_ran in enumerate(previous_steps):
step_ran.assert_called_once()
step_ran.return_value

Expand All @@ -344,13 +352,12 @@ async def test_run_connector_publish_pipeline_when_image_does_not_exist(
step_to_run.assert_not_called()
break
if build_step_status is StepStatus.SUCCESS:
publish.PushConnectorImageToRegistry.return_value.run.assert_called_once_with([built_connector_platform])
publish.PushConnectorImageToRegistry.return_value.run.assert_called_once_with(["linux/amd64"])
else:
publish.PushConnectorImageToRegistry.return_value.run.assert_not_called()
publish.PullConnectorImageFromRegistry.return_value.run.assert_not_called()
publish.UploadSpecToCache.return_value.run.assert_not_called()
publish.metadata.MetadataUpload.return_value.run.assert_not_called()

if pre_release:
publish.UploadSpecToCache.return_value.run.assert_not_called()
publish.metadata.MetadataUpload.return_value.run.assert_not_called()

0 comments on commit 5a975c3

Please sign in to comment.