Skip to content

Commit

Permalink
airbyte-ci: re-introduce mypy (#33964)
Browse files Browse the repository at this point in the history
  • Loading branch information
alafanechere committed Jan 5, 2024
1 parent 29a3da0 commit f90d8db
Show file tree
Hide file tree
Showing 78 changed files with 1,326 additions and 1,090 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/airbyte-ci-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ jobs:
gcs_credentials: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }}
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
github_token: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}
subcommand: "test airbyte-ci/connectors/pipelines --poetry-run-command='pytest tests'"
subcommand: "test airbyte-ci/connectors/pipelines --poetry-run-command='pytest tests' --poetry-run-command='mypy pipelines --disallow-untyped-defs' --poetry-run-command='ruff check pipelines'"
airbyte_ci_binary_url: ${{ inputs.airbyte_ci_binary_url || 'https://connectors.airbyte.com/airbyte-ci/releases/ubuntu/latest/airbyte-ci' }}
tailscale_auth_key: ${{ secrets.TAILSCALE_AUTH_KEY }}

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/cat-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ jobs:
gcs_credentials: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }}
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
github_token: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}
subcommand: "test airbyte-integrations/bases/connector-acceptance-test --test-directory=unit_tests"
subcommand: "test airbyte-integrations/bases/connector-acceptance-test --poetry-run-command='pytest unit_tests'"
tailscale_auth_key: ${{ secrets.TAILSCALE_AUTH_KEY }}
223 changes: 112 additions & 111 deletions airbyte-ci/connectors/pipelines/README.md

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions airbyte-ci/connectors/pipelines/pipelines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"""The pipelines package."""
import logging
import os

from typing import Union
from rich.logging import RichHandler

from .helpers import sentry_utils
Expand All @@ -15,16 +15,16 @@
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging_handlers = [RichHandler(rich_tracebacks=True)]
if "CI" in os.environ:
# RichHandler does not work great in the CI
logging_handlers = [logging.StreamHandler()]

# RichHandler does not work great in the CI environment, so we use a StreamHandler instead
logging_handler: Union[RichHandler, logging.StreamHandler] = RichHandler(rich_tracebacks=True) if "CI" not in os.environ else logging.StreamHandler()


logging.basicConfig(
level=logging.INFO,
format="%(name)s: %(message)s",
datefmt="[%X]",
handlers=logging_handlers,
handlers=[logging_handler],
)

main_logger = logging.getLogger(__name__)
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
from __future__ import annotations

import anyio
from connector_ops.utils import ConnectorLanguage
from connector_ops.utils import ConnectorLanguage # type: ignore
from pipelines.airbyte_ci.connectors.build_image.steps import java_connectors, python_connectors
from pipelines.airbyte_ci.connectors.build_image.steps.common import LoadContainerToLocalDockerHost, StepStatus
from pipelines.airbyte_ci.connectors.context import ConnectorContext
from pipelines.airbyte_ci.connectors.reports import ConnectorReport
from pipelines.airbyte_ci.connectors.reports import ConnectorReport, Report
from pipelines.models.steps import StepResult



class NoBuildStepForLanguageError(Exception):
pass

Expand All @@ -32,7 +33,7 @@ async def run_connector_build(context: ConnectorContext) -> StepResult:
return await LANGUAGE_BUILD_CONNECTOR_MAPPING[context.connector.language](context)


