Skip to content

Commit

Permalink
airbyte-ci: accept extra options on connectors test and pass these …
Browse files Browse the repository at this point in the history
…to test steps
  • Loading branch information
alafanechere committed Jan 11, 2024
1 parent 773d3bf commit cb583aa
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

from __future__ import annotations

import re
from typing import TYPE_CHECKING

import asyncclick as click
from pipelines.airbyte_ci.connectors.consts import CONNECTOR_TEST_STEP_ID

if TYPE_CHECKING:
from typing import Dict, Tuple

from pipelines.models.steps import STEP_PARAMS

# Pattern for extra param options: --<step_id>.<option_name>=<option_value>
EXTRA_PARAM_PATTERN_FOR_OPTION = re.compile(r"^--([a-zA-Z_][a-zA-Z0-9_]*)\.([a-zA-Z_-][a-zA-Z0-9_-]*)=([^=]+)$")
# Pattern for extra param flag: --<step_id>.<option_name>
EXTRA_PARAM_PATTERN_FOR_FLAG = re.compile(r"^--([a-zA-Z_][a-zA-Z0-9_]*)\.([a-zA-Z_-][a-zA-Z0-9_-]*)$")
EXTRA_PARAM_PATTERN_ERROR_MESSAGE = "The extra flags must be structured as --<step_id>.<flag_name> for flags or --<step_id>.<option_name>=<option_value> for options. You can use - or -- for option/flag names."


def build_extra_params_mapping(
ctx: click.Context, argument: click.core.Argument, raw_extra_params: Tuple[str]
) -> Dict[CONNECTOR_TEST_STEP_ID, STEP_PARAMS]:
"""Build a mapping of step id to extra params.
Validate the extra params and raise a ValueError if they are invalid.
Validation rules:
- The extra params must be structured as --<step_id>.<param_name>=<param_value> for options or --<step_id>.<param_name> for flags.
- The step id must be one of the existing step ids.
Args:
ctx (click.Context): The click context.
argument (click.core.Argument): The click argument.
raw_extra_params (Tuple[str]): The extra params provided by the user.
Raises:
ValueError: Raised if the extra params format is invalid.
ValueError: Raised if the step id in the extra params is not one of the unique steps to run.
Returns:
Dict[CONNECTOR_TEST_STEP_ID, STEP_PARAMS]: The mapping of step id to extra params.
"""
extra_params_mapping: Dict[CONNECTOR_TEST_STEP_ID, STEP_PARAMS] = {}
for param in raw_extra_params:
is_flag = "=" not in param
pattern = EXTRA_PARAM_PATTERN_FOR_FLAG if is_flag else EXTRA_PARAM_PATTERN_FOR_OPTION
matches = pattern.match(param)
if not matches:
raise ValueError(f"Invalid parameter {param}. {EXTRA_PARAM_PATTERN_ERROR_MESSAGE}")
if is_flag:
step_name, param_name = matches.groups()
param_value = None
else:
step_name, param_name, param_value = matches.groups()
try:
step_id = CONNECTOR_TEST_STEP_ID(step_name)
except ValueError:
raise ValueError(f"Invalid step name {step_name}, it must be one of {[step_id.value for step_id in CONNECTOR_TEST_STEP_ID]}")

extra_params_mapping.setdefault(step_id, {}).setdefault(param_name, [])
# param_value is None if the param is a flag
if param_value is not None:
extra_params_mapping[step_id][param_name].append(param_value)
return extra_params_mapping
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,30 @@
#

import sys
from typing import List
from typing import Dict, List

