diff --git a/src/prefect/client/schemas/objects.py b/src/prefect/client/schemas/objects.py index af2fd5c1e861..3a6b5540c767 100644 --- a/src/prefect/client/schemas/objects.py +++ b/src/prefect/client/schemas/objects.py @@ -422,6 +422,11 @@ class FlowRun(ObjectBaseModel): "The id of the deployment associated with this flow run, if available." ), ) + deployment_version: Optional[str] = Field( + default=None, + description="The version of the deployment associated with this flow run.", + example="1.0", + ) work_queue_name: Optional[str] = Field( default=None, description="The work queue that handled this flow run." ) diff --git a/src/prefect/client/schemas/responses.py b/src/prefect/client/schemas/responses.py index 88a6149c9b7d..48e8fc44fe0f 100644 --- a/src/prefect/client/schemas/responses.py +++ b/src/prefect/client/schemas/responses.py @@ -172,6 +172,11 @@ class FlowRunResponse(ObjectBaseModel): "The id of the deployment associated with this flow run, if available." ), ) + deployment_version: Optional[str] = Field( + default=None, + description="The version of the deployment associated with this flow run.", + example="1.0", + ) work_queue_name: Optional[str] = Field( default=None, description="The work queue that handled this flow run." ) diff --git a/src/prefect/server/api/deployments.py b/src/prefect/server/api/deployments.py index 47b34b1a0a20..050660c43961 100644 --- a/src/prefect/server/api/deployments.py +++ b/src/prefect/server/api/deployments.py @@ -672,6 +672,7 @@ async def create_flow_run_from_deployment( ), flow_id=deployment.flow_id, deployment_id=deployment.id, + deployment_version=deployment.version, parameters=parameters, tags=set(deployment.tags).union(flow_run.tags), infrastructure_document_id=( diff --git a/src/prefect/server/database/migrations/versions/postgresql/2024_04_04_094418_bd6efa529f03_add_deployment_version_to_flow_run.py b/src/prefect/server/database/migrations/versions/postgresql/2024_04_04_094418_bd6efa529f03_add_deployment_version_to_flow_run.py new file mode 100644 index 000000000000..e22327e72942 --- /dev/null +++ b/src/prefect/server/database/migrations/versions/postgresql/2024_04_04_094418_bd6efa529f03_add_deployment_version_to_flow_run.py @@ -0,0 +1,34 @@ +"""add_deployment_version_to_flow_run + +Revision ID: bd6efa529f03 +Revises: aeea5ee6f070 +Create Date: 2024-04-03 09:44:18.666688 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "bd6efa529f03" +down_revision = "aeea5ee6f070" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column( + "flow_run", sa.Column("deployment_version", sa.String(), nullable=True) + ) + op.create_index( + op.f("ix_flow_run__deployment_version"), + "flow_run", + ["deployment_version"], + unique=False, + ) + # ### end Alembic commands ### + + +def downgrade(): + op.drop_index(op.f("ix_flow_run__deployment_version"), table_name="flow_run") + op.drop_column("flow_run", "deployment_version") + # ### end Alembic commands ### diff --git a/src/prefect/server/database/migrations/versions/sqlite/2024_04_04_114538_8644a9595a08_add_deployment_version_to_flow_run.py b/src/prefect/server/database/migrations/versions/sqlite/2024_04_04_114538_8644a9595a08_add_deployment_version_to_flow_run.py new file mode 100644 index 000000000000..eef9ae233930 --- /dev/null +++ b/src/prefect/server/database/migrations/versions/sqlite/2024_04_04_114538_8644a9595a08_add_deployment_version_to_flow_run.py @@ -0,0 +1,32 @@ +"""add_deployment_version_to_flow_run + +Revision ID: 8644a9595a08 +Revises: 07ed05dfd4ec +Create Date: 2024-04-02 11:45:38.210743 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "8644a9595a08" +down_revision = "07ed05dfd4ec" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column( + "flow_run", sa.Column("deployment_version", sa.String(), nullable=True) + ) + op.create_index( + op.f("ix_flow_run__deployment_version"), + "flow_run", + ["deployment_version"], + unique=False, + ) + + +def downgrade(): + op.drop_index(op.f("ix_flow_run__deployment_version"), table_name="flow_run") + op.drop_column("flow_run", "deployment_version") diff --git a/src/prefect/server/database/orm_models.py b/src/prefect/server/database/orm_models.py index 618e134fd178..2a193080c1ad 100644 --- a/src/prefect/server/database/orm_models.py +++ b/src/prefect/server/database/orm_models.py @@ -501,6 +501,7 @@ def flow_id(cls): deployment_id = sa.Column(UUID(), nullable=True) work_queue_name = sa.Column(sa.String, index=True) flow_version = sa.Column(sa.String, index=True) + deployment_version = sa.Column(sa.String, index=True) parameters = sa.Column(JSON, server_default="{}", default=dict, nullable=False) idempotency_key = sa.Column(sa.String) context = sa.Column(JSON, server_default="{}", default=dict, nullable=False) diff --git a/src/prefect/server/models/deployments.py b/src/prefect/server/models/deployments.py index abc0748f752a..2abd3c1b0b43 100644 --- a/src/prefect/server/models/deployments.py +++ b/src/prefect/server/models/deployments.py @@ -619,6 +619,7 @@ async def _generate_scheduled_flow_runs( "id": uuid4(), "flow_id": deployment.flow_id, "deployment_id": deployment_id, + "deplyment_version": deployment.version, "work_queue_name": deployment.work_queue_name, "work_queue_id": deployment.work_queue_id, "parameters": deployment.parameters, diff --git a/src/prefect/server/schemas/core.py b/src/prefect/server/schemas/core.py index 652ca6cf7350..e7d94cc11930 100644 --- a/src/prefect/server/schemas/core.py +++ b/src/prefect/server/schemas/core.py @@ -159,6 +159,11 @@ class FlowRun(ORMBaseModel): "The id of the deployment associated with this flow run, if available." ), ) + deployment_version: Optional[str] = Field( + default=None, + description="The version of the deployment associated with this flow run.", + example="1.0", + ) work_queue_name: Optional[str] = Field( default=None, description="The work queue that handled this flow run." ) diff --git a/src/prefect/server/schemas/responses.py b/src/prefect/server/schemas/responses.py index c0e0d58d73a3..f96944fbb17b 100644 --- a/src/prefect/server/schemas/responses.py +++ b/src/prefect/server/schemas/responses.py @@ -183,6 +183,11 @@ class FlowRunResponse(ORMBaseModel): "The id of the deployment associated with this flow run, if available." ), ) + deployment_version: Optional[str] = Field( + default=None, + description="The version of the deployment associated with this flow run.", + example="1.0", + ) work_queue_id: Optional[UUID] = Field( default=None, description="The id of the run's work pool queue." ) diff --git a/tests/fixtures/database.py b/tests/fixtures/database.py index 9c0cb13e110a..91f176d77324 100644 --- a/tests/fixtures/database.py +++ b/tests/fixtures/database.py @@ -454,6 +454,47 @@ def hello(name: str): return deployment +@pytest.fixture +async def deployment_with_version( + session, + flow, + flow_function, + infrastructure_document_id, + storage_document_id, + work_queue_1, # attached to a work pool called the work_pool fixture named "test-work-pool" +): + def hello(name: str): + pass + + deployment = await models.deployments.create_deployment( + session=session, + deployment=schemas.core.Deployment( + name=f"my-deployment-{uuid.uuid4()}", + tags=["test"], + flow_id=flow.id, + schedules=[ + schemas.core.DeploymentSchedule( + schedule=schemas.schedules.IntervalSchedule( + interval=datetime.timedelta(days=1), + anchor_date=pendulum.datetime(2020, 1, 1), + ), + active=True, + ) + ], + storage_document_id=storage_document_id, + path="./subdir", + entrypoint="/file.py:flow", + infrastructure_document_id=infrastructure_document_id, + work_queue_name=work_queue_1.name, + parameter_openapi_schema=parameter_schema(hello), + work_queue_id=work_queue_1.id, + version="1.0", + ), + ) + await session.commit() + return deployment + + @pytest.fixture async def deployment_2( session, diff --git a/tests/server/api/test_deployments.py b/tests/server/api/test_deployments.py index e4caa3f64ef3..5a35d121dac6 100644 --- a/tests/server/api/test_deployments.py +++ b/tests/server/api/test_deployments.py @@ -2528,6 +2528,17 @@ async def test_create_flow_run_from_deployment_with_defaults( ) assert response.json()["work_queue_name"] == deployment.work_queue_name assert response.json()["state_type"] == schemas.states.StateType.SCHEDULED + assert response.json()["deployment_version"] is None + + async def test_create_flow_run_from_deployment_with_deployment_version( + self, deployment_with_version, client + ): + # should use default parameters, tags, and flow runner + response = await client.post( + f"deployments/{deployment_with_version.id}/create_flow_run", json={} + ) + assert response.status_code == 201 + assert response.json()["deployment_version"] == "1.0" async def test_create_flow_run_from_deployment_uses_work_queue_name( self, deployment, client, session diff --git a/tests/server/api/test_flow_runs.py b/tests/server/api/test_flow_runs.py index 1aa1e6641e13..a24a150daaa4 100644 --- a/tests/server/api/test_flow_runs.py +++ b/tests/server/api/test_flow_runs.py @@ -311,6 +311,31 @@ async def test_read_flow_run(self, flow, flow_run, client): assert response.status_code == status.HTTP_200_OK assert response.json()["id"] == str(flow_run.id) assert response.json()["flow_id"] == str(flow.id) + assert response.json()["deployment_version"] is None + + @pytest.fixture + async def flow_run_with_deployment_version(self, flow, session): + flow_run = await models.flow_runs.create_flow_run( + session=session, + flow_run=schemas.core.FlowRun( + flow_id=flow.id, + flow_version="1.0", + deployment_version="Deployment Version 1.0", + state=schemas.states.Pending(), + ), + ) + await session.commit() + return flow_run + + async def test_read_flow_run_with_deployment_version( + self, flow, flow_run_with_deployment_version, client + ): + # make sure we we can read the flow run correctly + response = await client.get(f"/flow_runs/{flow_run_with_deployment_version.id}") + assert response.status_code == status.HTTP_200_OK + assert response.json()["id"] == str(flow_run_with_deployment_version.id) + assert response.json()["flow_id"] == str(flow.id) + assert response.json()["deployment_version"] == "Deployment Version 1.0" async def test_read_flow_run_like_the_engine_does(self, flow, flow_run, client): """Regression test for the hex format of UUIDs in `PREFECT__FLOW_RUN_ID`