async def run_connector_build_pipeline(context: ConnectorContext, semaphore: anyio.Semaphore, image_tag: str) -> ConnectorReport:
async def run_connector_build_pipeline(context: ConnectorContext, semaphore: anyio.Semaphore, image_tag: str) -> Report:
"""Run a build pipeline for a single connector.
Args:
Expand All @@ -51,5 +52,6 @@ async def run_connector_build_pipeline(context: ConnectorContext, semaphore: any
if context.is_local and build_result.status is StepStatus.SUCCESS:
load_image_result = await LoadContainerToLocalDockerHost(context, per_platform_built_containers, image_tag).run()
step_results.append(load_image_result)
context.report = ConnectorReport(context, step_results, name="BUILD RESULTS")
return context.report
report = ConnectorReport(context, step_results, name="BUILD RESULTS")
context.report = report
return report
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from types import ModuleType
from typing import List, Optional

from connector_ops.utils import Connector
from connector_ops.utils import Connector # type: ignore
from dagger import Container

BUILD_CUSTOMIZATION_MODULE_NAME = "build_customization"
Expand All @@ -21,11 +21,15 @@ def get_build_customization_module(connector: Connector) -> Optional[ModuleType]
Optional[ModuleType]: The build_customization.py module if it exists, None otherwise.
"""
build_customization_spec_path = connector.code_directory / BUILD_CUSTOMIZATION_SPEC_NAME
if not build_customization_spec_path.exists():
return None
build_customization_spec = importlib.util.spec_from_file_location(

if not build_customization_spec_path.exists() or not (build_customization_spec := importlib.util.spec_from_file_location(
f"{connector.code_directory.name}_{BUILD_CUSTOMIZATION_MODULE_NAME}", build_customization_spec_path
)
)):
return None

if build_customization_spec.loader is None:
return None