import asyncclick as click
from pipelines import main_logger
from pipelines.airbyte_ci.connectors.consts import CONNECTOR_TEST_STEP_ID
from pipelines.airbyte_ci.connectors.context import ConnectorContext
from pipelines.airbyte_ci.connectors.pipeline import run_connectors_pipelines
from pipelines.airbyte_ci.connectors.test import argument_parsing
from pipelines.airbyte_ci.connectors.test.pipeline import run_connector_test_pipeline
from pipelines.cli.dagger_pipeline_command import DaggerPipelineCommand
from pipelines.consts import LOCAL_BUILD_PLATFORM, ContextState
from pipelines.helpers.github import update_global_commit_status_check_for_tests
from pipelines.helpers.run_steps import RunStepOptions
from pipelines.helpers.utils import fail_if_missing_docker_hub_creds
from pipelines.models.steps import STEP_PARAMS


@click.command(cls=DaggerPipelineCommand, help="Test all the selected connectors.")
@click.command(
cls=DaggerPipelineCommand,
help="Test all the selected connectors.",
context_settings=dict(
ignore_unknown_options=True,
),
)
@click.option(
"--code-tests-only",
is_flag=True,
Expand Down Expand Up @@ -47,13 +55,15 @@
type=click.Choice([step_id.value for step_id in CONNECTOR_TEST_STEP_ID]),
help="Skip a step by name. Can be used multiple times to skip multiple steps.",
)
@click.argument("extra_params", nargs=-1, type=click.UNPROCESSED, callback=argument_parsing.build_extra_params_mapping)
@click.pass_context
async def test(
ctx: click.Context,
code_tests_only: bool,
fail_fast: bool,
concurrent_cat: bool,
skip_step: List[str],
extra_params: Dict[CONNECTOR_TEST_STEP_ID, STEP_PARAMS],
) -> bool:
"""Runs a test pipeline for the selected connectors.
Expand All @@ -76,8 +86,8 @@ async def test(
run_step_options = RunStepOptions(
fail_fast=fail_fast,
skip_steps=[CONNECTOR_TEST_STEP_ID(step_id) for step_id in skip_step],
step_params=extra_params,
)

connectors_tests_contexts = [
ConnectorContext(
pipeline_name=f"Testing connector {connector.technical_name}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
#
"""This module groups factory like functions to dispatch tests steps according to the connector under test language."""

from __future__ import annotations

import re
from typing import TYPE_CHECKING

import anyio
from connector_ops.utils import ConnectorLanguage # type: ignore
Expand All @@ -12,7 +16,11 @@
from pipelines.airbyte_ci.connectors.test.steps import java_connectors, python_connectors
from pipelines.airbyte_ci.connectors.test.steps.common import QaChecks, VersionFollowsSemverCheck, VersionIncrementCheck
from pipelines.airbyte_ci.metadata.pipeline import MetadataValidation
from pipelines.helpers.run_steps import STEP_TREE, StepToRun, run_steps
from pipelines.helpers.run_steps import StepToRun, run_steps

if TYPE_CHECKING:

from pipelines.helpers.run_steps import STEP_TREE

LANGUAGE_MAPPING = {
"get_test_steps": {
Expand All @@ -23,14 +31,19 @@
}


EXTRA_PARAMS_PATTERN = re.compile(r"^--([a-zA-Z_][a-zA-Z0-9_]*)\.([a-zA-Z_-][a-zA-Z0-9_-]*)=([^=]+)$")
EXTRA_PARAMS_PATTERN_FOR_FLAG = re.compile(r"^--([a-zA-Z_][a-zA-Z0-9_]*)\.([a-zA-Z_-][a-zA-Z0-9_-]*)$")
EXTRA_PARAMS_PATTERN_ERROR_MESSAGE = "The extra params must be structured as --<step_id>.<param_name>=<param_value>"


def get_test_steps(context: ConnectorContext) -> STEP_TREE:
"""Get all the tests steps according to the connector language.
Args:
context (ConnectorContext): The current connector context.
Returns:
List[StepResult]: The list of tests steps.
STEP_TREE: The list of tests steps.
"""
if _get_test_steps := LANGUAGE_MAPPING["get_test_steps"].get(context.connector.language):
return _get_test_steps(context)
Expand All @@ -43,23 +56,25 @@ async def run_connector_test_pipeline(context: ConnectorContext, semaphore: anyi
"""
Compute the steps to run for a connector test pipeline.
"""
all_steps_to_run: STEP_TREE = []

steps_to_run = get_test_steps(context)
all_steps_to_run += get_test_steps(context)

if not context.code_tests_only:
steps_to_run += [
static_analysis_steps_to_run = [
[
StepToRun(id=CONNECTOR_TEST_STEP_ID.METADATA_VALIDATION, step=MetadataValidation(context)),
StepToRun(id=CONNECTOR_TEST_STEP_ID.VERSION_FOLLOW_CHECK, step=VersionFollowsSemverCheck(context)),
StepToRun(id=CONNECTOR_TEST_STEP_ID.VERSION_INC_CHECK, step=VersionIncrementCheck(context)),
StepToRun(id=CONNECTOR_TEST_STEP_ID.QA_CHECKS, step=QaChecks(context)),
]
]
all_steps_to_run += static_analysis_steps_to_run

async with semaphore:
async with context:
result_dict = await run_steps(
runnables=steps_to_run,
runnables=all_steps_to_run,
options=context.run_step_options,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
from pipelines.models.steps import StepStatus

if TYPE_CHECKING:
from pipelines.models.steps import Step, StepResult
from pipelines.airbyte_ci.connectors.consts import CONNECTOR_TEST_STEP_ID
from pipelines.models.steps import STEP_PARAMS, Step, StepResult

RESULTS_DICT = Dict[str, StepResult]
ARGS_TYPE = Union[Dict, Callable[[RESULTS_DICT], Dict], Awaitable[Dict]]
Expand All @@ -34,6 +35,7 @@ class RunStepOptions:
skip_steps: List[str] = field(default_factory=list)
log_step_tree: bool = True
concurrency: int = 10
step_params: Dict[CONNECTOR_TEST_STEP_ID, STEP_PARAMS] = field(default_factory=dict)


@dataclass(frozen=True)
Expand All @@ -44,7 +46,7 @@ class StepToRun:
Used to coordinate the execution of multiple steps inside a pipeline.
"""

