Skip to content

Commit

Permalink
connectors-ci: setup nightly builds (#24335)
Browse files Browse the repository at this point in the history
* connectors-ci: only send commit status on PR

* increase timeout

* try semaphore

* disable qa check code format for testing semaphore

* fix typo and change semaphore

* disable acceptance tests

* use semaphore

* restore ga builds

* use CacheSharingMode.SHARED, renable acceptance tests, semaphore of 5

* expose concurrency as a CLI option, set it to 10 on the workflow

* clean

* use xlarge runner

* // 15

* declare inputs

* better job naming

* better job naming

* use run-name

* use run-name
  • Loading branch information
alafanechere committed Mar 23, 2023
1 parent 468cd1c commit 810f9e9
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 28 deletions.
22 changes: 16 additions & 6 deletions .github/workflows/connector_nightly_builds_dagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,22 @@ on:
# 11AM UTC is 12AM CET, 4AM PST.
- cron: "0 11 * * *"
workflow_dispatch:
inputs:
runs-on:
type: string
default: large-runner
required: true
test-connectors-options:
default: --concurrency=5 --release-stage=generally_available --release-stage=beta
required: true

run-name: Test connectors ${{ inputs.test-connectors-options || 'Nightly builds GA and Beta connectors' }}

jobs:
connectors_ci:
name: Connectors CI
timeout-minutes: 240 # 4 hours
runs-on: large-runner
test_connectors:
name: Test connectors ${{ inputs.test-connectors-options || 'Nightly builds GA and Beta connectors' }}
timeout-minutes: 360 # 6 hours
runs-on: ${{ inputs.runs-on || 'large-runner' }}
steps:
- name: Get start timestamp
id: get-start-timestamp
Expand All @@ -31,7 +41,7 @@ jobs:
python-version: "3.10"
- name: Install ci-connector-ops package
run: pip install ./tools/ci_connector_ops\[pipelines]\
- name: Run nightly builds
- name: Test connectors
run: |
export _EXPERIMENTAL_DAGGER_RUNNER_HOST="unix:///var/run/buildkit/buildkitd.sock"
DAGGER_CLI_COMMIT="67c7e7635cf4ea0e446e2fed522a3e314c960f6a"
Expand All @@ -41,7 +51,7 @@ jobs:
mkdir -p "$DAGGER_TMP_BINDIR"
curl "https://dl.dagger.io/dagger/main/${DAGGER_CLI_COMMIT}/dagger_${DAGGER_CLI_COMMIT}_$(uname -s | tr A-Z a-z)_$(uname -m | sed s/x86_64/amd64/).tar.gz" | tar xvz -C "$DAGGER_TMP_BINDIR"
fi
connectors-ci --is-ci --gha-workflow-run-id=${{ github.run_id }} test-connectors --release-stage=beta --release-stage=generally_available
connectors-ci --is-ci --gha-workflow-run-id=${{ github.run_id }} test-connectors ${{ inputs.test-connectors-options || '--concurrency=5 --release-stage=generally_available --release-stage=beta' }}
env:
GCP_GSM_CREDENTIALS: ${{ secrets.GCP_GSM_CREDENTIALS }}
AWS_ACCESS_KEY_ID: ${{ secrets.STATUS_API_AWS_ACCESS_KEY_ID }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def with_python_base(context: ConnectorTestContext, python_image_name: str = "py
return (
context.dagger_client.container()
.from_(python_image_name)
.with_mounted_cache("/root/.cache/pip", pip_cache, sharing=CacheSharingMode.LOCKED)
.with_mounted_cache("/root/.cache/pip", pip_cache, sharing=CacheSharingMode.SHARED)
.with_mounted_directory("/tools", context.get_repo_dir("tools", include=["ci_credentials", "ci_common_utils"], exclude=[".venv"]))
.with_exec(["pip", "install", "--upgrade", "pip"])
)
Expand Down
42 changes: 24 additions & 18 deletions tools/ci_connector_ops/ci_connector_ops/pipelines/connectors_ci.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
logger = logging.getLogger(__name__)


async def run(context: ConnectorTestContext) -> ConnectorTestReport:
async def run(context: ConnectorTestContext, semaphore: anyio.Semaphore) -> ConnectorTestReport:
"""Runs a CI pipeline for a single connector.
A visual DAG can be found on the README.md file of the pipelines modules.
Expand All @@ -46,31 +46,34 @@ async def run(context: ConnectorTestContext) -> ConnectorTestReport:
Returns:
ConnectorTestReport: The test reports holding tests results.
"""
async with context:
async with asyncer.create_task_group() as task_group:
tasks = [
task_group.soonify(checks.QaChecks(context).run)(),
task_group.soonify(checks.CodeFormatChecks(context).run)(),
task_group.soonify(tests.run_all_tests)(context),
]
results = list(itertools.chain(*(task.value for task in tasks)))
async with semaphore:
async with context:
async with asyncer.create_task_group() as task_group:
tasks = [
task_group.soonify(checks.QaChecks(context).run)(),
task_group.soonify(checks.CodeFormatChecks(context).run)(),
task_group.soonify(tests.run_all_tests)(context),
]
results = list(itertools.chain(*(task.value for task in tasks)))

context.test_report = ConnectorTestReport(context, steps_results=results)
context.test_report = ConnectorTestReport(context, steps_results=results)

return context.test_report
return context.test_report


async def run_connectors_test_pipelines(contexts: List[ConnectorTestContext]):
async def run_connectors_test_pipelines(contexts: List[ConnectorTestContext], concurrency: int = 5):
"""Runs a CI pipeline for all the connectors passed.
Args:
contexts (List[ConnectorTestContext]): List of connector test contexts for which a CI pipeline needs to be run.
concurrency (int): Number of test pipeline that can run in parallel. Defaults to 5
"""
semaphore = anyio.Semaphore(concurrency)
async with dagger.Connection(DAGGER_CONFIG) as dagger_client:
async with anyio.create_task_group() as tg:
for context in contexts:
context.dagger_client = dagger_client.pipeline(f"{context.connector.technical_name} - Test Pipeline")
tg.start_soon(run, context)
tg.start_soon(run, context, semaphore)


@click.group()
Expand Down Expand Up @@ -120,7 +123,7 @@ def connectors_ci(
ctx.obj["gha_workflow_run_url"],
GITHUB_GLOBAL_DESCRIPTION,
GITHUB_GLOBAL_CONTEXT,
should_send=not ctx.obj["is_local"],
should_send=ctx.obj["ci_context"] == "pull_request",
logger=logger,
)

Expand All @@ -140,8 +143,11 @@ def connectors_ci(
type=click.Choice(["alpha", "beta", "generally_available"]),
)
@click.option("--modified/--not-modified", help="Only test modified connectors in the current branch.", default=False, type=bool)
@click.option("--concurrency", help="Number of connector tests pipeline to run in parallel.", default=5, type=int)
@click.pass_context
def test_connectors(ctx: click.Context, names: Tuple[str], languages: Tuple[ConnectorLanguage], release_stages: Tuple[str], modified: bool):
def test_connectors(
ctx: click.Context, names: Tuple[str], languages: Tuple[ConnectorLanguage], release_stages: Tuple[str], modified: bool, concurrency: int
):
"""Runs a CI pipeline the connector passed as CLI argument.
Args:
Expand Down Expand Up @@ -186,14 +192,14 @@ def test_connectors(ctx: click.Context, names: Tuple[str], languages: Tuple[Conn
in [ConnectorLanguage.PYTHON, ConnectorLanguage.LOW_CODE] # TODO: remove this once we implement pipelines for Java connector
]
try:
anyio.run(run_connectors_test_pipelines, connectors_tests_contexts)
anyio.run(run_connectors_test_pipelines, connectors_tests_contexts, concurrency)
update_commit_status_check(
ctx.obj["git_revision"],
"success",
ctx.obj["gha_workflow_run_url"],
GITHUB_GLOBAL_DESCRIPTION,
GITHUB_GLOBAL_CONTEXT,
should_send=not ctx.obj["is_local"],
should_send=ctx.obj.get("ci_context") == "pull_request",
logger=logger,
)
except dagger.DaggerError as e:
Expand All @@ -204,7 +210,7 @@ def test_connectors(ctx: click.Context, names: Tuple[str], languages: Tuple[Conn
ctx.obj["gha_workflow_run_url"],
GITHUB_GLOBAL_DESCRIPTION,
GITHUB_GLOBAL_CONTEXT,
should_send=not ctx.obj["is_local"],
should_send=ctx.obj.get("ci_context") == "pull_request",
logger=logger,
)

Expand Down
7 changes: 5 additions & 2 deletions tools/ci_connector_ops/ci_connector_ops/pipelines/contexts.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ def dagger_client(self, dagger_client: Client):
def is_ci(self):
return self.is_local is False

@property
def is_pr(self):
return self.ci_context == "pull_request"

@property
def repo(self):
return self.dagger_client.git(AIRBYTE_REPO_URL, keep_git_dir=True)
Expand Down Expand Up @@ -122,7 +126,7 @@ def github_commit_status(self) -> dict:
"target_url": self.gha_workflow_run_url,
"description": self.state.value["description"],
"context": f"[POC please ignore] Connector tests: {self.connector.technical_name}",
"should_send": self.is_ci,
"should_send": self.is_pr,
"logger": self.logger,
}

Expand Down Expand Up @@ -173,6 +177,5 @@ async def __aexit__(self, exception_type, exception_value, traceback) -> bool:
)
if report_upload_exit_code != 0:
self.logger.error("Uploading the report to S3 failed.")

await asyncify(update_commit_status_check)(**self.github_commit_status)
return True
2 changes: 1 addition & 1 deletion tools/ci_connector_ops/ci_connector_ops/pipelines/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ async def run(self) -> StepResult:
StepResult: Failure or success of the acceptances tests with stdout and stdout.
"""
if not self.context.connector.acceptance_test_config:
return StepResult(Step.ACCEPTANCE_TESTS, StepStatus.SKIPPED), None
return StepResult(self, StepStatus.SKIPPED), None

dagger_client = self.get_dagger_pipeline(self.context.dagger_client)

Expand Down

0 comments on commit 810f9e9

Please sign in to comment.