Skip to content

Commit

Permalink
Adds Deployment Version to the Flow Run object (#12591)
Browse files Browse the repository at this point in the history
  • Loading branch information
masonmenges committed Apr 9, 2024
1 parent 22f8978 commit fe07861
Show file tree
Hide file tree
Showing 12 changed files with 166 additions and 0 deletions.
5 changes: 5 additions & 0 deletions src/prefect/client/schemas/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,11 @@ class FlowRun(ObjectBaseModel):
"The id of the deployment associated with this flow run, if available."
),
)
deployment_version: Optional[str] = Field(
default=None,
description="The version of the deployment associated with this flow run.",
example="1.0",
)
work_queue_name: Optional[str] = Field(
default=None, description="The work queue that handled this flow run."
)
Expand Down
5 changes: 5 additions & 0 deletions src/prefect/client/schemas/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ class FlowRunResponse(ObjectBaseModel):
"The id of the deployment associated with this flow run, if available."
),
)
deployment_version: Optional[str] = Field(
default=None,
description="The version of the deployment associated with this flow run.",
example="1.0",
)
work_queue_name: Optional[str] = Field(
default=None, description="The work queue that handled this flow run."
)
Expand Down
1 change: 1 addition & 0 deletions src/prefect/server/api/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,7 @@ async def create_flow_run_from_deployment(
),
flow_id=deployment.flow_id,
deployment_id=deployment.id,
deployment_version=deployment.version,
parameters=parameters,
tags=set(deployment.tags).union(flow_run.tags),
infrastructure_document_id=(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""add_deployment_version_to_flow_run
Revision ID: bd6efa529f03
Revises: aeea5ee6f070
Create Date: 2024-04-03 09:44:18.666688
"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "bd6efa529f03"
down_revision = "aeea5ee6f070"
branch_labels = None
depends_on = None


def upgrade():
op.add_column(
"flow_run", sa.Column("deployment_version", sa.String(), nullable=True)
)
op.create_index(
op.f("ix_flow_run__deployment_version"),
"flow_run",
["deployment_version"],
unique=False,
)
# ### end Alembic commands ###


def downgrade():
op.drop_index(op.f("ix_flow_run__deployment_version"), table_name="flow_run")
op.drop_column("flow_run", "deployment_version")
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""add_deployment_version_to_flow_run
Revision ID: 8644a9595a08
Revises: 07ed05dfd4ec
Create Date: 2024-04-02 11:45:38.210743
"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "8644a9595a08"
down_revision = "07ed05dfd4ec"
branch_labels = None
depends_on = None


def upgrade():
op.add_column(
"flow_run", sa.Column("deployment_version", sa.String(), nullable=True)
)
op.create_index(
op.f("ix_flow_run__deployment_version"),
"flow_run",
["deployment_version"],
unique=False,
)


def downgrade():
op.drop_index(op.f("ix_flow_run__deployment_version"), table_name="flow_run")
op.drop_column("flow_run", "deployment_version")
1 change: 1 addition & 0 deletions src/prefect/server/database/orm_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ def flow_id(cls):
deployment_id = sa.Column(UUID(), nullable=True)
work_queue_name = sa.Column(sa.String, index=True)
flow_version = sa.Column(sa.String, index=True)
deployment_version = sa.Column(sa.String, index=True)
parameters = sa.Column(JSON, server_default="{}", default=dict, nullable=False)
idempotency_key = sa.Column(sa.String)
context = sa.Column(JSON, server_default="{}", default=dict, nullable=False)
Expand Down
1 change: 1 addition & 0 deletions src/prefect/server/models/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,7 @@ async def _generate_scheduled_flow_runs(
"id": uuid4(),
"flow_id": deployment.flow_id,
"deployment_id": deployment_id,
"deplyment_version": deployment.version,
"work_queue_name": deployment.work_queue_name,
"work_queue_id": deployment.work_queue_id,
"parameters": deployment.parameters,
Expand Down
5 changes: 5 additions & 0 deletions src/prefect/server/schemas/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ class FlowRun(ORMBaseModel):
"The id of the deployment associated with this flow run, if available."
),
)
deployment_version: Optional[str] = Field(
default=None,
description="The version of the deployment associated with this flow run.",
example="1.0",
)
work_queue_name: Optional[str] = Field(
default=None, description="The work queue that handled this flow run."
)
Expand Down
5 changes: 5 additions & 0 deletions src/prefect/server/schemas/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ class FlowRunResponse(ORMBaseModel):
"The id of the deployment associated with this flow run, if available."
),
)
deployment_version: Optional[str] = Field(
default=None,
description="The version of the deployment associated with this flow run.",
example="1.0",
)
work_queue_id: Optional[UUID] = Field(
default=None, description="The id of the run's work pool queue."
)
Expand Down
41 changes: 41 additions & 0 deletions tests/fixtures/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,47 @@ def hello(name: str):
return deployment


