Conversation
Load Impact:
|
| Area | Impact |
|---|---|
| DB calls at startup | +1 SELECT per restart; +1 ALTER TABLE on first deploy only |
| DB calls per transition | None — status_updated_at piggybacks on the existing UPDATE |
| DB calls for reading previous status | None — in-memory reads from the live SQLAlchemy instance |
| CPU per transition | Negligible — one datetime.now() + one histogram bucket increment |
| Memory | Negligible — one additional datetime field per loaded ExecutionNode |
| Network (OTel export) | One additional histogram metric stream, exported in background batches |
|
|
||
| def _add_columns_if_missing(db_engine: sqlalchemy.Engine) -> None: | ||
| """Add new nullable columns to existing tables via ALTER TABLE when they are | ||
| not yet present. SQLAlchemy's create_all() only creates missing tables, not | ||
| missing columns, so new columns require an explicit migration step.""" | ||
| _COLUMN_MIGRATIONS: list[tuple[str, str, str]] = [ | ||
| # (table_name, column_name, column_definition) | ||
| ("execution_node", "status_updated_at", "DATETIME"), | ||
| ] | ||
| inspector = sqlalchemy.inspect(db_engine) | ||
| for table_name, column_name, column_def in _COLUMN_MIGRATIONS: | ||
| existing_columns = {col["name"] for col in inspector.get_columns(table_name)} | ||
| if column_name not in existing_columns: | ||
| with db_engine.connect() as conn: | ||
| conn.execute( | ||
| sqlalchemy.text( | ||
| f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_def}" | ||
| ) | ||
| ) | ||
| conn.commit() |
There was a problem hiding this comment.
Add * in the function for keyword-only arguments, good style even with just 1 parameter.
There was a problem hiding this comment.
Done — added * to _add_columns_if_missing.
| missing columns, so new columns require an explicit migration step.""" | ||
| _COLUMN_MIGRATIONS: list[tuple[str, str, str]] = [ | ||
| # (table_name, column_name, column_definition) | ||
| ("execution_node", "status_updated_at", "DATETIME"), |
There was a problem hiding this comment.
Let's derive these instead of hard coding them
_COLUMN_MIGRATIONS = [
bts.ExecutionNode.__table__.c.statys_updated_at,
]
# Example
col = bts.ExecutionNode.__table__.c.statys_updated_at
table_name = col.table.name # "execution_node"
column_name = col.name # "statys_updated_at"
column_type = col.type # DateTime()
There was a problem hiding this comment.
Done — _COLUMN_MIGRATIONS now holds bts.ExecutionNode.table.c.status_updated_at and derives table name, column name, and type from it.
| conn.execute( | ||
| sqlalchemy.text( | ||
| f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_def}" | ||
| ) | ||
| ) |
There was a problem hiding this comment.
Let's use DDL for this and try not to use text if possible. For type safety and portability, which is why we're using SQLAlchemy.
from sqlalchemy import Column, DateTime
from sqlalchemy.schema import AddColumn
col_obj = bts.ExecutionNode.__table__.c.statys_updated_at
with db_engine.connect() as conn:
conn.execute(AddColumn("execution_node", col_obj.copy()))
conn.commit()
There was a problem hiding this comment.
sqlalchemy.schema.AddColumn isn't available in our SQLAlchemy version, so I kept sqlalchemy.text but compile the type string from the model via col.type.compile(dialect=db_engine.dialect) to stay dialect-portable
| return db_engine | ||
|
|
||
|
|
||
| def _add_columns_if_missing(db_engine: sqlalchemy.Engine) -> None: |
There was a problem hiding this comment.
From my backfill lessons learned, i would suggestion
- add logs to know if the migration is happening or not, and any errors
- a single transaction, even if you only have 1 column, for future proofing. Else remove the for loop for now to make the code align.
Merging all my suggestions into something like this:
_COLUMN_MIGRATIONS = [
bts.ExecutionNode.__table__.c.statys_updated_at,
]
inspector = sqlalchemy.inspect(db_engine)
with db_engine.connect() as conn:
for col in _COLUMN_MIGRATIONS:
existing = {c["name"] for c in inspector.get_columns(col.table.name)}
if col.name not in existing:
_logger.info(
f"Migrating: ALTER TABLE {col.table.name} ADD COLUMN {col.name} ({col.type})"
)
try:
conn.execute(sqlalchemy.schema.AddColumn(col.table.name, col.copy()))
except sqlalchemy.exc.OperationalError:
_logger.info(
f"Column {col.table.name}.{col.name} already exists (concurrent migration)"
)
else:
_logger.info(
f"Column {col.table.name}.{col.name} already exists — skipping"
)
conn.commit()
There was a problem hiding this comment.
Done — adopted your suggestion almost verbatim: single with db_engine.connect() wrapping the loop, an info log before each ALTER TABLE, and a catch for OperationalError on concurrent migration
| The caller is always responsible for supplying prev_status and | ||
| prev_status_updated_at. Typically these are read directly from the live | ||
| instance immediately before the call. At call sites that cross a | ||
| session.rollback() boundary, the values should be captured before the | ||
| rollback while the instance is still live, as rollback expires all loaded | ||
| instances and reading an expired attribute would trigger an unintended | ||
| lazy-load SELECT. |
There was a problem hiding this comment.
Curious, for my learning purposes, what does "rollback will expire ..." mean? Can you give me some examples. I'm not following the comment.
There was a problem hiding this comment.
Expanded the docstring with a concrete explanation: rollback() marks every attribute on every loaded instance as "expired", so reading prev_status or prev_status_updated_at after a rollback would silently fire a fresh SELECT instead of returning the in-memory value.
| _get_current_time() - prev_status_updated_at | ||
| ).total_seconds(), | ||
| ) | ||
| execution.container_execution_status = new_status |
There was a problem hiding this comment.
this is really cool!
Curious, this should handle ALL node status transitions right? Asking because I might want to piggyback off this function in the future for doing status search in the search API.
| execution_status_transition_duration = orchestrator_meter.create_histogram( | ||
| name="execution.status_transition.duration", | ||
| description="Duration an execution spent in a status before transitioning to the next status", | ||
| unit="s", |
There was a problem hiding this comment.
Can this be a strenum? so we know all the units that are possible.
There was a problem hiding this comment.
Done — added MetricUnit(str, enum.Enum) with SECONDS = "s" and ERRORS = "{error}", used for both instruments.
c1a2020 to
a53502b
Compare
a53502b to
959b2f6
Compare
**Changes:** * Adds histogram measurement for execution node status duration without adding additional database load to the system
959b2f6 to
949e41c
Compare

Changes
status_updated_atcolumn toExecutionNodetable to track when execution status last changedstatus_updated_attimestamp whencontainer_execution_statuschangesexecution_status_transition_durationhistogram metric to measure time spent in each execution status_transition_execution_status()helper function to centralize status updates and metric recording across all status transitionsstatus_updated_atcolumn to existing tablesShow of work
Note: Attribute names have since changed to
execution_status_prefixLocal smoke-test and verification completed ✅