Skip to content

Commit

Permalink
connectors-ci: better connector test debugging experience (1/2) (#27550)
Browse files Browse the repository at this point in the history
  • Loading branch information
alafanechere committed Jun 21, 2023
1 parent 59ff7b1 commit e742f8d
Show file tree
Hide file tree
Showing 7 changed files with 325 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@
from dagger import Container


async def mask_secrets_in_gha_logs(ci_credentials_with_downloaded_secrets: Container):
async def get_secrets_to_mask(ci_credentials_with_downloaded_secrets: Container) -> list[str]:
"""This function will print the secrets to mask in the GitHub actions logs with the ::add-mask:: prefix.
We're not doing it directly from the ci_credentials tool because its stdout is wrapped around the dagger logger,
And GHA will only interpret lines starting with ::add-mask:: as secrets to mask.
"""
if secrets_to_mask := await get_file_contents(ci_credentials_with_downloaded_secrets, "/tmp/secrets_to_mask.txt"):
for secret_to_mask in secrets_to_mask.splitlines():
secrets_to_mask = []
if secrets_to_mask_file := await get_file_contents(ci_credentials_with_downloaded_secrets, "/tmp/secrets_to_mask.txt"):
for secret_to_mask in secrets_to_mask_file.splitlines():
# We print directly to stdout because the GHA runner will mask only if the log line starts with "::add-mask::"
# If we use the dagger logger, or context logger, the log line will start with other stuff and will not be masked
print(f"::add-mask::{secret_to_mask}")
secrets_to_mask.append(secret_to_mask)
return secrets_to_mask


async def download(context: ConnectorContext, gcp_gsm_env_variable_name: str = "GCP_GSM_CREDENTIALS") -> Directory:
Expand All @@ -53,7 +56,7 @@ async def download(context: ConnectorContext, gcp_gsm_env_variable_name: str = "
)
# We don't want to print secrets in the logs when running locally.
if context.is_ci:
await mask_secrets_in_gha_logs(with_downloaded_secrets)
context.secrets_to_mask = await get_secrets_to_mask(with_downloaded_secrets)
return with_downloaded_secrets.directory(secrets_path)


Expand Down
161 changes: 128 additions & 33 deletions tools/ci_connector_ops/ci_connector_ops/pipelines/bases.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from __future__ import annotations

import json
import webbrowser
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
Expand All @@ -21,6 +22,7 @@
from ci_connector_ops.pipelines.utils import check_path_in_workdir, slugify, with_exit_code, with_stderr, with_stdout
from ci_connector_ops.utils import console
from dagger import Container, QueryError
from jinja2 import Environment, PackageLoader, select_autoescape
from rich.console import Group
from rich.panel import Panel
from rich.style import Style
Expand All @@ -29,7 +31,7 @@
from tabulate import tabulate

if TYPE_CHECKING:
from ci_connector_ops.pipelines.contexts import ConnectorContext, PipelineContext
from ci_connector_ops.pipelines.contexts import PipelineContext


class CIContext(str, Enum):
Expand Down Expand Up @@ -100,7 +102,7 @@ class Step(ABC):
started_at: ClassVar[datetime]
max_retries: ClassVar[int] = 0

def __init__(self, context: ConnectorContext) -> None: # noqa D107
def __init__(self, context: PipelineContext) -> None: # noqa D107
self.context = context
self.retry_count = 0

Expand Down Expand Up @@ -234,6 +236,19 @@ async def _run_tests_in_directory(self, connector_under_test: Container, test_di
return StepResult(self, StepStatus.SKIPPED)


class NoOpStep(Step):
"""A step that does nothing."""

title = "No Op"

def __init__(self, context: PipelineContext, step_status: StepStatus) -> None:
super().__init__(context)
self.step_status = step_status

async def _run(self, *args, **kwargs) -> StepResult:
return StepResult(self, self.step_status)


@dataclass(frozen=True)
class StepResult:
"""A dataclass to capture the result of a step."""
Expand All @@ -248,6 +263,20 @@ class StepResult:
def __repr__(self) -> str: # noqa D105
return f"{self.step.title}: {self.status.value}"

def __str__(self) -> str: # noqa D105
return f"{self.step.title}: {self.status.value}\n\nSTDOUT:\n{self.stdout}\n\nSTDERR:\n{self.stderr}"

def __post_init__(self):
if self.stderr:
super().__setattr__("stderr", self.redact_secrets_from_string(self.stderr))
if self.stdout:
super().__setattr__("stdout", self.redact_secrets_from_string(self.stdout))

def redact_secrets_from_string(self, value: str) -> str:
for secret in self.step.context.secrets_to_mask:
value = value.replace(secret, "********")
return value


@dataclass(frozen=True)
class Report:
Expand All @@ -257,15 +286,19 @@ class Report:
steps_results: List[StepResult]
created_at: datetime = field(default_factory=datetime.utcnow)
name: str = "REPORT"
_file_path_key: str = "report.json"
filename: str = "output"

@property
def file_path_key(self) -> str:
return self._file_path_key
def report_output_prefix(self) -> str: # noqa D102
return self.pipeline_context.report_output_prefix

@file_path_key.setter
def file_path_key(self, v: str) -> None:
self._file_path_key = v
@property
def json_report_file_name(self) -> str: # noqa D102
return self.filename + ".json"

@property
def json_report_remote_storage_key(self) -> str: # noqa D102
return f"{self.report_output_prefix}/{self.json_report_file_name}"

@property
def failed_steps(self) -> List[StepResult]: # noqa D102
Expand Down Expand Up @@ -295,25 +328,35 @@ def lead_duration(self) -> int: # noqa D102
def remote_storage_enabled(self) -> bool: # noqa D102
return self.pipeline_context.is_ci

async def save(self) -> None:
"""Save the report as a JSON file."""
local_report_path = anyio.Path(LOCAL_REPORTS_PATH_ROOT + self.file_path_key)
await local_report_path.parents[0].mkdir(parents=True, exist_ok=True)
await local_report_path.write_text(self.to_json())
async def save_local(self, filename: str, content: str) -> Path:
"""Save the report files locally."""
local_path = anyio.Path(f"{LOCAL_REPORTS_PATH_ROOT}/{self.report_output_prefix}/{filename}")
await local_path.parents[0].mkdir(parents=True, exist_ok=True)
await local_path.write_text(content)
return local_path

async def save_remote(self, local_path: Path, remote_key: str, content_type: str = None) -> int:
gcs_cp_flags = None if content_type is None else [f"--content-type={content_type}"]
local_file = self.pipeline_context.dagger_client.host().directory(".", include=[str(local_path)]).file(str(local_path))
report_upload_exit_code, _, _ = await remote_storage.upload_to_gcs(
dagger_client=self.pipeline_context.dagger_client,
file_to_upload=local_file,
key=remote_key,
bucket=self.pipeline_context.ci_report_bucket,
gcs_credentials=self.pipeline_context.ci_gcs_credentials_secret,
flags=gcs_cp_flags,
)
if report_upload_exit_code != 0:
self.pipeline_context.logger.error(f"Uploading {local_path} to GCS Bucket: {self.pipeline_context.ci_report_bucket} failed.")
return report_upload_exit_code

async def save(self) -> None:
"""Save the report files."""
local_json_path = await self.save_local(self.json_report_file_name, self.to_json())
self.pipeline_context.logger.info(f"Report saved locally at {local_json_path}")
if self.remote_storage_enabled:
local_report_dagger_file = (
self.pipeline_context.dagger_client.host().directory(".", include=[str(local_report_path)]).file(str(local_report_path))
)
report_upload_exit_code, _stdout, _stderr = await remote_storage.upload_to_gcs(
dagger_client=self.pipeline_context.dagger_client,
file_to_upload=local_report_dagger_file,
key=self.file_path_key,
bucket=self.pipeline_context.ci_report_bucket,
gcs_credentials=self.pipeline_context.ci_gcs_credentials_secret,
)
if report_upload_exit_code != 0:
self.pipeline_context.logger.error(f"Uploading the report to GCS Bucket: {self.pipeline_context.ci_report_bucket} failed.")
await self.save_remote(local_json_path, self.json_report_remote_storage_key, "application/json")
self.pipeline_context.logger.info(f"Report saved remotely at {self.json_report_remote_storage_key}")

def to_json(self) -> str:
"""Create a JSON representation of the report.
Expand Down Expand Up @@ -385,17 +428,29 @@ class ConnectorReport(Report):
"""A dataclass to build connector test reports to share pipelines executions results with the user."""

@property
def file_path_key(self) -> str: # noqa D102
connector_name = self.pipeline_context.connector.technical_name
connector_version = self.pipeline_context.connector.version
def report_output_prefix(self) -> str: # noqa D102
return f"{self.pipeline_context.report_output_prefix}/{self.pipeline_context.connector.technical_name}/{self.pipeline_context.connector.version}"

suffix = f"{connector_name}/{connector_version}/output.json"
file_path_key = f"{self.pipeline_context.report_output_prefix}/{suffix}"
return file_path_key
@property
def html_report_file_name(self) -> str: # noqa D102
return self.filename + ".html"

@property
def html_report_remote_storage_key(self) -> str: # noqa D102
return f"{self.report_output_prefix}/{self.html_report_file_name}"

@property
def html_report_url(self) -> str: # noqa D102
return f"https://storage.googleapis.com/{self.pipeline_context.ci_report_bucket}/{self.html_report_remote_storage_key}"

@property
def should_be_commented_on_pr(self) -> bool: # noqa D102
return self.pipeline_context.is_ci and self.pipeline_context.pull_request and self.pipeline_context.PRODUCTION
return (
self.pipeline_context.should_save_report
and self.pipeline_context.is_ci
and self.pipeline_context.pull_request
and self.pipeline_context.PRODUCTION
)

def to_json(self) -> str:
"""Create a JSON representation of the connector test report.
Expand All @@ -421,6 +476,7 @@ def to_json(self) -> str:
"git_revision": self.pipeline_context.git_revision,
"ci_context": self.pipeline_context.ci_context,
"cdk_version": self.pipeline_context.cdk_version,
"html_report_url": self.html_report_url,
}
)

