Skip to content

Commit

Permalink
airbyte-ci: accept extra options on connectors test and pass these …
Browse files Browse the repository at this point in the history
…to test steps
  • Loading branch information
alafanechere committed Jan 10, 2024
1 parent 773d3bf commit f9b4f9b
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@
from pipelines.helpers.utils import fail_if_missing_docker_hub_creds


@click.command(cls=DaggerPipelineCommand, help="Test all the selected connectors.")
@click.command(
cls=DaggerPipelineCommand,
help="Test all the selected connectors.",
context_settings=dict(
ignore_unknown_options=True,
),
)
@click.option(
"--code-tests-only",
is_flag=True,
Expand Down Expand Up @@ -47,13 +53,15 @@
type=click.Choice([step_id.value for step_id in CONNECTOR_TEST_STEP_ID]),
help="Skip a step by name. Can be used multiple times to skip multiple steps.",
)
@click.argument("extra_params", nargs=-1, type=click.UNPROCESSED)
@click.pass_context
async def test(
ctx: click.Context,
code_tests_only: bool,
fail_fast: bool,
concurrent_cat: bool,
skip_step: List[str],
extra_params: List[str],
) -> bool:
"""Runs a test pipeline for the selected connectors.
Expand Down Expand Up @@ -115,6 +123,7 @@ async def test(
ctx.obj["concurrency"],
ctx.obj["dagger_logs_path"],
ctx.obj["execute_timeout"],
extra_params,
)
except Exception as e:
main_logger.error("An error occurred while running the test pipeline", exc_info=e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,27 @@
#
"""This module groups factory like functions to dispatch tests steps according to the connector under test language."""

from __future__ import annotations

import re
from typing import TYPE_CHECKING

import anyio
from connector_ops.utils import ConnectorLanguage # type: ignore
from more_itertools import collapse
from pipelines.airbyte_ci.connectors.consts import CONNECTOR_TEST_STEP_ID
from pipelines.airbyte_ci.connectors.context import ConnectorContext
from pipelines.airbyte_ci.connectors.reports import ConnectorReport
from pipelines.airbyte_ci.connectors.test.steps import java_connectors, python_connectors
from pipelines.airbyte_ci.connectors.test.steps.common import QaChecks, VersionFollowsSemverCheck, VersionIncrementCheck
from pipelines.airbyte_ci.metadata.pipeline import MetadataValidation
from pipelines.helpers.run_steps import STEP_TREE, StepToRun, run_steps
from pipelines.helpers.run_steps import StepToRun, run_steps

if TYPE_CHECKING:
from typing import Dict, Tuple

from pipelines.helpers.run_steps import STEP_TREE
from pipelines.models.steps import STEP_PARAMS, Step

LANGUAGE_MAPPING = {
"get_test_steps": {
Expand All @@ -23,14 +34,19 @@
}


EXTRA_PARAMS_PATTERN = re.compile(r"^--([a-zA-Z_][a-zA-Z0-9_]*)\.([a-zA-Z_-][a-zA-Z0-9_-]*)=([^=]+)$")
EXTRA_PARAMS_PATTERN_FOR_FLAG = re.compile(r"^--([a-zA-Z_][a-zA-Z0-9_]*)\.([a-zA-Z_-][a-zA-Z0-9_-]*)$")
EXTRA_PARAMS_PATTERN_ERROR_MESSAGE = "The extra params must be structured as --<step_id>.<param_name>=<param_value>"


def get_test_steps(context: ConnectorContext) -> STEP_TREE:
"""Get all the tests steps according to the connector language.
Args:
context (ConnectorContext): The current connector context.
Returns:
List[StepResult]: The list of tests steps.
STEP_TREE: The list of tests steps.
"""
if _get_test_steps := LANGUAGE_MAPPING["get_test_steps"].get(context.connector.language):
return _get_test_steps(context)
Expand All @@ -39,27 +55,105 @@ def get_test_steps(context: ConnectorContext) -> STEP_TREE:
return []


async def run_connector_test_pipeline(context: ConnectorContext, semaphore: anyio.Semaphore) -> ConnectorReport:
def build_extra_params_mapping(
extra_params: Tuple[str], unique_steps_to_run: Dict[CONNECTOR_TEST_STEP_ID, Step]
) -> Dict[CONNECTOR_TEST_STEP_ID, STEP_PARAMS]:
"""Build a mapping of step id to extra params.
Validate the extra params and raise a ValueError if they are invalid.
Validation rules:
- The extra params must be structured as --<step_id>.<param_name>=<param_value>
- The step id must be one of the unique steps to run.
Args:
extra_params (Tuple[str]): The extra params provided by the user.
unique_steps_to_run (Dict[CONNECTOR_TEST_STEP_ID, Step]): The unique steps to run in the test pipeline.
Raises:
ValueError: Raised if the extra params format is invalid.
ValueError: Raised if the step id in the extra params is not one of the unique steps to run.
Returns:
Dict[CONNECTOR_TEST_STEP_ID, STEP_PARAMS]: The mapping of step id to extra params.
"""
extra_params_mapping: Dict[CONNECTOR_TEST_STEP_ID, STEP_PARAMS] = {}
valid_step_names = [step_id.value for step_id in CONNECTOR_TEST_STEP_ID]
for param in extra_params:
is_flag = "=" not in param
pattern = EXTRA_PARAMS_PATTERN_FOR_FLAG if is_flag else EXTRA_PARAMS_PATTERN
matches = pattern.match(param)
if not matches:
raise ValueError(f"Invalid parameter {param}. {EXTRA_PARAMS_PATTERN_ERROR_MESSAGE}")
if is_flag:
step_name, param_name = matches.groups()
param_value = None
else:
step_name, param_name, param_value = matches.groups()
if step_name not in valid_step_names or CONNECTOR_TEST_STEP_ID(step_name) not in unique_steps_to_run:
raise ValueError(
f"Invalid step name {step_name}, it must be one of {[step_id.value for step_id in unique_steps_to_run.keys()]}"
)
step_id = CONNECTOR_TEST_STEP_ID(step_name)
if step_id not in extra_params_mapping:
extra_params_mapping[step_id] = {}
if param_name not in extra_params_mapping[step_id]:
extra_params_mapping[step_id][param_name] = []
if param_value is not None:
extra_params_mapping[step_id][param_name].append(param_value)
return extra_params_mapping


def log_steps_params(unique_steps_to_run: Dict[CONNECTOR_TEST_STEP_ID, Step]) -> None:
"""Log the steps params.
Args:
unique_steps_to_run (Dict[CONNECTOR_TEST_STEP_ID, Step]): The unique steps to run in the test pipeline.
"""
for step_id, step in unique_steps_to_run.items():
if step.params:
step.logger.info(f"Step ID {step_id} will run with the following parameters: {step.params}")


def set_extra_params_on_steps_to_run(extra_params: Tuple[str], steps_to_run: STEP_TREE) -> None:
"""Set the extra params on the steps to run.
Args:
extra_params (Tuple[str]): The extra params provided by the user.
steps_to_run (STEP_TREE): The steps to run in the test pipeline.
"""
# As the steps to run can be nested, we need to flatten them to get the unique steps to run
unique_steps_to_run = {step_to_run.id: step_to_run.step for step_to_run in collapse(steps_to_run)}
extra_params_mapping = build_extra_params_mapping(extra_params, unique_steps_to_run)
for step_id, validated_extra_params in extra_params_mapping.items():
unique_steps_to_run[step_id].extra_params = validated_extra_params
log_steps_params(unique_steps_to_run)


async def run_connector_test_pipeline(context: ConnectorContext, semaphore: anyio.Semaphore, extra_params: Tuple[str]) -> ConnectorReport:
"""
Compute the steps to run for a connector test pipeline.
"""
all_steps_to_run: STEP_TREE = []

steps_to_run = get_test_steps(context)
all_steps_to_run += get_test_steps(context)

if not context.code_tests_only:
steps_to_run += [
static_analysis_steps_to_run = [
[
StepToRun(id=CONNECTOR_TEST_STEP_ID.METADATA_VALIDATION, step=MetadataValidation(context)),
StepToRun(id=CONNECTOR_TEST_STEP_ID.VERSION_FOLLOW_CHECK, step=VersionFollowsSemverCheck(context)),
StepToRun(id=CONNECTOR_TEST_STEP_ID.VERSION_INC_CHECK, step=VersionIncrementCheck(context)),
StepToRun(id=CONNECTOR_TEST_STEP_ID.QA_CHECKS, step=QaChecks(context)),
]
]
all_steps_to_run += static_analysis_steps_to_run

set_extra_params_on_steps_to_run(extra_params, all_steps_to_run)

async with semaphore:
async with context:
result_dict = await run_steps(
runnables=steps_to_run,
runnables=all_steps_to_run,
options=context.run_step_options,
)

Expand Down

0 comments on commit f9b4f9b

Please sign in to comment.