Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds Deployment Version to the Flow Run object #12591

Merged
merged 11 commits into from
Apr 9, 2024
5 changes: 5 additions & 0 deletions src/prefect/client/schemas/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,11 @@ class FlowRun(ObjectBaseModel):
"The id of the deployment associated with this flow run, if available."
),
)
deployment_version: Optional[str] = Field(
default=None,
description="The version of the deployment associated with this flow run.",
example="1.0",
)
work_queue_name: Optional[str] = Field(
default=None, description="The work queue that handled this flow run."
)
Expand Down
5 changes: 5 additions & 0 deletions src/prefect/client/schemas/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ class FlowRunResponse(ObjectBaseModel):
"The id of the deployment associated with this flow run, if available."
),
)
deployment_version: Optional[str] = Field(
default=None,
description="The version of the deployment associated with this flow run.",
example="1.0",
)
work_queue_name: Optional[str] = Field(
default=None, description="The work queue that handled this flow run."
)
Expand Down
1 change: 1 addition & 0 deletions src/prefect/server/api/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,7 @@ async def create_flow_run_from_deployment(
),
flow_id=deployment.flow_id,
deployment_id=deployment.id,
deployment_version=deployment.version,
parameters=parameters,
tags=set(deployment.tags).union(flow_run.tags),
infrastructure_document_id=(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""add_deployment_version_to_flow_run

Revision ID: bd6efa529f03
Revises: aeea5ee6f070
Create Date: 2024-04-03 09:44:18.666688

"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "bd6efa529f03"
down_revision = "aeea5ee6f070"
branch_labels = None
depends_on = None


def upgrade():
op.add_column(
"flow_run", sa.Column("deployment_version", sa.String(), nullable=True)
)
op.create_index(
op.f("ix_flow_run__deployment_version"),
"flow_run",
["deployment_version"],
unique=False,
)
# ### end Alembic commands ###


def downgrade():
op.drop_index(op.f("ix_flow_run__deployment_version"), table_name="flow_run")
op.drop_column("flow_run", "deployment_version")
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""add_deployment_version_to_flow_run

Revision ID: 8644a9595a08
Revises: 07ed05dfd4ec
Create Date: 2024-04-02 11:45:38.210743

"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "8644a9595a08"
down_revision = "07ed05dfd4ec"
branch_labels = None
depends_on = None


def upgrade():
op.add_column(
"flow_run", sa.Column("deployment_version", sa.String(), nullable=True)
)
op.create_index(
op.f("ix_flow_run__deployment_version"),
"flow_run",
["deployment_version"],
unique=False,
)


def downgrade():
op.drop_index(op.f("ix_flow_run__deployment_version"), table_name="flow_run")
op.drop_column("flow_run", "deployment_version")
1 change: 1 addition & 0 deletions src/prefect/server/database/orm_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ def flow_id(cls):
deployment_id = sa.Column(UUID(), nullable=True)
work_queue_name = sa.Column(sa.String, index=True)
flow_version = sa.Column(sa.String, index=True)
deployment_version = sa.Column(sa.String, index=True)
parameters = sa.Column(JSON, server_default="{}", default=dict, nullable=False)
idempotency_key = sa.Column(sa.String)
context = sa.Column(JSON, server_default="{}", default=dict, nullable=False)
Expand Down
1 change: 1 addition & 0 deletions src/prefect/server/models/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,7 @@ async def _generate_scheduled_flow_runs(
"id": uuid4(),
"flow_id": deployment.flow_id,
"deployment_id": deployment_id,
"deplyment_version": deployment.version,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deplyment_version is this an issue?

Copy link
Contributor Author

@masonmenges masonmenges Apr 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@meggers

deplyment_version is this an issue?

It shouldn't be no, but for clarification are you just referring to the naming convention?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for my lack of clarity, was referring to the misspelling (missing o in deployment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yep thanks for flagging that I'll get it updated 😁

"work_queue_name": deployment.work_queue_name,
"work_queue_id": deployment.work_queue_id,
"parameters": deployment.parameters,
Expand Down
5 changes: 5 additions & 0 deletions src/prefect/server/schemas/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ class FlowRun(ORMBaseModel):
"The id of the deployment associated with this flow run, if available."
),
)
deployment_version: Optional[str] = Field(
default=None,
description="The version of the deployment associated with this flow run.",
example="1.0",
)
work_queue_name: Optional[str] = Field(
default=None, description="The work queue that handled this flow run."
)
Expand Down
5 changes: 5 additions & 0 deletions src/prefect/server/schemas/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ class FlowRunResponse(ORMBaseModel):
"The id of the deployment associated with this flow run, if available."
),
)
deployment_version: Optional[str] = Field(
default=None,
description="The version of the deployment associated with this flow run.",
example="1.0",
)
work_queue_id: Optional[UUID] = Field(
default=None, description="The id of the run's work pool queue."
)
Expand Down
41 changes: 41 additions & 0 deletions tests/fixtures/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,47 @@ def hello(name: str):
return deployment