Expand All @@ -436,12 +492,51 @@ def post_comment_on_pr(self) -> None:
if step_result.status is not StepStatus.SKIPPED
]
markdown_comment += tabulate(report_data, headers=["Step", "Result"], tablefmt="pipe") + "\n\n"
markdown_comment += f"🔗 [View the logs here]({self.pipeline_context.gha_workflow_run_url})\n\n"
markdown_comment += f"🔗 [View the logs here]({self.html_report_url})\n\n"
markdown_comment += "*Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.*\n"
markdown_comment += "**You can run the same pipeline locally on this branch with the [airbyte-ci](https://github.com/airbytehq/airbyte/blob/master/tools/ci_connector_ops/ci_connector_ops/pipelines/README.md) tool with the following command**\n"
markdown_comment += f"```bash\nairbyte-ci connectors --name={self.pipeline_context.connector.technical_name} test\n```\n\n"
self.pipeline_context.pull_request.create_issue_comment(markdown_comment)

async def to_html(self) -> str:
env = Environment(
loader=PackageLoader("ci_connector_ops.pipelines.tests"), autoescape=select_autoescape(), trim_blocks=False, lstrip_blocks=True
)
template = env.get_template("test_report.html.j2")
template.globals["StepStatus"] = StepStatus
local_icon_path = await Path(f"{self.pipeline_context.connector.code_directory}/icon.svg").resolve()
template_context = {
"connector_name": self.pipeline_context.connector.technical_name,
"step_results": self.steps_results,
"run_duration": round(self.run_duration),
"created_at": self.created_at.isoformat(),
"connector_version": self.pipeline_context.connector.version,
"gha_workflow_run_url": None,
"git_branch": self.pipeline_context.git_branch,
"git_revision": self.pipeline_context.git_revision,
"commit_url": None,
"icon_url": local_icon_path.as_uri(),
}

