diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/bases.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/bases.py index c0c3af18fc681..14b8b0c496f4d 100644 --- a/tools/ci_connector_ops/ci_connector_ops/pipelines/bases.py +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/bases.py @@ -16,10 +16,9 @@ import anyio import asyncer from anyio import Path -from ci_connector_ops.pipelines.consts import PYPROJECT_TOML_FILE_PATH, LOCAL_REPORTS_PATH_ROOT from ci_connector_ops.pipelines.actions import remote_storage +from ci_connector_ops.pipelines.consts import LOCAL_REPORTS_PATH_ROOT, PYPROJECT_TOML_FILE_PATH from ci_connector_ops.pipelines.utils import check_path_in_workdir, slugify, with_exit_code, with_stderr, with_stdout - from ci_connector_ops.utils import console from dagger import Container, QueryError from rich.console import Group @@ -286,7 +285,11 @@ def success(self) -> bool: # noqa D102 @property def run_duration(self) -> int: # noqa D102 - return (self.created_at - self.pipeline_context.created_at).total_seconds() + return (self.pipeline_context.stopped_at - self.pipeline_context.started_at).total_seconds() + + @property + def lead_duration(self) -> int: # noqa D102 + return (self.pipeline_context.stopped_at - self.pipeline_context.created_at).total_seconds() @property def remote_storage_enabled(self) -> bool: # noqa D102 @@ -321,7 +324,7 @@ def to_json(self) -> str: return json.dumps( { "pipeline_name": self.pipeline_context.pipeline_name, - "run_timestamp": self.created_at.isoformat(), + "run_timestamp": self.pipeline_context.started_at.isoformat(), "run_duration": self.run_duration, "success": self.success, "failed_steps": [s.step.__class__.__name__ for s in self.failed_steps], @@ -329,8 +332,8 @@ def to_json(self) -> str: "skipped_steps": [s.step.__class__.__name__ for s in self.skipped_steps], "gha_workflow_run_url": self.pipeline_context.gha_workflow_run_url, "pipeline_start_timestamp": self.pipeline_context.pipeline_start_timestamp, - "pipeline_end_timestamp": round(self.created_at.timestamp()), - "pipeline_duration": round(self.created_at.timestamp()) - self.pipeline_context.pipeline_start_timestamp, + "pipeline_end_timestamp": round(self.pipeline_context.stopped_at.timestamp()), + "pipeline_duration": round(self.pipeline_context.stopped_at.timestamp()) - self.pipeline_context.pipeline_start_timestamp, "git_branch": self.pipeline_context.git_branch, "git_revision": self.pipeline_context.git_revision, "ci_context": self.pipeline_context.ci_context, diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/contexts.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/contexts.py index d90731b6684bf..25b2f8ebf24dc 100644 --- a/tools/ci_connector_ops/ci_connector_ops/pipelines/contexts.py +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/contexts.py @@ -15,8 +15,8 @@ import yaml from anyio import Path from asyncer import asyncify -from ci_connector_ops.pipelines.actions import remote_storage, secrets -from ci_connector_ops.pipelines.bases import CIContext, ConnectorReport, Report, StepStatus +from ci_connector_ops.pipelines.actions import secrets +from ci_connector_ops.pipelines.bases import CIContext, ConnectorReport, Report from ci_connector_ops.pipelines.github import update_commit_status_check from ci_connector_ops.pipelines.slack import send_message_to_webhook from ci_connector_ops.pipelines.utils import AIRBYTE_REPO_URL, METADATA_FILE_NAME, sanitize_gcs_credentials @@ -105,7 +105,8 @@ def __init__( self.dockerd_service = None self.ci_gcs_credentials = sanitize_gcs_credentials(ci_gcs_credentials) if ci_gcs_credentials else None self.ci_report_bucket = ci_report_bucket - + self.started_at = None + self.stopped_at = None update_commit_status_check(**self.github_commit_status) @property @@ -202,6 +203,7 @@ async def __aenter__(self): if self.dagger_client is None: raise Exception("A Pipeline can't be entered with an undefined dagger_client") self.state = ContextState.RUNNING + self.started_at = datetime.utcnow() await asyncify(update_commit_status_check)(**self.github_commit_status) if self.should_send_slack_message: await asyncify(send_message_to_webhook)(self.create_slack_message(), self.reporting_slack_channel, self.slack_webhook) @@ -247,6 +249,7 @@ async def __aexit__( bool: Whether the teardown operation ran successfully. """ self.state = self.determine_final_state(self.report, exception_value) + self.stopped_at = datetime.utcnow() if exception_value: self.logger.error("An error was handled by the Pipeline", exc_info=True) @@ -402,6 +405,7 @@ async def __aexit__( Returns: bool: Whether the teardown operation ran successfully. """ + self.stopped_at = datetime.utcnow() self.state = self.determine_final_state(self.report, exception_value) if exception_value: self.logger.error("An error got handled by the ConnectorContext", exc_info=True)