@pytest.fixture
async def deployment_with_version(
session,
flow,
flow_function,
infrastructure_document_id,
storage_document_id,
work_queue_1, # attached to a work pool called the work_pool fixture named "test-work-pool"
):
def hello(name: str):
pass

deployment = await models.deployments.create_deployment(
session=session,
deployment=schemas.core.Deployment(
name=f"my-deployment-{uuid.uuid4()}",
tags=["test"],
flow_id=flow.id,
schedules=[
schemas.core.DeploymentSchedule(
schedule=schemas.schedules.IntervalSchedule(
interval=datetime.timedelta(days=1),
anchor_date=pendulum.datetime(2020, 1, 1),
),
active=True,
)
],
storage_document_id=storage_document_id,
path="./subdir",
entrypoint="/file.py:flow",
infrastructure_document_id=infrastructure_document_id,
work_queue_name=work_queue_1.name,
parameter_openapi_schema=parameter_schema(hello),
work_queue_id=work_queue_1.id,
version="1.0",
),
)
await session.commit()
return deployment


@pytest.fixture
async def deployment_2(
session,
Expand Down
11 changes: 11 additions & 0 deletions tests/server/api/test_deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -2528,6 +2528,17 @@ async def test_create_flow_run_from_deployment_with_defaults(
)
assert response.json()["work_queue_name"] == deployment.work_queue_name
assert response.json()["state_type"] == schemas.states.StateType.SCHEDULED
assert response.json()["deployment_version"] is None

async def test_create_flow_run_from_deployment_with_deployment_version(
self, deployment_with_version, client
):
# should use default parameters, tags, and flow runner
response = await client.post(
f"deployments/{deployment_with_version.id}/create_flow_run", json={}
)
assert response.status_code == 201
assert response.json()["deployment_version"] == "1.0"

async def test_create_flow_run_from_deployment_uses_work_queue_name(
self, deployment, client, session
Expand Down
25 changes: 25 additions & 0 deletions tests/server/api/test_flow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,31 @@ async def test_read_flow_run(self, flow, flow_run, client):
assert response.status_code == status.HTTP_200_OK
assert response.json()["id"] == str(flow_run.id)
assert response.json()["flow_id"] == str(flow.id)
assert response.json()["deployment_version"] is None

@pytest.fixture
async def flow_run_with_deployment_version(self, flow, session):
flow_run = await models.flow_runs.create_flow_run(
session=session,
flow_run=schemas.core.FlowRun(
flow_id=flow.id,
flow_version="1.0",
deployment_version="Deployment Version 1.0",
state=schemas.states.Pending(),
),
)
await session.commit()
return flow_run

async def test_read_flow_run_with_deployment_version(
self, flow, flow_run_with_deployment_version, client
):
# make sure we we can read the flow run correctly
response = await client.get(f"/flow_runs/{flow_run_with_deployment_version.id}")
assert response.status_code == status.HTTP_200_OK
assert response.json()["id"] == str(flow_run_with_deployment_version.id)
assert response.json()["flow_id"] == str(flow.id)
assert response.json()["deployment_version"] == "Deployment Version 1.0"

async def test_read_flow_run_like_the_engine_does(self, flow, flow_run, client):
"""Regression test for the hex format of UUIDs in `PREFECT__FLOW_RUN_ID`
Expand Down
Loading