id: str
id: CONNECTOR_TEST_STEP_ID
step: Step
args: ARGS_TYPE = field(default_factory=dict)
depends_on: List[str] = field(default_factory=list)
Expand All @@ -71,7 +73,7 @@ def _skip_remaining_steps(remaining_steps: STEP_TREE) -> RESULTS_DICT:
"""
Skip all remaining steps.
"""
skipped_results = {}
skipped_results: Dict[str, StepResult] = {}
for runnable_step in remaining_steps:
if isinstance(runnable_step, StepToRun):
skipped_results[runnable_step.id] = runnable_step.step.skip()
Expand Down Expand Up @@ -243,6 +245,7 @@ async def run_steps(
tasks.append(task_group.soonify(run_steps)(list(step_to_run), results, options))
else:
step_args = await evaluate_run_args(step_to_run.args, results)
step_to_run.step.extra_params = options.step_params.get(step_to_run.id, {})
main_logger.info(f"QUEUING STEP {step_to_run.id}")
tasks.append(task_group.soonify(step_to_run.step.run)(**step_args))

Expand Down
3 changes: 2 additions & 1 deletion airbyte-ci/connectors/pipelines/pipelines/models/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,10 @@ def extra_params(self) -> STEP_PARAMS:

@extra_params.setter
def extra_params(self, value: STEP_PARAMS) -> None:
if not self.accept_extra_params:
if value and not self.accept_extra_params:
raise ValueError(f"{self.__class__.__name__} does not accept extra params.")
self._extra_params = value
self.logger.info(f"Will run with the following parameters: {self.params}")

@property
def default_params(self) -> STEP_PARAMS:
Expand Down

0 comments on commit cb583aa

Please sign in to comment.