Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 105 additions & 1 deletion datajunction-server/tests/api/deployments_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from datajunction_server.internal.git.github_service import GitHubServiceError
from datajunction_server.api.deployments import InProcessExecutor
from datajunction_server.models.dimensionlink import JoinType
from datajunction_server.database.node import Node
from datajunction_server.database.node import Node, NodeRelationship
from datajunction_server.database.tag import Tag
from datajunction_server.models.node import (
MetricDirection,
Expand Down Expand Up @@ -3997,3 +3997,107 @@ def test_falls_back_to_current_user_when_no_source(self):
spec = DeploymentSpec(namespace="test")
orchestrator = self._make_orchestrator(spec, username="admin")
assert orchestrator._history_user == "admin"


class TestDeploymentRevalidation:
"""
Deployment write path: _create_node_revision sets NodeRelationship rows
from node_graph, ensuring each new revision has correct parent links.
"""

@pytest.mark.asyncio
async def test_redeploy_creates_new_revision_with_correct_parents(
self,
client,
session,
):
"""
Force-redeploying a spec creates new node revisions via _create_node_revision,
which derives parents from node_graph. Verify the new revision has the
correct NodeRelationship row regardless of any corruption on the old revision.
"""
from sqlalchemy import select, delete

namespace = "revalidate_parents_test"

source = SourceSpec(
name="default.parts",
catalog="default",
schema="shop",
table="parts",
columns=[
ColumnSpec(name="part_id", type="int"),
ColumnSpec(name="name", type="string"),
],
)
transform = TransformSpec(
name="default.parts_enriched",
query="SELECT part_id, name FROM ${prefix}default.parts",
columns=[
ColumnSpec(name="part_id", type="int"),
ColumnSpec(name="name", type="string"),
],
)
spec = DeploymentSpec(namespace=namespace, nodes=[source, transform])

# Initial deployment
data = await deploy_and_wait(client, spec)
assert data["status"] == DeploymentStatus.SUCCESS.value

# Fetch the deployed transform and verify it has a parent
transform_name = f"{namespace}.default.parts_enriched"
from sqlalchemy.orm import joinedload

transform_node = (
await session.execute(
select(Node)
.where(Node.name == transform_name)
.options(joinedload(Node.current)),
)
).scalar_one()
original_rev_id = transform_node.current.id
original_rows = (
await session.execute(
select(NodeRelationship).where(
NodeRelationship.child_id == original_rev_id,
),
)
).all()
assert len(original_rows) == 1, "transform should have one parent after deploy"

# Corrupt: delete the NodeRelationship rows
await session.execute(
delete(NodeRelationship).where(
NodeRelationship.child_id == original_rev_id,
),
)
await session.commit()
assert (
await session.execute(
select(NodeRelationship).where(
NodeRelationship.child_id == original_rev_id,
),
)
).all() == []

# Re-deploy with force=True so even unchanged nodes get new revisions
# with correct parent links set by _create_node_revision.
force_spec = DeploymentSpec(
namespace=namespace,
nodes=[source, transform],
force=True,
)
data = await deploy_and_wait(client, force_spec)
assert data["status"] == DeploymentStatus.SUCCESS.value

# Re-fetch to get the new current revision created by re-deployment
await session.refresh(transform_node, ["current"])
new_rev_id = transform_node.current.id
restored_rows = (
await session.execute(
select(NodeRelationship).where(NodeRelationship.child_id == new_rev_id),
)
).all()
assert len(restored_rows) == 1, (
"_create_node_revision should have written correct parent relationships"
)
Loading