if self.pipeline_context.is_ci:
template_context["commit_url"] = f"https://github.com/airbytehq/airbyte/commit/{self.pipeline_context.git_revision}"
template_context["gha_workflow_run_url"] = self.pipeline_context.gha_workflow_run_url
template_context[
"icon_url"
] = f"https://raw.githubusercontent.com/airbytehq/airbyte/{self.pipeline_context.git_revision}/{self.pipeline_context.connector.code_directory}/icon.svg"
return template.render(template_context)

async def save(self) -> None:
local_html_path = await self.save_local(self.html_report_file_name, await self.to_html())
if self.pipeline_context.is_local:
absolute_path = await local_html_path.resolve()
self.pipeline_context.logger.info(f"Opening HTML report in browser: {absolute_path}")
webbrowser.open(absolute_path.as_uri())
if self.remote_storage_enabled:
await self.save_remote(local_html_path, self.html_report_remote_storage_key, "text/html")
self.pipeline_context.logger.info(f"HTML report uploaded to {self.html_report_url}")
await super().save()

def print(self):
"""Print the test report to the console in a nice way."""
connector_name = self.pipeline_context.connector.technical_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ def test(
ci_context=ctx.obj.get("ci_context"),
pull_request=ctx.obj.get("pull_request"),
ci_gcs_credentials=ctx.obj["ci_gcs_credentials"],
should_save_report=True,
)
for connector, modified_files in ctx.obj["selected_connectors_and_files"].items()
]
Expand Down Expand Up @@ -489,6 +490,8 @@ def format(ctx: click.Context) -> bool:
ci_gcs_credentials=ctx.obj["ci_gcs_credentials"],
ci_git_user=ctx.obj["ci_git_user"],
ci_github_access_token=ctx.obj["ci_github_access_token"],
pull_request=ctx.obj.get("pull_request"),
should_save_report=False,
)
for connector, modified_files in connectors_and_files_to_format
]
Expand Down
9 changes: 6 additions & 3 deletions tools/ci_connector_ops/ci_connector_ops/pipelines/contexts.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ def __init__(
self.ci_github_access_token = ci_github_access_token
self.started_at = None
self.stopped_at = None
self.secrets_to_mask = []
update_commit_status_check(**self.github_commit_status)

@property
Expand Down Expand Up @@ -262,7 +263,6 @@ async def __aexit__(
self.report = Report(self, steps_results=[])

self.report.print()
self.logger.info(self.report.to_json())

await asyncify(update_commit_status_check)(**self.github_commit_status)
if self.should_send_slack_message:
Expand Down Expand Up @@ -297,6 +297,7 @@ def __init__(
slack_webhook: Optional[str] = None,
reporting_slack_channel: Optional[str] = None,
pull_request: PullRequest = None,
should_save_report: bool = False,
):
"""Initialize a connector context.
Expand Down Expand Up @@ -326,6 +327,7 @@ def __init__(
self._secrets_dir = None
self._updated_secrets_dir = None
self.cdk_version = None
self.should_save_report = should_save_report

super().__init__(
pipeline_name=pipeline_name,
Expand Down Expand Up @@ -425,9 +427,9 @@ async def __aexit__(
await secrets.upload(self)

self.report.print()
self.logger.info(self.report.to_json())

await self.report.save()
if self.should_save_report:
await self.report.save()

if self.report.should_be_commented_on_pr:
self.report.post_comment_on_pr()
Expand Down Expand Up @@ -495,6 +497,7 @@ def __init__(
slack_webhook=slack_webhook,
reporting_slack_channel=reporting_slack_channel,
ci_gcs_credentials=ci_gcs_credentials,
should_save_report=True,
)

@property
Expand Down
Loading

0 comments on commit e742f8d

Please sign in to comment.