From 949e41ce603ae8ad41baf1153269390db17b28bc Mon Sep 17 00:00:00 2001 From: Morgan Wowk Date: Tue, 17 Mar 2026 21:20:35 -0700 Subject: [PATCH] feat: Measure execution status duration **Changes:** * Adds histogram measurement for execution node status duration without adding additional database load to the system --- cloud_pipelines_backend/backend_types_sql.py | 23 ++- cloud_pipelines_backend/database_ops.py | 38 +++- .../instrumentation/metrics.py | 33 +++- cloud_pipelines_backend/orchestrator_sql.py | 164 ++++++++++++++---- 4 files changed, 223 insertions(+), 35 deletions(-) diff --git a/cloud_pipelines_backend/backend_types_sql.py b/cloud_pipelines_backend/backend_types_sql.py index b984c6c..8597143 100644 --- a/cloud_pipelines_backend/backend_types_sql.py +++ b/cloud_pipelines_backend/backend_types_sql.py @@ -64,7 +64,7 @@ def generate_unique_id() -> str: # # Needed to put a union type into DB # class SqlIOTypeStruct(_BaseModel): -# type: structures.TypeSpecType +# type: structures.TypeSpecType # No. We'll represent TypeSpecType as name:str + properties:dict # Supported cases: # * type: "name" @@ -358,7 +358,9 @@ class ExecutionNode(_TableBase): repr=False, ) - # updated_at: orm.Mapped[datetime.datetime | None] = orm.mapped_column(default=None) + status_updated_at: orm.Mapped[datetime.datetime | None] = orm.mapped_column( + default=None + ) # execution_kind = orm.Mapped[typing.Literal["CONTAINER", "GRAPH"]] @@ -425,6 +427,23 @@ class ExecutionNode(_TableBase): ) +@sql.event.listens_for(ExecutionNode.container_execution_status, "set") +def _stamp_execution_status_updated_at( + target: ExecutionNode, + value: ContainerExecutionStatus | None, + _old_value: object, + _initiator: object, +) -> None: + """Keep status_updated_at in sync with container_execution_status. + + Fires for every Python-level write anywhere in the codebase (orchestrator, + API server, etc.). Does NOT fire on database loads, so there is no risk of + overwriting the column when SQLAlchemy hydrates a row from a SELECT. + """ + if value is not None: + target.status_updated_at = datetime.datetime.now(datetime.timezone.utc) + + EXECUTION_NODE_EXTRA_DATA_SYSTEM_ERROR_EXCEPTION_MESSAGE_KEY = ( "system_error_exception_message" ) diff --git a/cloud_pipelines_backend/database_ops.py b/cloud_pipelines_backend/database_ops.py index caf889e..e86686f 100644 --- a/cloud_pipelines_backend/database_ops.py +++ b/cloud_pipelines_backend/database_ops.py @@ -1,5 +1,4 @@ import logging - import sqlalchemy from sqlalchemy import orm @@ -66,12 +65,49 @@ def create_db_engine( return db_engine +def _add_columns_if_missing(*, db_engine: sqlalchemy.Engine) -> None: + """Add new nullable columns to existing tables when they are not yet present. + + SQLAlchemy's create_all() only creates missing tables, not missing columns, + so new columns require an explicit migration step. All additions run in a + single transaction so the schema is updated atomically.""" + _COLUMN_MIGRATIONS = [ + bts.ExecutionNode.__table__.c.status_updated_at, + ] + inspector = sqlalchemy.inspect(db_engine) + with db_engine.connect() as conn: + for col in _COLUMN_MIGRATIONS: + existing = {c["name"] for c in inspector.get_columns(col.table.name)} + if col.name not in existing: + _logger.info( + f"Migrating: ALTER TABLE {col.table.name} ADD COLUMN {col.name} ({col.type})" + ) + try: + col_type_str = col.type.compile(dialect=db_engine.dialect) + conn.execute( + sqlalchemy.text( + f"ALTER TABLE {col.table.name}" + f" ADD COLUMN {col.name} {col_type_str}" + ) + ) + except sqlalchemy.exc.OperationalError: + _logger.info( + f"Column {col.table.name}.{col.name} already exists (concurrent migration) — skipping" + ) + else: + _logger.info( + f"Column {col.table.name}.{col.name} already exists — skipping" + ) + conn.commit() + + def migrate_db( *, db_engine: sqlalchemy.Engine, do_skip_backfill: bool, ) -> None: _logger.info("Enter migrate DB") + _add_columns_if_missing(db_engine=db_engine) # # Example: # sqlalchemy.Index( diff --git a/cloud_pipelines_backend/instrumentation/metrics.py b/cloud_pipelines_backend/instrumentation/metrics.py index 5ddb326..94c54be 100644 --- a/cloud_pipelines_backend/instrumentation/metrics.py +++ b/cloud_pipelines_backend/instrumentation/metrics.py @@ -22,8 +22,18 @@ - Instrument: orchestrator_execution_system_errors """ +import enum + from opentelemetry import metrics as otel_metrics + +class MetricUnit(str, enum.Enum): + """UCUM-style unit strings accepted by the OTel SDK.""" + + SECONDS = "s" + ERRORS = "{error}" + + # --------------------------------------------------------------------------- # tangle.orchestrator # --------------------------------------------------------------------------- @@ -32,5 +42,26 @@ execution_system_errors = orchestrator_meter.create_counter( name="execution.system_errors", description="Number of execution nodes that ended in SYSTEM_ERROR status", - unit="{error}", + unit=MetricUnit.ERRORS, ) + +execution_status_transition_duration = orchestrator_meter.create_histogram( + name="execution.status_transition.duration", + description="Duration an execution spent in a status before transitioning to the next status", + unit=MetricUnit.SECONDS, +) + + +def record_status_transition( + from_status: str, + to_status: str, + duration_seconds: float, +) -> None: + """Record a single status-transition duration observation.""" + execution_status_transition_duration.record( + duration_seconds, + attributes={ + "execution.status.from": from_status, + "execution.status.to": to_status, + }, + ) diff --git a/cloud_pipelines_backend/orchestrator_sql.py b/cloud_pipelines_backend/orchestrator_sql.py index 4ac30a9..65f1a4e 100644 --- a/cloud_pipelines_backend/orchestrator_sql.py +++ b/cloud_pipelines_backend/orchestrator_sql.py @@ -99,6 +99,12 @@ def internal_process_queued_executions_queue(self, session: orm.Session): if queued_execution: self._queued_executions_queue_idle = False start_timestamp = time.monotonic_ns() + # Capture before the try/except: session.rollback() in the except + # handler expires every attribute on every loaded instance, so + # reading these after rollback would trigger an unintended + # lazy-load SELECT instead of returning the in-memory values. + queued_prev_status = queued_execution.container_execution_status + queued_prev_status_updated_at = queued_execution.status_updated_at with contextual_logging.logging_context(execution_id=queued_execution.id): _logger.info("Before processing queued execution") @@ -109,8 +115,11 @@ def internal_process_queued_executions_queue(self, session: orm.Session): except Exception as ex: _logger.exception("Error processing queued execution") session.rollback() - queued_execution.container_execution_status = ( - bts.ContainerExecutionStatus.SYSTEM_ERROR + _transition_execution_status( + execution=queued_execution, + new_status=bts.ContainerExecutionStatus.SYSTEM_ERROR, + prev_status=queued_prev_status, + prev_status_updated_at=queued_prev_status_updated_at, ) record_system_error_exception( execution=queued_execution, exception=ex @@ -179,8 +188,11 @@ def internal_process_running_executions_queue(self, session: orm.Session): # Mark our ExecutionNode as SYSTEM_ERROR execution_nodes = running_container_execution.execution_nodes for execution_node in execution_nodes: - execution_node.container_execution_status = ( - bts.ContainerExecutionStatus.SYSTEM_ERROR + _transition_execution_status( + execution=execution_node, + new_status=bts.ContainerExecutionStatus.SYSTEM_ERROR, + prev_status=execution_node.container_execution_status, + prev_status_updated_at=execution_node.status_updated_at, ) record_system_error_exception( execution=execution_node, exception=ex @@ -239,8 +251,11 @@ def internal_process_one_queued_execution( _logger.info( f"Execution did not have all input artifact data present. Waiting for upstream. {execution.id=}" ) - execution.container_execution_status = ( - bts.ContainerExecutionStatus.WAITING_FOR_UPSTREAM + _transition_execution_status( + execution=execution, + new_status=bts.ContainerExecutionStatus.WAITING_FOR_UPSTREAM, + prev_status=execution.container_execution_status, + prev_status_updated_at=execution.status_updated_at, ) session.commit() return @@ -329,8 +344,11 @@ def internal_process_one_queued_execution( execution.extra_data = {} execution.extra_data["reused_from_execution_node_id"] = old_execution.id - execution.container_execution_status = ( - old_execution.container_execution_status + _transition_execution_status( + execution=execution, + new_status=old_execution.container_execution_status, + prev_status=execution.container_execution_status, + prev_status_updated_at=execution.status_updated_at, ) # If the execution is still running, it's enough to just link execution node to it. @@ -373,8 +391,11 @@ def internal_process_one_queued_execution( downstream_execution.container_execution_status == bts.ContainerExecutionStatus.WAITING_FOR_UPSTREAM ): - downstream_execution.container_execution_status = ( - bts.ContainerExecutionStatus.QUEUED + _transition_execution_status( + execution=downstream_execution, + new_status=bts.ContainerExecutionStatus.QUEUED, + prev_status=bts.ContainerExecutionStatus.WAITING_FOR_UPSTREAM, + prev_status_updated_at=downstream_execution.status_updated_at, ) session.commit() return @@ -410,8 +431,11 @@ def internal_process_one_queued_execution( _logger.info( f"Cancelling execution {execution.id} and skipping all downstream executions." ) - execution.container_execution_status = ( - bts.ContainerExecutionStatus.CANCELLED + _transition_execution_status( + execution=execution, + new_status=bts.ContainerExecutionStatus.CANCELLED, + prev_status=execution.container_execution_status, + prev_status_updated_at=execution.status_updated_at, ) _mark_all_downstream_executions_as_skipped( session=session, execution=execution @@ -560,6 +584,13 @@ def generate_execution_log_uri( container_execution_uuid ) + # Capture before the launcher call: both error and success paths cross a + # session.rollback() boundary, after which the instance is expired. + # Reading these here — while the instance is live — avoids an unintended + # lazy-load SELECT on the far side of the rollback. + _pre_launch_status = execution.container_execution_status + _pre_launch_status_updated_at = execution.status_updated_at + try: launched_container: launcher_interfaces.LaunchedContainer = ( self._launcher.launch_container_task( @@ -583,8 +614,11 @@ def generate_execution_log_uri( with session.begin(): # Logs whole exception _logger.exception(f"Error launching container for {execution.id=}") - execution.container_execution_status = ( - bts.ContainerExecutionStatus.SYSTEM_ERROR + _transition_execution_status( + execution=execution, + new_status=bts.ContainerExecutionStatus.SYSTEM_ERROR, + prev_status=_pre_launch_status, + prev_status_updated_at=_pre_launch_status_updated_at, ) record_system_error_exception(execution=execution, exception=ex) _mark_all_downstream_executions_as_skipped( @@ -628,7 +662,12 @@ def generate_execution_log_uri( session.add(container_execution) execution.container_execution = container_execution execution.container_execution_cache_key = cache_key - execution.container_execution_status = container_execution.status + _transition_execution_status( + execution=execution, + new_status=container_execution.status, + prev_status=_pre_launch_status, + prev_status_updated_at=_pre_launch_status_updated_at, + ) # TODO: Maybe add artifact value and URI to input ArtifactData. def internal_process_one_running_execution( @@ -694,8 +733,11 @@ def internal_process_one_running_execution( _logger.info( f"Cancelling execution {execution_node.id} and skipping all downstream executions." ) - execution_node.container_execution_status = ( - bts.ContainerExecutionStatus.CANCELLED + _transition_execution_status( + execution=execution_node, + new_status=bts.ContainerExecutionStatus.CANCELLED, + prev_status=execution_node.container_execution_status, + prev_status_updated_at=execution_node.status_updated_at, ) _mark_all_downstream_executions_as_skipped( session=session, execution=execution_node @@ -742,8 +784,11 @@ def internal_process_one_running_execution( container_execution.status = bts.ContainerExecutionStatus.RUNNING container_execution.started_at = reloaded_launched_container.started_at for execution_node in execution_nodes: - execution_node.container_execution_status = ( - bts.ContainerExecutionStatus.RUNNING + _transition_execution_status( + execution=execution_node, + new_status=bts.ContainerExecutionStatus.RUNNING, + prev_status=execution_node.container_execution_status, + prev_status_updated_at=execution_node.status_updated_at, ) elif new_status == launcher_interfaces.ContainerStatus.SUCCEEDED: container_execution.status = bts.ContainerExecutionStatus.SUCCEEDED @@ -813,8 +858,11 @@ def _maybe_preload_value( ) # Skip downstream executions for execution_node in execution_nodes: - execution_node.container_execution_status = ( - bts.ContainerExecutionStatus.FAILED + _transition_execution_status( + execution=execution_node, + new_status=bts.ContainerExecutionStatus.FAILED, + prev_status=execution_node.container_execution_status, + prev_status_updated_at=execution_node.status_updated_at, ) _mark_all_downstream_executions_as_skipped( session=session, execution=execution_node @@ -855,8 +903,11 @@ def _maybe_preload_value( session.add_all(new_output_artifact_data_map.values()) for execution_node in execution_nodes: - execution_node.container_execution_status = ( - bts.ContainerExecutionStatus.SUCCEEDED + _transition_execution_status( + execution=execution_node, + new_status=bts.ContainerExecutionStatus.SUCCEEDED, + prev_status=execution_node.container_execution_status, + prev_status_updated_at=execution_node.status_updated_at, ) # TODO: Optimize for output_name, artifact_node in session.execute( @@ -877,8 +928,11 @@ def _maybe_preload_value( downstream_execution.container_execution_status == bts.ContainerExecutionStatus.WAITING_FOR_UPSTREAM ): - downstream_execution.container_execution_status = ( - bts.ContainerExecutionStatus.QUEUED + _transition_execution_status( + execution=downstream_execution, + new_status=bts.ContainerExecutionStatus.QUEUED, + prev_status=bts.ContainerExecutionStatus.WAITING_FOR_UPSTREAM, + prev_status_updated_at=downstream_execution.status_updated_at, ) elif new_status == launcher_interfaces.ContainerStatus.FAILED: container_execution.status = bts.ContainerExecutionStatus.FAILED @@ -897,16 +951,22 @@ def _maybe_preload_value( _retry(reloaded_launched_container.upload_log) # Skip downstream executions for execution_node in execution_nodes: - execution_node.container_execution_status = ( - bts.ContainerExecutionStatus.FAILED + _transition_execution_status( + execution=execution_node, + new_status=bts.ContainerExecutionStatus.FAILED, + prev_status=execution_node.container_execution_status, + prev_status_updated_at=execution_node.status_updated_at, ) _mark_all_downstream_executions_as_skipped( session=session, execution=execution_node ) elif new_status == launcher_interfaces.ContainerStatus.PENDING: for execution_node in execution_nodes: - execution_node.container_execution_status = ( - bts.ContainerExecutionStatus.PENDING + _transition_execution_status( + execution=execution_node, + new_status=bts.ContainerExecutionStatus.PENDING, + prev_status=execution_node.container_execution_status, + prev_status_updated_at=execution_node.status_updated_at, ) # ? Should we reset `started_at` or keep it? container_execution.started_at = None @@ -948,12 +1008,18 @@ def _mark_all_downstream_executions_as_skipped( if execution.id in seen_execution_ids: return seen_execution_ids.add(execution.id) - if execution.container_execution_status in { + current_status = execution.container_execution_status + if current_status in { bts.ContainerExecutionStatus.WAITING_FOR_UPSTREAM, # A downstream ExecutionNode can be in "Queued" state when it's been "woken up" by one of its upstreams. bts.ContainerExecutionStatus.QUEUED, }: - execution.container_execution_status = bts.ContainerExecutionStatus.SKIPPED + _transition_execution_status( + execution=execution, + new_status=bts.ContainerExecutionStatus.SKIPPED, + prev_status=current_status, + prev_status_updated_at=execution.status_updated_at, + ) # for artifact_node in execution.output_artifact_nodes: # for downstream_execution in artifact_node.downstream_executions: @@ -1047,6 +1113,42 @@ def _retry( raise +def _transition_execution_status( + *, + execution: bts.ExecutionNode, + new_status: bts.ContainerExecutionStatus, + prev_status: bts.ContainerExecutionStatus | None, + prev_status_updated_at: datetime.datetime | None, +) -> None: + """Set execution node status and record the status transition duration metric. + + The caller is always responsible for supplying prev_status and + prev_status_updated_at. Typically these are read directly from the live + instance immediately before the call. + + At call sites that cross a session.rollback() boundary, capture these + values *before* the rollback. SQLAlchemy's rollback() marks every + attribute on every loaded instance as "expired", meaning the next read + of any attribute issues a fresh SELECT against the database. If + prev_status or prev_status_updated_at were read *after* rollback, they + would trigger that unexpected lazy-load rather than returning the + in-memory value that was current at transition time. + + Passing None for either argument skips metric recording for that transition + without affecting the status write. status_updated_at is maintained + automatically by the SQLAlchemy event listener in backend_types_sql.py. + """ + if prev_status is not None and prev_status_updated_at is not None: + app_metrics.record_status_transition( + from_status=prev_status.value, + to_status=new_status.value, + duration_seconds=( + _get_current_time() - prev_status_updated_at + ).total_seconds(), + ) + execution.container_execution_status = new_status + + def record_system_error_exception(execution: bts.ExecutionNode, exception: Exception): app_metrics.execution_system_errors.add(1)