From 18115ede392f924aa0d6a23836ffcfbdb4d3627e Mon Sep 17 00:00:00 2001 From: masonmenges Date: Tue, 2 Apr 2024 11:33:36 -0600 Subject: [PATCH 01/10] add deployment version to flow run db model --- src/prefect/server/database/orm_models.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/prefect/server/database/orm_models.py b/src/prefect/server/database/orm_models.py index a3b3bd8802e9..865c39da1627 100644 --- a/src/prefect/server/database/orm_models.py +++ b/src/prefect/server/database/orm_models.py @@ -493,6 +493,7 @@ def flow_id(cls): deployment_id = sa.Column(UUID(), nullable=True) work_queue_name = sa.Column(sa.String, index=True) flow_version = sa.Column(sa.String, index=True) + deployment_version = sa.Column(sa.String, index=True) parameters = sa.Column(JSON, server_default="{}", default=dict, nullable=False) idempotency_key = sa.Column(sa.String) context = sa.Column(JSON, server_default="{}", default=dict, nullable=False) From 6766ce6c5ff8c24db227301e88273be4ae85ad40 Mon Sep 17 00:00:00 2001 From: masonmenges Date: Wed, 3 Apr 2024 09:56:23 -0600 Subject: [PATCH 02/10] add deployment version to flowrun schema --- src/prefect/server/schemas/core.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/prefect/server/schemas/core.py b/src/prefect/server/schemas/core.py index 5b8a2832639d..e4d441221179 100644 --- a/src/prefect/server/schemas/core.py +++ b/src/prefect/server/schemas/core.py @@ -167,6 +167,11 @@ class FlowRun(ORMBaseModel): "The id of the deployment associated with this flow run, if available." ), ) + deployment_version: Optional[str] = Field( + default=None, + description="The version of the deployment associated with this flow run.", + example="1.0", + ) work_queue_name: Optional[str] = Field( default=None, description="The work queue that handled this flow run." ) From 69f1398c3e346fc4a7e8ed6b03cdb34e12b33bf7 Mon Sep 17 00:00:00 2001 From: masonmenges Date: Wed, 3 Apr 2024 09:57:55 -0600 Subject: [PATCH 03/10] add deployment version to flowrunresponses schema --- src/prefect/server/schemas/responses.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/prefect/server/schemas/responses.py b/src/prefect/server/schemas/responses.py index 41e644454c88..23b0baa65652 100644 --- a/src/prefect/server/schemas/responses.py +++ b/src/prefect/server/schemas/responses.py @@ -183,6 +183,11 @@ class FlowRunResponse(ORMBaseModel): "The id of the deployment associated with this flow run, if available." ), ) + deployment_version: Optional[str] = Field( + default=None, + description="The version of the deployment associated with this flow run.", + example="1.0", + ) work_queue_id: Optional[UUID] = Field( default=None, description="The id of the run's work pool queue." ) From e5a1f3d0268eb6ef5ebeee271fcea86b87c6d4e6 Mon Sep 17 00:00:00 2001 From: masonmenges Date: Wed, 3 Apr 2024 10:19:37 -0600 Subject: [PATCH 04/10] reformatting --- tests/server/api/test_flow_runs.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/tests/server/api/test_flow_runs.py b/tests/server/api/test_flow_runs.py index 1aa1e6641e13..a24a150daaa4 100644 --- a/tests/server/api/test_flow_runs.py +++ b/tests/server/api/test_flow_runs.py @@ -311,6 +311,31 @@ async def test_read_flow_run(self, flow, flow_run, client): assert response.status_code == status.HTTP_200_OK assert response.json()["id"] == str(flow_run.id) assert response.json()["flow_id"] == str(flow.id) + assert response.json()["deployment_version"] is None + + @pytest.fixture + async def flow_run_with_deployment_version(self, flow, session): + flow_run = await models.flow_runs.create_flow_run( + session=session, + flow_run=schemas.core.FlowRun( + flow_id=flow.id, + flow_version="1.0", + deployment_version="Deployment Version 1.0", + state=schemas.states.Pending(), + ), + ) + await session.commit() + return flow_run + + async def test_read_flow_run_with_deployment_version( + self, flow, flow_run_with_deployment_version, client + ): + # make sure we we can read the flow run correctly + response = await client.get(f"/flow_runs/{flow_run_with_deployment_version.id}") + assert response.status_code == status.HTTP_200_OK + assert response.json()["id"] == str(flow_run_with_deployment_version.id) + assert response.json()["flow_id"] == str(flow.id) + assert response.json()["deployment_version"] == "Deployment Version 1.0" async def test_read_flow_run_like_the_engine_does(self, flow, flow_run, client): """Regression test for the hex format of UUIDs in `PREFECT__FLOW_RUN_ID` From f2a23ccf4549f2527fa852e4f3e34153b4ed0473 Mon Sep 17 00:00:00 2001 From: masonmenges Date: Wed, 3 Apr 2024 10:21:47 -0600 Subject: [PATCH 05/10] add db migrations for sqlite and postgres --- ...9f03_add_deployment_version_to_flow_run.py | 34 +++++++++++++++++++ ...5a08_add_deployment_version_to_flow_run.py | 32 +++++++++++++++++ 2 files changed, 66 insertions(+) create mode 100644 src/prefect/server/database/migrations/versions/postgresql/2024_04_03_094418_bd6efa529f03_add_deployment_version_to_flow_run.py create mode 100644 src/prefect/server/database/migrations/versions/sqlite/2024_04_02_114538_8644a9595a08_add_deployment_version_to_flow_run.py diff --git a/src/prefect/server/database/migrations/versions/postgresql/2024_04_03_094418_bd6efa529f03_add_deployment_version_to_flow_run.py b/src/prefect/server/database/migrations/versions/postgresql/2024_04_03_094418_bd6efa529f03_add_deployment_version_to_flow_run.py new file mode 100644 index 000000000000..594e50a68569 --- /dev/null +++ b/src/prefect/server/database/migrations/versions/postgresql/2024_04_03_094418_bd6efa529f03_add_deployment_version_to_flow_run.py @@ -0,0 +1,34 @@ +"""add_deployment_version_to_flow_run + +Revision ID: bd6efa529f03 +Revises: 7a653837d9ba +Create Date: 2024-04-03 09:44:18.666688 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "bd6efa529f03" +down_revision = "7a653837d9ba" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column( + "flow_run", sa.Column("deployment_version", sa.String(), nullable=True) + ) + op.create_index( + op.f("ix_flow_run__deployment_version"), + "flow_run", + ["deployment_version"], + unique=False, + ) + # ### end Alembic commands ### + + +def downgrade(): + op.drop_index(op.f("ix_flow_run__deployment_version"), table_name="flow_run") + op.drop_column("flow_run", "deployment_version") + # ### end Alembic commands ### diff --git a/src/prefect/server/database/migrations/versions/sqlite/2024_04_02_114538_8644a9595a08_add_deployment_version_to_flow_run.py b/src/prefect/server/database/migrations/versions/sqlite/2024_04_02_114538_8644a9595a08_add_deployment_version_to_flow_run.py new file mode 100644 index 000000000000..ae88f38352b2 --- /dev/null +++ b/src/prefect/server/database/migrations/versions/sqlite/2024_04_02_114538_8644a9595a08_add_deployment_version_to_flow_run.py @@ -0,0 +1,32 @@ +"""add_deployment_version_to_flow_run + +Revision ID: 8644a9595a08 +Revises: bacc60edce16 +Create Date: 2024-04-02 11:45:38.210743 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "8644a9595a08" +down_revision = "bacc60edce16" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column( + "flow_run", sa.Column("deployment_version", sa.String(), nullable=True) + ) + op.create_index( + op.f("ix_flow_run__deployment_version"), + "flow_run", + ["deployment_version"], + unique=False, + ) + + +def downgrade(): + op.drop_index(op.f("ix_flow_run__deployment_version"), table_name="flow_run") + op.drop_column("flow_run", "deployment_version") From 900f7376aa332dfe368db3ca9e649d8f52a89970 Mon Sep 17 00:00:00 2001 From: masonmenges Date: Thu, 4 Apr 2024 09:28:53 -0600 Subject: [PATCH 06/10] add deployment version to runs from a deployment --- src/prefect/server/api/deployments.py | 1 + src/prefect/server/models/deployments.py | 1 + 2 files changed, 2 insertions(+) diff --git a/src/prefect/server/api/deployments.py b/src/prefect/server/api/deployments.py index 47b34b1a0a20..050660c43961 100644 --- a/src/prefect/server/api/deployments.py +++ b/src/prefect/server/api/deployments.py @@ -672,6 +672,7 @@ async def create_flow_run_from_deployment( ), flow_id=deployment.flow_id, deployment_id=deployment.id, + deployment_version=deployment.version, parameters=parameters, tags=set(deployment.tags).union(flow_run.tags), infrastructure_document_id=( diff --git a/src/prefect/server/models/deployments.py b/src/prefect/server/models/deployments.py index abc0748f752a..2abd3c1b0b43 100644 --- a/src/prefect/server/models/deployments.py +++ b/src/prefect/server/models/deployments.py @@ -619,6 +619,7 @@ async def _generate_scheduled_flow_runs( "id": uuid4(), "flow_id": deployment.flow_id, "deployment_id": deployment_id, + "deplyment_version": deployment.version, "work_queue_name": deployment.work_queue_name, "work_queue_id": deployment.work_queue_id, "parameters": deployment.parameters, From f80fe47dc9cc24636a59e0bfac377bc3bd58795d Mon Sep 17 00:00:00 2001 From: masonmenges Date: Thu, 4 Apr 2024 09:47:22 -0600 Subject: [PATCH 07/10] add test creating flow runs from deployment api --- tests/fixtures/database.py | 41 ++++++++++++++++++++++++++++ tests/server/api/test_deployments.py | 11 ++++++++ 2 files changed, 52 insertions(+) diff --git a/tests/fixtures/database.py b/tests/fixtures/database.py index 9c0cb13e110a..91f176d77324 100644 --- a/tests/fixtures/database.py +++ b/tests/fixtures/database.py @@ -454,6 +454,47 @@ def hello(name: str): return deployment +@pytest.fixture +async def deployment_with_version( + session, + flow, + flow_function, + infrastructure_document_id, + storage_document_id, + work_queue_1, # attached to a work pool called the work_pool fixture named "test-work-pool" +): + def hello(name: str): + pass + + deployment = await models.deployments.create_deployment( + session=session, + deployment=schemas.core.Deployment( + name=f"my-deployment-{uuid.uuid4()}", + tags=["test"], + flow_id=flow.id, + schedules=[ + schemas.core.DeploymentSchedule( + schedule=schemas.schedules.IntervalSchedule( + interval=datetime.timedelta(days=1), + anchor_date=pendulum.datetime(2020, 1, 1), + ), + active=True, + ) + ], + storage_document_id=storage_document_id, + path="./subdir", + entrypoint="/file.py:flow", + infrastructure_document_id=infrastructure_document_id, + work_queue_name=work_queue_1.name, + parameter_openapi_schema=parameter_schema(hello), + work_queue_id=work_queue_1.id, + version="1.0", + ), + ) + await session.commit() + return deployment + + @pytest.fixture async def deployment_2( session, diff --git a/tests/server/api/test_deployments.py b/tests/server/api/test_deployments.py index e4caa3f64ef3..5a35d121dac6 100644 --- a/tests/server/api/test_deployments.py +++ b/tests/server/api/test_deployments.py @@ -2528,6 +2528,17 @@ async def test_create_flow_run_from_deployment_with_defaults( ) assert response.json()["work_queue_name"] == deployment.work_queue_name assert response.json()["state_type"] == schemas.states.StateType.SCHEDULED + assert response.json()["deployment_version"] is None + + async def test_create_flow_run_from_deployment_with_deployment_version( + self, deployment_with_version, client + ): + # should use default parameters, tags, and flow runner + response = await client.post( + f"deployments/{deployment_with_version.id}/create_flow_run", json={} + ) + assert response.status_code == 201 + assert response.json()["deployment_version"] == "1.0" async def test_create_flow_run_from_deployment_uses_work_queue_name( self, deployment, client, session From 04a09c54aeb9e04e17d500c90b9a018a41a7ee76 Mon Sep 17 00:00:00 2001 From: masonmenges Date: Thu, 4 Apr 2024 10:24:47 -0600 Subject: [PATCH 08/10] add deployment version to client for flow runs --- src/prefect/client/schemas/objects.py | 5 +++++ src/prefect/client/schemas/responses.py | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/src/prefect/client/schemas/objects.py b/src/prefect/client/schemas/objects.py index c83b82b62df0..30221a152594 100644 --- a/src/prefect/client/schemas/objects.py +++ b/src/prefect/client/schemas/objects.py @@ -424,6 +424,11 @@ class FlowRun(ObjectBaseModel): "The id of the deployment associated with this flow run, if available." ), ) + deployment_version: Optional[str] = Field( + default=None, + description="The version of the deployment associated with this flow run.", + example="1.0", + ) work_queue_name: Optional[str] = Field( default=None, description="The work queue that handled this flow run." ) diff --git a/src/prefect/client/schemas/responses.py b/src/prefect/client/schemas/responses.py index 88bdf76146c9..92efa32d7835 100644 --- a/src/prefect/client/schemas/responses.py +++ b/src/prefect/client/schemas/responses.py @@ -172,6 +172,11 @@ class FlowRunResponse(ObjectBaseModel): "The id of the deployment associated with this flow run, if available." ), ) + deployment_version: Optional[str] = Field( + default=None, + description="The version of the deployment associated with this flow run.", + example="1.0", + ) work_queue_name: Optional[str] = Field( default=None, description="The work queue that handled this flow run." ) From e0455b993536a46436a212cb924156d4dc8d1cea Mon Sep 17 00:00:00 2001 From: masonmenges Date: Thu, 4 Apr 2024 12:10:58 -0600 Subject: [PATCH 09/10] fix revision order --- ..._04_094418_bd6efa529f03_add_deployment_version_to_flow_run.py} | 0 ..._04_114538_8644a9595a08_add_deployment_version_to_flow_run.py} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename src/prefect/server/database/migrations/versions/postgresql/{2024_04_03_094418_bd6efa529f03_add_deployment_version_to_flow_run.py => 2024_04_04_094418_bd6efa529f03_add_deployment_version_to_flow_run.py} (100%) rename src/prefect/server/database/migrations/versions/sqlite/{2024_04_02_114538_8644a9595a08_add_deployment_version_to_flow_run.py => 2024_04_04_114538_8644a9595a08_add_deployment_version_to_flow_run.py} (100%) diff --git a/src/prefect/server/database/migrations/versions/postgresql/2024_04_03_094418_bd6efa529f03_add_deployment_version_to_flow_run.py b/src/prefect/server/database/migrations/versions/postgresql/2024_04_04_094418_bd6efa529f03_add_deployment_version_to_flow_run.py similarity index 100% rename from src/prefect/server/database/migrations/versions/postgresql/2024_04_03_094418_bd6efa529f03_add_deployment_version_to_flow_run.py rename to src/prefect/server/database/migrations/versions/postgresql/2024_04_04_094418_bd6efa529f03_add_deployment_version_to_flow_run.py diff --git a/src/prefect/server/database/migrations/versions/sqlite/2024_04_02_114538_8644a9595a08_add_deployment_version_to_flow_run.py b/src/prefect/server/database/migrations/versions/sqlite/2024_04_04_114538_8644a9595a08_add_deployment_version_to_flow_run.py similarity index 100% rename from src/prefect/server/database/migrations/versions/sqlite/2024_04_02_114538_8644a9595a08_add_deployment_version_to_flow_run.py rename to src/prefect/server/database/migrations/versions/sqlite/2024_04_04_114538_8644a9595a08_add_deployment_version_to_flow_run.py From 4d0ed19f4bde219e484e1d5e023f2e67dabc5bc2 Mon Sep 17 00:00:00 2001 From: masonmenges Date: Thu, 4 Apr 2024 12:24:58 -0600 Subject: [PATCH 10/10] fix revision order2 --- ..._094418_bd6efa529f03_add_deployment_version_to_flow_run.py | 4 ++-- ..._114538_8644a9595a08_add_deployment_version_to_flow_run.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/prefect/server/database/migrations/versions/postgresql/2024_04_04_094418_bd6efa529f03_add_deployment_version_to_flow_run.py b/src/prefect/server/database/migrations/versions/postgresql/2024_04_04_094418_bd6efa529f03_add_deployment_version_to_flow_run.py index 594e50a68569..e22327e72942 100644 --- a/src/prefect/server/database/migrations/versions/postgresql/2024_04_04_094418_bd6efa529f03_add_deployment_version_to_flow_run.py +++ b/src/prefect/server/database/migrations/versions/postgresql/2024_04_04_094418_bd6efa529f03_add_deployment_version_to_flow_run.py @@ -1,7 +1,7 @@ """add_deployment_version_to_flow_run Revision ID: bd6efa529f03 -Revises: 7a653837d9ba +Revises: aeea5ee6f070 Create Date: 2024-04-03 09:44:18.666688 """ @@ -10,7 +10,7 @@ # revision identifiers, used by Alembic. revision = "bd6efa529f03" -down_revision = "7a653837d9ba" +down_revision = "aeea5ee6f070" branch_labels = None depends_on = None diff --git a/src/prefect/server/database/migrations/versions/sqlite/2024_04_04_114538_8644a9595a08_add_deployment_version_to_flow_run.py b/src/prefect/server/database/migrations/versions/sqlite/2024_04_04_114538_8644a9595a08_add_deployment_version_to_flow_run.py index ae88f38352b2..eef9ae233930 100644 --- a/src/prefect/server/database/migrations/versions/sqlite/2024_04_04_114538_8644a9595a08_add_deployment_version_to_flow_run.py +++ b/src/prefect/server/database/migrations/versions/sqlite/2024_04_04_114538_8644a9595a08_add_deployment_version_to_flow_run.py @@ -1,7 +1,7 @@ """add_deployment_version_to_flow_run Revision ID: 8644a9595a08 -Revises: bacc60edce16 +Revises: 07ed05dfd4ec Create Date: 2024-04-02 11:45:38.210743 """ @@ -10,7 +10,7 @@ # revision identifiers, used by Alembic. revision = "8644a9595a08" -down_revision = "bacc60edce16" +down_revision = "07ed05dfd4ec" branch_labels = None depends_on = None