@pytest.fixture
async def deployment_with_version(
session,
flow,
flow_function,
infrastructure_document_id,
storage_document_id,
work_queue_1, # attached to a work pool called the work_pool fixture named "test-work-pool"
):
def hello(name: str):
pass

deployment = await models.deployments.create_deployment(
session=session,
deployment=schemas.core.Deployment(
name=f"my-deployment-{uuid.uuid4()}",
tags=["test"],
flow_id=flow.id,
schedules=[
schemas.core.DeploymentSchedule(
schedule=schemas.schedules.IntervalSchedule(
interval=datetime.timedelta(days=1),
anchor_date=pendulum.datetime(2020, 1, 1),
),
active=True,
)
],
storage_document_id=storage_document_id,
path="./subdir",
entrypoint="/file.py:flow",
infrastructure_document_id=infrastructure_document_id,
work_queue_name=work_queue_1.name,
parameter_openapi_schema=parameter_schema(hello),
work_queue_id=work_queue_1.id,
version="1.0",
),
)
await session.commit()
return deployment


@pytest.fixture
async def deployment_2(
session,
Expand Down
11 changes: 11 additions & 0 deletions tests/server/api/test_deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -2528,6 +2528,17 @@ async def test_create_flow_run_from_deployment_with_defaults(
)
assert response.json()["work_queue_name"] == deployment.work_queue_name
assert response.json()["state_type"] == schemas.states.StateType.SCHEDULED
assert response.json()["deployment_version"] is None

async def test_create_flow_run_from_deployment_with_deployment_version(
self, deployment_with_version, client
):
# should use default parameters, tags, and flow runner
response = await client.post(
f"deployments/{deployment_with_version.id}/create_flow_run", json={}
)
assert response.status_code == 201
assert response.json()["deployment_version"] == "1.0"

async def test_create_flow_run_from_deployment_uses_work_queue_name(
self, deployment, client, session
Expand Down
25 changes: 25 additions & 0 deletions tests/server/api/test_flow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,31 @@ async def test_read_flow_run(self, flow, flow_run, client):
assert response.status_code == status.HTTP_200_OK
assert response.json()["id"] == str(flow_run.id)
assert response.json()["flow_id"] == str(flow.id)
assert response.json()["deployment_version"] is None

@pytest.fixture
async def flow_run_with_deployment_version(self, flow, session):
flow_run = await models.flow_runs.create_flow_run(
session=session,
flow_run=schemas.core.FlowRun(
flow_id=flow.id,
flow_version="1.0",
deployment_version="Deployment Version 1.0",
state=schemas.states.Pending(),
),
)
await session.commit()
return flow_run

async def test_read_flow_run_with_deployment_version(
self, flow, flow_run_with_deployment_version, client
):
# make sure we we can read the flow run correctly
response = await client.get(f"/flow_runs/{flow_run_with_deployment_version.id}")
assert response.status_code == status.HTTP_200_OK
assert response.json()["id"] == str(flow_run_with_deployment_version.id)
assert response.json()["flow_id"] == str(flow.id)
assert response.json()["deployment_version"] == "Deployment Version 1.0"

async def test_read_flow_run_like_the_engine_does(self, flow, flow_run, client):
"""Regression test for the hex format of UUIDs in `PREFECT__FLOW_RUN_ID`
Expand Down

0 comments on commit fe07861

Please sign in to comment.