build_customization_module = importlib.util.module_from_spec(build_customization_spec)
build_customization_spec.loader.exec_module(build_customization_module)
return build_customization_module
Expand All @@ -41,9 +45,12 @@ def get_main_file_name(connector: Connector) -> str:
str: The main file name.
"""
build_customization_module = get_build_customization_module(connector)
if hasattr(build_customization_module, "MAIN_FILE_NAME"):
return build_customization_module.MAIN_FILE_NAME
return DEFAULT_MAIN_FILE_NAME

return (
build_customization_module.MAIN_FILE_NAME
if build_customization_module and hasattr(build_customization_module, "MAIN_FILE_NAME")
else DEFAULT_MAIN_FILE_NAME
)


def get_entrypoint(connector: Connector) -> List[str]:
Expand All @@ -64,7 +71,7 @@ async def pre_install_hooks(connector: Connector, base_container: Container, log
Container: The mutated base_container.
"""
build_customization_module = get_build_customization_module(connector)
if hasattr(build_customization_module, "pre_connector_install"):
if build_customization_module and hasattr(build_customization_module, "pre_connector_install"):
base_container = await build_customization_module.pre_connector_install(base_container)
logger.info(f"Connector {connector.technical_name} pre install hook executed.")
return base_container
Expand All @@ -83,7 +90,7 @@ async def post_install_hooks(connector: Connector, connector_container: Containe
Container: The mutated connector_container.
"""
build_customization_module = get_build_customization_module(connector)
if hasattr(build_customization_module, "post_connector_install"):
if build_customization_module and hasattr(build_customization_module, "post_connector_install"):
connector_container = await build_customization_module.post_connector_install(connector_container)
logger.info(f"Connector {connector.technical_name} post install hook executed.")
return connector_container
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,33 @@
from __future__ import annotations

from abc import ABC
from typing import List, Optional, Tuple
from typing import TYPE_CHECKING

import docker
import docker # type: ignore
from dagger import Container, ExecError, Platform, QueryError
from pipelines.airbyte_ci.connectors.context import ConnectorContext
from pipelines.helpers.utils import export_container_to_tarball
from pipelines.models.steps import Step, StepResult, StepStatus

if TYPE_CHECKING:
from typing import Any

class BuildConnectorImagesBase(Step, ABC):
"""
A step to build connector images for a set of platforms.
"""

context: ConnectorContext

@property
def title(self):
def title(self) -> str:
return f"Build {self.context.connector.technical_name} docker image for platform(s) {', '.join(self.build_platforms)}"

def __init__(self, context: ConnectorContext) -> None:
self.build_platforms: List[Platform] = context.targeted_platforms
self.build_platforms = context.targeted_platforms
super().__init__(context)

async def _run(self, *args) -> StepResult:
async def _run(self, *args: Any) -> StepResult:
build_results_per_platform = {}
for platform in self.build_platforms:
try:
Expand All @@ -46,7 +50,7 @@ async def _run(self, *args) -> StepResult:
)
return StepResult(self, StepStatus.SUCCESS, stdout=success_message, output_artifact=build_results_per_platform)

async def _build_connector(self, platform: Platform, *args) -> Container:
async def _build_connector(self, platform: Platform, *args: Any, **kwargs: Any) -> Container:
"""Implement the generation of the image for the platform and return the corresponding container.
Returns:
Expand All @@ -56,24 +60,26 @@ async def _build_connector(self, platform: Platform, *args) -> Container:


class LoadContainerToLocalDockerHost(Step):
def __init__(self, context: ConnectorContext, containers: dict[Platform, Container], image_tag: Optional[str] = "dev") -> None:
context: ConnectorContext

def __init__(self, context: ConnectorContext, containers: dict[Platform, Container], image_tag: str = "dev") -> None:
super().__init__(context)
self.image_tag = image_tag
self.containers = containers

def _generate_dev_tag(self, platform: Platform, multi_platforms: bool):
def _generate_dev_tag(self, platform: Platform, multi_platforms: bool) -> str:
"""
When building for multiple platforms, we need to tag the image with the platform name.
There's no way to locally build a multi-arch image, so we need to tag the image with the platform name when the user passed multiple architecture options.
"""
return f"{self.image_tag}-{platform.replace('/', '-')}" if multi_platforms else self.image_tag

@property
def title(self):
def title(self) -> str:
return f"Load {self.image_name}:{self.image_tag} to the local docker host."

@property
def image_name(self) -> Tuple:
def image_name(self) -> str:
return f"airbyte/{self.context.connector.technical_name}"

async def _run(self) -> StepResult:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
class BuildOrPullNormalization(Step):
"""A step to build or pull the normalization image for a connector according to the image name."""

context: ConnectorContext

def __init__(self, context: ConnectorContext, normalization_image: str, build_platform: Platform) -> None:
"""Initialize the step to build or pull the normalization image.
Expand All @@ -24,7 +26,10 @@ def __init__(self, context: ConnectorContext, normalization_image: str, build_pl
self.build_platform = build_platform
self.use_dev_normalization = normalization_image.endswith(":dev")
self.normalization_image = normalization_image
self.title = f"Build {self.normalization_image}" if self.use_dev_normalization else f"Pull {self.normalization_image}"

@property
def title(self) -> str:
return f"Build {self.normalization_image}" if self.use_dev_normalization else f"Pull {self.normalization_image}"

async def _run(self) -> StepResult:
if self.use_dev_normalization:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#


from typing import Any

from dagger import Container, Platform
from pipelines.airbyte_ci.connectors.build_image.steps import build_customization
from pipelines.airbyte_ci.connectors.build_image.steps.common import BuildConnectorImagesBase
Expand All @@ -17,9 +19,10 @@ class BuildConnectorImages(BuildConnectorImagesBase):
A spec command is run on the container to validate it was built successfully.
"""

context: ConnectorContext
PATH_TO_INTEGRATION_CODE = "/airbyte/integration_code"

async def _build_connector(self, platform: Platform):
async def _build_connector(self, platform: Platform, *args: Any) -> Container:
if (
"connectorBuildOptions" in self.context.connector.metadata
and "baseImage" in self.context.connector.metadata["connectorBuildOptions"]
Expand Down Expand Up @@ -74,10 +77,10 @@ async def _build_from_base_image(self, platform: Platform) -> Container:
# copy python dependencies from builder to connector container
customized_base.with_directory("/usr/local", builder.directory("/usr/local"))
.with_workdir(self.PATH_TO_INTEGRATION_CODE)
.with_file(main_file_name, (await self.context.get_connector_dir(include=main_file_name)).file(main_file_name))
.with_file(main_file_name, (await self.context.get_connector_dir(include=[main_file_name])).file(main_file_name))
.with_directory(
connector_snake_case_name,
(await self.context.get_connector_dir(include=connector_snake_case_name)).directory(connector_snake_case_name),
(await self.context.get_connector_dir(include=[connector_snake_case_name])).directory(connector_snake_case_name),
)
.with_env_variable("AIRBYTE_ENTRYPOINT", " ".join(entrypoint))
.with_entrypoint(entrypoint)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import datetime
from copy import deepcopy
from typing import TYPE_CHECKING

import semver
from dagger import Container
from dagger import Container, Directory
from pipelines.airbyte_ci.connectors.context import ConnectorContext
from pipelines.airbyte_ci.connectors.reports import ConnectorReport
from pipelines.airbyte_ci.connectors.reports import ConnectorReport, Report
from pipelines.helpers import git
from pipelines.helpers.connectors import metadata_change_helpers
from pipelines.models.steps import Step, StepResult, StepStatus

if TYPE_CHECKING:
from anyio import Semaphore


def get_bumped_version(version: str, bump_type: str) -> str:
current_version = semver.VersionInfo.parse(version)
Expand All @@ -28,6 +31,7 @@ def get_bumped_version(version: str, bump_type: str) -> str:


class AddChangelogEntry(Step):
context: ConnectorContext
title = "Add changelog entry"

def __init__(
Expand All @@ -37,7 +41,7 @@ def __init__(
new_version: str,
changelog_entry: str,
pull_request_number: str,
):
) -> None:
super().__init__(context)
self.repo_dir = repo_dir
self.new_version = new_version
Expand All @@ -60,7 +64,7 @@ async def _run(self) -> StepResult:
self,
StepStatus.FAILURE,
stdout=f"Could not add changelog entry: {e}",
output_artifact=self.container_with_airbyte_repo,
output_artifact=self.repo_dir,
)
updated_repo_dir = self.repo_dir.with_new_file(str(doc_path), updated_doc)
return StepResult(
Expand All @@ -70,14 +74,14 @@ async def _run(self) -> StepResult:
output_artifact=updated_repo_dir,
)

def find_line_index_for_new_entry(self, markdown_text) -> int:
def find_line_index_for_new_entry(self, markdown_text: str) -> int:
lines = markdown_text.splitlines()
for line_index, line in enumerate(lines):
if "version" in line.lower() and "date" in line.lower() and "pull request" in line.lower() and "subject" in line.lower():
return line_index + 2
raise Exception("Could not find the changelog section table in the documentation file.")

def add_changelog_entry(self, og_doc_content) -> str:
def add_changelog_entry(self, og_doc_content: str) -> str:
today = datetime.date.today().strftime("%Y-%m-%d")
lines = og_doc_content.splitlines()
line_index_for_new_entry = self.find_line_index_for_new_entry(og_doc_content)
Expand All @@ -87,14 +91,15 @@ def add_changelog_entry(self, og_doc_content) -> str:


class BumpDockerImageTagInMetadata(Step):
context: ConnectorContext
title = "Upgrade the dockerImageTag to the latest version in metadata.yaml"

def __init__(
self,
context: ConnectorContext,
repo_dir: Container,
repo_dir: Directory,
new_version: str,
):
) -> None:
super().__init__(context)
self.repo_dir = repo_dir
self.new_version = new_version
Expand Down Expand Up @@ -134,18 +139,18 @@ async def _run(self) -> StepResult:

async def run_connector_version_bump_pipeline(
context: ConnectorContext,
semaphore,
semaphore: Semaphore,
bump_type: str,
changelog_entry: str,
pull_request_number: str,
) -> ConnectorReport:
) -> Report:
"""Run a pipeline to upgrade for a single connector.
Args:
context (ConnectorContext): The initialized connector context.
Returns:
ConnectorReport: The reports holding the base image version upgrade results.
Report: The reports holding the base image version upgrade results.
"""
async with semaphore:
steps_results = []
Expand All @@ -172,5 +177,6 @@ async def run_connector_version_bump_pipeline(
steps_results.append(add_changelog_entry_result)
final_repo_dir = add_changelog_entry_result.output_artifact
await og_repo_dir.diff(final_repo_dir).export(str(git.get_git_repo_path()))
context.report = ConnectorReport(context, steps_results, name="CONNECTOR VERSION BUMP RESULTS")
return context.report
report = ConnectorReport(context, steps_results, name="CONNECTOR VERSION BUMP RESULTS")
context.report = report
return report
Loading

0 comments on commit f90d8db

Please sign in to comment.